forked from lightningnetwork/lnd
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsequencer.go
128 lines (103 loc) · 3.66 KB
/
sequencer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package htlcswitch
import (
"sync"
"github.com/go-errors/errors"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/kvdb"
)
// defaultSequenceBatchSize specifies the window of sequence numbers that are
// allocated for each write to disk made by the sequencer.
const defaultSequenceBatchSize = 1000
// Sequencer emits sequence numbers for locally initiated HTLCs. These are
// only used internally for tracking pending payments, however they must be
// unique in order to avoid circuit key collision in the circuit map.
type Sequencer interface {
// NextID returns a unique sequence number for each invocation.
NextID() (uint64, error)
}
var (
// nextPaymentIDKey identifies the bucket that will keep track of the
// persistent sequence numbers for payments.
nextPaymentIDKey = []byte("next-payment-id-key")
// ErrSequencerCorrupted signals that the persistence engine was not
// initialized, or has been corrupted since startup.
ErrSequencerCorrupted = errors.New(
"sequencer database has been corrupted")
)
// persistentSequencer is a concrete implementation of IDGenerator, that uses
// channeldb to allocate sequence numbers.
type persistentSequencer struct {
db *channeldb.DB
mu sync.Mutex
nextID uint64
horizonID uint64
}
// NewPersistentSequencer initializes a new sequencer using a channeldb backend.
func NewPersistentSequencer(db *channeldb.DB) (Sequencer, error) {
g := &persistentSequencer{
db: db,
}
// Ensure the database bucket is created before any updates are
// performed.
if err := g.initDB(); err != nil {
return nil, err
}
return g, nil
}
// NextID returns a unique sequence number for every invocation, persisting the
// assignment to avoid reuse.
func (s *persistentSequencer) NextID() (uint64, error) {
// nextID will be the unique sequence number returned if no errors are
// encountered.
var nextID uint64
// If our sequence batch has not been exhausted, we can allocate the
// next identifier in the range.
s.mu.Lock()
defer s.mu.Unlock()
if s.nextID < s.horizonID {
nextID = s.nextID
s.nextID++
return nextID, nil
}
// Otherwise, our sequence batch has been exhausted. We use the last
// known sequence number on disk to mark the beginning of the next
// sequence batch, and allocate defaultSequenceBatchSize (1000) at a
// time.
//
// NOTE: This also will happen on the first invocation after startup,
// i.e. when nextID and horizonID are both 0. The next sequence batch to be
// allocated will start from the last known tip on disk, which is fine
// as we only require uniqueness of the allocated numbers.
var nextHorizonID uint64
if err := kvdb.Update(s.db, func(tx kvdb.RwTx) error {
nextIDBkt := tx.ReadWriteBucket(nextPaymentIDKey)
if nextIDBkt == nil {
return ErrSequencerCorrupted
}
nextID = nextIDBkt.Sequence()
nextHorizonID = nextID + defaultSequenceBatchSize
// Cannot fail when used in Update.
nextIDBkt.SetSequence(nextHorizonID)
return nil
}); err != nil {
return 0, err
}
// Never assign index zero, to avoid collisions with the EmptyKeystone.
if nextID == 0 {
nextID++
}
// If our batch sequence allocation succeed, update our in-memory values
// so we can continue to allocate sequence numbers without hitting disk.
// The nextID is incremented by one in memory so the in can be used
// issued directly on the next invocation.
s.nextID = nextID + 1
s.horizonID = nextHorizonID
return nextID, nil
}
// initDB populates the bucket used to generate payment sequence numbers.
func (s *persistentSequencer) initDB() error {
return kvdb.Update(s.db, func(tx kvdb.RwTx) error {
_, err := tx.CreateTopLevelBucket(nextPaymentIDKey)
return err
})
}