Skip to content

Commit

Permalink
discovery: revamp premature update map
Browse files Browse the repository at this point in the history
Turns out we need it right now to handle some low latency race
conditions in our integration tests, so we'll opt to simply cap the size
of it to a low amount. We use a basic LRU caching mechainsm.

Fixes lightningnetwork#5076
  • Loading branch information
Roasbeef committed Nov 4, 2021
1 parent a6f22c6 commit 8627b5d
Showing 1 changed file with 48 additions and 14 deletions.
62 changes: 48 additions & 14 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcutil"
"github.com/davecgh/go-spew/spew"
"github.com/lightninglabs/neutrino/cache"
"github.com/lightninglabs/neutrino/cache/lru"
"github.com/lightningnetwork/lnd/batch"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
Expand All @@ -38,6 +40,10 @@ const (
// determine how often we should allow a new update for a specific
// channel and direction.
DefaultChannelUpdateInterval = time.Minute

// maxPrematureUpdates tracks the max amount of premature channel
// updates that we'll hold onto.
maxPrematureUpdates = 100
)

var (
Expand Down Expand Up @@ -268,6 +274,19 @@ type Config struct {
ChannelUpdateInterval time.Duration
}

// cachedNetworkMsg is a wrapper around a network message that can be used with
// *lru.Cache.
type cachedNetworkMsg struct {
msgs []*networkMsg
}

// Size returns the "size" of an entry. We return the number of items as we
// just want to limit the total amount of entires rather than do accurate size
// accounting.
func (c *cachedNetworkMsg) Size() (uint64, error) {
return uint64(len(c.msgs)), nil
}

// AuthenticatedGossiper is a subsystem which is responsible for receiving
// announcements, validating them and applying the changes to router, syncing
// lightning network with newly connected nodes, broadcasting announcements
Expand Down Expand Up @@ -302,8 +321,7 @@ type AuthenticatedGossiper struct {
// that wasn't associated with any channel we know about. We store
// them temporarily, such that we can reprocess them when a
// ChannelAnnouncement for the channel is received.
prematureChannelUpdates map[uint64][]*networkMsg
pChanUpdMtx sync.Mutex
prematureChannelUpdates *lru.Cache

// networkMsgs is a channel that carries new network broadcasted
// message from outside the gossiper service to be processed by the
Expand Down Expand Up @@ -368,7 +386,7 @@ func New(cfg Config, selfKeyDesc *keychain.KeyDescriptor) *AuthenticatedGossiper
networkMsgs: make(chan *networkMsg),
quit: make(chan struct{}),
chanPolicyUpdates: make(chan *chanPolicyUpdateRequest),
prematureChannelUpdates: make(map[uint64][]*networkMsg),
prematureChannelUpdates: lru.NewCache(maxPrematureUpdates),
channelMtx: multimutex.NewMutex(),
recentRejects: make(map[uint64]struct{}),
chanUpdateRateLimiter: make(map[uint64][2]*rate.Limiter),
Expand Down Expand Up @@ -1774,13 +1792,14 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
shortChanID := msg.ShortChannelID.ToUint64()
var channelUpdates []*networkMsg

d.pChanUpdMtx.Lock()
channelUpdates = append(channelUpdates, d.prematureChannelUpdates[shortChanID]...)

// Now delete the premature ChannelUpdates, since we added them
// all to the queue of network messages.
delete(d.prematureChannelUpdates, shortChanID)
d.pChanUpdMtx.Unlock()
earlyChanUpdates, err := d.prematureChannelUpdates.Get(shortChanID)
if err == nil {
// There was actually an entry in the map, so we'll
// accumulate it. We don't worry about deletion, since
// it'll eventually fall out anyway.
chanMsgs := earlyChanUpdates.(*cachedNetworkMsg)
channelUpdates = append(channelUpdates, chanMsgs.msgs...)
}

// Launch a new goroutine to handle each ChannelUpdate, this to
// ensure we don't block here, as we can handle only one
Expand Down Expand Up @@ -1929,11 +1948,26 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(
// of this, we temporarily add it to a map, and
// reprocess it after our own ChannelAnnouncement has
// been processed.
d.pChanUpdMtx.Lock()
d.prematureChannelUpdates[shortChanID] = append(
d.prematureChannelUpdates[shortChanID], nMsg,
earlyMsgs, err := d.prematureChannelUpdates.Get(
shortChanID,
)
d.pChanUpdMtx.Unlock()
switch {
// Nothing in the cache yeyt, we can just directly
// insert this element.
case err == cache.ErrElementNotFound:
d.prematureChannelUpdates.Put(shortChanID, &cachedNetworkMsg{
msgs: []*networkMsg{nMsg},
})

// There's already something in the cache, so we'll
// combine the set of messagesa into a single value.
default:
msgs := earlyMsgs.(*cachedNetworkMsg).msgs
msgs = append(msgs, nMsg)
d.prematureChannelUpdates.Put(shortChanID, &cachedNetworkMsg{
msgs: msgs,
})
}

log.Debugf("Got ChannelUpdate for edge not found in "+
"graph(shortChanID=%v), saving for "+
Expand Down

0 comments on commit 8627b5d

Please sign in to comment.