Skip to content

Commit

Permalink
merge bitcoin#17997: Remove mempool global from net
Browse files Browse the repository at this point in the history
  • Loading branch information
kwvg committed Apr 19, 2022
1 parent f0263a3 commit 41252a1
Show file tree
Hide file tree
Showing 11 changed files with 52 additions and 45 deletions.
1 change: 1 addition & 0 deletions src/governance/governance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <protocol.h>
#include <shutdown.h>
#include <spork.h>
#include <validation.h>

CGovernanceManager governance;

Expand Down
12 changes: 6 additions & 6 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1767,7 +1767,12 @@ bool AppInitMain(NodeContext& node)
assert(!node.connman);
node.connman = std::make_unique<CConnman>(GetRand(std::numeric_limits<uint64_t>::max()), GetRand(std::numeric_limits<uint64_t>::max()));

node.peer_logic.reset(new PeerLogicValidation(node.connman.get(), node.banman.get(), *node.scheduler, gArgs.GetBoolArg("-enablebip61", DEFAULT_ENABLE_BIP61)));
// Make mempool generally available in the node context. For example the connection manager, wallet, or RPC threads,
// which are all started after this, may use it from the node context.
assert(!node.mempool);
node.mempool = &::mempool;

node.peer_logic.reset(new PeerLogicValidation(node.connman.get(), node.banman.get(), *node.scheduler, *node.mempool, gArgs.GetBoolArg("-enablebip61", DEFAULT_ENABLE_BIP61)));
RegisterValidationInterface(node.peer_logic.get());

// sanitize comments per BIP-0014, format user agent and check total size
Expand Down Expand Up @@ -2174,11 +2179,6 @@ bool AppInitMain(NodeContext& node)
return false;
}

// Now that the chain state is loaded, make mempool generally available in the node context. For example the
// connection manager, wallet, or RPC threads, which are all started after this, may use it from the node context.
assert(!node.mempool);
node.mempool = &::mempool;

fs::path est_path = GetDataDir() / FEE_ESTIMATES_FILENAME;
CAutoFile est_filein(fsbridge::fopen(est_path, "rb"), SER_DISK, CLIENT_VERSION);
// Allowed to fail as this file IS missing on first startup.
Expand Down
1 change: 1 addition & 0 deletions src/llmq/chainlocks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <txmempool.h>
#include <ui_interface.h>
#include <util/validation.h>
#include <validation.h>

