Skip to content

Commit

Permalink
Version 510. Added UDP for block-vote messages (not yet active by
Browse files Browse the repository at this point in the history
default).
  • Loading branch information
n-y-z-o committed Apr 17, 2019
1 parent dcf2498 commit 06dfb48
Show file tree
Hide file tree
Showing 29 changed files with 582 additions and 213 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ If you are interested in this project, we recommend reading our white paper: htt

**Please note: the proof-of-diversity system requires that new verifiers be added to the cycle at a controlled rate. The length of time that a verifier must wait is related to the current cycle length and the number of verifiers waiting to join. As the cycle length increases, this may be a considerable amount of time. For instance, with a cycle length of 500, the minimum spacing between new verifiers will be approximately 2 hours. Please consult the Nyzo white paper for further details.**

To start your own verifier, we recommend creating a t3.small AWS instance with the latest Ubuntu LTS version, a 30GB EBS volume, and port 9444 incoming open to the world (TCP only; weird stuff can happen if you open UDP, too). Also, open the SSH port to your IP address so you can access the instance. Then, when you have SSHed into the instance, run the following commands (enter each command separately):
To start your own verifier, we recommend creating a t3.small AWS instance with the latest Ubuntu LTS version, a 30GB EBS volume, port 9444 incoming open to the world for TCP only, and port 9446 incoming open to the world for UDP only. Also, open the SSH port to your IP address so you can access the instance. Then, when you have SSHed into the instance, run the following commands (enter each command separately):

