Skip to content

Commit

Permalink
handle immediately closed connection
Browse files Browse the repository at this point in the history
  • Loading branch information
dennis-tra committed Dec 1, 2023
1 parent 9fa221c commit b8c6c5a
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 94 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -256,3 +256,5 @@ out
.tool-versions
**/*.dat
**/*.mmdb
*.logs
*.log
2 changes: 1 addition & 1 deletion cmd/nebula/cmd_crawl.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ func storeNeighbors[I core.PeerInfo[I]](ctx context.Context, dbc db.Client, dbCr
}
}
log.WithFields(log.Fields{
"duration": time.Since(start),
"duration": time.Since(start).String(),
"avg": fmt.Sprintf("%.2fms", time.Since(start).Seconds()/float64(len(handler.RoutingTables))*1000),
"peers": len(handler.RoutingTables),
"totalNeighbors": neighborsCount,
Expand Down
2 changes: 0 additions & 2 deletions db/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ var KnownErrors = map[string]string{
"can't assign requested address": models.NetErrorCantAssignRequestedAddress, // transient error
"cannot assign requested address": models.NetErrorCantAssignRequestedAddress, // transient error
"connection gated": models.NetErrorConnectionGated, // transient error
"connection closed immediately": models.NetErrorConnectionClosedImmediately,
}

var ErrorStr = map[string]string{}
Expand Down Expand Up @@ -75,7 +74,6 @@ var knownErrorsPrecedence = []string{
"failed to negotiate stream multiplexer",
"resource limit exceeded",
"Write on stream",
"connection closed immediately",
}

// NetError extracts the appropriate error type from the given error.
Expand Down
1 change: 0 additions & 1 deletion db/migrations/000026_add_net_errors.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ BEGIN;
ALTER TYPE net_error ADD VALUE 'connection_reset_by_peer';
ALTER TYPE net_error ADD VALUE 'cant_assign_requested_address';
ALTER TYPE net_error ADD VALUE 'connection_gated';
ALTER TYPE net_error ADD VALUE 'connection_closed_immediately';
ALTER TYPE net_error RENAME VALUE 'no_public_ip' TO 'no_ip_address';

CREATE OR REPLACE FUNCTION calc_max_failed_visits(
Expand Down
46 changes: 22 additions & 24 deletions db/models/boil_types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

121 changes: 76 additions & 45 deletions discv5/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,21 @@ func (c *Crawler) Work(ctx context.Context, task PeerInfo) (core.CrawlResult[Pee
libp2pResult := <-libp2pResultCh
discV5Result := <-discV5ResultCh

properties := c.PeerProperties(task.Node)

if libp2pResult.ConnClosedImmediately {
properties["direct_close"] = true
}

if libp2pResult.GenTCPAddr {
properties["gen_tcp_addr"] = true
}

data, err := json.Marshal(properties)
if err != nil {
log.WithError(err).WithField("properties", properties).Warnln("Could not marshal peer properties")
}

cr := core.CrawlResult[PeerInfo]{
CrawlerID: c.id,
Info: task,
Expand All @@ -75,7 +90,7 @@ func (c *Crawler) Work(ctx context.Context, task PeerInfo) (core.CrawlResult[Pee
CrawlEndTime: time.Now(),
ConnectStartTime: libp2pResult.ConnectStartTime,
ConnectEndTime: libp2pResult.ConnectEndTime,
Properties: c.PeerProperties(task.Node),
Properties: data,
LogErrors: c.cfg.LogErrors,
}

Expand All @@ -85,7 +100,7 @@ func (c *Crawler) Work(ctx context.Context, task PeerInfo) (core.CrawlResult[Pee
return cr, nil
}

func (c *Crawler) PeerProperties(node *enode.Node) json.RawMessage {
func (c *Crawler) PeerProperties(node *enode.Node) map[string]any {
properties := map[string]any{}

properties["seq"] = node.Record().Seq()
Expand Down Expand Up @@ -119,23 +134,19 @@ func (c *Crawler) PeerProperties(node *enode.Node) json.RawMessage {
properties["enr"] = node.String()
}

data, err := json.Marshal(properties)
if err != nil {
log.WithError(err).WithField("properties", properties).Warnln("Could not marshal peer properties")
return nil
}

return data
return properties
}

type Libp2pResult struct {
ConnectStartTime time.Time
ConnectEndTime time.Time
ConnectError error
ConnectErrorStr string
Agent string
Protocols []string
ListenAddrs []ma.Multiaddr
ConnectStartTime time.Time
ConnectEndTime time.Time
ConnectError error
ConnectErrorStr string
Agent string
Protocols []string
ListenAddrs []ma.Multiaddr
ConnClosedImmediately bool // whether conn was no error but still unconnected
GenTCPAddr bool // whether a TCP address was generated
}

func (c *Crawler) crawlLibp2p(ctx context.Context, pi PeerInfo) chan Libp2pResult {
Expand All @@ -144,9 +155,16 @@ func (c *Crawler) crawlLibp2p(ctx context.Context, pi PeerInfo) chan Libp2pResul
go func() {
result := Libp2pResult{}

// sanitize the given addresses like removing UDP-only addresses and
// adding corresponding TCP addresses.
sanitizedAddrs, generated := sanitizeAddrs(pi.Addrs())

// keep track if we generated a TCP address to dial
result.GenTCPAddr = generated

addrInfo := peer.AddrInfo{
ID: pi.ID(),
Addrs: pi.Addrs(),
Addrs: sanitizedAddrs,
}

result.ConnectStartTime = time.Now()
Expand All @@ -156,6 +174,22 @@ func (c *Crawler) crawlLibp2p(ctx context.Context, pi PeerInfo) chan Libp2pResul
// If we could successfully connect to the peer we actually crawl it.
if result.ConnectError == nil {

// check if we're connected
if c.host.Network().Connectedness(pi.ID()) == network.NotConnected {
// this is a weird behavior I was obesrving. Libp2p reports a
// successful connection establishment but isn't connected right
// after the call returned. This point is not a big problem at this
// point because fetchNeighbors will open the connection again. This
// works more often than not but is still weird. At least keep track
// of these cases.
result.ConnClosedImmediately = true

// try it again one more time
if !c.isIdentified(addrInfo.ID) {
_ = c.connect(ctx, addrInfo)
}
}

// wait for the Identify exchange to complete
c.identifyWait(ctx, addrInfo)

Expand Down Expand Up @@ -207,11 +241,6 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
return fmt.Errorf("skipping node as it has no public IP address") // change knownErrs map if changing this msg
}

dialAddrInfo := peer.AddrInfo{
ID: pi.ID,
Addrs: sanitizeAddrs(pi.Addrs),
}

var (
retry int = 0
maxRetries int = 2
Expand All @@ -221,22 +250,16 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
for {
logEntry := log.WithFields(log.Fields{
"timeout": c.cfg.DialTimeout.String(),
"remoteID": dialAddrInfo.ID.String(),
"remoteID": pi.ID.String(),
"retry": retry,
"maddrs": dialAddrInfo.Addrs,
"maddrs": pi.Addrs,
})
logEntry.Debugln("Connecting to peer", dialAddrInfo.ID.ShortString())
logEntry.Debugln("Connecting to peer", pi.ID.ShortString())

timeoutCtx, cancel := context.WithTimeout(ctx, c.cfg.DialTimeout)
err := c.host.Connect(timeoutCtx, dialAddrInfo)
err := c.host.Connect(timeoutCtx, pi)
cancel()

// if libp2p says we established a connection, but we're not actually
// connected, assign a custom error.
if err == nil && c.host.Network().Connectedness(pi.ID) != network.Connected {
err = fmt.Errorf("connection closed immediately")
}

// if we still don't have an error (despite the above custom error
// handling), we return to the caller.
if err == nil {
Expand All @@ -247,29 +270,31 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
// because subsequent connection attempts have a shorter timeout which
// means that it's more likely to run into a context.DeadlineExceeded
// error. If that's the case, we return the original error for tracking
// purposes.
// purposes. If the error is nil, we're not connected and not identified
// firstErr will stay nil. This is fine because we'll track that outside
// of this connect call.
if firstErr == nil {
firstErr = err
}

switch true {
case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorNegotiateSecurityProtocol]):
case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorConnectionRefused]):
case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorConnectionResetByPeer]):
case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorConnectionClosedImmediately]):
case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorConnectionGated]):
case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorCantAssignRequestedAddress]):
// case strings.Contains(err.Error(), db.ErrorStr[models.NetErrorConnectionResetByPeer]): often because of mismatching peer ids, so excluding it for now
default:
if errors.Is(err, context.DeadlineExceeded) {
err = firstErr
}
logEntry.WithError(err).Debugln("Failed connecting to peer", dialAddrInfo.ID.ShortString())
logEntry.WithError(err).Debugln("Failed connecting to peer", pi.ID.ShortString())
return err
}

if retry == maxRetries {
if errors.Is(err, context.DeadlineExceeded) {
err = firstErr
}
logEntry.WithError(err).Debugln("Exceeded retries connecting to peer", dialAddrInfo.ID.ShortString())
logEntry.WithError(err).Debugln("Exceeded retries connecting to peer", pi.ID.ShortString())
return err
}

Expand All @@ -291,7 +316,7 @@ func (c *Crawler) connect(ctx context.Context, pi peer.AddrInfo) error {
// there is no other reliable transport address like TCP or QUIC we use the UDP
// IP address + port and craft a TCP address out of it. The UDP address will
// still be removed and replaced with TCP.
func sanitizeAddrs(maddrs []ma.Multiaddr) []ma.Multiaddr {
func sanitizeAddrs(maddrs []ma.Multiaddr) ([]ma.Multiaddr, bool) {
newMaddrs := make([]ma.Multiaddr, 0, len(maddrs))
for _, maddr := range maddrs {
if _, err := maddr.ValueForProtocol(ma.P_TCP); err == nil {
Expand All @@ -306,7 +331,7 @@ func sanitizeAddrs(maddrs []ma.Multiaddr) []ma.Multiaddr {
}

if len(newMaddrs) > 0 {
return newMaddrs
return newMaddrs, false
}

for i, maddr := range maddrs {
Expand Down Expand Up @@ -338,10 +363,10 @@ func sanitizeAddrs(maddrs []ma.Multiaddr) []ma.Multiaddr {

newMaddrs = append(newMaddrs, tcpMaddr)

return newMaddrs
return newMaddrs, true
}

return maddrs
return maddrs, false
}

// identifyWait waits until any connection to a peer passed the Identify
Expand All @@ -350,7 +375,7 @@ func sanitizeAddrs(maddrs []ma.Multiaddr) []ma.Multiaddr {
// identified in the past. We detect a successful identification if an
// AgentVersion is stored in the peer store
func (c *Crawler) identifyWait(ctx context.Context, pi peer.AddrInfo) {
timeoutCtx, cancel := context.WithTimeout(ctx, 15*time.Second) // TODO: parameterize
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second) // TODO: parameterize
defer cancel()

var wg sync.WaitGroup
Expand All @@ -368,8 +393,7 @@ func (c *Crawler) identifyWait(ctx context.Context, pi peer.AddrInfo) {
// check if identification was successful by looking for
// the AgentVersion key. If it exists, we cancel the
// identification of the remaining connections.
agent, err := c.host.Peerstore().Get(pi.ID, "AgentVersion")
if err == nil && agent.(string) != "" {
if c.isIdentified(pi.ID) {
cancel()
return
}
Expand All @@ -380,6 +404,13 @@ func (c *Crawler) identifyWait(ctx context.Context, pi peer.AddrInfo) {
wg.Wait()
}

// isIdentified returns true if the given peer.ID was successfully identified.
// Just because IdentifyWait returns doesn't mean the peer was identified.
func (c *Crawler) isIdentified(pid peer.ID) bool {
agent, err := c.host.Peerstore().Get(pid, "AgentVersion")
return err == nil && agent.(string) != ""
}

type DiscV5Result struct {
// The time we received the first successful response
RespondedAt *time.Time
Expand Down
Loading

0 comments on commit b8c6c5a

Please sign in to comment.