Skip to content

Commit

Permalink
Add Node Stake Consensus (hashgraph#4344)
Browse files Browse the repository at this point in the history
* Add Node Stake Consensus

Signed-off-by: Edwin Greene <[email protected]>
  • Loading branch information
edwin-greene authored Sep 2, 2022
1 parent ea7f5a1 commit 1ab8143
Show file tree
Hide file tree
Showing 15 changed files with 860 additions and 188 deletions.
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ value, it is recommended to only populate overridden properties in the custom `a
| `hedera.mirror.importer.downloader.balance.writeSignatures` | false | Whether to write verified signature files to the filesystem. |
| `hedera.mirror.importer.downloader.bucketName` | | The cloud storage bucket name to download streamed files. This value takes priority over network hardcoded bucket names regardless of `hedera.mirror.importer.network` value. |
| `hedera.mirror.importer.downloader.cloudProvider` | S3 | The cloud provider to download files from. Either `S3` or `GCP` |
| `hedera.mirror.importer.downloader.consensusRatio` | 0.333 | The ratio of verified nodes (nodes used to come to consensus on the signature file hash) to total number of nodes available |
| `hedera.mirror.importer.downloader.consensusRatio` | 0.33333333333 | The ratio of verified nodes (nodes used to come to consensus on the signature file hash) to total number of nodes available |
| `hedera.mirror.importer.downloader.endpointOverride` | | Can be specified to download streams from a source other than S3 and GCP. Should be S3 compatible |
| `hedera.mirror.importer.downloader.event.batchSize` | 100 | The number of signature files to download per node before downloading the signed files |
| `hedera.mirror.importer.downloader.event.enabled` | false | Whether to enable event file downloads |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ public class AddressBook {
.collect(Collectors
.toMap(e -> e.getNodeAccountId().toString(), AddressBookEntry::getPublicKeyAsObject));

@ToString.Exclude
@Transient
@Getter(lazy = true)
private final Map<Long, EntityId> nodeIdNodeAccountIdMap = this.getEntries()
.stream()
.collect(Collectors
.toMap(e -> e.getNodeId(), AddressBookEntry::getNodeAccountId));

public Set<EntityId> getNodeSet() {
return entries.stream()
.map(AddressBookEntry::getNodeAccountId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
public class CacheConfiguration {

public static final String EXPIRE_AFTER_5M = "cacheManagerExpireAfter5m";
public static final String EXPIRE_AFTER_24H = "cacheManagerExpireAfter24h";
public static final String CACHE_MANAGER_ALIAS = "cacheManagerAlias";

@Bean(EXPIRE_AFTER_5M)
Expand All @@ -53,4 +54,14 @@ CacheManager cacheManagerAlias() {
caffeineCacheManager.setCacheSpecification("maximumSize=100000,expireAfterWrite=30m");
return caffeineCacheManager;
}

/*
* Note the size of this cache should be adjusted according to the maximum number of Nodes.
*/
@Bean(EXPIRE_AFTER_24H)
CacheManager cacheManagerNodeStake() {
CaffeineCacheManager caffeineCacheManager = new CaffeineCacheManager();
caffeineCacheManager.setCacheSpecification("maximumSize=100,expireAfterWrite=24h");
return new TransactionAwareCacheManagerProxy(caffeineCacheManager);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
* ‍
*/

import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
Expand Down Expand Up @@ -48,9 +51,12 @@ public class CommonDownloaderProperties {
@NotNull
private CloudProvider cloudProvider = CloudProvider.S3;

private MathContext consensusRatioMathContext = new MathContext(19, RoundingMode.DOWN);

@NotNull
@Max(1)
@Min(0)
private float consensusRatio = (float) 1 / 3;
private BigDecimal consensusRatio = BigDecimal.ONE.divide(BigDecimal.valueOf(3), consensusRatioMathContext);

private String endpointOverride;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.hedera.mirror.importer.downloader;

/*-
* ‌
* Hedera Mirror Node
* ​
* Copyright (C) 2019 - 2022 Hedera Hashgraph, LLC
* ​
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ‍
*/

import java.util.Collection;

import com.hedera.mirror.importer.domain.FileStreamSignature;

public interface ConsensusValidator {
void validate(Collection<FileStreamSignature> signatures);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package com.hedera.mirror.importer.downloader;

/*-
* ‌
* Hedera Mirror Node
* ​
* Copyright (C) 2019 - 2022 Hedera Hashgraph, LLC
* ​
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ‍
*/

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Collection;
import java.util.HashMap;
import javax.inject.Named;
import lombok.CustomLog;
import lombok.RequiredArgsConstructor;

import com.hedera.mirror.common.domain.entity.EntityId;
import com.hedera.mirror.importer.addressbook.AddressBookService;
import com.hedera.mirror.importer.domain.FileStreamSignature;
import com.hedera.mirror.importer.exception.SignatureVerificationException;
import com.hedera.mirror.importer.repository.NodeStakeRepository;

@CustomLog
@Named
@RequiredArgsConstructor
public class ConsensusValidatorImpl implements ConsensusValidator {
private final AddressBookService addressBookService;
private final CommonDownloaderProperties commonDownloaderProperties;
private final NodeStakeRepository nodeStakeRepository;

/**
* Validates that the signature files satisfy the consensus requirement:
* <ol>
* <li>If NodeStakes are within the NodeStakeRepository, at least 1/3 of the total node stake amount has been
* signature verified.</li>
* <li>If no NodeStakes are in the NodeStakeRepository, At least 1/3 signature files are present</li>
* </ol>
*
* @param signatures a list of signature files which have the same filename
* @throws SignatureVerificationException
*/
@Override
public void validate(Collection<FileStreamSignature> signatures) throws SignatureVerificationException {
Multimap<String, FileStreamSignature> signatureHashMap = HashMultimap.create();
for (var signature : signatures) {
if (signature.getStatus() == FileStreamSignature.SignatureStatus.VERIFIED) {
signatureHashMap.put(signature.getFileHashAsHex(), signature);
}
}

var filename = signatures.stream().map(FileStreamSignature::getFilename).findFirst().orElse("unknown");
if (BigDecimal.ZERO.equals(commonDownloaderProperties.getConsensusRatio()) && signatureHashMap.size() > 0) {
log.debug("Signature file {} does not require consensus, skipping consensus check", filename);
return;
}

long totalStake = 0;
var nodeAccountIdToStakeMap = new HashMap<EntityId, Long>();
var addressBook = addressBookService.getCurrent();
var nodeStakes = nodeStakeRepository.findLatest();
if (nodeStakes.isEmpty()) {
var nodeAccountIDPubKeyMap = addressBook.getNodeAccountIDPubKeyMap();
totalStake = nodeAccountIDPubKeyMap.size();
} else {
var nodeIdToNodeAccountIdMap = addressBook.getNodeIdNodeAccountIdMap();
for (var nodeStake : nodeStakes) {
totalStake += nodeStake.getStake();
var nodeAccountId = nodeIdToNodeAccountIdMap.get(nodeStake.getNodeId());
if (nodeAccountId == null) {
log.warn("Node Stake found for Node Id {} but no Node Account Id found", nodeStake.getNodeId());
continue;
}

nodeAccountIdToStakeMap.put(nodeAccountId, nodeStake.getStake());
}
}

long consensusCount = 0;
for (String key : signatureHashMap.keySet()) {
var validatedSignatures = signatureHashMap.get(key);
long stake = 0L;
for (var signature : validatedSignatures) {
// If the map has no entry for the node account id, a default value of 1 is used to count a signature.
stake += nodeAccountIdToStakeMap.getOrDefault(signature.getNodeAccountId(), 1L);
}

if (canReachConsensus(stake, totalStake)) {
consensusCount += validatedSignatures.size();
validatedSignatures.forEach(s -> s.setStatus(FileStreamSignature.SignatureStatus.CONSENSUS_REACHED));
}
}

if (consensusCount > 0) {
return;
}

throw new SignatureVerificationException(String.format("Consensus not reached for file %s", filename));
}

private boolean canReachConsensus(long stake, long totalStake) {
log.info("Stake: {}, Total Stake: {}", stake, totalStake);
log.info("Consensus Ratio: {}", commonDownloaderProperties.getConsensusRatio());

var stakeRequiredForConsensus = BigDecimal.valueOf(totalStake)
.multiply(commonDownloaderProperties.getConsensusRatio())
.setScale(0, RoundingMode.CEILING);

log.info("Stake Required For Consensus: {}", stakeRequiredForConsensus);
log.info("Result: {}", BigDecimal.valueOf(stake).compareTo(stakeRequiredForConsensus) >= 0);

return BigDecimal.valueOf(stake).compareTo(stakeRequiredForConsensus) >= 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,26 @@
import com.google.common.base.Stopwatch;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultimap;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import java.nio.file.Path;
import java.security.PublicKey;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -62,6 +68,7 @@
import com.hedera.mirror.common.domain.StreamType;
import com.hedera.mirror.common.domain.addressbook.AddressBook;
import com.hedera.mirror.common.domain.entity.EntityId;
import com.hedera.mirror.common.domain.entity.EntityType;
import com.hedera.mirror.importer.MirrorProperties;
import com.hedera.mirror.importer.addressbook.AddressBookService;
import com.hedera.mirror.importer.config.MirrorDateRangePropertiesProcessor;
Expand Down Expand Up @@ -99,6 +106,7 @@ public abstract class Downloader<T extends StreamFile> {
private final StreamType streamType;
// Metrics
private final MeterRegistry meterRegistry;
private final Map<String, Counter> nodeSignatureStatusMetricMap = new ConcurrentHashMap<>();
private final Timer cloudStorageLatencyMetric;
private final Timer downloadLatencyMetric;
private final Timer streamCloseMetric;
Expand Down Expand Up @@ -394,16 +402,30 @@ private void verifySigsAndDownloadDataFiles(Multimap<String, FileStreamSignature
String sigFilename = sigFilenameIter.next();
Collection<FileStreamSignature> signatures = sigFilesMap.get(sigFilename);
boolean valid = false;

var nodeAccountIDPubKeyMap = addressBookService.getCurrent().getNodeAccountIDPubKeyMap();
try {
nodeSignatureVerifier.verify(signatures);

var consensusCount = signatures.stream()
.filter(s -> s.getStatus() == FileStreamSignature.SignatureStatus.CONSENSUS_REACHED)
.count();

if (consensusCount == nodeAccountIDPubKeyMap.size()) {
log.debug("Verified signature file {} reached consensus", sigFilename);
} else if (consensusCount > 0) {
log.warn("Verified signature file {} reached consensus but with some errors: {}", sigFilename,
statusMap(signatures, nodeAccountIDPubKeyMap));
}
} catch (SignatureVerificationException ex) {
var statusMapMessage = statusMap(signatures, nodeAccountIDPubKeyMap);
if (sigFilenameIter.hasNext()) {
log.warn("Signature verification failed but still have files in the batch, try to process the " +
"next group: {}", ex.getMessage());
"next group. File {}: {}", sigFilename, statusMapMessage);
continue;
}
throw ex;

throw new SignatureVerificationException("Signature verification failed for file " + sigFilename + ":" +
" " + statusMapMessage);
}

for (FileStreamSignature signature : signatures) {
Expand Down Expand Up @@ -562,4 +584,50 @@ boolean verifyHashChain(StreamFile streamFile, String expectedPreviousHash) {

return streamFile.getPreviousHash().contentEquals(expectedPreviousHash);
}

private Map<String, Collection<String>> statusMap(Collection<FileStreamSignature> signatures, Map<String,
PublicKey> nodeAccountIDPubKeyMap) {
Map<String, Collection<String>> statusMap = signatures.stream()
.collect(Collectors.groupingBy(fss -> fss.getStatus().toString(),
Collectors.mapping(FileStreamSignature::getNodeAccountIdString, Collectors
.toCollection(TreeSet::new))));

Set<String> seenNodes = new HashSet<>();
signatures.forEach(signature -> seenNodes.add(signature.getNodeAccountId().toString()));

Set<String> missingNodes = new TreeSet<>(Sets.difference(
nodeAccountIDPubKeyMap.keySet().stream().collect(Collectors.toSet()),
seenNodes));
statusMap.put(FileStreamSignature.SignatureStatus.NOT_FOUND.toString(), missingNodes);

String signatureStreamType = signatures.stream()
.map(FileStreamSignature::getStreamType)
.map(StreamType::toString)
.findFirst()
.orElse("UNKNOWN");
for (Map.Entry<String, Collection<String>> entry : statusMap.entrySet()) {
entry.getValue().forEach(nodeAccountId -> {
Counter counter = nodeSignatureStatusMetricMap.computeIfAbsent(
nodeAccountId,
n -> newStatusMetric(nodeAccountId, signatureStreamType, entry.getKey()));
counter.increment();
});
}

// remove CONSENSUS_REACHED for logging purposes
statusMap.remove(FileStreamSignature.SignatureStatus.CONSENSUS_REACHED.toString());
return statusMap;
}

private Counter newStatusMetric(String entityIdString, String streamType, String status) {
EntityId entityId = EntityId.of(entityIdString, EntityType.ACCOUNT);
return Counter.builder("hedera.mirror.download.signature.verification")
.description("The number of signatures verified from a particular node")
.tag("nodeAccount", entityId.getEntityNum().toString())
.tag("realm", entityId.getRealmNum().toString())
.tag("shard", entityId.getShardNum().toString())
.tag("type", streamType)
.tag("status", status)
.register(meterRegistry);
}
}
Loading

0 comments on commit 1ab8143

Please sign in to comment.