namespace llmq
{
Expand Down
1 change: 1 addition & 0 deletions src/llmq/dkgsessionhandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <masternode/node.h>
#include <chainparams.h>
#include <net_processing.h>
#include <validation.h>

namespace llmq
{
Expand Down
1 change: 1 addition & 0 deletions src/llmq/dkgsessionmgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <chainparams.h>
#include <net_processing.h>
#include <spork.h>
#include <validation.h>

namespace llmq
{
Expand Down
54 changes: 27 additions & 27 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ static bool MarkBlockAsReceived(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs

// returns false, still setting pit, if the block was already in flight from the same peer
// pit will only be valid as long as the same cs_main lock is being held
static bool MarkBlockAsInFlight(NodeId nodeid, const uint256& hash, const CBlockIndex *pindex = nullptr, std::list<QueuedBlock>::iterator **pit = nullptr) EXCLUSIVE_LOCKS_REQUIRED(cs_main) {
static bool MarkBlockAsInFlight(CTxMemPool& mempool, NodeId nodeid, const uint256& hash, const CBlockIndex *pindex = nullptr, std::list<QueuedBlock>::iterator **pit = nullptr) EXCLUSIVE_LOCKS_REQUIRED(cs_main) {
CNodeState *state = State(nodeid);
assert(state != nullptr);

Expand Down Expand Up @@ -1099,7 +1099,7 @@ unsigned int LimitOrphanTxSize(unsigned int nMaxOrphansSize)
return nEvicted;
}

void static ProcessOrphanTx(CConnman* connman, std::set<uint256>& orphan_work_set) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans);
void static ProcessOrphanTx(CConnman* connman, CTxMemPool& mempool, std::set<uint256>& orphan_work_set) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans);

/**
* Mark a misbehaving peer to be banned depending upon the value of `-banscore`.
Expand Down Expand Up @@ -1161,8 +1161,8 @@ static bool BlockRequestAllowed(const CBlockIndex* pindex, const Consensus::Para
(GetBlockProofEquivalentTime(*pindexBestHeader, *pindex, *pindexBestHeader, consensusParams) < STALE_RELAY_AGE_LIMIT);
}

PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, BanMan* banman, CScheduler &scheduler, bool enable_bip61)
: connman(connmanIn), m_banman(banman), m_stale_tip_check_time(0), m_enable_bip61(enable_bip61) {
PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, BanMan* banman, CScheduler &scheduler, CTxMemPool& pool, bool enable_bip61)
: connman(connmanIn), m_banman(banman), m_mempool(pool), m_stale_tip_check_time(0), m_enable_bip61(enable_bip61) {
// Initialize global variables that cannot be constructed at startup.
recentRejects.reset(new CRollingBloomFilter(120000, 0.000001));

Expand Down Expand Up @@ -1220,7 +1220,7 @@ void PeerLogicValidation::BlockConnected(const std::shared_ptr<const CBlock>& pb

while (!orphanWorkSet.empty()) {
LogPrint(BCLog::MEMPOOL, "Trying to process %d orphans\n", orphanWorkSet.size());
ProcessOrphanTx(connman, orphanWorkSet);
ProcessOrphanTx(connman, m_mempool, orphanWorkSet);
}

g_last_tip_update = GetTime();
Expand Down Expand Up @@ -1356,7 +1356,7 @@ void PeerLogicValidation::BlockChecked(const CBlock& block, const CValidationSta
//


bool static AlreadyHave(const CInv& inv) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
bool static AlreadyHave(const CInv& inv, const CTxMemPool& mempool) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
{
switch (inv.type)
{
Expand Down Expand Up @@ -1640,7 +1640,7 @@ void static ProcessGetBlockData(CNode* pfrom, const CChainParams& chainparams, c
}
}

void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnman* connman, const std::atomic<bool>& interruptMsgProc) LOCKS_EXCLUDED(cs_main)
void static ProcessGetData(CNode* pfrom, const CChainParams& chainparams, CConnman* connman, const CTxMemPool& mempool, const std::atomic<bool>& interruptMsgProc) LOCKS_EXCLUDED(cs_main)
{
AssertLockNotHeld(cs_main);

Expand Down Expand Up @@ -1847,7 +1847,7 @@ inline void static SendBlockTransactions(const CBlock& block, const BlockTransac
connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::BLOCKTXN, resp));
}

bool static ProcessHeadersMessage(CNode *pfrom, CConnman *connman, const std::vector<CBlockHeader>& headers, const CChainParams& chainparams, bool punish_duplicate_invalid)
bool static ProcessHeadersMessage(CNode *pfrom, CConnman *connman, CTxMemPool& mempool, const std::vector<CBlockHeader>& headers, const CChainParams& chainparams, bool punish_duplicate_invalid)
{
const CNetMsgMaker msgMaker(pfrom->GetSendVersion());
size_t nCount = headers.size();
Expand Down Expand Up @@ -2017,7 +2017,7 @@ bool static ProcessHeadersMessage(CNode *pfrom, CConnman *connman, const std::ve
break;
}
vGetData.push_back(CInv(MSG_BLOCK, pindex->GetBlockHash()));
MarkBlockAsInFlight(pfrom->GetId(), pindex->GetBlockHash(), pindex);
MarkBlockAsInFlight(mempool, pfrom->GetId(), pindex->GetBlockHash(), pindex);
LogPrint(BCLog::NET, "Requesting block %s from peer=%d\n",
pindex->GetBlockHash().ToString(), pfrom->GetId());
}
Expand Down Expand Up @@ -2069,7 +2069,7 @@ bool static ProcessHeadersMessage(CNode *pfrom, CConnman *connman, const std::ve
return true;
}

void static ProcessOrphanTx(CConnman* connman, std::set<uint256>& orphan_work_set) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans)
void static ProcessOrphanTx(CConnman* connman, CTxMemPool& mempool, std::set<uint256>& orphan_work_set) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans)
{
AssertLockHeld(cs_main);
AssertLockHeld(g_cs_orphans);
Expand Down Expand Up @@ -2411,7 +2411,7 @@ std::pair<bool /*ret*/, bool /*do_return*/> static ValidateDSTX(CCoinJoinBroadca
return {true, false};
}

bool ProcessMessage(CNode* pfrom, const std::string& msg_type, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman* connman, BanMan* banman, const std::atomic<bool>& interruptMsgProc, bool enable_bip61)
bool ProcessMessage(CNode* pfrom, const std::string& msg_type, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CTxMemPool& mempool, CConnman* connman, BanMan* banman, const std::atomic<bool>& interruptMsgProc, bool enable_bip61)
{
LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(msg_type), vRecv.size(), pfrom->GetId());
statsClient.inc("message.received." + SanitizeString(msg_type), 1.0f);
Expand Down Expand Up @@ -2878,7 +2878,7 @@ bool ProcessMessage(CNode* pfrom, const std::string& msg_type, CDataStream& vRec
if (interruptMsgProc)
return true;

bool fAlreadyHave = AlreadyHave(inv);
bool fAlreadyHave = AlreadyHave(inv, mempool);
LogPrint(BCLog::NET, "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom->GetId());
statsClient.inc(strprintf("message.received.inv_%s", inv.GetCommand()), 1.0f);

Expand Down Expand Up @@ -2950,7 +2950,7 @@ bool ProcessMessage(CNode* pfrom, const std::string& msg_type, CDataStream& vRec
}

pfrom->vRecvGetData.insert(pfrom->vRecvGetData.end(), vInv.begin(), vInv.end());
ProcessGetData(pfrom, chainparams, connman, interruptMsgProc);
ProcessGetData(pfrom, chainparams, connman, mempool, interruptMsgProc);
return true;
}

Expand Down Expand Up @@ -3205,7 +3205,7 @@ bool ProcessMessage(CNode* pfrom, const std::string& msg_type, CDataStream& vRec
bool fMissingInputs = false;
CValidationState state;

if (!AlreadyHave(inv) && AcceptToMemoryPool(mempool, state, ptx, &fMissingInputs /* pfMissingInputs */,
if (!AlreadyHave(inv, mempool) && AcceptToMemoryPool(mempool, state, ptx, &fMissingInputs /* pfMissingInputs */,
false /* bypass_limits */, 0 /* nAbsurdFee */)) {
// Process custom txes, this changes AlreadyHave to "true"
if (nInvType == MSG_DSTX) {
Expand Down Expand Up @@ -3234,7 +3234,7 @@ bool ProcessMessage(CNode* pfrom, const std::string& msg_type, CDataStream& vRec
mempool.size(), mempool.DynamicMemoryUsage() / 1000);

// Recursively process any orphan transactions that depended on this one
ProcessOrphanTx(connman, pfrom->orphan_work_set);
ProcessOrphanTx(connman, mempool, pfrom->orphan_work_set);
}
else if (fMissingInputs)
{
Expand All @@ -3251,11 +3251,11 @@ bool ProcessMessage(CNode* pfrom, const std::string& msg_type, CDataStream& vRec
for (const CTxIn& txin : tx.vin) {
CInv _inv(MSG_TX, txin.prevout.hash);
pfrom->AddInventoryKnown(_inv);
if (!AlreadyHave(_inv)) RequestObject(State(pfrom->GetId()), _inv, current_time);
if (!AlreadyHave(_inv, mempool)) RequestObject(State(pfrom->GetId()), _inv, current_time);
// We don't know if the previous tx was a regular or a mixing one, try both
CInv _inv2(MSG_DSTX, txin.prevout.hash);
pfrom->AddInventoryKnown(_inv2);
if (!AlreadyHave(_inv2)) RequestObject(State(pfrom->GetId()), _inv2, current_time);
if (!AlreadyHave(_inv2, mempool)) RequestObject(State(pfrom->GetId()), _inv2, current_time);
}
AddOrphanTx(ptx, pfrom->GetId());

Expand Down Expand Up @@ -3436,7 +3436,7 @@ bool ProcessMessage(CNode* pfrom, const std::string& msg_type, CDataStream& vRec
if ((!fAlreadyInFlight && nodestate->nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) ||
(fAlreadyInFlight && blockInFlightIt->second.first == pfrom->GetId())) {
std::list<QueuedBlock>::iterator *queuedBlockIt = nullptr;
if (!MarkBlockAsInFlight(pfrom->GetId(), pindex->GetBlockHash(), pindex, &queuedBlockIt)) {
if (!MarkBlockAsInFlight(mempool, pfrom->GetId(), pindex->GetBlockHash(), pindex, &queuedBlockIt)) {
if (!(*queuedBlockIt)->partialBlock)
(*queuedBlockIt)->partialBlock.reset(new PartiallyDownloadedBlock(&mempool));
else {
Expand Down Expand Up @@ -3509,15 +3509,15 @@ bool ProcessMessage(CNode* pfrom, const std::string& msg_type, CDataStream& vRec
} // cs_main

if (fProcessBLOCKTXN)
return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, connman, banman, interruptMsgProc, enable_bip61);
return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, nTimeReceived, chainparams, mempool, connman, banman, interruptMsgProc, enable_bip61);

if (fRevertToHeaderProcessing) {
// Headers received from HB compact block peers are permitted to be
// relayed before full validation (see BIP 152), so we don't want to disconnect
// the peer if the header turns out to be for an invalid block.
// Note that if a peer tries to build on an invalid chain, that
// will be detected and the peer will be banned.
return ProcessHeadersMessage(pfrom, connman, {cmpctblock.header}, chainparams, /*punish_duplicate_invalid=*/false);
return ProcessHeadersMessage(pfrom, connman, mempool, {cmpctblock.header}, chainparams, /*punish_duplicate_invalid=*/false);
}

if (fBlockReconstructed) {
Expand Down Expand Up @@ -3675,7 +3675,7 @@ bool ProcessMessage(CNode* pfrom, const std::string& msg_type, CDataStream& vRec
// disconnect the peer if it is using one of our outbound connection
// slots.
bool should_punish = !pfrom->fInbound && !pfrom->m_manual_connection;
return ProcessHeadersMessage(pfrom, connman, headers, chainparams, should_punish);
return ProcessHeadersMessage(pfrom, connman, mempool, headers, chainparams, should_punish);
}

if (msg_type == NetMsgType::BLOCK)
Expand Down Expand Up @@ -4073,11 +4073,11 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& inter
bool fMoreWork = false;

if (!pfrom->vRecvGetData.empty())
ProcessGetData(pfrom, chainparams, connman, interruptMsgProc);
ProcessGetData(pfrom, chainparams, connman, m_mempool, interruptMsgProc);

if (!pfrom->orphan_work_set.empty()) {
LOCK2(cs_main, g_cs_orphans);
ProcessOrphanTx(connman, pfrom->orphan_work_set);
ProcessOrphanTx(connman, m_mempool, pfrom->orphan_work_set);
}

if (pfrom->fDisconnect)
Expand Down Expand Up @@ -4137,7 +4137,7 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& inter
bool fRet = false;
try
{
fRet = ProcessMessage(pfrom, msg_type, vRecv, msg.m_time, chainparams, connman, m_banman, interruptMsgProc, m_enable_bip61);
fRet = ProcessMessage(pfrom, msg_type, vRecv, msg.m_time, chainparams, m_mempool, connman, m_banman, interruptMsgProc, m_enable_bip61);
if (interruptMsgProc)
return false;
if (!pfrom->vRecvGetData.empty())
Expand Down Expand Up @@ -4667,7 +4667,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto)

// Respond to BIP35 mempool requests
if (fSendTrickle && pto->fSendMempool) {
auto vtxinfo = mempool.infoAll();
auto vtxinfo = m_mempool.infoAll();
pto->fSendMempool = false;

LOCK(pto->cs_filter);
Expand Down Expand Up @@ -4837,7 +4837,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto)
FindNextBlocksToDownload(pto->GetId(), MAX_BLOCKS_IN_TRANSIT_PER_PEER - state.nBlocksInFlight, vToDownload, staller, consensusParams);
for (const CBlockIndex *pindex : vToDownload) {
vGetData.push_back(CInv(MSG_BLOCK, pindex->GetBlockHash()));
MarkBlockAsInFlight(pto->GetId(), pindex->GetBlockHash(), pindex);
MarkBlockAsInFlight(m_mempool, pto->GetId(), pindex->GetBlockHash(), pindex);
LogPrint(BCLog::NET, "Requesting block %s (%d) peer=%d\n", pindex->GetBlockHash().ToString(),
pindex->nHeight, pto->GetId());
}
Expand Down Expand Up @@ -4886,7 +4886,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto)
state.m_object_download.m_object_in_flight.erase(inv);
continue;
}
if (!AlreadyHave(inv)) {
if (!AlreadyHave(inv, m_mempool)) {
// If this object was last requested more than GetObjectInterval ago,
// then request.
const auto last_request_time = GetObjectRequestTime(inv.hash);
Expand Down
10 changes: 6 additions & 4 deletions src/net_processing.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
#ifndef BITCOIN_NET_PROCESSING_H
#define BITCOIN_NET_PROCESSING_H

#include <net.h>
#include <validation.h>
#include <validationinterface.h>
#include <consensus/params.h>
#include <net.h>
#include <sync.h>
#include <validationinterface.h>

class CTxMemPool;

extern CCriticalSection cs_main;

Expand All @@ -27,10 +28,11 @@ class PeerLogicValidation final : public CValidationInterface, public NetEventsI
private:
CConnman* const connman;
BanMan* const m_banman;
CTxMemPool& m_mempool;

bool SendRejectsAndCheckIfBanned(CNode* pnode, bool enable_bip61) EXCLUSIVE_LOCKS_REQUIRED(cs_main);
public:
PeerLogicValidation(CConnman* connmanIn, BanMan* banman, CScheduler &scheduler, bool enable_bip61);
PeerLogicValidation(CConnman* connmanIn, BanMan* banman, CScheduler &scheduler, CTxMemPool& pool, bool enable_bip61);

/**
* Overridden from CValidationInterface.
Expand Down
1 change: 1 addition & 0 deletions src/spork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <timedata.h>
#include <util/ranges.h>
#include <util/validation.h> // for strMessageMagic
#include <validation.h>

#include <string>

Expand Down
10 changes: 5 additions & 5 deletions src/test/denialofservice_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ BOOST_FIXTURE_TEST_SUITE(denialofservice_tests, TestingSetup)
BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction)
{
auto connman = MakeUnique<CConnman>(0x1337, 0x1337);
auto peerLogic = MakeUnique<PeerLogicValidation>(connman.get(), nullptr, *m_node.scheduler, false);
auto peerLogic = MakeUnique<PeerLogicValidation>(connman.get(), nullptr, *m_node.scheduler, *m_node.mempool, false);

// Mock an outbound peer
CAddress addr1(ip(0xa0b0c001), NODE_NONE);
Expand Down Expand Up @@ -148,7 +148,7 @@ static void AddRandomOutboundPeer(std::vector<CNode *> &vNodes, PeerLogicValidat
BOOST_AUTO_TEST_CASE(stale_tip_peer_management)
{
auto connman = MakeUnique<CConnmanTest>(0x1337, 0x1337);
auto peerLogic = MakeUnique<PeerLogicValidation>(connman.get(), nullptr, *m_node.scheduler, false);
auto peerLogic = MakeUnique<PeerLogicValidation>(connman.get(), nullptr, *m_node.scheduler, *m_node.mempool, false);

const Consensus::Params& consensusParams = Params().GetConsensus();
constexpr int nMaxOutbound = 8;
Expand Down Expand Up @@ -221,7 +221,7 @@ BOOST_AUTO_TEST_CASE(DoS_banning)
{
auto banman = MakeUnique<BanMan>(GetDataDir() / "banlist.dat", nullptr, DEFAULT_MISBEHAVING_BANTIME);
auto connman = MakeUnique<CConnman>(0x1337, 0x1337);
auto peerLogic = MakeUnique<PeerLogicValidation>(connman.get(), banman.get(), *m_node.scheduler, false);
auto peerLogic = MakeUnique<PeerLogicValidation>(connman.get(), banman.get(), *m_node.scheduler, *m_node.mempool, false);

banman->ClearBanned();
CAddress addr1(ip(0xa0b0c001), NODE_NONE);
Expand Down Expand Up @@ -276,7 +276,7 @@ BOOST_AUTO_TEST_CASE(DoS_banscore)
{
auto banman = MakeUnique<BanMan>(GetDataDir() / "banlist.dat", nullptr, DEFAULT_MISBEHAVING_BANTIME);
auto connman = MakeUnique<CConnman>(0x1337, 0x1337);
auto peerLogic = MakeUnique<PeerLogicValidation>(connman.get(), banman.get(), *m_node.scheduler, false);
auto peerLogic = MakeUnique<PeerLogicValidation>(connman.get(), banman.get(), *m_node.scheduler, *m_node.mempool, false);

banman->ClearBanned();
gArgs.ForceSetArg("-banscore", "111"); // because 11 is my favorite number
Expand Down Expand Up @@ -323,7 +323,7 @@ BOOST_AUTO_TEST_CASE(DoS_bantime)
{
auto banman = MakeUnique<BanMan>(GetDataDir() / "banlist.dat", nullptr, DEFAULT_MISBEHAVING_BANTIME);
auto connman = MakeUnique<CConnman>(0x1337, 0x1337);
auto peerLogic = MakeUnique<PeerLogicValidation>(connman.get(), banman.get(), *m_node.scheduler, false);
auto peerLogic = MakeUnique<PeerLogicValidation>(connman.get(), banman.get(), *m_node.scheduler, *m_node.mempool, false);

banman->ClearBanned();
int64_t nStartTime = GetTime();
Expand Down
Loading

0 comments on commit 41252a1

Please sign in to comment.