Skip to content

Commit

Permalink
Merge pull request datahop#33 from datahop/kademlia_callbacks
Browse files Browse the repository at this point in the history
kademlia find operation callbacks
  • Loading branch information
srene authored Apr 24, 2023
2 parents 61de214 + cb1b62b commit fe75908
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 24 deletions.
5 changes: 4 additions & 1 deletion simulator/src/main/java/peersim/kademlia/KademliaEvents.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package peersim.kademlia;

import java.math.BigInteger;
import peersim.kademlia.operations.Operation;

public interface KademliaEvents {

public void nodesFound(BigInteger[] neighbours);
public void nodesFound(Operation op, BigInteger[] neighbours);

public void operationComplete(Operation op);
}
18 changes: 7 additions & 11 deletions simulator/src/main/java/peersim/kademlia/KademliaProtocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import peersim.edsim.EDSimulator;
import peersim.kademlia.operations.FindOperation;
import peersim.kademlia.operations.GetOperation;
import peersim.kademlia.operations.Operation;
import peersim.kademlia.operations.PutOperation;
import peersim.transport.UnreliableTransport;

Expand Down Expand Up @@ -193,19 +194,19 @@ private void handleResponse(Message m, int myPid) {
// save received neighbour in the closest Set of fin operation

BigInteger[] neighbours = (BigInteger[]) m.body;
if (callback != null) callback.nodesFound(neighbours);
if (callback != null) callback.nodesFound(fop, neighbours);
for (BigInteger neighbour : neighbours) routingTable.addNeighbour(neighbour);

if (!fop.isFinished() && Arrays.asList(neighbours).contains(fop.getDestNode())) {
logger.warning("Found node " + fop.getDestNode());

if (callback != null) callback.operationComplete(fop);
KademliaObserver.find_ok.add(1);
fop.setFinished(true);
}

if (fop instanceof GetOperation && m.value != null && !fop.isFinished()) {
fop.setFinished(true);

if (callback != null) callback.operationComplete(fop);
((GetOperation) fop).setValue(m.value);
logger.warning(
"Getprocess finished found " + ((GetOperation) fop).getValue() + " hops " + fop.nrHops);
Expand Down Expand Up @@ -262,7 +263,7 @@ private void handleResponse(Message m, int myPid) {
} else {
findOp.remove(fop.getId());
}

if (callback != null) callback.operationComplete(fop);
if (fop.getBody().equals("Automatically Generated Traffic")
&& fop.getClosest().containsKey(fop.getDestNode())) {
// update statistics
Expand Down Expand Up @@ -337,7 +338,7 @@ private void handleFind(Message m, int myPid) {
* @param m Message received (contains the node to find)
* @param myPid the sender Pid
*/
private void handleInit(Message m, int myPid) {
public Operation handleInit(Message m, int myPid) {

logger.info("handleInitFind " + (BigInteger) m.body);
KademliaObserver.find_op.add(1);
Expand Down Expand Up @@ -401,6 +402,7 @@ private void handleInit(Message m, int myPid) {
}
}
}
return fop;
}

/**
Expand Down Expand Up @@ -461,13 +463,7 @@ public void processEvent(Node myNode, int myPid, Object event) {
break;

case Message.MSG_INIT_FIND:
m = (Message) event;
handleInit(m, myPid);
break;
case Message.MSG_INIT_GET:
m = (Message) event;
break;

case Message.MSG_INIT_PUT:
m = (Message) event;
handleInit(m, myPid);
Expand Down
27 changes: 15 additions & 12 deletions simulator/src/main/java/peersim/kademlia/das/DASProtocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import peersim.kademlia.Message;
import peersim.kademlia.SimpleEvent;
import peersim.kademlia.das.operations.RandomSamplingOperation;
import peersim.kademlia.operations.Operation;
import peersim.transport.UnreliableTransport;

public class DASProtocol implements Cloneable, EDProtocol, KademliaEvents {
Expand Down Expand Up @@ -353,18 +354,6 @@ public BigInteger getKademliaId() {
return this.getKademliaProtocol().getKademliaNode().getId();
}

/**
* Callback of the kademlia protocol of the nodes found and contacted
*
* @param neihbours array with the ids of the nodes found
*/
@Override
public void nodesFound(BigInteger[] neighbours) {
List<BigInteger> list = new ArrayList<>(Arrays.asList(neighbours));
list.remove(builderAddress);
searchTable.addNode(list.toArray(new BigInteger[0]));
}

/**
* Starts the random sampling operation
*
Expand Down Expand Up @@ -410,4 +399,18 @@ public void setDASProtocolID(int protocolID) {
public int getDASProtocolID() {
return this.dasID;
}

@Override
public void nodesFound(Operation op, BigInteger[] neighbours) {
// TODO Auto-generated method stub
List<BigInteger> list = new ArrayList<>(Arrays.asList(neighbours));
list.remove(builderAddress);
searchTable.addNode(list.toArray(new BigInteger[0]));
}

@Override
public void operationComplete(Operation op) {
// TODO Auto-generated method stub
throw new UnsupportedOperationException("Unimplemented method 'operationComplete'");
}
}

0 comments on commit fe75908

Please sign in to comment.