From 8bba030f6601b3e4899d0af4b8171d750f395523 Mon Sep 17 00:00:00 2001 From: Lazy Nina <81658138+lazynina@users.noreply.github.com> Date: Fri, 28 Jun 2024 13:20:38 -0400 Subject: [PATCH] Add disconnect reason to peer disconnect logic (#1378) --- integration_testing/connection_bridge.go | 8 ++--- lib/connection_manager.go | 8 ++--- lib/network_manager.go | 46 ++++++++++++------------ lib/peer.go | 35 +++++++++++------- lib/remote_node.go | 2 +- lib/server.go | 42 +++++++++++----------- scripts/mempool/mempool_dumper.go | 8 ++--- 7 files changed, 79 insertions(+), 70 deletions(-) diff --git a/integration_testing/connection_bridge.go b/integration_testing/connection_bridge.go index 764cfe61e..cc79f19b3 100644 --- a/integration_testing/connection_bridge.go +++ b/integration_testing/connection_bridge.go @@ -403,10 +403,10 @@ func (bridge *ConnectionBridge) Disconnect() { } bridge.disabled = true - bridge.connectionInboundA.Disconnect() - bridge.connectionInboundB.Disconnect() - bridge.connectionOutboundA.Disconnect() - bridge.connectionOutboundB.Disconnect() + bridge.connectionInboundA.Disconnect("ConnectionBridge.Disconnect") + bridge.connectionInboundB.Disconnect("ConnectionBridge.Disconnect") + bridge.connectionOutboundA.Disconnect("ConnectionBridge.Disconnect") + bridge.connectionOutboundB.Disconnect("ConnectionBridge.Disconnect") bridge.outboundListenerA.Close() bridge.outboundListenerB.Close() diff --git a/lib/connection_manager.go b/lib/connection_manager.go index 1a8448521..c730a6210 100644 --- a/lib/connection_manager.go +++ b/lib/connection_manager.go @@ -472,7 +472,7 @@ func (cmgr *ConnectionManager) CloseConnection(peerId uint64) { if !ok { return } - peer.Disconnect() + peer.Disconnect("ConnectionManager.CloseConnection: Closing connection to peer") } // Update our data structures to remove this peer. @@ -550,15 +550,15 @@ func (cmgr *ConnectionManager) Stop() { len(cmgr.persistentPeers)) for _, peer := range cmgr.inboundPeers { glog.V(1).Infof(CLog(Red, fmt.Sprintf("ConnectionManager.Stop: Inbound peer (%v)", peer))) - peer.Disconnect() + peer.Disconnect("ConnectionManager.Stop: Stopping ConnectionManager") } for _, peer := range cmgr.outboundPeers { glog.V(1).Infof("ConnectionManager.Stop: Outbound peer (%v)", peer) - peer.Disconnect() + peer.Disconnect("ConnectionManager.Stop: Stopping ConnectionManager") } for _, peer := range cmgr.persistentPeers { glog.V(1).Infof("ConnectionManager.Stop: Persistent peer (%v)", peer) - peer.Disconnect() + peer.Disconnect("ConnectionManager.Stop: Stopping ConnectionManager") } // Close all of the listeners. diff --git a/lib/network_manager.go b/lib/network_manager.go index 750f85390..3ab6a29ea 100644 --- a/lib/network_manager.go +++ b/lib/network_manager.go @@ -254,7 +254,7 @@ func (nm *NetworkManager) _handleVersionMessage(origin *Peer, desoMsg DeSoMessag if verMsg, ok = desoMsg.(*MsgDeSoVersion); !ok { glog.Errorf("NetworkManager.handleVersionMessage: Disconnecting RemoteNode with id: (%v) "+ "error casting version message", origin.ID) - nm.Disconnect(rn) + nm.Disconnect(rn, "error casting version message") return } @@ -264,7 +264,7 @@ func (nm *NetworkManager) _handleVersionMessage(origin *Peer, desoMsg DeSoMessag nm.usedNonces.Delete(msgNonce) glog.Errorf("NetworkManager.handleVersionMessage: Disconnecting RemoteNode with id: (%v) "+ "nonce collision, nonce (%v)", origin.ID, msgNonce) - nm.Disconnect(rn) + nm.Disconnect(rn, "nonce collision") return } @@ -273,7 +273,7 @@ func (nm *NetworkManager) _handleVersionMessage(origin *Peer, desoMsg DeSoMessag if err := rn.HandleVersionMessage(verMsg, responseNonce); err != nil { glog.Errorf("NetworkManager.handleVersionMessage: Requesting PeerDisconnect for id: (%v) "+ "error handling version message: %v", origin.ID, err) - nm.Disconnect(rn) + nm.Disconnect(rn, fmt.Sprintf("error handling version message: %v", err)) return } @@ -297,7 +297,7 @@ func (nm *NetworkManager) _handleVerackMessage(origin *Peer, desoMsg DeSoMessage if vrkMsg, ok = desoMsg.(*MsgDeSoVerack); !ok { glog.Errorf("NetworkManager.handleVerackMessage: Disconnecting RemoteNode with id: (%v) "+ "error casting verack message", origin.ID) - nm.Disconnect(rn) + nm.Disconnect(rn, "error casting verack message") return } @@ -305,7 +305,7 @@ func (nm *NetworkManager) _handleVerackMessage(origin *Peer, desoMsg DeSoMessage if err := rn.HandleVerackMessage(vrkMsg); err != nil { glog.Errorf("NetworkManager.handleVerackMessage: Requesting PeerDisconnect for id: (%v) "+ "error handling verack message: %v", origin.ID, err) - nm.Disconnect(rn) + nm.Disconnect(rn, fmt.Sprintf("error handling verack message: %v", err)) return } @@ -321,7 +321,7 @@ func (nm *NetworkManager) _handleDisconnectedPeerMessage(origin *Peer, desoMsg D glog.V(2).Infof("NetworkManager._handleDisconnectedPeerMessage: Handling disconnected peer message for "+ "id=%v", origin.ID) - nm.DisconnectById(NewRemoteNodeId(origin.ID)) + nm.DisconnectById(NewRemoteNodeId(origin.ID), "peer disconnected") // Update the persistentIpToRemoteNodeIdsMap, in case the disconnected peer was a persistent peer. ipRemoteNodeIdMap := nm.persistentIpToRemoteNodeIdsMap.ToMap() for ip, id := range ipRemoteNodeIdMap { @@ -459,7 +459,7 @@ func (nm *NetworkManager) processOutboundConnection(conn Connection) (*RemoteNod func (nm *NetworkManager) cleanupFailedInboundConnection(remoteNode *RemoteNode, connection Connection) { glog.V(2).Infof("NetworkManager.cleanupFailedInboundConnection: Cleaning up failed inbound connection") if remoteNode != nil { - nm.Disconnect(remoteNode) + nm.Disconnect(remoteNode, "cleaning up failed inbound connection") } connection.Close() } @@ -478,7 +478,7 @@ func (nm *NetworkManager) cleanupFailedOutboundConnection(connection Connection) id := NewRemoteNodeId(oc.attemptId) rn := nm.GetRemoteNodeById(id) if rn != nil { - nm.Disconnect(rn) + nm.Disconnect(rn, "cleaning up failed outbound connection") } oc.Close() nm.cmgr.RemoveAttemptedOutboundAddrs(oc.address) @@ -578,14 +578,14 @@ func (nm *NetworkManager) refreshValidatorIndices() { if _, ok := nm.GetValidatorOutboundIndex().Get(pk.Serialize()); ok { glog.V(2).Infof("NetworkManager.refreshValidatorIndices: Disconnecting Validator RemoteNode "+ "(%v) has validator public key (%v) that is already present in validator index", rn, pk) - nm.Disconnect(rn) + nm.Disconnect(rn, "outbound - validator public key already present in validator index") continue } } else { if _, ok := nm.GetValidatorInboundIndex().Get(pk.Serialize()); ok { glog.V(2).Infof("NetworkManager.refreshValidatorIndices: Disconnecting Validator RemoteNode "+ "(%v) has validator public key (%v) that is already present in validator index", rn, pk) - nm.Disconnect(rn) + nm.Disconnect(rn, "inbound - validator public key already present in validator index") continue } } @@ -704,7 +704,7 @@ func (nm *NetworkManager) refreshNonValidatorOutboundIndex() { } glog.V(2).Infof("NetworkManager.refreshNonValidatorOutboundIndex: Disconnecting attempted remote "+ "node (id=%v) due to excess outbound RemoteNodes", rn.GetId()) - nm.Disconnect(rn) + nm.Disconnect(rn, "excess attempted outbound RemoteNodes") excessiveOutboundRemoteNodes-- } // Now disconnect the connected remote nodes, if we still have too many remote nodes. @@ -714,7 +714,7 @@ func (nm *NetworkManager) refreshNonValidatorOutboundIndex() { } glog.V(2).Infof("NetworkManager.refreshNonValidatorOutboundIndex: Disconnecting connected remote "+ "node (id=%v) due to excess outbound RemoteNodes", rn.GetId()) - nm.Disconnect(rn) + nm.Disconnect(rn, "excess connected outbound RemoteNodes") excessiveOutboundRemoteNodes-- } } @@ -754,7 +754,7 @@ func (nm *NetworkManager) refreshNonValidatorInboundIndex() { } glog.V(2).Infof("NetworkManager.refreshNonValidatorInboundIndex: Disconnecting inbound remote "+ "node (id=%v) due to excess inbound RemoteNodes", rn.GetId()) - nm.Disconnect(rn) + nm.Disconnect(rn, "excess inbound RemoteNodes") excessiveInboundRemoteNodes-- } } @@ -943,7 +943,7 @@ func (nm *NetworkManager) AttachOutboundConnection( } if err := remoteNode.AttachOutboundConnection(conn, na, isPersistent); err != nil { - nm.Disconnect(remoteNode) + nm.Disconnect(remoteNode, fmt.Sprintf("error attaching outbound connection: %v", err)) return nil, errors.Wrapf(err, "NetworkManager.AttachOutboundConnection: Problem calling AttachOutboundConnection "+ "for addr: (%s). Disconnecting remote node (id=%v)", conn.RemoteAddr().String(), remoteNode.GetId()) } @@ -959,7 +959,7 @@ func (nm *NetworkManager) DisconnectAll() { allRemoteNodes := nm.GetAllRemoteNodes().GetAll() for _, rn := range allRemoteNodes { glog.V(2).Infof("NetworkManager.DisconnectAll: Disconnecting from remote node (id=%v)", rn.GetId()) - nm.Disconnect(rn) + nm.Disconnect(rn, "disconnecting all remote nodes") } } @@ -1001,22 +1001,22 @@ func (nm *NetworkManager) ProcessCompletedHandshake(remoteNode *RemoteNode) { nm.srv.maybeRequestAddresses(remoteNode) } -func (nm *NetworkManager) Disconnect(rn *RemoteNode) { +func (nm *NetworkManager) Disconnect(rn *RemoteNode, disconnectReason string) { if rn == nil { return } glog.V(2).Infof("NetworkManager.Disconnect: Disconnecting from remote node id=%v", rn.GetId()) - rn.Disconnect() + rn.Disconnect(disconnectReason) nm.removeRemoteNodeFromIndexer(rn) } -func (nm *NetworkManager) DisconnectById(id RemoteNodeId) { +func (nm *NetworkManager) DisconnectById(id RemoteNodeId, disconnectReason string) { rn := nm.GetRemoteNodeById(id) if rn == nil { return } - nm.Disconnect(rn) + nm.Disconnect(rn, disconnectReason) } func (nm *NetworkManager) SendMessage(rn *RemoteNode, desoMessage DeSoMessage) error { @@ -1067,7 +1067,7 @@ func (nm *NetworkManager) Cleanup() { for _, rn := range allRemoteNodes { if rn.IsTimedOut() { glog.V(2).Infof("NetworkManager.Cleanup: Disconnecting from remote node (id=%v)", rn.GetId()) - nm.Disconnect(rn) + nm.Disconnect(rn, "cleanup") } } } @@ -1261,7 +1261,7 @@ func (nm *NetworkManager) InitiateHandshake(rn *RemoteNode) { nonce := uint64(RandInt64(math.MaxInt64)) if err := rn.InitiateHandshake(nonce); err != nil { glog.Errorf("NetworkManager.InitiateHandshake: Error initiating handshake: %v", err) - nm.Disconnect(rn) + nm.Disconnect(rn, fmt.Sprintf("error initiating handshake: %v", err)) } nm.usedNonces.Add(nonce) } @@ -1285,7 +1285,7 @@ func (nm *NetworkManager) handleHandshakeComplete(remoteNode *RemoteNode) { if err := nm.handleHandshakeCompletePoSMessage(remoteNode); err != nil { glog.Errorf("NetworkManager.handleHandshakeComplete: Error handling PoS handshake peer message: %v, "+ "remoteNodePk (%s)", err, remoteNode.GetValidatorPublicKey().Serialize()) - nm.Disconnect(remoteNode) + nm.Disconnect(remoteNode, fmt.Sprintf("error handling PoS handshake peer message: %v", err)) return } nm.ProcessCompletedHandshake(remoteNode) @@ -1317,7 +1317,7 @@ func (nm *NetworkManager) handleHandshakeCompletePoSMessage(remoteNode *RemoteNo existingValidator, ok := nm.GetValidatorOutboundIndex().Get(validatorPk.Serialize()) if ok && remoteNode.GetId() != existingValidator.GetId() { if remoteNode.IsPersistent() && !existingValidator.IsPersistent() { - nm.Disconnect(existingValidator) + nm.Disconnect(existingValidator, "outbound - duplicate validator public key") return nil } return fmt.Errorf("NetworkManager.handleHandshakeCompletePoSMessage: Outbound RemoteNode with duplicate validator public key. "+ diff --git a/lib/peer.go b/lib/peer.go index a8c865ed3..b631c1e7a 100644 --- a/lib/peer.go +++ b/lib/peer.go @@ -93,6 +93,10 @@ type Peer struct { // Set to zero until Disconnect has been called on the Peer. Used to make it // so that the logic in Disconnect will only be executed once. disconnected int32 + // TODO: This should be an enum. + // disconnectReason is the reason the peer was disconnected. It is set the first + // time Disconnect is called on a peer. + disconnectReason string // Signals that the peer is now in the stopped state. quit chan interface{} @@ -413,7 +417,7 @@ func (pp *Peer) HandleGetBlocks(msg *MsgDeSoGetBlocks) { // GetHeaders request. glog.Errorf("Server._handleGetBlocks: Disconnecting peer %v because "+ "she asked for a block with hash %v that we don't have", pp, msg.HashList[0]) - pp.Disconnect() + pp.Disconnect("handleGetBlocks - requested block with hash we don't have. protocolV2") return } allBlocks.Blocks = append(allBlocks.Blocks, blockToSend) @@ -442,7 +446,7 @@ func (pp *Peer) HandleGetBlocks(msg *MsgDeSoGetBlocks) { // GetHeaders request. glog.Errorf("Server._handleGetBlocks: Disconnecting peer %v because "+ "she asked for a block with hash %v that we don't have", pp, msg.HashList[0]) - pp.Disconnect() + pp.Disconnect("handleGetBlocks - requested block with hash we don't have. protocol < v2") return } pp.AddDeSoMessage(blockToSend, false) @@ -464,7 +468,7 @@ func (pp *Peer) HandleGetSnapshot(msg *MsgDeSoGetSnapshot) { if pp.snapshotChunkRequestInFlight { glog.V(1).Infof("Peer.HandleGetSnapshot: Ignoring GetSnapshot from Peer %v"+ "because he already requested a GetSnapshot", pp) - pp.Disconnect() + pp.Disconnect("handleGetSnapshot - peer already requested a snapshot chunk") return } pp.snapshotChunkRequestInFlight = true @@ -474,7 +478,7 @@ func (pp *Peer) HandleGetSnapshot(msg *MsgDeSoGetSnapshot) { if pp.srv.snapshot == nil { glog.Errorf("Peer.HandleGetSnapshot: Ignoring GetSnapshot from Peer %v "+ "and disconnecting because node doesn't support HyperSync", pp) - pp.Disconnect() + pp.Disconnect("handleGetSnapshot - peer doesn't support HyperSync") return } @@ -497,7 +501,7 @@ func (pp *Peer) HandleGetSnapshot(msg *MsgDeSoGetSnapshot) { if len(msg.SnapshotStartKey) == 0 || len(msg.GetPrefix()) == 0 { glog.Errorf("Peer.HandleGetSnapshot: Ignoring GetSnapshot from Peer %v "+ "because SnapshotStartKey or Prefix are empty", pp) - pp.Disconnect() + pp.Disconnect("handleGetSnapshot - empty SnapshotStartKey or Prefix") return } @@ -788,7 +792,7 @@ out: func (pp *Peer) String() string { isDisconnected := "" if pp.disconnected != 0 { - isDisconnected = ", DISCONNECTED" + isDisconnected = fmt.Sprintf(", DISCONNECTED (%v)", pp.disconnectReason) } return fmt.Sprintf("[ Remote Address: %v%s PeerID=%d ]", pp.addrStr, isDisconnected, pp.ID) } @@ -1007,7 +1011,7 @@ out: glog.V(3).Infof("Writing Message: (%v)", msg) if err := pp.WriteDeSoMessage(msg); err != nil { glog.Errorf("Peer.outHandler: Problem sending message to peer: %v: %v", pp, err) - pp.Disconnect() + pp.Disconnect("outHandler - problem sending message to peer") } case <-stallTicker.C: // Every second take a look to see if there's something that the peer should @@ -1025,7 +1029,9 @@ out: glog.Errorf("Peer.outHandler: Peer %v took too long to response to "+ "reqest. Expected MsgType=%v at time %v but it is now time %v", pp, firstEntry.MessageType, firstEntry.TimeExpected, nowTime) - pp.Disconnect() + pp.Disconnect(fmt.Sprintf( + "outHandler - peer took too long to respond to request, expected MsgType=%v", + firstEntry.MessageType)) } case <-pp.quit: @@ -1060,7 +1066,7 @@ func (pp *Peer) _maybeAddBlocksToSend(msg DeSoMessage) error { // We can safely increase this without breaking backwards-compatibility because old // nodes will never send us more hashes than this. if len(pp.blocksToSend) > MaxBlocksInFlightPoS { - pp.Disconnect() + pp.Disconnect("maybeAddBlocksToSend - too many blocks requested") return fmt.Errorf("_maybeAddBlocksToSend: Disconnecting peer %v because she requested %d "+ "blocks, which is more than the %d blocks allowed "+ "in flight", pp, len(pp.blocksToSend), MaxBlocksInFlightPoS) @@ -1148,7 +1154,7 @@ func (pp *Peer) inHandler() { // is processed. idleTimer := time.AfterFunc(idleTimeout, func() { glog.V(1).Infof("Peer.inHandler: Peer %v no answer for %v -- disconnecting", pp, idleTimeout) - pp.Disconnect() + pp.Disconnect("inHandler - no answer for idleTimeout") }) out: @@ -1230,7 +1236,7 @@ out: idleTimer.Stop() // Disconnect the Peer if it isn't already. - pp.Disconnect() + pp.Disconnect("inHandler - done processing messages") glog.V(1).Infof("Peer.inHandler: done for peer: %v", pp) } @@ -1326,15 +1332,18 @@ func (pp *Peer) ReadDeSoMessage() (DeSoMessage, error) { return msg, nil } +// TODO: Disconnect reason enum + // Disconnect closes a peer's network connection. -func (pp *Peer) Disconnect() { +func (pp *Peer) Disconnect(reason string) { // Only run the logic the first time Disconnect is called. - glog.V(1).Infof(CLog(Yellow, "Peer.Disconnect: Starting")) + glog.V(0).Infof(CLog(Yellow, "Peer.Disconnect: Starting for Peer %v with reason: %v"), pp, reason) if atomic.LoadInt32(&pp.disconnected) != 0 { glog.V(1).Infof("Peer.Disconnect: Disconnect call ignored since it was already called before for Peer %v", pp) return } atomic.AddInt32(&pp.disconnected, 1) + pp.disconnectReason = reason glog.V(2).Infof("Peer.Disconnect: Running Disconnect for the first time for Peer %v", pp) diff --git a/lib/remote_node.go b/lib/remote_node.go index aaa295c6a..87b679a8e 100644 --- a/lib/remote_node.go +++ b/lib/remote_node.go @@ -386,7 +386,7 @@ func (rn *RemoteNode) AttachOutboundConnection(conn net.Conn, na *wire.NetAddres } // Disconnect disconnects the remote node, closing the attempted connection or the established connection. -func (rn *RemoteNode) Disconnect() { +func (rn *RemoteNode) Disconnect(disconnectReason string) { rn.mtx.Lock() defer rn.mtx.Unlock() diff --git a/lib/server.go b/lib/server.go index 24e345acc..759e8ca24 100644 --- a/lib/server.go +++ b/lib/server.go @@ -1075,7 +1075,7 @@ func (srv *Server) _handleHeaderBundle(pp *Peer, msg *MsgDeSoHeaderBundle) { "found between the received header height %v does not match the checkpoint block info %v", pp, srv.blockchain.chainState(), headerReceived.Height, srv.blockchain.GetCheckpointBlockInfo().String()) - pp.Disconnect() + pp.Disconnect("Header height mismatch with checkpoint block info") return } @@ -1100,7 +1100,7 @@ func (srv *Server) _handleHeaderBundle(pp *Peer, msg *MsgDeSoHeaderBundle) { "because error occurred processing header: %v, isOrphan: %v", pp, srv.blockchain.chainState(), err, isOrphan) - pp.Disconnect() + pp.Disconnect("Error processing header") return } } @@ -1310,7 +1310,7 @@ func (srv *Server) _handleHeaderBundle(pp *Peer, msg *MsgDeSoHeaderBundle) { "she indicated that she has more headers but the last hash %v in "+ "the header bundle does not correspond to a block in our index.", pp, lastHash) - pp.Disconnect() + pp.Disconnect("Last hash in header bundle not in our index") return } pp.AddDeSoMessage(&MsgDeSoGetHeaders{ @@ -1372,7 +1372,7 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) { if srv.snapshot == nil { glog.Errorf("srv._handleSnapshot: Received a snapshot message from a peer but srv.snapshot is nil. " + "This peer shouldn't send us snapshot messages because we didn't pass the SFHyperSync flag.") - pp.Disconnect() + pp.Disconnect("handleSnapshot: Snapshot message received but snapshot is nil") return } @@ -1380,7 +1380,7 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) { if srv.blockchain.ChainState() != SyncStateSyncingSnapshot { glog.Errorf("srv._handleSnapshot: Received a snapshot message from peer but chain is not currently syncing from "+ "snapshot. This means peer is most likely misbehaving so we'll disconnect them. Peer: (%v)", pp) - pp.Disconnect() + pp.Disconnect("handleSnapshot: Chain is not syncing from snapshot") return } @@ -1388,7 +1388,7 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) { // We should disconnect the peer because he is misbehaving or doesn't have the snapshot. glog.Errorf("srv._handleSnapshot: Received a snapshot messages with empty snapshot chunk "+ "disconnecting misbehaving peer (%v)", pp) - pp.Disconnect() + pp.Disconnect("handleSnapshot: Empty snapshot chunk received from peer") return } @@ -1426,7 +1426,7 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) { "hyper sync height (%v) and hash (%v)", msg.SnapshotMetadata.SnapshotBlockHeight, msg.SnapshotMetadata.CurrentEpochBlockHash, srv.HyperSyncProgress.SnapshotMetadata.SnapshotBlockHeight, srv.HyperSyncProgress.SnapshotMetadata.CurrentEpochBlockHash) - pp.Disconnect() + pp.Disconnect("handleSnapshot: Snapshot metadata does not match expected snapshot metadata") return } @@ -1443,7 +1443,7 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) { // We should disconnect the peer because he is misbehaving glog.Errorf("srv._handleSnapshot: Problem finding appropriate sync prefix progress "+ "disconnecting misbehaving peer (%v)", pp) - pp.Disconnect() + pp.Disconnect("handleSnapshot: Problem finding appropriate sync prefix progress") return } @@ -1457,7 +1457,7 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) { // We should disconnect the peer because he is misbehaving glog.Errorf("srv._handleSnapshot: HyperSyncProgress epoch checksum bytes does not match that received from peer, "+ "disconnecting misbehaving peer (%v)", pp) - pp.Disconnect() + pp.Disconnect("handleSnapshot: Snapshot checksum bytes do not match expected checksum bytes") return } @@ -1480,7 +1480,7 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) { glog.Errorf("srv._handleSnapshot: Snapshot chunk DBEntry key has mismatched prefix "+ "disconnecting misbehaving peer (%v)", pp) srv.HyperSyncProgress.SnapshotMetadata.CurrentEpochChecksumBytes = prevChecksumBytes - pp.Disconnect() + pp.Disconnect("handleSnapshot: Snapshot chunk DBEntry key has mismatched prefix") return } dbChunk = append(dbChunk, msg.SnapshotChunk[0]) @@ -1492,7 +1492,7 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) { glog.Errorf("srv._handleSnapshot: Received a snapshot chunk that's not in-line with the sync progress "+ "disconnecting misbehaving peer (%v)", pp) srv.HyperSyncProgress.SnapshotMetadata.CurrentEpochChecksumBytes = prevChecksumBytes - pp.Disconnect() + pp.Disconnect("handleSnapshot: Snapshot chunk not in-line with sync progress") return } } @@ -1509,7 +1509,7 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) { glog.Errorf("srv._handleSnapshot: DBEntry key has mismatched prefix "+ "disconnecting misbehaving peer (%v)", pp) srv.HyperSyncProgress.SnapshotMetadata.CurrentEpochChecksumBytes = prevChecksumBytes - pp.Disconnect() + pp.Disconnect("handleSnapshot: DBEntry key has mismatched prefix") return } // Make sure that the dbChunk is sorted increasingly. @@ -1519,7 +1519,7 @@ func (srv *Server) _handleSnapshot(pp *Peer, msg *MsgDeSoSnapshotData) { "value (%v) and second entry with index (%v) and value (%v) disconnecting misbehaving peer (%v)", ii-1, dbChunk[ii-1].Key, ii, dbChunk[ii].Key, pp) srv.HyperSyncProgress.SnapshotMetadata.CurrentEpochChecksumBytes = prevChecksumBytes - pp.Disconnect() + pp.Disconnect("handleSnapshot: dbChunk entries are not sorted") return } } @@ -2185,7 +2185,7 @@ func (srv *Server) _logAndDisconnectPeer(pp *Peer, blockMsg *MsgDeSoBlock, suffi // fetch headers, blocks, etc. So we'll be back. glog.Errorf("Server._handleBlock: Encountered an error processing "+ "block %v. Disconnecting from peer %v: %s", blockMsg, pp, suffix) - pp.Disconnect() + pp.Disconnect("Problem processing block") } // This function handles a single block that we receive from our peer. Originally, we would receive blocks @@ -2258,7 +2258,7 @@ func (srv *Server) _handleBlock(pp *Peer, blk *MsgDeSoBlock, isLastBlock bool) { "found between the received header height %v does not match the checkpoint block info %v", pp, srv.blockchain.chainState(), blk.Header.Height, srv.blockchain.GetCheckpointBlockInfo().Hash.String()) - pp.Disconnect() + pp.Disconnect("Mismatch between received header height and checkpoint block info") return } @@ -2434,7 +2434,7 @@ func (srv *Server) _handleBlockBundle(pp *Peer, bundle *MsgDeSoBlockBundle) { glog.Infof(CLog(Cyan, fmt.Sprintf("Server._handleBlockBundle: Received EMPTY block bundle "+ "at header height ( %v ) from Peer %v. Disconnecting peer since this should never happen.", srv.blockchain.headerTip().Height, pp))) - pp.Disconnect() + pp.Disconnect("Received empty block bundle.") return } glog.Infof(CLog(Cyan, fmt.Sprintf("Server._handleBlockBundle: Received blocks ( %v->%v / %v ) from Peer %v. "+ @@ -2593,7 +2593,7 @@ func (srv *Server) _processTransactions(pp *Peer, transactions []*MsgDeSoTxn) [] glog.Errorf(fmt.Sprintf("Server._handleTransactionBundle: Disconnecting "+ "Peer %v for sending us a transaction %v with fee below the minimum fee %d", pp, txn, srv.mempool.minFeeRateNanosPerKB)) - pp.Disconnect() + pp.Disconnect("Transaction fee below minimum fee") } // Don't do anything else if we got an error. @@ -2673,7 +2673,7 @@ func (srv *Server) _handleAddrMessage(pp *Peer, desoMsg DeSoMessage) { var ok bool if msg, ok = desoMsg.(*MsgDeSoAddr); !ok { glog.Errorf("Server._handleAddrMessage: Problem decoding MsgDeSoAddr: %v", spew.Sdump(desoMsg)) - srv.networkManager.DisconnectById(id) + srv.networkManager.DisconnectById(id, "Problem decoding MsgDeSoAddr") return } @@ -2689,7 +2689,7 @@ func (srv *Server) _handleAddrMessage(pp *Peer, desoMsg DeSoMessage) { "Peer id=%v for sending us an addr message with %d transactions, which exceeds "+ "the max allowed %d", pp.ID, len(msg.AddrList), MaxAddrsPerAddrMsg)) - srv.networkManager.DisconnectById(id) + srv.networkManager.DisconnectById(id, "Addr message too large") return } @@ -2740,7 +2740,7 @@ func (srv *Server) _handleGetAddrMessage(pp *Peer, desoMsg DeSoMessage) { if _, ok := desoMsg.(*MsgDeSoGetAddr); !ok { glog.Errorf("Server._handleAddrMessage: Problem decoding "+ "MsgDeSoAddr: %v", spew.Sdump(desoMsg)) - srv.networkManager.DisconnectById(id) + srv.networkManager.DisconnectById(id, "Problem decoding MsgDeSoGetAddr") return } @@ -2769,7 +2769,7 @@ func (srv *Server) _handleGetAddrMessage(pp *Peer, desoMsg DeSoMessage) { rn := srv.networkManager.GetRemoteNodeById(id) if err := srv.networkManager.SendMessage(rn, res); err != nil { glog.Errorf("Server._handleGetAddrMessage: Problem sending addr message to peer %v: %v", pp, err) - srv.networkManager.DisconnectById(id) + srv.networkManager.DisconnectById(id, "Problem sending addr message") return } } diff --git a/scripts/mempool/mempool_dumper.go b/scripts/mempool/mempool_dumper.go index 8b8c3e3b2..ce4e809d8 100644 --- a/scripts/mempool/mempool_dumper.go +++ b/scripts/mempool/mempool_dumper.go @@ -65,9 +65,9 @@ func main() { } messagesFromPeer := make(chan *lib.ServerMessage) - peer := lib.NewPeer(conn, true, netAddrss, true, + peer := lib.NewPeer(0, conn, true, netAddrss, true, 10000, 0, &lib.DeSoMainnetParams, - messagesFromPeer, nil, nil, lib.NodeSyncTypeAny) + messagesFromPeer, nil, nil, lib.NodeSyncTypeAny, nil) time.Sleep(1 * time.Second) if err := peer.NegotiateVersion(lib.DeSoMainnetParams.VersionNegotiationTimeout); err != nil { panic(err) @@ -163,7 +163,7 @@ func main() { } time.Sleep(1) - peer.Disconnect() + peer.Disconnect("done dumping mempool") break } } @@ -238,7 +238,7 @@ func main() { } } - peer.Disconnect() + peer.Disconnect("done loading txns") } else { fmt.Println("Command must be 'dump' or 'load'")