forked from ava-labs/hypersdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathproposer.go
379 lines (338 loc) · 9.23 KB
/
proposer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
// Copyright (C) 2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package gossiper
import (
"context"
"errors"
"sync"
"sync/atomic"
"time"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/utils/timer"
"github.com/ava-labs/avalanchego/vms/proposervm/proposer"
"github.com/ava-labs/hypersdk/cache"
"github.com/ava-labs/hypersdk/chain"
"github.com/ava-labs/hypersdk/consts"
"github.com/ava-labs/hypersdk/workers"
"go.uber.org/zap"
)
var _ Gossiper = (*Proposer)(nil)
type Proposer struct {
vm VM
cfg *ProposerConfig
appSender common.AppSender
doneGossip chan struct{}
lastVerified int64
fl sync.Mutex
q chan struct{}
lastQueue int64
timer *timer.Timer
waiting atomic.Bool
// cache is thread-safe
cache *cache.FIFO[ids.ID, any]
}
type ProposerConfig struct {
GossipProposerDiff int
GossipProposerDepth int
GossipMinLife int64 // ms
GossipMaxSize int
GossipMinDelay int64 // ms
NoGossipBuilderDiff int
VerifyTimeout int64 // ms
SeenCacheSize int
}
func DefaultProposerConfig() *ProposerConfig {
return &ProposerConfig{
GossipProposerDiff: 4,
GossipProposerDepth: 1,
GossipMinLife: 5 * 1000,
GossipMaxSize: consts.NetworkSizeLimit,
GossipMinDelay: 50,
NoGossipBuilderDiff: 4,
VerifyTimeout: proposer.MaxDelay.Milliseconds(),
SeenCacheSize: 2_500_000,
}
}
func NewProposer(vm VM, cfg *ProposerConfig) (*Proposer, error) {
g := &Proposer{
vm: vm,
cfg: cfg,
doneGossip: make(chan struct{}),
lastVerified: -1,
q: make(chan struct{}),
lastQueue: -1,
}
g.timer = timer.NewTimer(g.handleTimerNotify)
cache, err := cache.NewFIFO[ids.ID, any](cfg.SeenCacheSize)
if err != nil {
return nil, err
}
g.cache = cache
return g, nil
}
func (g *Proposer) Force(ctx context.Context) error {
ctx, span := g.vm.Tracer().Start(ctx, "Gossiper.Force")
defer span.End()
g.fl.Lock()
defer g.fl.Unlock()
// Gossip newest transactions
//
// We remove these transactions from the mempool
// otherwise we'll just keep sending the same FIFO txs
// to the network over and over.
//
// If we are going to build, we should never be attempting
// to gossip and we should hold on to the txs we
// could execute. By gossiping, we are basically saying that
// it is better if someone else builds with these txs because
// that increases the probability they'll be accepted
// before they expire.
var (
txs = []*chain.Transaction{}
size = 0
start = time.Now()
now = start.UnixMilli()
)
mempoolErr := g.vm.Mempool().Top(
ctx,
g.vm.GetTargetGossipDuration(),
func(ictx context.Context, next *chain.Transaction) (cont bool, rest bool, err error) {
// Remove txs that are expired
if next.Base.Timestamp < now {
return true, false, nil
}
// Don't gossip txs that are about to expire
life := next.Base.Timestamp - now
if life < g.cfg.GossipMinLife {
return true, true, nil
}
// Gossip up to [GossipMaxSize]
txSize := next.Size()
if txSize+size > g.cfg.GossipMaxSize {
return false, true, nil
}
// Don't remove anything from mempool
// that will be dropped (this seems
// like we sent it then got sent it back?)
txID := next.ID()
if _, ok := g.cache.Get(txID); ok {
return true, true, nil
}
g.cache.Put(txID, nil)
txs = append(txs, next)
size += txSize
return true, false, nil
},
)
if mempoolErr != nil {
return mempoolErr
}
if len(txs) == 0 {
g.vm.Logger().Warn("no transactions to gossip")
return nil
}
g.vm.Logger().Info("gossiping transactions", zap.Int("txs", len(txs)), zap.Duration("t", time.Since(start)))
g.vm.RecordTxsGossiped(len(txs))
return g.sendTxs(ctx, txs)
}
func (g *Proposer) HandleAppGossip(ctx context.Context, nodeID ids.NodeID, msg []byte) error {
actionRegistry, authRegistry := g.vm.Registry()
authCounts, txs, err := chain.UnmarshalTxs(msg, initialCapacity, actionRegistry, authRegistry)
if err != nil {
g.vm.Logger().Warn(
"received invalid txs",
zap.Stringer("peerID", nodeID),
zap.Error(err),
)
return nil
}
g.vm.RecordTxsReceived(len(txs))
// Add incoming transactions to our caches to prevent useless gossip and perform
// batch signature verification.
//
// We rely on AppGossipConcurrency to regulate concurrency here, so we don't create
// a separate pool of workers for this verification.
job, err := workers.NewSerial().NewJob(len(txs))
if err != nil {
g.vm.Logger().Warn(
"unable to spawn new worker",
zap.Stringer("peerID", nodeID),
zap.Error(err),
)
return nil
}
batchVerifier := chain.NewAuthBatch(g.vm, job, authCounts)
var seen int
for _, tx := range txs {
// Verify signature async
txDigest, err := tx.Digest()
if err != nil {
g.vm.Logger().Warn(
"unable to compute tx digest",
zap.Stringer("peerID", nodeID),
zap.Error(err),
)
batchVerifier.Done(nil)
return nil
}
batchVerifier.Add(txDigest, tx.Auth)
// Add incoming txs to the cache to make
// sure we never gossip anything we receive (someone
// else will)
if g.cache.Put(tx.ID(), nil) {
seen++
}
}
batchVerifier.Done(nil)
g.vm.RecordSeenTxsReceived(seen)
// Wait for signature verification to finish
if err := job.Wait(); err != nil {
g.vm.Logger().Warn(
"received invalid gossip",
zap.Stringer("peerID", nodeID),
zap.Error(err),
)
return nil
}
// Mark incoming gossip as held by [nodeID], if it is a validator
isValidator, err := g.vm.IsValidator(ctx, nodeID)
if err != nil {
g.vm.Logger().Warn(
"unable to determine if nodeID is validator",
zap.Stringer("peerID", nodeID),
zap.Error(err),
)
}
// Submit incoming gossip to mempool
start := time.Now()
for _, err := range g.vm.Submit(ctx, false, txs) {
if err == nil || errors.Is(err, chain.ErrDuplicateTx) {
continue
}
g.vm.Logger().Debug(
"failed to submit gossiped txs",
zap.Stringer("nodeID", nodeID),
zap.Bool("validator", isValidator),
zap.Error(err),
)
}
g.vm.Logger().Info(
"tx gossip received",
zap.Int("txs", len(txs)),
zap.Int("previously seen", seen),
zap.Stringer("nodeID", nodeID),
zap.Bool("validator", isValidator),
zap.Duration("t", time.Since(start)),
)
// only trace error to prevent VM's being shutdown
// from "AppGossip" returning an error
return nil
}
func (g *Proposer) notify() {
select {
case g.q <- struct{}{}:
g.lastQueue = time.Now().UnixMilli()
default:
}
}
func (g *Proposer) handleTimerNotify() {
g.notify()
g.waiting.Store(false)
}
func (g *Proposer) Queue(context.Context) {
if !g.waiting.CompareAndSwap(false, true) {
g.vm.Logger().Debug("unable to start waiting")
return
}
now := time.Now().UnixMilli()
force := g.lastQueue + g.cfg.GossipMinDelay
if now >= force {
g.notify()
g.waiting.Store(false)
return
}
sleep := force - now
sleepDur := time.Duration(sleep * int64(time.Millisecond))
g.timer.SetTimeoutIn(sleepDur)
g.vm.Logger().Debug("waiting to notify to gossip", zap.Duration("t", sleepDur))
}
// periodically but less aggressively force-regossip the pending
func (g *Proposer) Run(appSender common.AppSender) {
g.appSender = appSender
defer close(g.doneGossip)
// Timer blocks until stopped
go g.timer.Dispatch()
for {
select {
case <-g.q:
tctx := context.Background()
// Check if we are going to propose if it has been less than
// [VerifyTimeout] since the last time we verified a block.
if time.Now().UnixMilli()-g.lastVerified < g.cfg.VerifyTimeout {
proposers, err := g.vm.Proposers(
tctx,
g.cfg.NoGossipBuilderDiff,
1,
)
if err == nil && proposers.Contains(g.vm.NodeID()) {
g.Queue(tctx) // requeue later in case peer validator
g.vm.Logger().Debug("not gossiping because soon to propose")
continue
} else if err != nil {
g.vm.Logger().Warn("unable to determine if will propose soon, gossiping anyways", zap.Error(err))
}
}
// Gossip to proposers who will produce next
if err := g.Force(tctx); err != nil {
g.vm.Logger().Warn("gossip txs failed", zap.Error(err))
continue
}
case <-g.vm.StopChan():
g.vm.Logger().Info("stopping gossip loop")
return
}
}
}
func (g *Proposer) BlockVerified(t int64) {
if t < g.lastVerified {
return
}
g.lastVerified = t
}
func (g *Proposer) Done() {
g.timer.Stop()
<-g.doneGossip
}
func (g *Proposer) sendTxs(ctx context.Context, txs []*chain.Transaction) error {
ctx, span := g.vm.Tracer().Start(ctx, "Gossiper.sendTxs")
defer span.End()
// Marshal gossip
b, err := chain.MarshalTxs(txs)
if err != nil {
return err
}
// Select next set of proposers and send gossip to them
proposers, err := g.vm.Proposers(
ctx,
g.cfg.GossipProposerDiff,
g.cfg.GossipProposerDepth,
)
if err != nil || proposers.Len() == 0 {
g.vm.Logger().Warn(
"unable to find any proposers, falling back to all-to-all gossip",
zap.Error(err),
)
return g.appSender.SendAppGossip(ctx, b)
}
recipients := set.NewSet[ids.NodeID](len(proposers))
for proposer := range proposers {
// Don't gossip to self
if proposer == g.vm.NodeID() {
continue
}
recipients.Add(proposer)
}
return g.appSender.SendAppGossipSpecific(ctx, recipients, b)
}