Skip to content

Commit

Permalink
fix: Improve quorum caching (again) (dashpay#5761)
Browse files Browse the repository at this point in the history
## Issue being fixed or feature implemented
1. `scanQuorumsCache` is a special one and we use it incorrectly.
2. Platform doesn't really use anything that calls `ScanQuorums()`
directly, they specify the exact quorum hash in RPCs so it's
`GetQuorum()` that is used instead. The only place `ScanQuorums()` is
used for Platform related stuff is `StartCleanupOldQuorumDataThread()`
because we want to preserve quorum data used by `GetQuorum()`. But this
can be optimised with its own (much more compact) cache.
3. RPCs that use `ScanQuorums()` should in most cases be ok with smaller
cache, for other use cases there is a note in help text now.

## What was done?
pls see individual commits

## How Has This Been Tested?
run tests, run a node (~in progress~ looks stable)

## Breaking Changes
n/a

## Checklist:
- [x] I have performed a self-review of my own code
- [x] I have commented my code, particularly in hard-to-understand areas
- [ ] I have added or updated relevant unit/integration/functional/e2e
tests
- [ ] I have made corresponding changes to the documentation
- [x] I have assigned this pull request to a milestone
  • Loading branch information
UdjinM6 authored and PastaPastaPasta committed Dec 24, 2023
1 parent f0feb36 commit ee4c27d
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 21 deletions.
6 changes: 1 addition & 5 deletions src/llmq/dkgsessionmgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -465,10 +465,6 @@ void CDKGSessionManager::CleanupOldContributions() const
const auto prefixes = {DB_VVEC, DB_SKCONTRIB, DB_ENC_CONTRIB};

for (const auto& params : Params().GetConsensus().llmqs) {
// For how many blocks recent DKG info should be kept
const int MAX_CYCLES = params.useRotation ? params.keepOldKeys / params.signingActiveQuorumCount : params.keepOldKeys;
const int MAX_STORE_DEPTH = MAX_CYCLES * params.dkgInterval;

LogPrint(BCLog::LLMQ, "CDKGSessionManager::%s -- looking for old entries for llmq type %d\n", __func__, ToUnderlying(params.type));

CDBBatch batch(*db);
Expand All @@ -486,7 +482,7 @@ void CDKGSessionManager::CleanupOldContributions() const
}
cnt_all++;
const CBlockIndex* pindexQuorum = m_chainstate.m_blockman.LookupBlockIndex(std::get<2>(k));
if (pindexQuorum == nullptr || m_chainstate.m_chain.Tip()->nHeight - pindexQuorum->nHeight > MAX_STORE_DEPTH) {
if (pindexQuorum == nullptr || m_chainstate.m_chain.Tip()->nHeight - pindexQuorum->nHeight > utils::max_store_depth(params)) {
// not found or too old
batch.Erase(k);
cnt_old++;
Expand Down
75 changes: 62 additions & 13 deletions src/llmq/quorums.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,6 @@ CQuorumManager::CQuorumManager(CBLSWorker& _blsWorker, CChainState& chainstate,
m_peerman(peerman)
{
utils::InitQuorumsCache(mapQuorumsCache, false);
utils::InitQuorumsCache(scanQuorumsCache, false);

quorumThreadInterrupt.reset();
}

Expand Down Expand Up @@ -503,14 +501,45 @@ std::vector<CQuorumCPtr> CQuorumManager::ScanQuorums(Consensus::LLMQType llmqTyp
return {};
}

const CBlockIndex* pIndexScanCommitments{pindexStart};
gsl::not_null<const CBlockIndex*> pindexStore{pindexStart};
const auto& llmq_params_opt = GetLLMQParams(llmqType);
assert(llmq_params_opt.has_value());

// Quorum sets can only change during the mining phase of DKG.
// Find the closest known block index.
const int quorumCycleStartHeight = pindexStart->nHeight - (pindexStart->nHeight % llmq_params_opt->dkgInterval);
const int quorumCycleMiningStartHeight = quorumCycleStartHeight + llmq_params_opt->dkgMiningWindowStart;
const int quorumCycleMiningEndHeight = quorumCycleStartHeight + llmq_params_opt->dkgMiningWindowEnd;

if (pindexStart->nHeight < quorumCycleMiningStartHeight) {
// too early for this cycle, use the previous one
// bail out if it's below genesis block
if (quorumCycleMiningEndHeight < llmq_params_opt->dkgInterval) return {};
pindexStore = pindexStart->GetAncestor(quorumCycleMiningEndHeight - llmq_params_opt->dkgInterval);
} else if (pindexStart->nHeight > quorumCycleMiningEndHeight) {
// we are past the mining phase of this cycle, use it
pindexStore = pindexStart->GetAncestor(quorumCycleMiningEndHeight);
}
// everything else is inside the mining phase of this cycle, no pindexStore adjustment needed

gsl::not_null<const CBlockIndex*> pIndexScanCommitments{pindexStore};
size_t nScanCommitments{nCountRequested};
std::vector<CQuorumCPtr> vecResultQuorums;

{
LOCK(cs_scan_quorums);
if (scanQuorumsCache.empty()) {
for (const auto& llmq : Params().GetConsensus().llmqs) {
// NOTE: We store it for each block hash in the DKG mining phase here
// and not for a single quorum hash per quorum like we do for other caches.
// And we only do this for max_cycles() of the most recent quorums
// because signing by old quorums requires the exact quorum hash to be specified
// and quorum scanning isn't needed there.
scanQuorumsCache.try_emplace(llmq.type, utils::max_cycles(llmq, llmq.keepOldConnections) * (llmq.dkgMiningWindowEnd - llmq.dkgMiningWindowStart));
}
}
auto& cache = scanQuorumsCache[llmqType];
bool fCacheExists = cache.get(pindexStart->GetBlockHash(), vecResultQuorums);
bool fCacheExists = cache.get(pindexStore->GetBlockHash(), vecResultQuorums);
if (fCacheExists) {
// We have exactly what requested so just return it
if (vecResultQuorums.size() == nCountRequested) {
Expand All @@ -524,17 +553,17 @@ std::vector<CQuorumCPtr> CQuorumManager::ScanQuorums(Consensus::LLMQType llmqTyp
// scanning for the rests
if (!vecResultQuorums.empty()) {
nScanCommitments -= vecResultQuorums.size();
// bail out if it's below genesis block
if (vecResultQuorums.back()->m_quorum_base_block_index->pprev == nullptr) return {};
pIndexScanCommitments = vecResultQuorums.back()->m_quorum_base_block_index->pprev;
}
} else {
// If there is nothing in cache request at least cache.max_size() because this gets cached then later
nScanCommitments = std::max(nCountRequested, cache.max_size());
// If there is nothing in cache request at least keepOldConnections because this gets cached then later
nScanCommitments = std::max(nCountRequested, static_cast<size_t>(llmq_params_opt->keepOldConnections));
}
}

// Get the block indexes of the mined commitments to build the required quorums from
const auto& llmq_params_opt = GetLLMQParams(llmqType);
assert(llmq_params_opt.has_value());
std::vector<const CBlockIndex*> pQuorumBaseBlockIndexes{ llmq_params_opt->useRotation ?
quorumBlockProcessor.GetMinedCommitmentsIndexedUntilBlock(llmqType, pIndexScanCommitments, nScanCommitments) :
quorumBlockProcessor.GetMinedCommitmentsUntilBlock(llmqType, pIndexScanCommitments, nScanCommitments)
Expand All @@ -551,10 +580,12 @@ std::vector<CQuorumCPtr> CQuorumManager::ScanQuorums(Consensus::LLMQType llmqTyp
const size_t nCountResult{vecResultQuorums.size()};
if (nCountResult > 0) {
LOCK(cs_scan_quorums);
// Don't cache more than cache.max_size() elements
// Don't cache more than keepOldConnections elements
// because signing by old quorums requires the exact quorum hash
// to be specified and quorum scanning isn't needed there.
auto& cache = scanQuorumsCache[llmqType];
const size_t nCacheEndIndex = std::min(nCountResult, cache.max_size());
cache.emplace(pindexStart->GetBlockHash(), {vecResultQuorums.begin(), vecResultQuorums.begin() + nCacheEndIndex});
const size_t nCacheEndIndex = std::min(nCountResult, static_cast<size_t>(llmq_params_opt->keepOldConnections));
cache.emplace(pindexStore->GetBlockHash(), {vecResultQuorums.begin(), vecResultQuorums.begin() + nCacheEndIndex});
}
// Don't return more than nCountRequested elements
const size_t nResultEndIndex = std::min(nCountResult, nCountRequested);
Expand Down Expand Up @@ -1023,13 +1054,31 @@ void CQuorumManager::StartCleanupOldQuorumDataThread(const CBlockIndex* pIndex)
workerPool.push([pIndex, t, this](int threadId) {
std::set<uint256> dbKeysToSkip;

if (LOCK(cs_cleanup); cleanupQuorumsCache.empty()) {
utils::InitQuorumsCache(cleanupQuorumsCache, false);
}
for (const auto& params : Params().GetConsensus().llmqs) {
if (quorumThreadInterrupt) {
break;
}
for (const auto& pQuorum : ScanQuorums(params.type, pIndex, params.keepOldKeys)) {
dbKeysToSkip.insert(MakeQuorumKey(*pQuorum));
LOCK(cs_cleanup);
auto& cache = cleanupQuorumsCache[params.type];
const CBlockIndex* pindex_loop{pIndex};
std::set<uint256> quorum_keys;
while (pindex_loop != nullptr && pIndex->nHeight - pindex_loop->nHeight < utils::max_store_depth(params)) {
uint256 quorum_key;
if (cache.get(pindex_loop->GetBlockHash(), quorum_key)) {
quorum_keys.insert(quorum_key);
if (quorum_keys.size() >= params.keepOldKeys) break; // extra safety belt
}
pindex_loop = pindex_loop->pprev;
}
for (const auto& pQuorum : ScanQuorums(params.type, pIndex, params.keepOldKeys - quorum_keys.size())) {
const uint256 quorum_key = MakeQuorumKey(*pQuorum);
quorum_keys.insert(quorum_key);
cache.insert(pQuorum->m_quorum_base_block_index->GetBlockHash(), quorum_key);
}
dbKeysToSkip.merge(quorum_keys);
}

if (!quorumThreadInterrupt) {
Expand Down
2 changes: 2 additions & 0 deletions src/llmq/quorums.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ class CQuorumManager
mutable std::map<Consensus::LLMQType, unordered_lru_cache<uint256, CQuorumPtr, StaticSaltedHasher>> mapQuorumsCache GUARDED_BY(cs_map_quorums);
mutable RecursiveMutex cs_scan_quorums;
mutable std::map<Consensus::LLMQType, unordered_lru_cache<uint256, std::vector<CQuorumCPtr>, StaticSaltedHasher>> scanQuorumsCache GUARDED_BY(cs_scan_quorums);
mutable Mutex cs_cleanup;
mutable std::map<Consensus::LLMQType, unordered_lru_cache<uint256, uint256, StaticSaltedHasher>> cleanupQuorumsCache GUARDED_BY(cs_cleanup);

mutable ctpl::thread_pool workerPool;
mutable CThreadInterrupt quorumThreadInterrupt;
Expand Down
1 change: 1 addition & 0 deletions src/llmq/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1115,6 +1115,7 @@ template void InitQuorumsCache<std::map<Consensus::LLMQType, unordered_lru_cache
template void InitQuorumsCache<std::map<Consensus::LLMQType, unordered_lru_cache<uint256, std::vector<CQuorumCPtr>, StaticSaltedHasher>>>(std::map<Consensus::LLMQType, unordered_lru_cache<uint256, std::vector<CQuorumCPtr>, StaticSaltedHasher>>& cache, bool limit_by_connections);
template void InitQuorumsCache<std::map<Consensus::LLMQType, unordered_lru_cache<uint256, std::shared_ptr<llmq::CQuorum>, StaticSaltedHasher, 0ul, 0ul>, std::less<Consensus::LLMQType>, std::allocator<std::pair<Consensus::LLMQType const, unordered_lru_cache<uint256, std::shared_ptr<llmq::CQuorum>, StaticSaltedHasher, 0ul, 0ul>>>>>(std::map<Consensus::LLMQType, unordered_lru_cache<uint256, std::shared_ptr<llmq::CQuorum>, StaticSaltedHasher, 0ul, 0ul>, std::less<Consensus::LLMQType>, std::allocator<std::pair<Consensus::LLMQType const, unordered_lru_cache<uint256, std::shared_ptr<llmq::CQuorum>, StaticSaltedHasher, 0ul, 0ul>>>>&cache, bool limit_by_connections);
template void InitQuorumsCache<std::map<Consensus::LLMQType, unordered_lru_cache<uint256, int, StaticSaltedHasher>>>(std::map<Consensus::LLMQType, unordered_lru_cache<uint256, int, StaticSaltedHasher>>& cache, bool limit_by_connections);
template void InitQuorumsCache<std::map<Consensus::LLMQType, unordered_lru_cache<uint256, uint256, StaticSaltedHasher>>>(std::map<Consensus::LLMQType, unordered_lru_cache<uint256, uint256, StaticSaltedHasher>>& cache, bool limit_by_connections);

} // namespace utils

Expand Down
11 changes: 11 additions & 0 deletions src/llmq/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,17 @@ void IterateNodesRandom(NodesContainer& nodeStates, Continue&& cont, Callback&&
template <typename CacheType>
void InitQuorumsCache(CacheType& cache, bool limit_by_connections = true);

[[ nodiscard ]] static constexpr int max_cycles(const Consensus::LLMQParams& llmqParams, int quorums_count)
{
return llmqParams.useRotation ? quorums_count / llmqParams.signingActiveQuorumCount : quorums_count;
}

[[ nodiscard ]] static constexpr int max_store_depth(const Consensus::LLMQParams& llmqParams)
{
// For how many blocks recent DKG info should be kept
return max_cycles(llmqParams, llmqParams.keepOldKeys) * llmqParams.dkgInterval;
}

} // namespace utils

[[ nodiscard ]] const std::optional<Consensus::LLMQParams> GetLLMQParams(Consensus::LLMQType llmqType);
Expand Down
11 changes: 8 additions & 3 deletions src/rpc/quorums.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ static void quorum_list_help(const JSONRPCRequest& request)
RPCHelpMan{"quorum list",
"List of on-chain quorums\n",
{
{"count", RPCArg::Type::NUM, /* default */ "", "Number of quorums to list. Will list active quorums if \"count\" is not specified."},
{"count", RPCArg::Type::NUM, /* default */ "",
"Number of quorums to list. Will list active quorums if \"count\" is not specified.\n"
"Can be CPU/disk heavy when the value is larger than the number of active quorums."
},
},
RPCResult{
RPCResult::Type::OBJ, "", "",
Expand Down Expand Up @@ -365,8 +368,10 @@ static void quorum_memberof_help(const JSONRPCRequest& request)
{
{"proTxHash", RPCArg::Type::STR_HEX, RPCArg::Optional::NO, "ProTxHash of the masternode."},
{"scanQuorumsCount", RPCArg::Type::NUM, /* default */ "",
"Number of quorums to scan for. If not specified,\n"
"the active quorum count for each specific quorum type is used."},
"Number of quorums to scan for.\n"
"If not specified, the active quorum count for each specific quorum type is used.\n"
"Can be CPU/disk heavy when the value is larger than the number of active quorums."
},
},
RPCResults{},
RPCExamples{""},
Expand Down

0 comments on commit ee4c27d

Please sign in to comment.