From 000ac92d76ecf296ddb607193c275496987bb3bd Mon Sep 17 00:00:00 2001 From: hunter-bera <133678627+hunter-bera@users.noreply.github.com> Date: Wed, 22 May 2024 14:29:59 -0400 Subject: [PATCH] Resend stale txns on startup (#86) * Resend stale txns on startup * nit * change to common.Address to handle cases * nit * nit * addressing comments * add TODO * add queued nonces * rename --- client/eth/client.go | 68 +++++++++++++++++++++++++++--- client/eth/client_provider.go | 51 ++++++++++++++++++++-- core/transactor/tracker/noncer.go | 26 ++++++++---- core/transactor/tracker/tracker.go | 11 +++-- core/transactor/transactor.go | 34 ++++++++++++++- 5 files changed, 163 insertions(+), 27 deletions(-) diff --git a/client/eth/client.go b/client/eth/client.go index e3999e6..1362f51 100644 --- a/client/eth/client.go +++ b/client/eth/client.go @@ -45,9 +45,49 @@ type Reader interface { SuggestGasTipCap(ctx context.Context) (*big.Int, error) TransactionByHash(ctx context.Context, hash common.Hash, ) (tx *ethcoretypes.Transaction, isPending bool, err error) - TxPoolContent( - ctx context.Context) ( - map[string]map[string]map[string]*ethcoretypes.Transaction, error) + + /* + TxPoolContentFrom returns the pending and queued transactions of this address. + Example response: + + { + "pending": { + "0": { + // transaction details... + } + }, + "queued": { + "0": { + // transaction details... + } + } + } + + */ + TxPoolContentFrom(ctx context.Context, address common.Address) ( + map[string]map[string]*ethcoretypes.Transaction, error) + + /* + TxPoolInspect returns the textual summary of all pending and queued transactions. + Example response: + + { + "pending": { + "0x12345": { + "0": "0x12345789: 1 wei + 2 gas x 3 wei" + } + }, + "queued": { + "0x12345": { + "0": "0x12345789: 1 wei + 2 gas x 3 wei" + } + } + } + + */ + TxPoolInspect( + ctx context.Context, + ) (map[string]map[common.Address]map[string]string, error) } type Writer interface { @@ -137,14 +177,28 @@ func (c *ExtendedEthClient) SubscribeFilterLogs( return c.Client.SubscribeFilterLogs(ctxWithTimeout, q, ch) } -func (c *ExtendedEthClient) TxPoolContent( +func (c *ExtendedEthClient) TxPoolContentFrom( + ctx context.Context, address common.Address, +) (map[string]map[string]*ethcoretypes.Transaction, error) { + var result map[string]map[string]*ethcoretypes.Transaction + ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout) + defer cancel() + if err := c.Client.Client().CallContext( + ctxWithTimeout, &result, "txpool_contentFrom", address, + ); err != nil { + return nil, err + } + return result, nil +} + +func (c *ExtendedEthClient) TxPoolInspect( ctx context.Context, -) (map[string]map[string]map[string]*ethcoretypes.Transaction, error) { - var result map[string]map[string]map[string]*ethcoretypes.Transaction +) (map[string]map[common.Address]map[string]string, error) { + var result map[string]map[common.Address]map[string]string ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout) defer cancel() if err := c.Client.Client().CallContext( - ctxWithTimeout, &result, "txpool_content", + ctxWithTimeout, &result, "txpool_inspect", ); err != nil { return nil, err } diff --git a/client/eth/client_provider.go b/client/eth/client_provider.go index 8715b34..ea3e94e 100644 --- a/client/eth/client_provider.go +++ b/client/eth/client_provider.go @@ -255,13 +255,58 @@ func (c *ChainProviderImpl) TransactionByHash( return nil, false, ErrClientNotFound } -func (c *ChainProviderImpl) TxPoolContent(ctx context.Context) ( - map[string]map[string]map[string]*types.Transaction, error, +/* +TxPoolContentFrom returns the pending and queued transactions of this address. +Example response: + + { + "pending": { + "0": { + // transaction details... + } + }, + "queued": { + "0": { + // transaction details... + } + } + } +*/ +func (c *ChainProviderImpl) TxPoolContentFrom(ctx context.Context, address common.Address) ( + map[string]map[string]*types.Transaction, error, +) { + if client, ok := c.GetHTTP(); ok { + ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout) + defer cancel() + return client.TxPoolContentFrom(ctxWithTimeout, address) + } + return nil, ErrClientNotFound +} + +/* +TxPoolInspect returns the textual summary of all pending and queued transactions. +Example response: + + { + "pending": { + "0x12345": { + "0": "0x12345789: 1 wei + 2 gas x 3 wei" + } + }, + "queued": { + "0x12345": { + "0": "0x12345789: 1 wei + 2 gas x 3 wei" + } + } + } +*/ +func (c *ChainProviderImpl) TxPoolInspect(ctx context.Context) ( + map[string]map[common.Address]map[string]string, error, ) { if client, ok := c.GetHTTP(); ok { ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout) defer cancel() - return client.TxPoolContent(ctxWithTimeout) + return client.TxPoolInspect(ctxWithTimeout) } return nil, ErrClientNotFound } diff --git a/core/transactor/tracker/noncer.go b/core/transactor/tracker/noncer.go index e8d1d24..8432db2 100644 --- a/core/transactor/tracker/noncer.go +++ b/core/transactor/tracker/noncer.go @@ -2,6 +2,7 @@ package tracker import ( "context" + "strconv" "sync" "time" @@ -19,7 +20,8 @@ type Noncer struct { // mempool state latestPendingNonce uint64 - queuedNonces map[uint64]struct{} + // TODO: purge old nonces from the map to avoid infinite memory growth + inMempoolNonces map[uint64]struct{} // "in-process" nonces acquired map[uint64]struct{} // The set of acquired nonces. @@ -34,7 +36,7 @@ type Noncer struct { func NewNoncer(sender common.Address, refreshInterval time.Duration) *Noncer { return &Noncer{ sender: sender, - queuedNonces: make(map[uint64]struct{}), + inMempoolNonces: make(map[uint64]struct{}), acquired: make(map[uint64]struct{}), inFlight: skiplist.New(skiplist.Uint64), refreshInterval: refreshInterval, @@ -61,7 +63,7 @@ func (n *Noncer) refreshLoop(ctx context.Context) { } } -// refreshNonces refreshes the pending nonce and queued nonces from the mempool. +// refreshNonces refreshes the pending nonces from the mempool. func (n *Noncer) refreshNonces(ctx context.Context) { n.mu.Lock() defer n.mu.Unlock() @@ -72,9 +74,15 @@ func (n *Noncer) refreshNonces(ctx context.Context) { // TODO: handle case where stored & chain pending nonce is out of sync? } - if content, err := n.ethClient.TxPoolContent(ctx); err == nil { - for _, tx := range content["queued"][n.sender.Hex()] { - n.queuedNonces[tx.Nonce()] = struct{}{} + // Use txpool.inspect instead of txpool.content. Less data to fetch. + if content, err := n.ethClient.TxPoolInspect(ctx); err == nil { + for nonceStr := range content["pending"][n.sender] { + nonce, _ := strconv.ParseUint(nonceStr, 10, 64) + n.inMempoolNonces[nonce] = struct{}{} + } + for nonceStr := range content["queued"][n.sender] { + nonce, _ := strconv.ParseUint(nonceStr, 10, 64) + n.inMempoolNonces[nonce] = struct{}{} } } } @@ -108,9 +116,9 @@ func (n *Noncer) Acquire() (uint64, bool) { } n.acquired[nonce] = struct{}{} - // Set isReplacing to true only if the next nonce is already queued in the mempool. - if _, isQueued := n.queuedNonces[nonce]; isQueued { - delete(n.queuedNonces, nonce) + // Set isReplacing to true only if the next nonce is already pending in the mempool. + if _, inMempool := n.inMempoolNonces[nonce]; inMempool { + delete(n.inMempoolNonces, nonce) isReplacing = true } diff --git a/core/transactor/tracker/tracker.go b/core/transactor/tracker/tracker.go index 7541004..f103268 100644 --- a/core/transactor/tracker/tracker.go +++ b/core/transactor/tracker/tracker.go @@ -18,7 +18,7 @@ const retryBackoff = 500 * time.Millisecond type Tracker struct { noncer *Noncer dispatcher *event.Dispatcher[*Response] - senderAddr string // hex address of tx sender + senderAddr common.Address // tx sender address inMempoolTimeout time.Duration // for hitting mempool staleTimeout time.Duration // for a tx receipt @@ -34,7 +34,7 @@ func New( return &Tracker{ noncer: noncer, dispatcher: dispatcher, - senderAddr: sender.Hex(), + senderAddr: sender, inMempoolTimeout: inMempoolTimeout, staleTimeout: staleTimeout, } @@ -89,20 +89,19 @@ func (t *Tracker) trackStatus(ctx context.Context, resp *Response) { // checkMempool marks the tx according to its state in the mempool. Returns true if found. func (t *Tracker) checkMempool(ctx context.Context, resp *Response) bool { - content, err := t.ethClient.TxPoolContent(ctx) + content, err := t.ethClient.TxPoolContentFrom(ctx, t.senderAddr) if err != nil { return false } txNonce := strconv.FormatUint(resp.Nonce(), 10) - - if senderTxs, ok := content["pending"][t.senderAddr]; ok { + if senderTxs, ok := content["pending"]; ok { if _, isPending := senderTxs[txNonce]; isPending { t.markPending(ctx, resp) return true } } - if senderTxs, ok := content["queued"][t.senderAddr]; ok { + if senderTxs, ok := content["queued"]; ok { if _, isQueued := senderTxs[txNonce]; isQueued { // mark the transaction as expired, but it does exist in the mempool. t.markExpired(resp, false) diff --git a/core/transactor/transactor.go b/core/transactor/transactor.go index 139c73a..1c4e323 100644 --- a/core/transactor/transactor.go +++ b/core/transactor/transactor.go @@ -22,8 +22,9 @@ import ( // TxrV2 is the main transactor object. TODO: deprecate off being a job. type TxrV2 struct { - cfg Config - logger log.Logger + cfg Config + logger log.Logger + signerAddr common.Address requests queuetypes.Queue[*types.Request] factory *factory.Factory @@ -62,6 +63,7 @@ func NewTransactor(cfg Config, signer kmstypes.TxSigner) (*TxrV2, error) { return &TxrV2{ cfg: cfg, requests: queue, + signerAddr: signer.Address(), factory: factory, noncer: noncer, sender: sender.New(factory, noncer), @@ -96,6 +98,13 @@ func (t *TxrV2) Setup(ctx context.Context) error { t.sender.Setup(chain, t.logger) t.tracker.SetClient(chain) t.noncer.Start(ctx, chain) + + // If there are any pending txns at startup, they are likely to be stuck in the mempool. + // Resend them. + if err := t.resendStaleTxns(ctx); err != nil { + return err + } + go t.mainLoop(ctx) return nil @@ -175,3 +184,24 @@ func (t *TxrV2) removeStateTracking(msgIDs ...string) { delete(t.preconfirmedStates, msgID) } } + +// resendStaleTxns resends all the stale (pending) transactions in the tx pool. +func (t *TxrV2) resendStaleTxns(ctx context.Context) error { + sCtx := sdk.UnwrapContext(ctx) + chain := sCtx.Chain() + + content, err := chain.TxPoolContentFrom(ctx, t.signerAddr) + if err != nil { + t.logger.Error("failed to get tx pool content", "err", err) + return err + } + + for _, txn := range content["pending"] { + bumpedTxn := sender.BumpGas(txn) + if err = t.sender.SendTransaction(ctx, bumpedTxn); err != nil { + t.logger.Error("failed to resend stale transaction", "err", err) + return err + } + } + return nil +}