Skip to content

Commit

Permalink
Add handling for missing proposal data
Browse files Browse the repository at this point in the history
  • Loading branch information
ManfredKarrer committed Oct 10, 2018
1 parent 5141806 commit 9b40306
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 54 deletions.
4 changes: 2 additions & 2 deletions common/src/main/proto/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ message NetworkEnvelope {

AddPersistableNetworkPayloadMessage add_persistable_network_payload_message = 31;
AckMessage ack_message = 32;
RepublishBlindVotesRequest republish_blind_votes_request = 33;
RepublishGovernanceDataRequest republish_governance_data_request = 33;
}
}

Expand Down Expand Up @@ -320,7 +320,7 @@ message NewBlockBroadcastMessage {
BaseBlock raw_block = 1;
}

message RepublishBlindVotesRequest {
message RepublishGovernanceDataRequest {
}

///////////////////////////////////////////////////////////////////////////////////////////
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/bisq/core/dao/DaoModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import bisq.core.dao.governance.blindvote.BlindVoteListService;
import bisq.core.dao.governance.blindvote.BlindVoteValidator;
import bisq.core.dao.governance.blindvote.MyBlindVoteListService;
import bisq.core.dao.governance.blindvote.network.RepublishBlindVotesHandler;
import bisq.core.dao.governance.blindvote.network.RepublishGovernanceDataHandler;
import bisq.core.dao.governance.blindvote.storage.BlindVoteStorageService;
import bisq.core.dao.governance.blindvote.storage.BlindVoteStore;
import bisq.core.dao.governance.myvote.MyVoteListService;
Expand Down Expand Up @@ -169,7 +169,7 @@ protected void configure() {
bind(VoteResultService.class).in(Singleton.class);
bind(MissingDataRequestService.class).in(Singleton.class);
bind(IssuanceService.class).in(Singleton.class);
bind(RepublishBlindVotesHandler.class).in(Singleton.class);
bind(RepublishGovernanceDataHandler.class).in(Singleton.class);

// Genesis
String genesisTxId = environment.getProperty(DaoOptionKeys.GENESIS_TX_ID, String.class, "");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class BlindVoteListService implements AppendOnlyDataStoreListener, BsqSta
private final P2PService p2PService;
private final BlindVoteValidator blindVoteValidator;
@Getter
private final ObservableList<BlindVotePayload> appendOnlyStoreList = FXCollections.observableArrayList();
private final ObservableList<BlindVotePayload> blindVotePayloads = FXCollections.observableArrayList();


///////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -123,7 +123,7 @@ public void onAdded(PersistableNetworkPayload payload) {
///////////////////////////////////////////////////////////////////////////////////////////

public List<BlindVote> getBlindVotesInPhaseAndCycle() {
return appendOnlyStoreList.stream()
return blindVotePayloads.stream()
.filter(blindVotePayload -> blindVoteValidator.isTxInPhaseAndCycle(blindVotePayload.getBlindVote()))
.map(BlindVotePayload::getBlindVote)
.collect(Collectors.toList());
Expand All @@ -141,14 +141,14 @@ private void fillListFromAppendOnlyDataStore() {
private void onAppendOnlyDataAdded(PersistableNetworkPayload persistableNetworkPayload) {
if (persistableNetworkPayload instanceof BlindVotePayload) {
BlindVotePayload blindVotePayload = (BlindVotePayload) persistableNetworkPayload;
if (!appendOnlyStoreList.contains(blindVotePayload)) {
if (!blindVotePayloads.contains(blindVotePayload)) {
BlindVote blindVote = blindVotePayload.getBlindVote();
String txId = blindVote.getTxId();
// We don't check the phase and the cycle as we want to add all object independently when we receive it
// (or when we start the app to fill our list from the data we gor from the seed node).
if (blindVoteValidator.areDataFieldsValid(blindVote)) {
// We don't validate as we might receive blindVotes from other cycles or phases at startup.
appendOnlyStoreList.add(blindVotePayload);
blindVotePayloads.add(blindVotePayload);
log.info("We received a blindVotePayload. blindVoteTxId={}", txId);
} else {
log.warn("We received an invalid blindVotePayload. blindVoteTxId={}", txId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package bisq.core.dao.governance.blindvote.network;

import bisq.core.dao.governance.blindvote.network.messages.RepublishBlindVotesRequest;
import bisq.core.dao.governance.blindvote.network.messages.RepublishGovernanceDataRequest;

import bisq.network.p2p.NodeAddress;
import bisq.network.p2p.network.Connection;
Expand Down Expand Up @@ -49,11 +49,11 @@
import org.jetbrains.annotations.NotNull;

/**
* Responsible for sending a RepublishBlindVotesRequest to full nodes.
* Responsible for sending a RepublishGovernanceDataRequest to full nodes.
* Processing of RepublishBlindVotesRequests at full nodes is done in the FullNodeNetworkService.
*/
@Slf4j
public final class RepublishBlindVotesHandler {
public final class RepublishGovernanceDataHandler {
private static final long TIMEOUT = 120;

private final Collection<NodeAddress> seedNodeAddresses;
Expand All @@ -64,9 +64,9 @@ public final class RepublishBlindVotesHandler {
private Timer timeoutTimer;

@Inject
public RepublishBlindVotesHandler(NetworkNode networkNode,
PeerManager peerManager,
SeedNodeRepository seedNodesRepository) {
public RepublishGovernanceDataHandler(NetworkNode networkNode,
PeerManager peerManager,
SeedNodeRepository seedNodesRepository) {
this.networkNode = networkNode;
this.peerManager = peerManager;
this.seedNodeAddresses = new HashSet<>(seedNodesRepository.getSeedNodeAddresses());
Expand All @@ -77,7 +77,7 @@ public RepublishBlindVotesHandler(NetworkNode networkNode,
// API
///////////////////////////////////////////////////////////////////////////////////////////

public void requestBlindVotePayload() {
public void sendRepublishRequest() {
// First try if we have a seed node in our connections. All seed nodes are full nodes.
if (!stopped)
connectToNextNode();
Expand All @@ -88,12 +88,12 @@ public void requestBlindVotePayload() {
// Private
///////////////////////////////////////////////////////////////////////////////////////////

private void sendRepublishBlindVotesRequest(NodeAddress nodeAddress) {
RepublishBlindVotesRequest republishBlindVotesRequest = new RepublishBlindVotesRequest();
private void sendRepublishRequest(NodeAddress nodeAddress) {
RepublishGovernanceDataRequest republishGovernanceDataRequest = new RepublishGovernanceDataRequest();
if (timeoutTimer == null) {
timeoutTimer = UserThread.runAfter(() -> { // setup before sending to avoid race conditions
if (!stopped) {
String errorMessage = "A timeout occurred at sending republishBlindVotesRequest:" +
String errorMessage = "A timeout occurred at sending republishGovernanceDataRequest:" +
" to nodeAddress:" + nodeAddress;
log.warn(errorMessage);
connectToNextNode();
Expand All @@ -105,13 +105,13 @@ private void sendRepublishBlindVotesRequest(NodeAddress nodeAddress) {
TIMEOUT);
}

log.info("We send to peer {} a republishBlindVotesRequest.", nodeAddress);
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, republishBlindVotesRequest);
log.info("We send to peer {} a republishGovernanceDataRequest.", nodeAddress);
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, republishGovernanceDataRequest);
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
if (!stopped) {
log.info("Sending of RepublishBlindVotesRequest message to peer {} succeeded.", nodeAddress.getFullAddress());
log.info("Sending of RepublishGovernanceDataRequest message to peer {} succeeded.", nodeAddress.getFullAddress());
stop();
} else {
log.trace("We have stopped already. We ignore that networkNode.sendMessage.onSuccess call." +
Expand All @@ -122,7 +122,7 @@ public void onSuccess(Connection connection) {
@Override
public void onFailure(@NotNull Throwable throwable) {
if (!stopped) {
String errorMessage = "Sending republishBlindVotesRequest to " + nodeAddress +
String errorMessage = "Sending republishGovernanceDataRequest to " + nodeAddress +
" failed. That is expected if the peer is offline.\n\t" +
"\n\tException=" + throwable.getMessage();
log.info(errorMessage);
Expand All @@ -144,7 +144,7 @@ private void connectToNextNode() {
if (connectionToSeedNodeOptional.isPresent() &&
connectionToSeedNodeOptional.get().getPeersNodeAddressOptional().isPresent()) {
NodeAddress nodeAddress = connectionToSeedNodeOptional.get().getPeersNodeAddressOptional().get();
sendRepublishBlindVotesRequest(nodeAddress);
sendRepublishRequest(nodeAddress);
} else {
// If connected seed nodes did not confirm receipt of message we try next seed node from seedNodeAddresses
List<NodeAddress> list = seedNodeAddresses.stream()
Expand All @@ -155,7 +155,7 @@ private void connectToNextNode() {
if (!list.isEmpty()) {
NodeAddress nodeAddress = list.get(0);
seedNodeAddresses.remove(nodeAddress);
sendRepublishBlindVotesRequest(nodeAddress);
sendRepublishRequest(nodeAddress);
} else {
log.warn("No more seed nodes available. We try any of our other peers.");
connectToAnyFullNode();
Expand Down Expand Up @@ -189,7 +189,7 @@ private void connectToAnyFullNode() {
list = new ArrayList<>(list.subList(0, Math.min(list.size(), 4)));
list.stream()
.map(Peer::getNodeAddress)
.forEach(this::sendRepublishBlindVotesRequest);
.forEach(this::sendRepublishRequest);
} else {
log.warn("No other nodes found. We try again in 60 seconds.");
UserThread.runAfter(this::connectToNextNode, 60);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@

@EqualsAndHashCode(callSuper = true)
@Getter
public final class RepublishBlindVotesRequest extends NetworkEnvelope implements DirectMessage, CapabilityRequiringPayload {
public final class RepublishGovernanceDataRequest extends NetworkEnvelope implements DirectMessage, CapabilityRequiringPayload {

public RepublishBlindVotesRequest() {
public RepublishGovernanceDataRequest() {
this(Version.getP2PMessageVersion());
}

Expand All @@ -46,19 +46,19 @@ public RepublishBlindVotesRequest() {
// PROTO BUFFER
///////////////////////////////////////////////////////////////////////////////////////////

private RepublishBlindVotesRequest(int messageVersion) {
private RepublishGovernanceDataRequest(int messageVersion) {
super(messageVersion);
}

@Override
public PB.NetworkEnvelope toProtoNetworkEnvelope() {
return getNetworkEnvelopeBuilder()
.setRepublishBlindVotesRequest(PB.RepublishBlindVotesRequest.newBuilder())
.setRepublishGovernanceDataRequest(PB.RepublishGovernanceDataRequest.newBuilder())
.build();
}

public static NetworkEnvelope fromProto(PB.RepublishBlindVotesRequest proto, int messageVersion) {
return new RepublishBlindVotesRequest(messageVersion);
public static NetworkEnvelope fromProto(PB.RepublishGovernanceDataRequest proto, int messageVersion) {
return new RepublishGovernanceDataRequest(messageVersion);
}

@Override
Expand All @@ -70,7 +70,7 @@ public List<Integer> getRequiredCapabilities() {

@Override
public String toString() {
return "RepublishBlindVotesRequest{" +
return "RepublishGovernanceDataRequest{" +
"\n} " + super.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package bisq.core.dao.governance.voteresult;

import bisq.core.dao.DaoSetupService;
import bisq.core.dao.governance.blindvote.network.RepublishBlindVotesHandler;
import bisq.core.dao.governance.blindvote.network.RepublishGovernanceDataHandler;

import javax.inject.Inject;

Expand All @@ -29,14 +29,14 @@
import lombok.Getter;

public class MissingDataRequestService implements DaoSetupService {
private final RepublishBlindVotesHandler republishBlindVotesHandler;
private final RepublishGovernanceDataHandler republishGovernanceDataHandler;

@Getter
private final ObservableList<VoteResultException> voteResultExceptions = FXCollections.observableArrayList();

@Inject
public MissingDataRequestService(RepublishBlindVotesHandler republishBlindVotesHandler) {
this.republishBlindVotesHandler = republishBlindVotesHandler;
public MissingDataRequestService(RepublishGovernanceDataHandler republishGovernanceDataHandler) {
this.republishGovernanceDataHandler = republishGovernanceDataHandler;
}


Expand All @@ -49,9 +49,7 @@ public void addListeners() {
voteResultExceptions.addListener((ListChangeListener<VoteResultException>) c -> {
c.next();
if (c.wasAdded()) {
c.getAddedSubList().stream().filter(e -> e instanceof VoteResultException.MissingBlindVoteDataException)
.map(e -> (VoteResultException.MissingBlindVoteDataException) e)
.forEach(e -> republishBlindVotesHandler.requestBlindVotePayload());
republishGovernanceDataHandler.sendRepublishRequest();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,16 @@ public String toString() {
}
}

@EqualsAndHashCode(callSuper = true)
public static abstract class MissingDataException extends VoteResultException {
private MissingDataException(String message) {
super(message);
}
}

@EqualsAndHashCode(callSuper = true)
@Value
public static class MissingBlindVoteDataException extends VoteResultException {
public static class MissingBlindVoteDataException extends MissingDataException {
private String blindVoteTxId;

MissingBlindVoteDataException(String blindVoteTxId) {
Expand All @@ -93,7 +100,7 @@ public String toString() {

@EqualsAndHashCode(callSuper = true)
@Value
public static class MissingBallotException extends VoteResultException {
public static class MissingBallotException extends MissingDataException {
private List<Ballot> existingBallots;
private List<String> proposalTxIdsOfMissingBallots;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ private Set<DecryptedBallotsWithMerits> getDecryptedBallotsWithMeritsSet(int cha
//TODO handle case that we are missing proposals
log.warn("We are missing proposals to create the vote result: " + missingBallotException.toString());
missingDataRequestService.addVoteResultException(missingBallotException);
voteResultExceptions.add(missingBallotException);
return null;
} catch (VoteResultException.DecryptionException decryptionException) {
log.error("Could not decrypt data: " + decryptionException.toString());
Expand All @@ -336,7 +337,9 @@ private Set<DecryptedBallotsWithMerits> getDecryptedBallotsWithMeritsSet(int cha
"and see if that blindVote was part of the majority data view. If so we should " +
"recover the missing blind vote by a request to our peers. blindVoteTxId={}", blindVoteTxId);

missingDataRequestService.addVoteResultException(new VoteResultException.MissingBlindVoteDataException(blindVoteTxId));
VoteResultException.MissingBlindVoteDataException voteResultException = new VoteResultException.MissingBlindVoteDataException(blindVoteTxId);
missingDataRequestService.addVoteResultException(voteResultException);
voteResultExceptions.add(voteResultException);
return null;
}
} catch (VoteResultException.ValidationException e) {
Expand Down
Loading

0 comments on commit 9b40306

Please sign in to comment.