```
sudo apt update
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/co/nyzo/verifier/BlacklistManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@
public class BlacklistManager {

private static final long blacklistDuration = 1000L * 60L * 10L; // ten minutes
private static final boolean useIpTables = true;
private static final boolean useIpTables = false;

private static final Map<ByteBuffer, Long> blacklistedAddresses = new HashMap<>();

static {
// Always try to flush firewall rules. This is necessary whether the firewall is being used this run or not,
// because it might have been used the previous run.
// `sudo iptables -nvL` to check
runProcess("sudo", "iptables", "-F");
//runProcess("sudo", "iptables", "-F");
}

public static void addToBlacklist(byte[] ipAddress) {
Expand Down Expand Up @@ -70,10 +70,10 @@ public static void performMaintenance() {

private static void setIpTableEntry(String addDrop, byte[] ipAddress) {

if (useIpTables) {
runProcess("sudo", "iptables", addDrop, "INPUT", "-s", IpUtil.addressAsString(ipAddress), "-p", "tcp",
"--destination-port", MeshListener.getPort() + "", "-j", "DROP");
}
//if (useIpTables) {
//runProcess("sudo", "iptables", addDrop, "INPUT", "-s", IpUtil.addressAsString(ipAddress), "-p", "tcp",
// "--destination-port", MeshListener.getPort() + "", "-j", "DROP");
//}
}

private static void runProcess(String... args) {
Expand Down
33 changes: 16 additions & 17 deletions src/main/java/co/nyzo/verifier/BlockVoteManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,23 +263,22 @@ public static synchronized void requestMissingVotes() {

numberOfVotesRequested++;

Message.fetch(IpUtil.addressAsString(node.getIpAddress()), node.getPort(), message,
new MessageCallback() {
@Override
public void responseReceived(Message message) {

BlockVote vote = (BlockVote) message.getContent();
if (vote != null) {
registerVote(message);

// Each time a good vote is received, the last-vote-request timestamp is
// updated. If we take some time to request all the votes, this helps to
// avoid starting a new round of requests soon after, or even before, this
// round of requests completes.
lastVoteRequestTimestamp = System.currentTimeMillis();
}
}
});
Message.fetch(node, message, new MessageCallback() {
@Override
public void responseReceived(Message message) {

BlockVote vote = (BlockVote) message.getContent();
if (vote != null) {
registerVote(message);

// Each time a good vote is received, the last-vote-request timestamp is
// updated. If we take some time to request all the votes, this helps to
// avoid starting a new round of requests soon after, or even before, this
// round of requests completes.
lastVoteRequestTimestamp = System.currentTimeMillis();
}
}
});
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/co/nyzo/verifier/FieldByteSize.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,9 @@ public class FieldByteSize {
public static int string(String value) {
return stringLength + (value == null ? 0 : value.getBytes(StandardCharsets.UTF_8).length);
}

public static int string(String value, int maximumStringByteLength) {
return stringLength + (value == null ? 0 : Math.min(value.getBytes(StandardCharsets.UTF_8).length,
maximumStringByteLength));
}
}
178 changes: 137 additions & 41 deletions src/main/java/co/nyzo/verifier/MeshListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import co.nyzo.verifier.util.PrintUtil;
import co.nyzo.verifier.util.UpdateUtil;

import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
Expand All @@ -24,23 +26,38 @@ public class MeshListener {

private static final int maximumConcurrentConnectionsForIp = 20;

private static final AtomicBoolean aliveTcp = new AtomicBoolean(false);
private static final AtomicBoolean aliveUdp = new AtomicBoolean(false);

// The only message sent via UDP right now is BlockVote19.
private static final int udpBufferSize = FieldByteSize.messageLength + FieldByteSize.timestamp + // message fields
FieldByteSize.messageType + FieldByteSize.identifier + FieldByteSize.signature + // message fields
FieldByteSize.blockHeight + FieldByteSize.hash + FieldByteSize.timestamp; // block vote fields

private static final byte[] packetBuffer = new byte[udpBufferSize];

public static void main(String[] args) {
start();
}

private static final AtomicBoolean alive = new AtomicBoolean(false);

public static boolean isAlive() {
return alive.get();
return aliveTcp.get() || aliveUdp.get();
}

public static final int standardPort = 9444;
public static final int standardPortTcp = 9444;
public static final int standardPortUdp = 9446;

private static ServerSocket serverSocketTcp = null;
private static DatagramSocket datagramSocketUdp = null;
private static int portTcp;
private static int portUdp;

private static ServerSocket serverSocket = null;
private static int port;
public static int getPortTcp() {
return portTcp;
}

public static int getPort() {
return port;
public static int getPortUdp() {
return portUdp;
}

private static final BiFunction<Integer, Integer, Integer> mergeFunction =
Expand All @@ -55,44 +72,95 @@ public Integer apply(Integer integer0, Integer integer1) {

public static void start() {

if (!aliveTcp.getAndSet(true)) {
startSocketThreadTcp();
}

if (!aliveUdp.getAndSet(true)) {
startSocketThreadUdp();
}
}

public static void startSocketThreadTcp() {

Map<ByteBuffer, Integer> connectionsPerIp = new ConcurrentHashMap<>();
AtomicInteger activeReadThreads = new AtomicInteger(0);

if (!alive.getAndSet(true)) {

new Thread(new Runnable() {
@Override
public void run() {
try {
serverSocket = new ServerSocket(standardPort);
port = serverSocket.getLocalPort();

while (!UpdateUtil.shouldTerminate()) {

if (Verifier.isPaused()) {
try {
Thread.sleep(1000L);
} catch (Exception ignored) { }
} else {
try {
Socket clientSocket = serverSocket.accept();
processSocket(clientSocket, activeReadThreads, connectionsPerIp);
} catch (Exception ignored) { }
}
new Thread(new Runnable() {
@Override
public void run() {
try {
serverSocketTcp = new ServerSocket(standardPortTcp);
portTcp = serverSocketTcp.getLocalPort();

while (!UpdateUtil.shouldTerminate()) {

if (Verifier.isPaused()) {
try {
Thread.sleep(1000L);
} catch (Exception ignored) { }
} else {
try {
Socket clientSocket = serverSocketTcp.accept();
processSocket(clientSocket, activeReadThreads, connectionsPerIp);
} catch (Exception ignored) { }
}
}

closeSockets();

} catch (Exception e) {

closeSocket();
System.err.println("Exception trying to open mesh listener. Exiting.");
UpdateUtil.terminate();
}

aliveTcp.set(false);
}
}, "MeshListener-serverSocketTcp").start();
}

} catch (Exception e) {
private static void startSocketThreadUdp() {

System.err.println("Exception trying to open mesh listener. Exiting.");
UpdateUtil.terminate();
new Thread(new Runnable() {
@Override
public void run() {
try {
datagramSocketUdp = new DatagramSocket(standardPortUdp);
portUdp = datagramSocketUdp.getLocalPort();

while (!UpdateUtil.shouldTerminate()) {

if (Verifier.isPaused()) {
try {
Thread.sleep(1000L);
} catch (Exception ignored) { }
} else {
try {
DatagramPacket datagramPacket = new DatagramPacket(packetBuffer, udpBufferSize);
datagramSocketUdp.receive(datagramPacket);
if (BlacklistManager.inBlacklist(datagramPacket.getAddress().getAddress())) {
try {
numberOfMessagesRejected.incrementAndGet();
} catch (Exception ignored) { }
} else {
numberOfMessagesAccepted.incrementAndGet();
readMessage(datagramPacket);
}

} catch (Exception ignored) { }
}
}

alive.set(false);
} catch (Exception e) {

System.err.println("Exception trying to open UDP socket. Exiting.");
UpdateUtil.terminate();
}
}, "MeshListener-serverSocket").start();
}

aliveUdp.set(false);
}
}, "MeshListener-datagramSocketUdp").start();
}

private static void processSocket(Socket clientSocket, AtomicInteger activeReadThreads,
Expand Down Expand Up @@ -145,7 +213,7 @@ public void run() {
connectionsPerIp.clear();
}
}
}, "MeshListener-clientSocket").start();
}, "MeshListener-clientSocketTcp").start();
}
}
}
Expand All @@ -172,14 +240,33 @@ private static void readMessageAndRespond(Socket clientSocket) {
} catch (Exception ignored) { }
}

public static void closeSocket() {
private static void readMessage(DatagramPacket packet) {

try {

Message message = Message.fromBytes(packet.getData(), packet.getAddress().getAddress(), true);
if (message != null) {

// For UDP, we do not send the response.
Message response = response(message);
}

if (serverSocket != null) {
} catch (Exception ignored) { }
}

public static void closeSockets() {

if (serverSocketTcp != null) {
try {
serverSocket.close();
serverSocketTcp.close();
} catch (Exception ignored) {
}
serverSocket = null;
serverSocketTcp = null;
}

if (datagramSocketUdp != null) {
datagramSocketUdp.close();
datagramSocketUdp = null;
}
}

Expand Down Expand Up @@ -304,6 +391,15 @@ public static Message response(Message message) {

response = new Message(MessageType.FullMeshResponse42, new MeshResponse(NodeManager.getMesh()));

} else if (messageType == MessageType.NodeJoinV2_43) {

NodeManager.updateNode(message);

NodeJoinMessageV2 nodeJoinMessage = (NodeJoinMessageV2) message.getContent();
NicknameManager.put(message.getSourceNodeIdentifier(), nodeJoinMessage.getNickname());

response = new Message(MessageType.NodeJoinResponseV2_44, new NodeJoinResponseV2());

} else if (messageType == MessageType.Ping200) {

response = new Message(MessageType.PingResponse201, new PingResponse("hello, " +
Expand Down
Loading

0 comments on commit 06dfb48

Please sign in to comment.