Skip to content

Commit

Permalink
Add disconnect reason to peer disconnect logic (#1378)
Browse files Browse the repository at this point in the history
  • Loading branch information
lazynina authored Jun 28, 2024
1 parent bb87ca8 commit 8bba030
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 70 deletions.
8 changes: 4 additions & 4 deletions integration_testing/connection_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,10 @@ func (bridge *ConnectionBridge) Disconnect() {
}

bridge.disabled = true
bridge.connectionInboundA.Disconnect()
bridge.connectionInboundB.Disconnect()
bridge.connectionOutboundA.Disconnect()
bridge.connectionOutboundB.Disconnect()
bridge.connectionInboundA.Disconnect("ConnectionBridge.Disconnect")
bridge.connectionInboundB.Disconnect("ConnectionBridge.Disconnect")
bridge.connectionOutboundA.Disconnect("ConnectionBridge.Disconnect")
bridge.connectionOutboundB.Disconnect("ConnectionBridge.Disconnect")
bridge.outboundListenerA.Close()
bridge.outboundListenerB.Close()

Expand Down
8 changes: 4 additions & 4 deletions lib/connection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func (cmgr *ConnectionManager) CloseConnection(peerId uint64) {
if !ok {
return
}
peer.Disconnect()
peer.Disconnect("ConnectionManager.CloseConnection: Closing connection to peer")
}

// Update our data structures to remove this peer.
Expand Down Expand Up @@ -550,15 +550,15 @@ func (cmgr *ConnectionManager) Stop() {
len(cmgr.persistentPeers))
for _, peer := range cmgr.inboundPeers {
glog.V(1).Infof(CLog(Red, fmt.Sprintf("ConnectionManager.Stop: Inbound peer (%v)", peer)))
peer.Disconnect()
peer.Disconnect("ConnectionManager.Stop: Stopping ConnectionManager")
}
for _, peer := range cmgr.outboundPeers {
glog.V(1).Infof("ConnectionManager.Stop: Outbound peer (%v)", peer)
peer.Disconnect()
peer.Disconnect("ConnectionManager.Stop: Stopping ConnectionManager")
}
for _, peer := range cmgr.persistentPeers {
glog.V(1).Infof("ConnectionManager.Stop: Persistent peer (%v)", peer)
peer.Disconnect()
peer.Disconnect("ConnectionManager.Stop: Stopping ConnectionManager")
}

// Close all of the listeners.
Expand Down
46 changes: 23 additions & 23 deletions lib/network_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ func (nm *NetworkManager) _handleVersionMessage(origin *Peer, desoMsg DeSoMessag
if verMsg, ok = desoMsg.(*MsgDeSoVersion); !ok {
glog.Errorf("NetworkManager.handleVersionMessage: Disconnecting RemoteNode with id: (%v) "+
"error casting version message", origin.ID)
nm.Disconnect(rn)
nm.Disconnect(rn, "error casting version message")
return
}

Expand All @@ -264,7 +264,7 @@ func (nm *NetworkManager) _handleVersionMessage(origin *Peer, desoMsg DeSoMessag
nm.usedNonces.Delete(msgNonce)
glog.Errorf("NetworkManager.handleVersionMessage: Disconnecting RemoteNode with id: (%v) "+
"nonce collision, nonce (%v)", origin.ID, msgNonce)
nm.Disconnect(rn)
nm.Disconnect(rn, "nonce collision")
return
}

Expand All @@ -273,7 +273,7 @@ func (nm *NetworkManager) _handleVersionMessage(origin *Peer, desoMsg DeSoMessag
if err := rn.HandleVersionMessage(verMsg, responseNonce); err != nil {
glog.Errorf("NetworkManager.handleVersionMessage: Requesting PeerDisconnect for id: (%v) "+
"error handling version message: %v", origin.ID, err)
nm.Disconnect(rn)
nm.Disconnect(rn, fmt.Sprintf("error handling version message: %v", err))
return

}
Expand All @@ -297,15 +297,15 @@ func (nm *NetworkManager) _handleVerackMessage(origin *Peer, desoMsg DeSoMessage
if vrkMsg, ok = desoMsg.(*MsgDeSoVerack); !ok {
glog.Errorf("NetworkManager.handleVerackMessage: Disconnecting RemoteNode with id: (%v) "+
"error casting verack message", origin.ID)
nm.Disconnect(rn)
nm.Disconnect(rn, "error casting verack message")
return
}

// Call HandleVerackMessage on the RemoteNode.
if err := rn.HandleVerackMessage(vrkMsg); err != nil {
glog.Errorf("NetworkManager.handleVerackMessage: Requesting PeerDisconnect for id: (%v) "+
"error handling verack message: %v", origin.ID, err)
nm.Disconnect(rn)
nm.Disconnect(rn, fmt.Sprintf("error handling verack message: %v", err))
return
}

Expand All @@ -321,7 +321,7 @@ func (nm *NetworkManager) _handleDisconnectedPeerMessage(origin *Peer, desoMsg D

glog.V(2).Infof("NetworkManager._handleDisconnectedPeerMessage: Handling disconnected peer message for "+
"id=%v", origin.ID)
nm.DisconnectById(NewRemoteNodeId(origin.ID))
nm.DisconnectById(NewRemoteNodeId(origin.ID), "peer disconnected")
// Update the persistentIpToRemoteNodeIdsMap, in case the disconnected peer was a persistent peer.
ipRemoteNodeIdMap := nm.persistentIpToRemoteNodeIdsMap.ToMap()
for ip, id := range ipRemoteNodeIdMap {
Expand Down Expand Up @@ -459,7 +459,7 @@ func (nm *NetworkManager) processOutboundConnection(conn Connection) (*RemoteNod
func (nm *NetworkManager) cleanupFailedInboundConnection(remoteNode *RemoteNode, connection Connection) {
glog.V(2).Infof("NetworkManager.cleanupFailedInboundConnection: Cleaning up failed inbound connection")
if remoteNode != nil {
nm.Disconnect(remoteNode)
nm.Disconnect(remoteNode, "cleaning up failed inbound connection")
}
connection.Close()
}
Expand All @@ -478,7 +478,7 @@ func (nm *NetworkManager) cleanupFailedOutboundConnection(connection Connection)
id := NewRemoteNodeId(oc.attemptId)
rn := nm.GetRemoteNodeById(id)
if rn != nil {
nm.Disconnect(rn)
nm.Disconnect(rn, "cleaning up failed outbound connection")
}
oc.Close()
nm.cmgr.RemoveAttemptedOutboundAddrs(oc.address)
Expand Down Expand Up @@ -578,14 +578,14 @@ func (nm *NetworkManager) refreshValidatorIndices() {
if _, ok := nm.GetValidatorOutboundIndex().Get(pk.Serialize()); ok {
glog.V(2).Infof("NetworkManager.refreshValidatorIndices: Disconnecting Validator RemoteNode "+
"(%v) has validator public key (%v) that is already present in validator index", rn, pk)
nm.Disconnect(rn)
nm.Disconnect(rn, "outbound - validator public key already present in validator index")
continue
}
} else {
if _, ok := nm.GetValidatorInboundIndex().Get(pk.Serialize()); ok {
glog.V(2).Infof("NetworkManager.refreshValidatorIndices: Disconnecting Validator RemoteNode "+
"(%v) has validator public key (%v) that is already present in validator index", rn, pk)
nm.Disconnect(rn)
nm.Disconnect(rn, "inbound - validator public key already present in validator index")
continue
}
}
Expand Down Expand Up @@ -704,7 +704,7 @@ func (nm *NetworkManager) refreshNonValidatorOutboundIndex() {
}
glog.V(2).Infof("NetworkManager.refreshNonValidatorOutboundIndex: Disconnecting attempted remote "+
"node (id=%v) due to excess outbound RemoteNodes", rn.GetId())
nm.Disconnect(rn)
nm.Disconnect(rn, "excess attempted outbound RemoteNodes")
excessiveOutboundRemoteNodes--
}
// Now disconnect the connected remote nodes, if we still have too many remote nodes.
Expand All @@ -714,7 +714,7 @@ func (nm *NetworkManager) refreshNonValidatorOutboundIndex() {
}
glog.V(2).Infof("NetworkManager.refreshNonValidatorOutboundIndex: Disconnecting connected remote "+
"node (id=%v) due to excess outbound RemoteNodes", rn.GetId())
nm.Disconnect(rn)
nm.Disconnect(rn, "excess connected outbound RemoteNodes")
excessiveOutboundRemoteNodes--
}
}
Expand Down Expand Up @@ -754,7 +754,7 @@ func (nm *NetworkManager) refreshNonValidatorInboundIndex() {
}
glog.V(2).Infof("NetworkManager.refreshNonValidatorInboundIndex: Disconnecting inbound remote "+
"node (id=%v) due to excess inbound RemoteNodes", rn.GetId())
nm.Disconnect(rn)
nm.Disconnect(rn, "excess inbound RemoteNodes")
excessiveInboundRemoteNodes--
}
}
Expand Down Expand Up @@ -943,7 +943,7 @@ func (nm *NetworkManager) AttachOutboundConnection(
}

if err := remoteNode.AttachOutboundConnection(conn, na, isPersistent); err != nil {
nm.Disconnect(remoteNode)
nm.Disconnect(remoteNode, fmt.Sprintf("error attaching outbound connection: %v", err))
return nil, errors.Wrapf(err, "NetworkManager.AttachOutboundConnection: Problem calling AttachOutboundConnection "+
"for addr: (%s). Disconnecting remote node (id=%v)", conn.RemoteAddr().String(), remoteNode.GetId())
}
Expand All @@ -959,7 +959,7 @@ func (nm *NetworkManager) DisconnectAll() {
allRemoteNodes := nm.GetAllRemoteNodes().GetAll()
for _, rn := range allRemoteNodes {
glog.V(2).Infof("NetworkManager.DisconnectAll: Disconnecting from remote node (id=%v)", rn.GetId())
nm.Disconnect(rn)
nm.Disconnect(rn, "disconnecting all remote nodes")
}
}

Expand Down Expand Up @@ -1001,22 +1001,22 @@ func (nm *NetworkManager) ProcessCompletedHandshake(remoteNode *RemoteNode) {
nm.srv.maybeRequestAddresses(remoteNode)
}

func (nm *NetworkManager) Disconnect(rn *RemoteNode) {
func (nm *NetworkManager) Disconnect(rn *RemoteNode, disconnectReason string) {
if rn == nil {
return
}
glog.V(2).Infof("NetworkManager.Disconnect: Disconnecting from remote node id=%v", rn.GetId())
rn.Disconnect()
rn.Disconnect(disconnectReason)
nm.removeRemoteNodeFromIndexer(rn)
}

func (nm *NetworkManager) DisconnectById(id RemoteNodeId) {
func (nm *NetworkManager) DisconnectById(id RemoteNodeId, disconnectReason string) {
rn := nm.GetRemoteNodeById(id)
if rn == nil {
return
}

nm.Disconnect(rn)
nm.Disconnect(rn, disconnectReason)
}

func (nm *NetworkManager) SendMessage(rn *RemoteNode, desoMessage DeSoMessage) error {
Expand Down Expand Up @@ -1067,7 +1067,7 @@ func (nm *NetworkManager) Cleanup() {
for _, rn := range allRemoteNodes {
if rn.IsTimedOut() {
glog.V(2).Infof("NetworkManager.Cleanup: Disconnecting from remote node (id=%v)", rn.GetId())
nm.Disconnect(rn)
nm.Disconnect(rn, "cleanup")
}
}
}
Expand Down Expand Up @@ -1261,7 +1261,7 @@ func (nm *NetworkManager) InitiateHandshake(rn *RemoteNode) {
nonce := uint64(RandInt64(math.MaxInt64))
if err := rn.InitiateHandshake(nonce); err != nil {
glog.Errorf("NetworkManager.InitiateHandshake: Error initiating handshake: %v", err)
nm.Disconnect(rn)
nm.Disconnect(rn, fmt.Sprintf("error initiating handshake: %v", err))
}
nm.usedNonces.Add(nonce)
}
Expand All @@ -1285,7 +1285,7 @@ func (nm *NetworkManager) handleHandshakeComplete(remoteNode *RemoteNode) {
if err := nm.handleHandshakeCompletePoSMessage(remoteNode); err != nil {
glog.Errorf("NetworkManager.handleHandshakeComplete: Error handling PoS handshake peer message: %v, "+
"remoteNodePk (%s)", err, remoteNode.GetValidatorPublicKey().Serialize())
nm.Disconnect(remoteNode)
nm.Disconnect(remoteNode, fmt.Sprintf("error handling PoS handshake peer message: %v", err))
return
}
nm.ProcessCompletedHandshake(remoteNode)
Expand Down Expand Up @@ -1317,7 +1317,7 @@ func (nm *NetworkManager) handleHandshakeCompletePoSMessage(remoteNode *RemoteNo
existingValidator, ok := nm.GetValidatorOutboundIndex().Get(validatorPk.Serialize())
if ok && remoteNode.GetId() != existingValidator.GetId() {
if remoteNode.IsPersistent() && !existingValidator.IsPersistent() {
nm.Disconnect(existingValidator)
nm.Disconnect(existingValidator, "outbound - duplicate validator public key")
return nil
}
return fmt.Errorf("NetworkManager.handleHandshakeCompletePoSMessage: Outbound RemoteNode with duplicate validator public key. "+
Expand Down
35 changes: 22 additions & 13 deletions lib/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ type Peer struct {
// Set to zero until Disconnect has been called on the Peer. Used to make it
// so that the logic in Disconnect will only be executed once.
disconnected int32
// TODO: This should be an enum.
// disconnectReason is the reason the peer was disconnected. It is set the first
// time Disconnect is called on a peer.
disconnectReason string
// Signals that the peer is now in the stopped state.
quit chan interface{}

Expand Down Expand Up @@ -413,7 +417,7 @@ func (pp *Peer) HandleGetBlocks(msg *MsgDeSoGetBlocks) {
// GetHeaders request.
glog.Errorf("Server._handleGetBlocks: Disconnecting peer %v because "+
"she asked for a block with hash %v that we don't have", pp, msg.HashList[0])
pp.Disconnect()
pp.Disconnect("handleGetBlocks - requested block with hash we don't have. protocolV2")
return
}
allBlocks.Blocks = append(allBlocks.Blocks, blockToSend)
Expand Down Expand Up @@ -442,7 +446,7 @@ func (pp *Peer) HandleGetBlocks(msg *MsgDeSoGetBlocks) {
// GetHeaders request.
glog.Errorf("Server._handleGetBlocks: Disconnecting peer %v because "+
"she asked for a block with hash %v that we don't have", pp, msg.HashList[0])
pp.Disconnect()
pp.Disconnect("handleGetBlocks - requested block with hash we don't have. protocol < v2")
return
}
pp.AddDeSoMessage(blockToSend, false)
Expand All @@ -464,7 +468,7 @@ func (pp *Peer) HandleGetSnapshot(msg *MsgDeSoGetSnapshot) {
if pp.snapshotChunkRequestInFlight {
glog.V(1).Infof("Peer.HandleGetSnapshot: Ignoring GetSnapshot from Peer %v"+
"because he already requested a GetSnapshot", pp)
pp.Disconnect()
pp.Disconnect("handleGetSnapshot - peer already requested a snapshot chunk")
return
}
pp.snapshotChunkRequestInFlight = true
Expand All @@ -474,7 +478,7 @@ func (pp *Peer) HandleGetSnapshot(msg *MsgDeSoGetSnapshot) {
if pp.srv.snapshot == nil {
glog.Errorf("Peer.HandleGetSnapshot: Ignoring GetSnapshot from Peer %v "+
"and disconnecting because node doesn't support HyperSync", pp)
pp.Disconnect()
pp.Disconnect("handleGetSnapshot - peer doesn't support HyperSync")
return
}

Expand All @@ -497,7 +501,7 @@ func (pp *Peer) HandleGetSnapshot(msg *MsgDeSoGetSnapshot) {
if len(msg.SnapshotStartKey) == 0 || len(msg.GetPrefix()) == 0 {
glog.Errorf("Peer.HandleGetSnapshot: Ignoring GetSnapshot from Peer %v "+
"because SnapshotStartKey or Prefix are empty", pp)
pp.Disconnect()
pp.Disconnect("handleGetSnapshot - empty SnapshotStartKey or Prefix")
return
}

Expand Down Expand Up @@ -788,7 +792,7 @@ out:
func (pp *Peer) String() string {
isDisconnected := ""
if pp.disconnected != 0 {
isDisconnected = ", DISCONNECTED"
isDisconnected = fmt.Sprintf(", DISCONNECTED (%v)", pp.disconnectReason)
}
return fmt.Sprintf("[ Remote Address: %v%s PeerID=%d ]", pp.addrStr, isDisconnected, pp.ID)
}
Expand Down Expand Up @@ -1007,7 +1011,7 @@ out:
glog.V(3).Infof("Writing Message: (%v)", msg)
if err := pp.WriteDeSoMessage(msg); err != nil {
glog.Errorf("Peer.outHandler: Problem sending message to peer: %v: %v", pp, err)
pp.Disconnect()
pp.Disconnect("outHandler - problem sending message to peer")
}
case <-stallTicker.C:
// Every second take a look to see if there's something that the peer should
Expand All @@ -1025,7 +1029,9 @@ out:
glog.Errorf("Peer.outHandler: Peer %v took too long to response to "+
"reqest. Expected MsgType=%v at time %v but it is now time %v",
pp, firstEntry.MessageType, firstEntry.TimeExpected, nowTime)
pp.Disconnect()
pp.Disconnect(fmt.Sprintf(
"outHandler - peer took too long to respond to request, expected MsgType=%v",
firstEntry.MessageType))
}

case <-pp.quit:
Expand Down Expand Up @@ -1060,7 +1066,7 @@ func (pp *Peer) _maybeAddBlocksToSend(msg DeSoMessage) error {
// We can safely increase this without breaking backwards-compatibility because old
// nodes will never send us more hashes than this.
if len(pp.blocksToSend) > MaxBlocksInFlightPoS {
pp.Disconnect()
pp.Disconnect("maybeAddBlocksToSend - too many blocks requested")
return fmt.Errorf("_maybeAddBlocksToSend: Disconnecting peer %v because she requested %d "+
"blocks, which is more than the %d blocks allowed "+
"in flight", pp, len(pp.blocksToSend), MaxBlocksInFlightPoS)
Expand Down Expand Up @@ -1148,7 +1154,7 @@ func (pp *Peer) inHandler() {
// is processed.
idleTimer := time.AfterFunc(idleTimeout, func() {
glog.V(1).Infof("Peer.inHandler: Peer %v no answer for %v -- disconnecting", pp, idleTimeout)
pp.Disconnect()
pp.Disconnect("inHandler - no answer for idleTimeout")
})

out:
Expand Down Expand Up @@ -1230,7 +1236,7 @@ out:
idleTimer.Stop()

// Disconnect the Peer if it isn't already.
pp.Disconnect()
pp.Disconnect("inHandler - done processing messages")

glog.V(1).Infof("Peer.inHandler: done for peer: %v", pp)
}
Expand Down Expand Up @@ -1326,15 +1332,18 @@ func (pp *Peer) ReadDeSoMessage() (DeSoMessage, error) {
return msg, nil
}

// TODO: Disconnect reason enum

// Disconnect closes a peer's network connection.
func (pp *Peer) Disconnect() {
func (pp *Peer) Disconnect(reason string) {
// Only run the logic the first time Disconnect is called.
glog.V(1).Infof(CLog(Yellow, "Peer.Disconnect: Starting"))
glog.V(0).Infof(CLog(Yellow, "Peer.Disconnect: Starting for Peer %v with reason: %v"), pp, reason)
if atomic.LoadInt32(&pp.disconnected) != 0 {
glog.V(1).Infof("Peer.Disconnect: Disconnect call ignored since it was already called before for Peer %v", pp)
return
}
atomic.AddInt32(&pp.disconnected, 1)
pp.disconnectReason = reason

glog.V(2).Infof("Peer.Disconnect: Running Disconnect for the first time for Peer %v", pp)

Expand Down
2 changes: 1 addition & 1 deletion lib/remote_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ func (rn *RemoteNode) AttachOutboundConnection(conn net.Conn, na *wire.NetAddres
}

// Disconnect disconnects the remote node, closing the attempted connection or the established connection.
func (rn *RemoteNode) Disconnect() {
func (rn *RemoteNode) Disconnect(disconnectReason string) {
rn.mtx.Lock()
defer rn.mtx.Unlock()

Expand Down
Loading

0 comments on commit 8bba030

Please sign in to comment.