diff --git a/README.md b/README.md index f80436c7..64073acc 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ If you are interested in this project, we recommend reading our white paper: htt **Please note: the proof-of-diversity system requires that new verifiers be added to the cycle at a controlled rate. The length of time that a verifier must wait is related to the current cycle length and the number of verifiers waiting to join. As the cycle length increases, this may be a considerable amount of time. For instance, with a cycle length of 500, the minimum spacing between new verifiers will be approximately 2 hours. Please consult the Nyzo white paper for further details.** -To start your own verifier, we recommend creating a t3.small AWS instance with the latest Ubuntu LTS version, a 30GB EBS volume, and port 9444 incoming open to the world (TCP only; weird stuff can happen if you open UDP, too). Also, open the SSH port to your IP address so you can access the instance. Then, when you have SSHed into the instance, run the following commands (enter each command separately): +To start your own verifier, we recommend creating a t3.small AWS instance with the latest Ubuntu LTS version, a 30GB EBS volume, port 9444 incoming open to the world for TCP only, and port 9446 incoming open to the world for UDP only. Also, open the SSH port to your IP address so you can access the instance. Then, when you have SSHed into the instance, run the following commands (enter each command separately): ``` sudo apt update diff --git a/src/main/java/co/nyzo/verifier/BlacklistManager.java b/src/main/java/co/nyzo/verifier/BlacklistManager.java index a3d626e6..383b6cff 100644 --- a/src/main/java/co/nyzo/verifier/BlacklistManager.java +++ b/src/main/java/co/nyzo/verifier/BlacklistManager.java @@ -11,7 +11,7 @@ public class BlacklistManager { private static final long blacklistDuration = 1000L * 60L * 10L; // ten minutes - private static final boolean useIpTables = true; + private static final boolean useIpTables = false; private static final Map blacklistedAddresses = new HashMap<>(); @@ -19,7 +19,7 @@ public class BlacklistManager { // Always try to flush firewall rules. This is necessary whether the firewall is being used this run or not, // because it might have been used the previous run. // `sudo iptables -nvL` to check - runProcess("sudo", "iptables", "-F"); + //runProcess("sudo", "iptables", "-F"); } public static void addToBlacklist(byte[] ipAddress) { @@ -70,10 +70,10 @@ public static void performMaintenance() { private static void setIpTableEntry(String addDrop, byte[] ipAddress) { - if (useIpTables) { - runProcess("sudo", "iptables", addDrop, "INPUT", "-s", IpUtil.addressAsString(ipAddress), "-p", "tcp", - "--destination-port", MeshListener.getPort() + "", "-j", "DROP"); - } + //if (useIpTables) { + //runProcess("sudo", "iptables", addDrop, "INPUT", "-s", IpUtil.addressAsString(ipAddress), "-p", "tcp", + // "--destination-port", MeshListener.getPort() + "", "-j", "DROP"); + //} } private static void runProcess(String... args) { diff --git a/src/main/java/co/nyzo/verifier/BlockVoteManager.java b/src/main/java/co/nyzo/verifier/BlockVoteManager.java index 690be9c3..fc6f0e31 100644 --- a/src/main/java/co/nyzo/verifier/BlockVoteManager.java +++ b/src/main/java/co/nyzo/verifier/BlockVoteManager.java @@ -263,23 +263,22 @@ public static synchronized void requestMissingVotes() { numberOfVotesRequested++; - Message.fetch(IpUtil.addressAsString(node.getIpAddress()), node.getPort(), message, - new MessageCallback() { - @Override - public void responseReceived(Message message) { - - BlockVote vote = (BlockVote) message.getContent(); - if (vote != null) { - registerVote(message); - - // Each time a good vote is received, the last-vote-request timestamp is - // updated. If we take some time to request all the votes, this helps to - // avoid starting a new round of requests soon after, or even before, this - // round of requests completes. - lastVoteRequestTimestamp = System.currentTimeMillis(); - } - } - }); + Message.fetch(node, message, new MessageCallback() { + @Override + public void responseReceived(Message message) { + + BlockVote vote = (BlockVote) message.getContent(); + if (vote != null) { + registerVote(message); + + // Each time a good vote is received, the last-vote-request timestamp is + // updated. If we take some time to request all the votes, this helps to + // avoid starting a new round of requests soon after, or even before, this + // round of requests completes. + lastVoteRequestTimestamp = System.currentTimeMillis(); + } + } + }); } } } diff --git a/src/main/java/co/nyzo/verifier/FieldByteSize.java b/src/main/java/co/nyzo/verifier/FieldByteSize.java index 65dbfa80..6c8d14b4 100644 --- a/src/main/java/co/nyzo/verifier/FieldByteSize.java +++ b/src/main/java/co/nyzo/verifier/FieldByteSize.java @@ -36,4 +36,9 @@ public class FieldByteSize { public static int string(String value) { return stringLength + (value == null ? 0 : value.getBytes(StandardCharsets.UTF_8).length); } + + public static int string(String value, int maximumStringByteLength) { + return stringLength + (value == null ? 0 : Math.min(value.getBytes(StandardCharsets.UTF_8).length, + maximumStringByteLength)); + } } diff --git a/src/main/java/co/nyzo/verifier/MeshListener.java b/src/main/java/co/nyzo/verifier/MeshListener.java index 4b83bc5c..0cfbcabc 100644 --- a/src/main/java/co/nyzo/verifier/MeshListener.java +++ b/src/main/java/co/nyzo/verifier/MeshListener.java @@ -6,6 +6,8 @@ import co.nyzo.verifier.util.PrintUtil; import co.nyzo.verifier.util.UpdateUtil; +import java.net.DatagramPacket; +import java.net.DatagramSocket; import java.net.ServerSocket; import java.net.Socket; import java.nio.ByteBuffer; @@ -24,23 +26,38 @@ public class MeshListener { private static final int maximumConcurrentConnectionsForIp = 20; + private static final AtomicBoolean aliveTcp = new AtomicBoolean(false); + private static final AtomicBoolean aliveUdp = new AtomicBoolean(false); + + // The only message sent via UDP right now is BlockVote19. + private static final int udpBufferSize = FieldByteSize.messageLength + FieldByteSize.timestamp + // message fields + FieldByteSize.messageType + FieldByteSize.identifier + FieldByteSize.signature + // message fields + FieldByteSize.blockHeight + FieldByteSize.hash + FieldByteSize.timestamp; // block vote fields + + private static final byte[] packetBuffer = new byte[udpBufferSize]; + public static void main(String[] args) { start(); } - private static final AtomicBoolean alive = new AtomicBoolean(false); - public static boolean isAlive() { - return alive.get(); + return aliveTcp.get() || aliveUdp.get(); } - public static final int standardPort = 9444; + public static final int standardPortTcp = 9444; + public static final int standardPortUdp = 9446; + + private static ServerSocket serverSocketTcp = null; + private static DatagramSocket datagramSocketUdp = null; + private static int portTcp; + private static int portUdp; - private static ServerSocket serverSocket = null; - private static int port; + public static int getPortTcp() { + return portTcp; + } - public static int getPort() { - return port; + public static int getPortUdp() { + return portUdp; } private static final BiFunction mergeFunction = @@ -55,44 +72,95 @@ public Integer apply(Integer integer0, Integer integer1) { public static void start() { + if (!aliveTcp.getAndSet(true)) { + startSocketThreadTcp(); + } + + if (!aliveUdp.getAndSet(true)) { + startSocketThreadUdp(); + } + } + + public static void startSocketThreadTcp() { + Map connectionsPerIp = new ConcurrentHashMap<>(); AtomicInteger activeReadThreads = new AtomicInteger(0); - if (!alive.getAndSet(true)) { - - new Thread(new Runnable() { - @Override - public void run() { - try { - serverSocket = new ServerSocket(standardPort); - port = serverSocket.getLocalPort(); - - while (!UpdateUtil.shouldTerminate()) { - - if (Verifier.isPaused()) { - try { - Thread.sleep(1000L); - } catch (Exception ignored) { } - } else { - try { - Socket clientSocket = serverSocket.accept(); - processSocket(clientSocket, activeReadThreads, connectionsPerIp); - } catch (Exception ignored) { } - } + new Thread(new Runnable() { + @Override + public void run() { + try { + serverSocketTcp = new ServerSocket(standardPortTcp); + portTcp = serverSocketTcp.getLocalPort(); + + while (!UpdateUtil.shouldTerminate()) { + + if (Verifier.isPaused()) { + try { + Thread.sleep(1000L); + } catch (Exception ignored) { } + } else { + try { + Socket clientSocket = serverSocketTcp.accept(); + processSocket(clientSocket, activeReadThreads, connectionsPerIp); + } catch (Exception ignored) { } } + } + + closeSockets(); + + } catch (Exception e) { - closeSocket(); + System.err.println("Exception trying to open mesh listener. Exiting."); + UpdateUtil.terminate(); + } + + aliveTcp.set(false); + } + }, "MeshListener-serverSocketTcp").start(); + } - } catch (Exception e) { + private static void startSocketThreadUdp() { - System.err.println("Exception trying to open mesh listener. Exiting."); - UpdateUtil.terminate(); + new Thread(new Runnable() { + @Override + public void run() { + try { + datagramSocketUdp = new DatagramSocket(standardPortUdp); + portUdp = datagramSocketUdp.getLocalPort(); + + while (!UpdateUtil.shouldTerminate()) { + + if (Verifier.isPaused()) { + try { + Thread.sleep(1000L); + } catch (Exception ignored) { } + } else { + try { + DatagramPacket datagramPacket = new DatagramPacket(packetBuffer, udpBufferSize); + datagramSocketUdp.receive(datagramPacket); + if (BlacklistManager.inBlacklist(datagramPacket.getAddress().getAddress())) { + try { + numberOfMessagesRejected.incrementAndGet(); + } catch (Exception ignored) { } + } else { + numberOfMessagesAccepted.incrementAndGet(); + readMessage(datagramPacket); + } + + } catch (Exception ignored) { } + } } - alive.set(false); + } catch (Exception e) { + + System.err.println("Exception trying to open UDP socket. Exiting."); + UpdateUtil.terminate(); } - }, "MeshListener-serverSocket").start(); - } + + aliveUdp.set(false); + } + }, "MeshListener-datagramSocketUdp").start(); } private static void processSocket(Socket clientSocket, AtomicInteger activeReadThreads, @@ -145,7 +213,7 @@ public void run() { connectionsPerIp.clear(); } } - }, "MeshListener-clientSocket").start(); + }, "MeshListener-clientSocketTcp").start(); } } } @@ -172,14 +240,33 @@ private static void readMessageAndRespond(Socket clientSocket) { } catch (Exception ignored) { } } - public static void closeSocket() { + private static void readMessage(DatagramPacket packet) { + + try { + + Message message = Message.fromBytes(packet.getData(), packet.getAddress().getAddress(), true); + if (message != null) { + + // For UDP, we do not send the response. + Message response = response(message); + } - if (serverSocket != null) { + } catch (Exception ignored) { } + } + + public static void closeSockets() { + + if (serverSocketTcp != null) { try { - serverSocket.close(); + serverSocketTcp.close(); } catch (Exception ignored) { } - serverSocket = null; + serverSocketTcp = null; + } + + if (datagramSocketUdp != null) { + datagramSocketUdp.close(); + datagramSocketUdp = null; } } @@ -304,6 +391,15 @@ public static Message response(Message message) { response = new Message(MessageType.FullMeshResponse42, new MeshResponse(NodeManager.getMesh())); + } else if (messageType == MessageType.NodeJoinV2_43) { + + NodeManager.updateNode(message); + + NodeJoinMessageV2 nodeJoinMessage = (NodeJoinMessageV2) message.getContent(); + NicknameManager.put(message.getSourceNodeIdentifier(), nodeJoinMessage.getNickname()); + + response = new Message(MessageType.NodeJoinResponseV2_44, new NodeJoinResponseV2()); + } else if (messageType == MessageType.Ping200) { response = new Message(MessageType.PingResponse201, new PingResponse("hello, " + diff --git a/src/main/java/co/nyzo/verifier/Message.java b/src/main/java/co/nyzo/verifier/Message.java index d3e65ce4..8b12ac06 100644 --- a/src/main/java/co/nyzo/verifier/Message.java +++ b/src/main/java/co/nyzo/verifier/Message.java @@ -28,6 +28,13 @@ public class Message { MessageType.MissingBlockRequest25)); public static final long replayProtectionInterval = 5000L; + private static DatagramSocket datagramSocket; + static { + try { + datagramSocket = new DatagramSocket(); + } catch (Exception ignored) { } + } + // We do not broadcast any messages to the full mesh from the broadcast method. We do, however, use the full mesh // as a potential pool for random requests for the following types. This reduces strain on in-cycle verifiers. private static final Set fullMeshMessageTypes = new HashSet<>(Arrays.asList(MessageType.BlockRequest11, @@ -118,8 +125,7 @@ public static void broadcast(Message message) { for (Node node : mesh) { if (node.isActive() && !ByteUtil.arraysAreEqual(node.getIdentifier(), Verifier.getIdentifier()) && BlockManager.verifierInOrNearCurrentCycle(ByteBuffer.wrap(node.getIdentifier()))) { - String ipAddress = IpUtil.addressAsString(node.getIpAddress()); - fetch(ipAddress, node.getPort(), message, null); + fetch(node, message, null); } } } @@ -145,11 +151,20 @@ public static void fetchFromRandomNode(Message message, MessageCallback messageC } else { System.out.println("trying to fetch " + message.getType() + " from " + NicknameManager.get(node.getIdentifier())); - fetch(IpUtil.addressAsString(node.getIpAddress()), node.getPort(), message, messageCallback); + fetch(node, message, messageCallback); } } - public static void fetch(String hostNameOrIp, int port, Message message, MessageCallback messageCallback) { + public static void fetch(Node node, Message message, MessageCallback messageCallback) { + + if (message.getType() == MessageType.BlockVote19 && node.getPortUdp() > 0) { + sendUdp(node.getIpAddress(), node.getPortUdp(), message); + } else { + fetchTcp(IpUtil.addressAsString(node.getIpAddress()), node.getPortTcp(), message, messageCallback); + } + } + + public static void fetchTcp(String hostNameOrIp, int port, Message message, MessageCallback messageCallback) { byte[] identifier = NodeManager.identifierForIpAddress(hostNameOrIp); @@ -213,16 +228,34 @@ public void run() { } } + public static void sendUdp(byte[] ipAddress, int port, Message message) { + + byte[] identifier = NodeManager.identifierForIpAddress(ipAddress); + + // Do not send the message to this verifier, and do not send a message that will get this verifier blacklisted + // if it is not in the cycle. + if (!ByteUtil.arraysAreEqual(identifier, Verifier.getIdentifier()) && + (BlockManager.verifierInOrNearCurrentCycle(ByteBuffer.wrap(Verifier.getIdentifier())) || + BlockManager.inGenesisCycle() || + !disallowedNonCycleTypes.contains(message.getType()))) { + + try { + byte[] messageBytes = message.getBytesForTransmission(); + InetAddress address = Inet4Address.getByAddress(ipAddress); + DatagramPacket packet = new DatagramPacket(messageBytes, messageBytes.length, address, port); + datagramSocket.send(packet); + } catch (Exception ignored) { } + } + } + public static Message readFromStream(InputStream inputStream, byte[] sourceIpAddress, MessageType sourceType) { byte[] response = getResponse(inputStream); Message message; if (response.length == 0) { - System.out.println("empty response from " + IpUtil.addressAsString(sourceIpAddress) + " for message of " + - "type " + sourceType); message = null; } else { - message = fromBytes(response, sourceIpAddress); + message = fromBytes(response, sourceIpAddress, false); } return message; @@ -321,7 +354,7 @@ public byte[] getBytesForTransmission() { return result; } - public static Message fromBytes(byte[] bytes, byte[] sourceIpAddress) { + public static Message fromBytes(byte[] bytes, byte[] sourceIpAddress, boolean discardMessageLength) { Message message = null; int typeValue = 0; @@ -329,7 +362,9 @@ public static Message fromBytes(byte[] bytes, byte[] sourceIpAddress) { try { ByteBuffer buffer = ByteBuffer.wrap(bytes); - // The size is discarded before this method, so it is not read here. + if (discardMessageLength) { + buffer.getInt(); + } long timestamp = buffer.getLong(); typeValue = buffer.getShort() & 0xffff; @@ -427,6 +462,10 @@ private static MessageObject processContent(MessageType type, ByteBuffer buffer) content = VerifierRemovalVote.fromByteBuffer(buffer); } else if (type == MessageType.FullMeshResponse42) { content = MeshResponse.fromByteBuffer(buffer); + } else if (type == MessageType.NodeJoinV2_43) { + content = NodeJoinMessageV2.fromByteBuffer(buffer); + } else if (type == MessageType.NodeJoinResponseV2_44) { + content = NodeJoinResponseV2.fromByteBuffer(buffer); } else if (type == MessageType.PingResponse201) { content = PingResponse.fromByteBuffer(buffer); } else if (type == MessageType.UpdateResponse301) { @@ -467,6 +506,20 @@ public static void putString(String value, ByteBuffer buffer) { } } + public static void putString(String value, ByteBuffer buffer, int maximumStringByteLength) { + + if (value == null) { + buffer.putShort((short) 0); + } else { + byte[] lineBytes = value.getBytes(StandardCharsets.UTF_8); + if (lineBytes.length > maximumStringByteLength) { + lineBytes = Arrays.copyOf(lineBytes, maximumStringByteLength); + } + buffer.putShort((short) lineBytes.length); + buffer.put(lineBytes); + } + } + public static String getString(ByteBuffer buffer) { int lineByteLength = buffer.getShort() & 0xffff; @@ -476,6 +529,15 @@ public static String getString(ByteBuffer buffer) { return new String(lineBytes, StandardCharsets.UTF_8); } + public static String getString(ByteBuffer buffer, int maximumStringByteLength) { + + int lineByteLength = Math.min(buffer.getShort() & 0xffff, maximumStringByteLength); + byte[] lineBytes = new byte[lineByteLength]; + buffer.get(lineBytes); + + return new String(lineBytes, StandardCharsets.UTF_8); + } + public static byte[] getByteArray(ByteBuffer buffer, int size) { byte[] array = new byte[size]; diff --git a/src/main/java/co/nyzo/verifier/MessageType.java b/src/main/java/co/nyzo/verifier/MessageType.java index 995a9098..7659dc3b 100644 --- a/src/main/java/co/nyzo/verifier/MessageType.java +++ b/src/main/java/co/nyzo/verifier/MessageType.java @@ -48,6 +48,8 @@ public enum MessageType { VerifierRemovalVoteResponse40(40), FullMeshRequest41(41), FullMeshResponse42(42), + NodeJoinV2_43(43), + NodeJoinResponseV2_44(44), // test messages Ping200(200), diff --git a/src/main/java/co/nyzo/verifier/NewVerifierQueueManager.java b/src/main/java/co/nyzo/verifier/NewVerifierQueueManager.java index 15bb5cc4..d548838c 100644 --- a/src/main/java/co/nyzo/verifier/NewVerifierQueueManager.java +++ b/src/main/java/co/nyzo/verifier/NewVerifierQueueManager.java @@ -50,7 +50,7 @@ public static synchronized void updateVote() { for (Node node : mesh) { if (ByteUtil.arraysAreEqual(node.getIdentifier(), newVote) || ByteUtil.arraysAreEqual(node.getIdentifier(), previousVote)) { - Message.fetch(IpUtil.addressAsString(node.getIpAddress()), node.getPort(), message, null); + Message.fetch(node, message, null); } } } diff --git a/src/main/java/co/nyzo/verifier/Node.java b/src/main/java/co/nyzo/verifier/Node.java index 79da1e27..27919770 100644 --- a/src/main/java/co/nyzo/verifier/Node.java +++ b/src/main/java/co/nyzo/verifier/Node.java @@ -1,6 +1,5 @@ package co.nyzo.verifier; -import co.nyzo.verifier.messages.TransactionPoolResponse; import co.nyzo.verifier.util.IpUtil; import java.nio.ByteBuffer; @@ -10,18 +9,20 @@ public class Node implements MessageObject { private byte[] identifier; // wallet public key (32 bytes) private byte[] ipAddress; // IPv4 address, stored as bytes to keep memory predictable (4 bytes) - private int port; // port number + private int portTcp; // TCP port number + private int portUdp; // UDP port number, if available private long queueTimestamp; // this is the timestamp that determines queue placement -- it is // when the verifier joined the mesh or when the verifier was last // updated private long identifierChangeTimestamp; // when the identifier at this IP was last changed private long inactiveTimestamp; // when the verifier was marked as inactive; -1 for active verifiers - public Node(byte[] identifier, byte[] ipAddress, int port) { + public Node(byte[] identifier, byte[] ipAddress, int portTcp, int portUdp) { this.identifier = Arrays.copyOf(identifier, FieldByteSize.identifier); this.ipAddress = Arrays.copyOf(ipAddress, FieldByteSize.ipAddress); - this.port = port; + this.portTcp = portTcp; + this.portUdp = portUdp; this.queueTimestamp = System.currentTimeMillis(); this.identifierChangeTimestamp = System.currentTimeMillis(); this.inactiveTimestamp = -1L; @@ -39,12 +40,20 @@ public byte[] getIpAddress() { return ipAddress; } - public int getPort() { - return port; + public int getPortTcp() { + return portTcp; } - public void setPort(int port) { - this.port = port; + public void setPortTcp(int portTcp) { + this.portTcp = portTcp; + } + + public int getPortUdp() { + return portUdp; + } + + public void setPortUdp(int portUdp) { + this.portUdp = portUdp; } public long getQueueTimestamp() { @@ -93,7 +102,7 @@ public byte[] getBytes() { ByteBuffer buffer = ByteBuffer.wrap(result); buffer.put(identifier); buffer.put(ipAddress); - buffer.putInt(port); + buffer.putInt(portTcp); buffer.putLong(queueTimestamp); return result; @@ -105,10 +114,10 @@ public static Node fromByteBuffer(ByteBuffer buffer) { buffer.get(identifier); byte[] ipAddress = new byte[FieldByteSize.ipAddress]; buffer.get(ipAddress); - int port = buffer.getInt(); + int portTcp = buffer.getInt(); long queueTimestamp = buffer.getLong(); - Node node = new Node(identifier, ipAddress, port); + Node node = new Node(identifier, ipAddress, portTcp, -1); node.setQueueTimestamp(queueTimestamp); return node; @@ -116,6 +125,6 @@ public static Node fromByteBuffer(ByteBuffer buffer) { @Override public String toString() { - return "[Node: " + IpUtil.addressAsString(getIpAddress()) + ":" + port + "]"; + return "[Node: " + IpUtil.addressAsString(getIpAddress()) + ":TCP=" + portTcp + ",UDP=" + portUdp + "]"; } } diff --git a/src/main/java/co/nyzo/verifier/NodeManager.java b/src/main/java/co/nyzo/verifier/NodeManager.java index f730218d..0d969ddc 100644 --- a/src/main/java/co/nyzo/verifier/NodeManager.java +++ b/src/main/java/co/nyzo/verifier/NodeManager.java @@ -42,13 +42,31 @@ public static void updateNode(Message message) { // In previous versions, more types of requests were registered to increase mesh density. However, to make the // system more flexible, we have changed this to only update a node when explicitly requested to do so through // a node join. - if (message.getType() == MessageType.NodeJoin3 || message.getType() == MessageType.NodeJoinResponse4) { + if (message.getType() == MessageType.NodeJoin3 || message.getType() == MessageType.NodeJoinResponse4 || + message.getType() == MessageType.NodeJoinV2_43 || + message.getType() == MessageType.NodeJoinResponseV2_44) { + + int portTcp; + int portUdp; + if (message.getType() == MessageType.NodeJoin3 || message.getType() == MessageType.NodeJoinResponse4) { + portTcp = ((PortMessage) message.getContent()).getPort(); + portUdp = -1; + } else { // NodeJoinV2_43 || NodeJoinResponseV2_44 + portTcp = ((PortMessageV2) message.getContent()).getPortTcp(); + portUdp = ((PortMessageV2) message.getContent()).getPortUdp(); + } - int port = ((PortMessage) message.getContent()).getPort(); - boolean isNewNode = updateNode(message.getSourceNodeIdentifier(), message.getSourceIpAddress(), port); + boolean isNewNode = updateNode(message.getSourceNodeIdentifier(), message.getSourceIpAddress(), portTcp, + portUdp); if (isNewNode) { - Message.fetch(IpUtil.addressAsString(message.getSourceIpAddress()), port, - new Message(MessageType.NodeJoin3, new NodeJoinMessage()), null); + // Send the same kind of node-join message that this node just sent. + if (message.getType() == MessageType.NodeJoin3) { + Message.fetchTcp(IpUtil.addressAsString(message.getSourceIpAddress()), portTcp, + new Message(MessageType.NodeJoin3, new NodeJoinMessage()), null); + } else { + Message.fetchTcp(IpUtil.addressAsString(message.getSourceIpAddress()), portTcp, + new Message(MessageType.NodeJoinV2_43, new NodeJoinMessageV2()), null); + } } } else if (message.getType() == MessageType.MissingBlockVoteRequest23 || @@ -70,10 +88,10 @@ public static void updateNode(Message message) { public static void addTemporaryLocalVerifierEntry() { - updateNode(Verifier.getIdentifier(), new byte[4], 0); + updateNode(Verifier.getIdentifier(), new byte[4], 0, 0); } - private static synchronized boolean updateNode(byte[] identifier, byte[] ipAddress, int port) { + private static synchronized boolean updateNode(byte[] identifier, byte[] ipAddress, int portTcp, int portUdp) { boolean isNewNode = false; if (identifier != null && identifier.length == FieldByteSize.identifier && ipAddress != null && @@ -85,7 +103,7 @@ private static synchronized boolean updateNode(byte[] identifier, byte[] ipAddre if (existingNode == null) { // This is the case when no other node is at the IP. We create a new node and add it to the map. - Node node = new Node(identifier, ipAddress, port); + Node node = new Node(identifier, ipAddress, portTcp, portUdp); long persistedTimestamp = persistedQueueTimestamps.getOrDefault(ByteBuffer.wrap(identifier), 0L); if (persistedTimestamp > 0L && persistedTimestamp < node.getQueueTimestamp()) { node.setQueueTimestamp(persistedTimestamp); @@ -100,11 +118,14 @@ private static synchronized boolean updateNode(byte[] identifier, byte[] ipAddre } } else { - // This is the case when there is already a node at the IP. We always update the port and mark the node + // This is the case when there is already a node at the IP. We always update the ports and mark the node // as active. Then, if the verifier has changed and a verifier change is allowed, we update the // verifier. - existingNode.setPort(port); + existingNode.setPortTcp(portTcp); + if (portUdp > 0) { + existingNode.setPortUdp(portUdp); + } existingNode.setInactiveTimestamp(-1L); if (!ByteUtil.arraysAreEqual(existingNode.getIdentifier(), identifier) && @@ -182,10 +203,9 @@ public static boolean connectedToMesh() { return ipAddressToNodeMap.size() > 1; } - public static byte[] identifierForIpAddress(String addressString) { + public static byte[] identifierForIpAddress(byte[] address) { byte[] identifier = null; - byte[] address = IpUtil.addressFromString(addressString); if (address != null) { ByteBuffer addressBuffer = ByteBuffer.wrap(address); Node node = ipAddressToNodeMap.get(addressBuffer); @@ -197,6 +217,11 @@ public static byte[] identifierForIpAddress(String addressString) { return identifier; } + public static byte[] identifierForIpAddress(String addressString) { + + return identifierForIpAddress(IpUtil.addressFromString(addressString)); + } + public static void markFailedConnection(String addressString) { byte[] address = IpUtil.addressFromString(addressString); @@ -314,26 +339,41 @@ public static void sendNodeJoinRequests(int count) { if (port != null && port > 0) { nodeJoinRequestsSent.incrementAndGet(); - Message nodeJoinMessage = new Message(MessageType.NodeJoin3, new NodeJoinMessage()); - Message.fetch(IpUtil.addressAsString(ipAddressBuffer.array()), port, nodeJoinMessage, + + // This is the V2 node-join message. This will be activated in a later version. + /* + Message nodeJoinMessage = new Message(MessageType.NodeJoinV2_43, new NodeJoinMessageV2()); + Message.fetchTcp(IpUtil.addressAsString(ipAddressBuffer.array()), port, nodeJoinMessage, new MessageCallback() { @Override public void responseReceived(Message message) { - if (message != null) { + + if (message != null && message.getContent() instanceof NodeJoinResponseV2) { + + System.out.println("UDP: got V2 node-join response from " + + IpUtil.addressAsString(ipAddressBuffer.array())); updateNode(message); - NodeJoinResponse response = (NodeJoinResponse) message.getContent(); - if (response != null) { + NodeJoinResponseV2 response = (NodeJoinResponseV2) message.getContent(); + NicknameManager.put(message.getSourceNodeIdentifier(), response.getNickname()); + } + } + });*/ - NicknameManager.put(message.getSourceNodeIdentifier(), - response.getNickname()); + // This is the legacy message. This will be removed in a later version. + Message legacyMessage = new Message(MessageType.NodeJoin3, new NodeJoinMessage()); + Message.fetchTcp(IpUtil.addressAsString(ipAddressBuffer.array()), port, legacyMessage, + new MessageCallback() { + @Override + public void responseReceived(Message message) { + + if (message != null && message.getContent() instanceof NodeJoinResponse) { - if (!ByteUtil.isAllZeros(response.getNewVerifierVote().getIdentifier())) { - NewVerifierVoteManager.registerVote(message.getSourceNodeIdentifier(), - response.getNewVerifierVote(), false); - } - } + updateNode(message); + + NodeJoinResponse response = (NodeJoinResponse) message.getContent(); + NicknameManager.put(message.getSourceNodeIdentifier(), response.getNickname()); } } }); @@ -366,7 +406,7 @@ public void responseReceived(Message message) { // Enqueue node-join requests to all nodes in the response. MeshResponse response = (MeshResponse) message.getContent(); for (Node node : response.getMesh()) { - NodeManager.enqueueNodeJoinMessage(node.getIpAddress(), node.getPort()); + NodeManager.enqueueNodeJoinMessage(node.getIpAddress(), node.getPortTcp()); } System.out.println("reloaded node-join request queue, size is now " + diff --git a/src/main/java/co/nyzo/verifier/Verifier.java b/src/main/java/co/nyzo/verifier/Verifier.java index 348376fd..b7e49758 100644 --- a/src/main/java/co/nyzo/verifier/Verifier.java +++ b/src/main/java/co/nyzo/verifier/Verifier.java @@ -200,12 +200,11 @@ public static void start() { AtomicInteger numberOfResponsesReceived = new AtomicInteger(0); // Send bootstrap requests to all trusted entry points. - Message bootstrapRequest = new Message(MessageType.BootstrapRequestV2_35, - new BootstrapRequest(MeshListener.getPort())); + Message bootstrapRequest = new Message(MessageType.BootstrapRequestV2_35, new BootstrapRequest()); for (TrustedEntryPoint entryPoint : trustedEntryPoints) { System.out.println("sending Bootstrap request to " + entryPoint); - Message.fetch(entryPoint.getHost(), entryPoint.getPort(), bootstrapRequest, + Message.fetchTcp(entryPoint.getHost(), entryPoint.getPort(), bootstrapRequest, new MessageCallback() { @Override public void responseReceived(Message message) { @@ -310,14 +309,14 @@ public void run() { private static void fetchMesh(TrustedEntryPoint entryPoint, AtomicInteger numberOfMeshResponsesPending) { Message meshRequest = new Message(MessageType.MeshRequest15, null); - Message.fetch(entryPoint.getHost(), entryPoint.getPort(), meshRequest, new MessageCallback() { + Message.fetchTcp(entryPoint.getHost(), entryPoint.getPort(), meshRequest, new MessageCallback() { @Override public void responseReceived(Message message) { // Enqueue node-join requests for all nodes in the response. MeshResponse response = (MeshResponse) message.getContent(); for (Node node : response.getMesh()) { - NodeManager.enqueueNodeJoinMessage(node.getIpAddress(), node.getPort()); + NodeManager.enqueueNodeJoinMessage(node.getIpAddress(), node.getPortTcp()); } numberOfMeshResponsesPending.decrementAndGet(); @@ -330,7 +329,7 @@ private static void sendNodeJoinMessage(TrustedEntryPoint trustedEntryPoint) { System.out.println("sending node-join messages to trusted entry point: " + trustedEntryPoint); Message message = new Message(MessageType.NodeJoin3, new NodeJoinMessage()); - Message.fetch(trustedEntryPoint.getHost(), trustedEntryPoint.getPort(), message, + Message.fetchTcp(trustedEntryPoint.getHost(), trustedEntryPoint.getPort(), message, new MessageCallback() { @Override public void responseReceived(Message message) { diff --git a/src/main/java/co/nyzo/verifier/VerifierPerformanceManager.java b/src/main/java/co/nyzo/verifier/VerifierPerformanceManager.java index ef100d60..3e27f665 100644 --- a/src/main/java/co/nyzo/verifier/VerifierPerformanceManager.java +++ b/src/main/java/co/nyzo/verifier/VerifierPerformanceManager.java @@ -32,7 +32,7 @@ public class VerifierPerformanceManager { private static final int messagesPerIteration = 10; private static final Map voteMessageIpToTimestampMap = new ConcurrentHashMap<>(); - public static final File scoreFile = new File(Verifier.dataRootDirectory, "performance_scores_v1"); + public static final File scoreFile = new File(Verifier.dataRootDirectory, "performance_scores_v2"); private static final BiFunction mergeFunction = new BiFunction() { @@ -223,7 +223,7 @@ public static void sendVotes() { voteMessageIpToTimestampMap.put(ipAddress, System.currentTimeMillis()); numberOfMessages++; - Message.fetch(IpUtil.addressAsString(node.getIpAddress()), node.getPort(), message, null); + Message.fetch(node, message, null); } } } diff --git a/src/main/java/co/nyzo/verifier/Version.java b/src/main/java/co/nyzo/verifier/Version.java index ab3e4d88..e7a895eb 100644 --- a/src/main/java/co/nyzo/verifier/Version.java +++ b/src/main/java/co/nyzo/verifier/Version.java @@ -2,7 +2,7 @@ public class Version { - private static final int version = 509; + private static final int version = 510; public static int getVersion() { diff --git a/src/main/java/co/nyzo/verifier/messages/BootstrapRequest.java b/src/main/java/co/nyzo/verifier/messages/BootstrapRequest.java index 32c7ee8b..70db854b 100644 --- a/src/main/java/co/nyzo/verifier/messages/BootstrapRequest.java +++ b/src/main/java/co/nyzo/verifier/messages/BootstrapRequest.java @@ -5,12 +5,15 @@ import java.nio.ByteBuffer; -public class BootstrapRequest implements MessageObject, PortMessage { +public class BootstrapRequest implements MessageObject { private int port; - public BootstrapRequest(int port) { + public BootstrapRequest() { + this.port = -1; + } + private BootstrapRequest(int port) { this.port = port; } @@ -38,12 +41,11 @@ public static BootstrapRequest fromByteBuffer(ByteBuffer buffer) { BootstrapRequest result = null; try { + // The port is no longer used. It is stored to ensure signature integrity. int port = buffer.getInt(); result = new BootstrapRequest(port); - } catch (Exception ignored) { - ignored.printStackTrace(); - } + } catch (Exception ignored) { } return result; } diff --git a/src/main/java/co/nyzo/verifier/messages/NewBlockMessage.java b/src/main/java/co/nyzo/verifier/messages/NewBlockMessage.java index 4aeaea7f..a41dde96 100644 --- a/src/main/java/co/nyzo/verifier/messages/NewBlockMessage.java +++ b/src/main/java/co/nyzo/verifier/messages/NewBlockMessage.java @@ -4,19 +4,17 @@ import java.nio.ByteBuffer; -public class NewBlockMessage implements MessageObject, PortMessage { +public class NewBlockMessage implements MessageObject { private Block block; private int port; public NewBlockMessage(Block block) { - this.block = block; - this.port = MeshListener.getPort(); + this.port = -1; } - public NewBlockMessage(Block block, int port) { - + private NewBlockMessage(Block block, int port) { this.block = block; this.port = port; } @@ -25,10 +23,6 @@ public Block getBlock() { return block; } - public int getPort() { - return port; - } - @Override public int getByteSize() { @@ -41,7 +35,7 @@ public byte[] getBytes() { byte[] array = new byte[getByteSize()]; ByteBuffer buffer = ByteBuffer.wrap(array); buffer.put(block.getBytes()); - buffer.putInt(port); + buffer.putInt(port); // The port is no longer used. It is included to ensure signature integrity. return array; } @@ -52,12 +46,10 @@ public static NewBlockMessage fromByteBuffer(ByteBuffer buffer) { try { Block block = Block.fromByteBuffer(buffer); - int port = buffer.getInt(); + int port = buffer.getInt(); // The port is no longer used. It is stored to ensure signature integrity. result = new NewBlockMessage(block, port); - } catch (Exception ignored) { - ignored.printStackTrace(); - } + } catch (Exception ignored) { } return result; } diff --git a/src/main/java/co/nyzo/verifier/messages/NodeJoinMessage.java b/src/main/java/co/nyzo/verifier/messages/NodeJoinMessage.java index f313a3dc..b7426647 100644 --- a/src/main/java/co/nyzo/verifier/messages/NodeJoinMessage.java +++ b/src/main/java/co/nyzo/verifier/messages/NodeJoinMessage.java @@ -11,7 +11,7 @@ public class NodeJoinMessage implements MessageObject, PortMessage { public NodeJoinMessage() { - this.port = MeshListener.getPort(); + this.port = MeshListener.getPortTcp(); this.nickname = Verifier.getNickname(); } diff --git a/src/main/java/co/nyzo/verifier/messages/NodeJoinMessageV2.java b/src/main/java/co/nyzo/verifier/messages/NodeJoinMessageV2.java new file mode 100644 index 00000000..98877a22 --- /dev/null +++ b/src/main/java/co/nyzo/verifier/messages/NodeJoinMessageV2.java @@ -0,0 +1,74 @@ +package co.nyzo.verifier.messages; + +import co.nyzo.verifier.*; + +import java.nio.ByteBuffer; + +public class NodeJoinMessageV2 implements MessageObject, PortMessageV2 { + + public static final int maximumNicknameLengthBytes = 50; + + private int portTcp; + private int portUdp; + private String nickname; + + public NodeJoinMessageV2() { + + this.portTcp = MeshListener.getPortTcp(); + this.portUdp = MeshListener.getPortUdp(); + this.nickname = Verifier.getNickname(); + } + + public NodeJoinMessageV2(int portTcp, int portUdp, String nickname) { + + this.portTcp = portTcp; + this.portUdp = portUdp; + this.nickname = nickname; + } + + public int getPortTcp() { + return portTcp; + } + + @Override + public int getPortUdp() { + return portUdp; + } + + public String getNickname() { + return nickname; + } + + @Override + public int getByteSize() { + + return FieldByteSize.port * 2 + FieldByteSize.string(nickname, maximumNicknameLengthBytes); + } + + @Override + public byte[] getBytes() { + + byte[] array = new byte[getByteSize()]; + ByteBuffer buffer = ByteBuffer.wrap(array); + buffer.putInt(portTcp); + buffer.putInt(portUdp); + Message.putString(nickname, buffer, maximumNicknameLengthBytes); + + return array; + } + + public static NodeJoinMessageV2 fromByteBuffer(ByteBuffer buffer) { + + NodeJoinMessageV2 result = null; + + try { + int portTcp = buffer.getInt(); + int portUdp = buffer.getInt(); + String nickname = Message.getString(buffer, maximumNicknameLengthBytes); + + result = new NodeJoinMessageV2(portTcp, portUdp, nickname); + } catch (Exception ignored) { } + + return result; + } +} diff --git a/src/main/java/co/nyzo/verifier/messages/NodeJoinResponse.java b/src/main/java/co/nyzo/verifier/messages/NodeJoinResponse.java index fc8489d1..87676538 100644 --- a/src/main/java/co/nyzo/verifier/messages/NodeJoinResponse.java +++ b/src/main/java/co/nyzo/verifier/messages/NodeJoinResponse.java @@ -3,9 +3,6 @@ import co.nyzo.verifier.*; import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; public class NodeJoinResponse implements MessageObject, PortMessage { @@ -18,7 +15,7 @@ public class NodeJoinResponse implements MessageObject, PortMessage { public NodeJoinResponse() { this.nickname = Verifier.getNickname(); - this.port = MeshListener.getPort(); + this.port = MeshListener.getPortTcp(); this.newVerifierVote = NewVerifierVoteManager.getLocalVote(); } @@ -89,9 +86,7 @@ public static NodeJoinResponse fromByteBuffer(ByteBuffer buffer) { NewVerifierVote newVerifierVote = new NewVerifierVote(newVerifierVoteIdentifier); result = new NodeJoinResponse(nickname, port, newVerifierVote); - } catch (Exception ignored) { - ignored.printStackTrace(); - } + } catch (Exception ignored) { } return result; } diff --git a/src/main/java/co/nyzo/verifier/messages/NodeJoinResponseV2.java b/src/main/java/co/nyzo/verifier/messages/NodeJoinResponseV2.java new file mode 100644 index 00000000..5ab8ec97 --- /dev/null +++ b/src/main/java/co/nyzo/verifier/messages/NodeJoinResponseV2.java @@ -0,0 +1,74 @@ +package co.nyzo.verifier.messages; + +import co.nyzo.verifier.*; + +import java.nio.ByteBuffer; + +public class NodeJoinResponseV2 implements MessageObject, PortMessageV2 { + + private String nickname; + private int portTcp; + private int portUdp; + + public NodeJoinResponseV2() { + + this.nickname = Verifier.getNickname(); + this.portTcp = MeshListener.getPortTcp(); + this.portUdp = MeshListener.getPortUdp(); + } + + public NodeJoinResponseV2(String nickname, int portTcp, int portUdp) { + + this.nickname = nickname == null ? "" : nickname; + this.portTcp = portTcp; + this.portUdp = portUdp; + } + + public String getNickname() { + + return nickname; + } + + @Override + public int getPortTcp() { + return portTcp; + } + + @Override + public int getPortUdp() { + return portUdp; + } + + @Override + public int getByteSize() { + return FieldByteSize.string(nickname, NodeJoinMessageV2.maximumNicknameLengthBytes) + FieldByteSize.port * 2; + } + + @Override + public byte[] getBytes() { + + byte[] array = new byte[getByteSize()]; + ByteBuffer buffer = ByteBuffer.wrap(array); + + Message.putString(nickname, buffer, NodeJoinMessageV2.maximumNicknameLengthBytes); + buffer.putInt(portTcp); + buffer.putInt(portUdp); + + return array; + } + + public static NodeJoinResponseV2 fromByteBuffer(ByteBuffer buffer) { + + NodeJoinResponseV2 result = null; + + try { + String nickname = Message.getString(buffer, NodeJoinMessageV2.maximumNicknameLengthBytes); + int portTcp = buffer.getInt(); + int portUdp = buffer.getInt(); + + result = new NodeJoinResponseV2(nickname, portTcp, portUdp); + } catch (Exception ignored) { } + + return result; + } +} diff --git a/src/main/java/co/nyzo/verifier/messages/PortMessageV2.java b/src/main/java/co/nyzo/verifier/messages/PortMessageV2.java new file mode 100644 index 00000000..e5742bf8 --- /dev/null +++ b/src/main/java/co/nyzo/verifier/messages/PortMessageV2.java @@ -0,0 +1,7 @@ +package co.nyzo.verifier.messages; + +public interface PortMessageV2 { + + int getPortTcp(); + int getPortUdp(); +} diff --git a/src/main/java/co/nyzo/verifier/messages/UpdateResponse.java b/src/main/java/co/nyzo/verifier/messages/UpdateResponse.java index b82b09e2..ada1af11 100644 --- a/src/main/java/co/nyzo/verifier/messages/UpdateResponse.java +++ b/src/main/java/co/nyzo/verifier/messages/UpdateResponse.java @@ -111,7 +111,7 @@ private static void update() { public void run() { // Flag that the system should terminate and close the MeshListener socket. UpdateUtil.terminate(); - MeshListener.closeSocket(); + MeshListener.closeSockets(); // Wait for the verifier and mesh listener to terminate. while (Verifier.isAlive() || MeshListener.isAlive()) { diff --git a/src/main/java/co/nyzo/verifier/messages/debug/VerifierRemovalTallyStatusResponse.java b/src/main/java/co/nyzo/verifier/messages/debug/VerifierRemovalTallyStatusResponse.java index 85fbe037..ebde657c 100644 --- a/src/main/java/co/nyzo/verifier/messages/debug/VerifierRemovalTallyStatusResponse.java +++ b/src/main/java/co/nyzo/verifier/messages/debug/VerifierRemovalTallyStatusResponse.java @@ -23,6 +23,16 @@ public VerifierRemovalTallyStatusResponse(Message request) { NicknameManager.get(identifier.array()) + "): " + voteTotals.get(identifier)); } + // Sort descending on vote totals. + Collections.sort(lines, new Comparator() { + @Override + public int compare(String string1, String string2) { + Integer value1 = Integer.parseInt(string1.split(":")[1].trim()); + Integer value2 = Integer.parseInt(string2.split(":")[1].trim()); + return value2.compareTo(value1); + } + }); + this.lines = lines; } else { this.lines = Collections.singletonList("*** unauthorized ***"); diff --git a/src/main/java/co/nyzo/verifier/recovery/UnfreezeMe.java b/src/main/java/co/nyzo/verifier/recovery/UnfreezeMe.java index 5631f906..4b3034df 100644 --- a/src/main/java/co/nyzo/verifier/recovery/UnfreezeMe.java +++ b/src/main/java/co/nyzo/verifier/recovery/UnfreezeMe.java @@ -26,7 +26,7 @@ public static void main(String[] args) { byte[] ipAddress = new byte[FieldByteSize.ipAddress]; Message meshRequest = new Message(MessageType.MeshRequest15, null); AtomicBoolean receivedResponse = new AtomicBoolean(false); - Message.fetch("verifier0.nyzo.co", MeshListener.standardPort, meshRequest, new MessageCallback() { + Message.fetchTcp("verifier0.nyzo.co", MeshListener.standardPortTcp, meshRequest, new MessageCallback() { @Override public void responseReceived(Message message) { @@ -60,13 +60,14 @@ public void responseReceived(Message message) { receivedResponse.set(false); Message message = new Message(MessageType.HashVoteOverrideRequest29, new HashVoteOverrideRequest(height, hash)); message.sign(privateSeed); - Message.fetch(IpUtil.addressAsString(ipAddress), MeshListener.standardPort, message, new MessageCallback() { - @Override - public void responseReceived(Message message) { - System.out.println("response is " + message); - receivedResponse.set(true); - } - }); + Message.fetchTcp(IpUtil.addressAsString(ipAddress), MeshListener.standardPortTcp, message, + new MessageCallback() { + @Override + public void responseReceived(Message message) { + System.out.println("response is " + message); + receivedResponse.set(true); + } + }); // Wait for the response to return. while (!receivedResponse.get()) { diff --git a/src/main/java/co/nyzo/verifier/scripts/HashVoteOverrideRequestScript.java b/src/main/java/co/nyzo/verifier/scripts/HashVoteOverrideRequestScript.java index 378e7b16..8ed3a8f8 100644 --- a/src/main/java/co/nyzo/verifier/scripts/HashVoteOverrideRequestScript.java +++ b/src/main/java/co/nyzo/verifier/scripts/HashVoteOverrideRequestScript.java @@ -47,14 +47,15 @@ public static void main(String[] args) { Message message = new Message(MessageType.HashVoteOverrideRequest29, request); message.sign(privateSeed); for (byte[] ipAddress : ipAddresses) { - Message.fetch(IpUtil.addressAsString(ipAddress), MeshListener.standardPort, message, new MessageCallback() { - @Override - public void responseReceived(Message message) { + Message.fetchTcp(IpUtil.addressAsString(ipAddress), MeshListener.standardPortTcp, message, + new MessageCallback() { + @Override + public void responseReceived(Message message) { - System.out.println("response is " + message); - numberOfResponsesNotYetReceived.decrementAndGet(); - } - }); + System.out.println("response is " + message); + numberOfResponsesNotYetReceived.decrementAndGet(); + } + }); } // Wait for the responses to return. diff --git a/src/main/java/co/nyzo/verifier/scripts/NewVerifierVoteOverrideRequestScript.java b/src/main/java/co/nyzo/verifier/scripts/NewVerifierVoteOverrideRequestScript.java index 39258afb..651aa0c9 100644 --- a/src/main/java/co/nyzo/verifier/scripts/NewVerifierVoteOverrideRequestScript.java +++ b/src/main/java/co/nyzo/verifier/scripts/NewVerifierVoteOverrideRequestScript.java @@ -45,13 +45,14 @@ public static void main(String[] args) { Message message = new Message(MessageType.NewVerifierVoteOverrideRequest33, request); message.sign(privateSeed); for (byte[] ipAddress : ipAddresses) { - Message.fetch(IpUtil.addressAsString(ipAddress), MeshListener.standardPort, message, new MessageCallback() { - @Override - public void responseReceived(Message message) { - System.out.println("response is " + message); - numberOfResponsesNotYetReceived.decrementAndGet(); - } - }); + Message.fetchTcp(IpUtil.addressAsString(ipAddress), MeshListener.standardPortTcp, message, + new MessageCallback() { + @Override + public void responseReceived(Message message) { + System.out.println("response is " + message); + numberOfResponsesNotYetReceived.decrementAndGet(); + } + }); } // Wait for the responses to return. diff --git a/src/main/java/co/nyzo/verifier/scripts/ScriptUtil.java b/src/main/java/co/nyzo/verifier/scripts/ScriptUtil.java index e8055897..60b3123f 100644 --- a/src/main/java/co/nyzo/verifier/scripts/ScriptUtil.java +++ b/src/main/java/co/nyzo/verifier/scripts/ScriptUtil.java @@ -17,11 +17,11 @@ public class ScriptUtil { public static List ipAddressesForVerifier(byte[] identifier) { - // Ask Nyzo verifier 0 for the mesh. Get the IP addresses of the verifier. + // Ask Nyzo verifier 0 for the mesh. Get the IP addresses of the verifier. List ipAddresses = new ArrayList<>(); Message meshRequest = new Message(MessageType.MeshRequest15, null); AtomicBoolean receivedResponse = new AtomicBoolean(false); - Message.fetch("verifier0.nyzo.co", MeshListener.standardPort, meshRequest, new MessageCallback() { + Message.fetchTcp("verifier0.nyzo.co", MeshListener.standardPortTcp, meshRequest, new MessageCallback() { @Override public void responseReceived(Message message) { @@ -86,25 +86,26 @@ public static void fetchMultilineStatus(MessageType messageType, String[] args) message.sign(privateSeed); } for (byte[] ipAddress : ipAddresses) { - Message.fetch(IpUtil.addressAsString(ipAddress), MeshListener.standardPort, message, new MessageCallback() { - @Override - public void responseReceived(Message message) { + Message.fetchTcp(IpUtil.addressAsString(ipAddress), MeshListener.standardPortTcp, message, + new MessageCallback() { + @Override + public void responseReceived(Message message) { - System.out.println("response message: " + message); - if (message != null) { - if (message.getContent() instanceof MultilineTextResponse) { - MultilineTextResponse response = (MultilineTextResponse) message.getContent(); - System.out.println("response number of lines: " + response.getLines().size()); - for (String line : response.getLines()) { - System.out.println(line); + System.out.println("response message: " + message); + if (message != null) { + if (message.getContent() instanceof MultilineTextResponse) { + MultilineTextResponse response = (MultilineTextResponse) message.getContent(); + System.out.println("response number of lines: " + response.getLines().size()); + for (String line : response.getLines()) { + System.out.println(line); + } + } else { + System.out.println("content is incorrect type: " + message.getContent()); + } } - } else { - System.out.println("content is incorrect type: " + message.getContent()); - } - } - numberOfResponsesNotYetReceived.decrementAndGet(); - } + numberOfResponsesNotYetReceived.decrementAndGet(); + } }); } diff --git a/src/main/java/co/nyzo/verifier/scripts/StatusRequestScript.java b/src/main/java/co/nyzo/verifier/scripts/StatusRequestScript.java index 3ff3f80a..50fe19b3 100644 --- a/src/main/java/co/nyzo/verifier/scripts/StatusRequestScript.java +++ b/src/main/java/co/nyzo/verifier/scripts/StatusRequestScript.java @@ -61,25 +61,26 @@ public static void main(String[] args) { message.sign(privateSeed); } for (byte[] ipAddress : ipAddresses) { - Message.fetch(IpUtil.addressAsString(ipAddress), MeshListener.standardPort, message, new MessageCallback() { - @Override - public void responseReceived(Message message) { + Message.fetchTcp(IpUtil.addressAsString(ipAddress), MeshListener.standardPortTcp, message, + new MessageCallback() { + @Override + public void responseReceived(Message message) { - if (message == null) { - System.out.println("response message is null"); - } else { + if (message == null) { + System.out.println("response message is null"); + } else { - // Get the response object from the message. - StatusResponse response = (StatusResponse) message.getContent(); + // Get the response object from the message. + StatusResponse response = (StatusResponse) message.getContent(); - // Print the response. - for (String line : response.getLines()) { - System.out.println(line); - } - } + // Print the response. + for (String line : response.getLines()) { + System.out.println(line); + } + } - numberOfResponsesNotYetReceived.decrementAndGet(); - } + numberOfResponsesNotYetReceived.decrementAndGet(); + } }); } diff --git a/src/main/java/co/nyzo/verifier/sentinel/Sentinel.java b/src/main/java/co/nyzo/verifier/sentinel/Sentinel.java index f114a11c..a5f80d34 100644 --- a/src/main/java/co/nyzo/verifier/sentinel/Sentinel.java +++ b/src/main/java/co/nyzo/verifier/sentinel/Sentinel.java @@ -193,9 +193,8 @@ private static Set fetchBootstrapResponses() { AtomicInteger numberOfResponsesPending = new AtomicInteger(verifiers.size()); for (ManagedVerifier verifier : verifiers.values()) { - Message bootstrapRequest = new Message(MessageType.BootstrapRequestV2_35, - new BootstrapRequest(MeshListener.getPort())); - Message.fetch(verifier.getHost(), verifier.getPort(), bootstrapRequest, new MessageCallback() { + Message bootstrapRequest = new Message(MessageType.BootstrapRequestV2_35, new BootstrapRequest()); + Message.fetchTcp(verifier.getHost(), verifier.getPort(), bootstrapRequest, new MessageCallback() { @Override public void responseReceived(Message message) { @@ -255,7 +254,7 @@ private static boolean processBootstrapResponses(Set bootst AtomicInteger numberOfResponsesPending = new AtomicInteger(verifiers.size()); for (ManagedVerifier verifier : verifiers.values()) { - Message.fetch(verifier.getHost(), verifier.getPort(), message, new MessageCallback() { + Message.fetchTcp(verifier.getHost(), verifier.getPort(), message, new MessageCallback() { @Override public void responseReceived(Message message) { @@ -315,7 +314,7 @@ private static void updateMesh(ManagedVerifier verifier) { // Get the mesh. Message message = new Message(MessageType.MeshRequest15, null); - Message.fetch(verifier.getHost(), 9444, message, new MessageCallback() { + Message.fetchTcp(verifier.getHost(), verifier.getPort(), message, new MessageCallback() { @Override public void responseReceived(Message message) { @@ -344,7 +343,7 @@ private static void updateBlocks(ManagedVerifier verifier) { false)); AtomicBoolean processedResponse = new AtomicBoolean(false); - Message.fetch(verifier.getHost(), verifier.getPort(), message, new MessageCallback() { + Message.fetchTcp(verifier.getHost(), verifier.getPort(), message, new MessageCallback() { @Override public void responseReceived(Message message) { @@ -464,8 +463,7 @@ private static void transmitBlockIfNecessary() { message.sign(verifier.getSeed()); for (Node node : combinedCycle()) { - Message.fetch(IpUtil.addressAsString(node.getIpAddress()), node.getPort(), message, - null); + Message.fetch(node, message, null); } blockTransmittedForManagedVerifier = true; System.out.println("sent block for " + diff --git a/src/main/java/co/nyzo/verifier/util/UpdateUtil.java b/src/main/java/co/nyzo/verifier/util/UpdateUtil.java index 22b27bf0..9f890a28 100644 --- a/src/main/java/co/nyzo/verifier/util/UpdateUtil.java +++ b/src/main/java/co/nyzo/verifier/util/UpdateUtil.java @@ -36,7 +36,7 @@ public static void reset() { public void run() { // Flag that the system should terminate and close the MeshListener socket. terminate(); - MeshListener.closeSocket(); + MeshListener.closeSockets(); // Wait for the verifier, the mesh listener, and the seed transaction generator to terminate. while (Verifier.isAlive() || MeshListener.isAlive() || SeedTransactionManager.isAlive()) {