Skip to content

Commit

Permalink
net: add a flag to indicate when a node's process queue is full
Browse files Browse the repository at this point in the history
Messages are dumped very quickly from the socket handler to the processor, so
it's the depth of the processing queue that's interesting.

The socket handler checks the process queue's size during the brief message
hand-off and pauses if necessary, and the processor possibly unpauses each time
a message is popped off of its queue.
  • Loading branch information
theuni committed Jan 13, 2017
1 parent 4d712e3 commit c6e8a9b
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 12 deletions.
10 changes: 7 additions & 3 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1165,9 +1165,7 @@ void CConnman::ThreadSocketHandler()
}
{
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
if (lockRecv && (
pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() ||
pnode->GetTotalRecvSize() <= GetReceiveFloodSize()))
if (lockRecv && !pnode->fPauseRecv)
FD_SET(pnode->hSocket, &fdsetRecv);
}
}
Expand Down Expand Up @@ -1240,14 +1238,18 @@ void CConnman::ThreadSocketHandler()
pnode->CloseSocketDisconnect();
RecordBytesRecv(nBytes);
if (notify) {
size_t nSizeAdded = 0;
auto it(pnode->vRecvMsg.begin());
for (; it != pnode->vRecvMsg.end(); ++it) {
if (!it->complete())
break;
nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE;
}
{
LOCK(pnode->cs_vProcessMsg);
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
pnode->nProcessQueueSize += nSizeAdded;
pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
}
WakeMessageHandler();
}
Expand Down Expand Up @@ -2592,6 +2594,8 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
minFeeFilter = 0;
lastSentFeeFilter = 0;
nextSendTimeFeeFilter = 0;
fPauseRecv = false;
nProcessQueueSize = 0;

BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes())
mapRecvBytesPerMsgCmd[msg] = 0;
Expand Down
11 changes: 2 additions & 9 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,7 @@ class CNode

CCriticalSection cs_vProcessMsg;
std::list<CNetMessage> vProcessMsg;
size_t nProcessQueueSize;

std::deque<CInv> vRecvGetData;
std::list<CNetMessage> vRecvMsg;
Expand Down Expand Up @@ -650,6 +651,7 @@ class CNode
const NodeId id;

const uint64_t nKeyedNetGroup;
std::atomic_bool fPauseRecv;
protected:

mapMsgCmdSize mapSendBytesPerMsgCmd;
Expand Down Expand Up @@ -743,15 +745,6 @@ class CNode
return nRefCount;
}

// requires LOCK(cs_vRecvMsg)
unsigned int GetTotalRecvSize()
{
unsigned int total = 0;
BOOST_FOREACH(const CNetMessage &msg, vRecvMsg)
total += msg.vRecv.size() + 24;
return total;
}

// requires LOCK(cs_vRecvMsg)
bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete);

Expand Down
2 changes: 2 additions & 0 deletions src/net_processing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2475,6 +2475,8 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
return false;
// Just take one message
msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
pfrom->nProcessQueueSize -= msgs.front().vRecv.size() + CMessageHeader::HEADER_SIZE;
pfrom->fPauseRecv = pfrom->nProcessQueueSize > connman.GetReceiveFloodSize();
fMoreWork = !pfrom->vProcessMsg.empty();
}
CNetMessage& msg(msgs.front());
Expand Down

0 comments on commit c6e8a9b

Please sign in to comment.