Skip to content

Commit

Permalink
Version 515. Performance and recovery improvements.
Browse files Browse the repository at this point in the history
  • Loading branch information
n-y-z-o committed May 4, 2019
1 parent d2be298 commit f112745
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 90 deletions.
29 changes: 23 additions & 6 deletions src/main/java/co/nyzo/verifier/BlockManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public class BlockManager {
private static List<ByteBuffer> currentCycleList = new ArrayList<>();
private static Set<ByteBuffer> currentCycleSet = ConcurrentHashMap.newKeySet();
private static Set<ByteBuffer> currentAndNearCycleSet = ConcurrentHashMap.newKeySet();
private static Set<Node> currentAndNearCycleNodes = ConcurrentHashMap.newKeySet();
private static long genesisBlockStartTimestamp = -1L;
private static boolean initialized = false;
private static boolean cycleComplete = false;
Expand Down Expand Up @@ -536,17 +537,17 @@ public static int currentCycleLength() {

public static List<ByteBuffer> verifiersInCurrentCycleList() {

return new ArrayList<>(currentCycleList);
return currentCycleList;
}

public static Set<ByteBuffer> verifiersInCurrentCycleSet() {

return new HashSet<>(currentCycleSet);
return currentCycleSet;
}

public static Set<ByteBuffer> verifiersInCurrentAndNearCycleSet() {
public static Set<Node> getCurrentAndNearCycleNodes() {

return new HashSet<>(currentAndNearCycleSet);
return currentAndNearCycleNodes;
}

public static boolean verifierInCurrentCycle(ByteBuffer identifier) {
Expand Down Expand Up @@ -635,14 +636,30 @@ private static synchronized void updateVerifiersInCurrentCycle(Block block,
PersistentData.put(lastVerifierRemovalHeightKey, lastVerifierRemovalHeight);
}

// Store the edge height, cycle list, and indication of Genesis cycle.
BlockManager.currentCycleEndHeight = edgeHeight;
BlockManager.currentCycleList = currentCycleList;
BlockManager.currentCycleSet = new HashSet<>(currentCycleList);
BlockManager.inGenesisCycle = inGenesisCycle;

Set<ByteBuffer> currentAndNearCycleSet = new HashSet<>(currentCycleList);
// Build the cycle set.
Set<ByteBuffer> currentCycleSet = ConcurrentHashMap.newKeySet();
currentCycleSet.addAll(currentCycleList);
BlockManager.currentCycleSet = currentCycleSet;

// Build the cycle-and-near set.
Set<ByteBuffer> currentAndNearCycleSet = ConcurrentHashMap.newKeySet();
currentAndNearCycleSet.addAll(currentCycleList);
currentAndNearCycleSet.addAll(NewVerifierVoteManager.topVerifiers());
BlockManager.currentAndNearCycleSet = currentAndNearCycleSet;

// Build the cycle-and-near node set.
Set<Node> currentAndNearCycleNodes = ConcurrentHashMap.newKeySet();
for (Node node : NodeManager.getMesh()) {
if (currentAndNearCycleSet.contains(ByteBuffer.wrap(node.getIdentifier()))) {
currentAndNearCycleNodes.add(node);
}
}
BlockManager.currentAndNearCycleNodes = currentAndNearCycleNodes;
}
}

Expand Down
47 changes: 5 additions & 42 deletions src/main/java/co/nyzo/verifier/BlockVoteManager.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package co.nyzo.verifier;

import co.nyzo.verifier.messages.BlockVote;
import co.nyzo.verifier.messages.MissingBlockVoteRequest;
import co.nyzo.verifier.util.IpUtil;
import co.nyzo.verifier.util.NotificationUtil;

import java.nio.ByteBuffer;
import java.util.*;
Expand Down Expand Up @@ -220,10 +217,10 @@ public static synchronized void requestMissingVotes() {
// from the mesh again but had an outage that caused it to miss earlier votes.

// Also, to conserve local CPU and mesh bandwidth, limit vote requests to no more frequently than once every
// two seconds.
// second.
long frozenEdgeHeight = BlockManager.getFrozenEdgeHeight();
long currentTimestamp = System.currentTimeMillis();
if (!BlockManager.inGenesisCycle() && currentTimestamp - lastVoteRequestTimestamp > 2000L) {
if (!BlockManager.inGenesisCycle() && currentTimestamp - lastVoteRequestTimestamp > 1000L) {

// Look through all heights in the vote map. If the vote is greater than 50% and the height is greater than
// one more than the frozen edge, a vote request should be performed.
Expand All @@ -241,46 +238,12 @@ public static synchronized void requestMissingVotes() {
currentTimestamp - lastVoteRequestTimestamp > 15000L;

if (shouldRequest) {
// We will request votes from all verifiers in the current cycle, even those we already have. Some
// votes may have changed.
Set<ByteBuffer> verifiersInCurrentCycle = BlockManager.verifiersInCurrentCycleSet();

// Set the last-vote-request timestamp now. We will also set it in the response to ensure a minimum gap.
// Set the last-vote-request timestamp to ensure a minimum gap between requests.
lastVoteRequestTimestamp = System.currentTimeMillis();

// We only work to freeze one past the frozen edge.
long heightToRequest = frozenEdgeHeight + 1;

NotificationUtil.send("Need to request " + verifiersInCurrentCycle.size() + " votes for height " +
heightToRequest + " on " + Verifier.getNickname(), frozenEdgeHeight);

// Finally, request the votes.
Message message = new Message(MessageType.MissingBlockVoteRequest23,
new MissingBlockVoteRequest(heightToRequest));
for (Node node : NodeManager.getMesh()) {

if (verifiersInCurrentCycle.contains(ByteBuffer.wrap(node.getIdentifier()))) {

numberOfVotesRequested++;

Message.fetch(node, message, new MessageCallback() {
@Override
public void responseReceived(Message message) {

BlockVote vote = (BlockVote) message.getContent();
if (vote != null) {
registerVote(message);

// Each time a good vote is received, the last-vote-request timestamp is
// updated. If we take some time to request all the votes, this helps to
// avoid starting a new round of requests soon after, or even before, this
// round of requests completes.
lastVoteRequestTimestamp = System.currentTimeMillis();
}
}
});
}
}
// Request the votes.
Verifier.requestBlockWithVotes();
}
}
}
Expand Down
12 changes: 5 additions & 7 deletions src/main/java/co/nyzo/verifier/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,11 @@ public void sign(byte[] privateSeed) {

public static void broadcast(Message message) {

System.out.println("broadcasting message: " + message.getType());

// Send the message to all nodes in the current cycle and the top in the new-verifier queue.
List<Node> mesh = NodeManager.getMesh();
for (Node node : mesh) {
if (node.isActive() && !ByteUtil.arraysAreEqual(node.getIdentifier(), Verifier.getIdentifier()) &&
BlockManager.verifierInOrNearCurrentCycle(ByteBuffer.wrap(node.getIdentifier()))) {
Set<Node> nodes = BlockManager.getCurrentAndNearCycleNodes();
System.out.println("broadcasting message: " + message.getType() + " to " + nodes.size());
for (Node node : nodes) {
if (node.isActive() && !ByteUtil.arraysAreEqual(node.getIdentifier(), Verifier.getIdentifier())) {
fetch(node, message, null);
}
}
Expand All @@ -143,7 +141,7 @@ public static void fetchFromRandomNode(Message message, MessageCallback messageC
}

if (node == null) {
System.out.println("unable to find suitable node");
System.out.println("unable to find suitable node for random fetch");
} else {
System.out.println("trying to fetch " + message.getType() + " from " +
NicknameManager.get(node.getIdentifier()));
Expand Down
58 changes: 28 additions & 30 deletions src/main/java/co/nyzo/verifier/UnfrozenBlockManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,22 @@

import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

public class UnfrozenBlockManager {

private static Map<Long, Map<ByteBuffer, Block>> unfrozenBlocks = new HashMap<>();
private static Map<Long, Integer> thresholdOverrides = new HashMap<>();
private static Map<Long, byte[]> hashOverrides = new HashMap<>();
private static Map<Long, Map<ByteBuffer, Block>> unfrozenBlocks = new ConcurrentHashMap<>();
private static Map<Long, Integer> thresholdOverrides = new ConcurrentHashMap<>();
private static Map<Long, byte[]> hashOverrides = new ConcurrentHashMap<>();

private static String voteDescription = "*** not yet voted ***";

private static Map<Long, Map<ByteBuffer, Block>> disconnectedBlocks = new HashMap<>();
private static Map<Long, Map<ByteBuffer, Block>> disconnectedBlocks = new ConcurrentHashMap<>();

private static long lastBlockVoteTimestamp = 0L;

public static synchronized void attemptToRegisterDisconnectedBlocks() {
public static void attemptToRegisterDisconnectedBlocks() {

// Remove the disconnected blocks one past the frozen edge from the disconnected map. Attempt to register them.
long frozenEdgeHeight = BlockManager.getFrozenEdgeHeight();
Expand All @@ -41,7 +42,7 @@ public static synchronized void attemptToRegisterDisconnectedBlocks() {
}
}

public static synchronized boolean registerBlock(Block block) {
public static boolean registerBlock(Block block) {

boolean registeredBlock = false;

Expand Down Expand Up @@ -74,17 +75,15 @@ public static synchronized boolean registerBlock(Block block) {
}

// Check that the verification timestamp is not unreasonably far into the future.
// TODO: This code will be activated in a later version. Activating it immediately could jeopardize
// TODO: verifiers that have updated when less than 25% of the cycle has updated.
//if (block.getVerificationTimestamp() > System.currentTimeMillis() + 5000L) {
// verificationTimestampValid = false;
//}
if (block.getVerificationTimestamp() > System.currentTimeMillis() + 5000L) {
verificationTimestampValid = false;
}
}

if (!alreadyContainsBlock && verificationTimestampValid) {

// At this point, it is prudent to independently calculate the balance list. We only register the block
// if we can calculate the balance list and if the has matches what we expect. This will ensure that no
// if we can calculate the balance list and if the hash matches what we expect. This will ensure that no
// blocks with invalid transactions are registered (they will be removed in the balance-list
// calculation, and the hash will not match).
BalanceList balanceList = BalanceListManager.balanceListForBlock(block, new StringBuilder());
Expand Down Expand Up @@ -128,7 +127,7 @@ public static synchronized boolean registerBlock(Block block) {
return registeredBlock;
}

public static synchronized void updateVote() {
public static void updateVote() {

// Only vote for the first height past the frozen edge, and only continue if we have blocks and have not voted
// for this height in less than the minimum interval time (the additional 200ms is to account for network
Expand Down Expand Up @@ -245,7 +244,7 @@ public static synchronized void updateVote() {
}
}

public static synchronized void castVote(long height, byte[] hash) {
public static void castVote(long height, byte[] hash) {

System.out.println("^^^^^^^^^^^^^^^^^^^^^ casting vote for height " + height);
lastBlockVoteTimestamp = System.currentTimeMillis();
Expand All @@ -261,7 +260,7 @@ public static synchronized void castVote(long height, byte[] hash) {
}
}

public static synchronized void attemptToFreezeBlock() {
public static void attemptToFreezeBlock() {

long frozenEdgeHeight = BlockManager.getFrozenEdgeHeight();
long heightToFreeze = frozenEdgeHeight + 1;
Expand Down Expand Up @@ -339,7 +338,7 @@ public void responseReceived(Message message) {
});
}

public static synchronized Set<Long> unfrozenBlockHeights() {
public static Set<Long> unfrozenBlockHeights() {

return new HashSet<>(unfrozenBlocks.keySet());
}
Expand All @@ -355,7 +354,7 @@ public static int numberOfBlocksAtHeight(long height) {
return number;
}

public static synchronized List<Block> allUnfrozenBlocks() {
public static List<Block> allUnfrozenBlocks() {

List<Block> allBlocks = new ArrayList<>();
for (Map<ByteBuffer, Block> blocks : unfrozenBlocks.values()) {
Expand All @@ -365,13 +364,13 @@ public static synchronized List<Block> allUnfrozenBlocks() {
return allBlocks;
}

public static synchronized List<Block> unfrozenBlocksAtHeight(long height) {
public static List<Block> unfrozenBlocksAtHeight(long height) {

return unfrozenBlocks.containsKey(height) ? new ArrayList<>(unfrozenBlocks.get(height).values()) :
new ArrayList<>();
Map<ByteBuffer, Block> mapForHeight = unfrozenBlocks.get(height);
return mapForHeight == null ? new ArrayList<>() : new ArrayList<>(mapForHeight.values());
}

public static synchronized Block unfrozenBlockAtHeight(long height, byte[] hash) {
public static Block unfrozenBlockAtHeight(long height, byte[] hash) {

Block block = null;
if (hash != null) {
Expand All @@ -384,17 +383,16 @@ public static synchronized Block unfrozenBlockAtHeight(long height, byte[] hash)
return block;
}

public static synchronized void purge() {
public static void purge() {

unfrozenBlocks.clear();
}

public static synchronized void requestMissingBlocks() {
public static void requestMissingBlocks() {

long frozenEdgeHeight = BlockManager.getFrozenEdgeHeight();
for (long height : BlockVoteManager.getHeights()) {
if (height == frozenEdgeHeight + 1) {

for (ByteBuffer hash : BlockVoteManager.getHashesForHeight(height)) {
Block block = UnfrozenBlockManager.unfrozenBlockAtHeight(height, hash.array());
if (block == null) {
Expand All @@ -405,7 +403,7 @@ public static synchronized void requestMissingBlocks() {
}
}

public static synchronized void setThresholdOverride(long height, int threshold) {
public static void setThresholdOverride(long height, int threshold) {

if (threshold == 0) {
thresholdOverrides.remove(height);
Expand All @@ -414,7 +412,7 @@ public static synchronized void setThresholdOverride(long height, int threshold)
}
}

public static synchronized void setHashOverride(long height, byte[] hash) {
public static void setHashOverride(long height, byte[] hash) {

if (ByteUtil.isAllZeros(hash)) {
hashOverrides.remove(height);
Expand All @@ -423,14 +421,14 @@ public static synchronized void setHashOverride(long height, byte[] hash) {
}
}

public static synchronized Map<Long, Integer> getThresholdOverrides() {
public static Map<Long, Integer> getThresholdOverrides() {

return new HashMap<>(thresholdOverrides);
return thresholdOverrides;
}

public static synchronized Map<Long, byte[]> getHashOverrides() {
public static Map<Long, byte[]> getHashOverrides() {

return new HashMap<>(hashOverrides);
return hashOverrides;
}

public static String getVoteDescription() {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/co/nyzo/verifier/Verifier.java
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ public static int getRejoinCount() {
return rejoinCount;
}

private static void requestBlockWithVotes() {
public static void requestBlockWithVotes() {

long frozenEdgeHeight = BlockManager.getFrozenEdgeHeight();
if (BlockManager.openEdgeHeight(false) > frozenEdgeHeight + 2) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,9 @@ public Integer apply(Integer integer0, Integer integer1) {

public static void updateScoresForFrozenBlock(Block block, Map<ByteBuffer, BlockVote> votes) {

// Only proceed if the block is not null. It is rare or maybe impossible for the block to be null, but it is
// still a reasonable precaution in an environment such as this.
if (block != null) {
// Only proceed if the block is not null and the vote map is not null. It is rare or maybe impossible for the
// block to be null, but it is still a reasonable precaution in an environment such as this.
if (block != null && votes != null) {

// Add for each in-cycle verifier. Each time a block is frozen, a verifier's score increases, but it then
// decreases for each vote received.
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/co/nyzo/verifier/Version.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

public class Version {

private static final int version = 514;
private static final int version = 515;

public static int getVersion() {

Expand Down

0 comments on commit f112745

Please sign in to comment.