Skip to content

Commit

Permalink
Resend stale txns on startup (berachain#86)
Browse files Browse the repository at this point in the history
* Resend stale txns on startup

* nit

* change to common.Address to handle cases

* nit

* nit

* addressing comments

* add TODO

* add queued nonces

* rename
  • Loading branch information
hunter-bera authored May 22, 2024
1 parent 78e37d4 commit 000ac92
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 27 deletions.
68 changes: 61 additions & 7 deletions client/eth/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,49 @@ type Reader interface {
SuggestGasTipCap(ctx context.Context) (*big.Int, error)
TransactionByHash(ctx context.Context, hash common.Hash,
) (tx *ethcoretypes.Transaction, isPending bool, err error)
TxPoolContent(
ctx context.Context) (
map[string]map[string]map[string]*ethcoretypes.Transaction, error)

/*
TxPoolContentFrom returns the pending and queued transactions of this address.
Example response:
{
"pending": {
"0": {
// transaction details...
}
},
"queued": {
"0": {
// transaction details...
}
}
}
*/
TxPoolContentFrom(ctx context.Context, address common.Address) (
map[string]map[string]*ethcoretypes.Transaction, error)

/*
TxPoolInspect returns the textual summary of all pending and queued transactions.
Example response:
{
"pending": {
"0x12345": {
"0": "0x12345789: 1 wei + 2 gas x 3 wei"
}
},
"queued": {
"0x12345": {
"0": "0x12345789: 1 wei + 2 gas x 3 wei"
}
}
}
*/
TxPoolInspect(
ctx context.Context,
) (map[string]map[common.Address]map[string]string, error)
}

type Writer interface {
Expand Down Expand Up @@ -137,14 +177,28 @@ func (c *ExtendedEthClient) SubscribeFilterLogs(
return c.Client.SubscribeFilterLogs(ctxWithTimeout, q, ch)
}

func (c *ExtendedEthClient) TxPoolContent(
func (c *ExtendedEthClient) TxPoolContentFrom(
ctx context.Context, address common.Address,
) (map[string]map[string]*ethcoretypes.Transaction, error) {
var result map[string]map[string]*ethcoretypes.Transaction
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
if err := c.Client.Client().CallContext(
ctxWithTimeout, &result, "txpool_contentFrom", address,
); err != nil {
return nil, err
}
return result, nil
}

func (c *ExtendedEthClient) TxPoolInspect(
ctx context.Context,
) (map[string]map[string]map[string]*ethcoretypes.Transaction, error) {
var result map[string]map[string]map[string]*ethcoretypes.Transaction
) (map[string]map[common.Address]map[string]string, error) {
var result map[string]map[common.Address]map[string]string
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
if err := c.Client.Client().CallContext(
ctxWithTimeout, &result, "txpool_content",
ctxWithTimeout, &result, "txpool_inspect",
); err != nil {
return nil, err
}
Expand Down
51 changes: 48 additions & 3 deletions client/eth/client_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,13 +255,58 @@ func (c *ChainProviderImpl) TransactionByHash(
return nil, false, ErrClientNotFound
}

func (c *ChainProviderImpl) TxPoolContent(ctx context.Context) (
map[string]map[string]map[string]*types.Transaction, error,
/*
TxPoolContentFrom returns the pending and queued transactions of this address.
Example response:
{
"pending": {
"0": {
// transaction details...
}
},
"queued": {
"0": {
// transaction details...
}
}
}
*/
func (c *ChainProviderImpl) TxPoolContentFrom(ctx context.Context, address common.Address) (
map[string]map[string]*types.Transaction, error,
) {
if client, ok := c.GetHTTP(); ok {
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
return client.TxPoolContentFrom(ctxWithTimeout, address)
}
return nil, ErrClientNotFound
}

/*
TxPoolInspect returns the textual summary of all pending and queued transactions.
Example response:
{
"pending": {
"0x12345": {
"0": "0x12345789: 1 wei + 2 gas x 3 wei"
}
},
"queued": {
"0x12345": {
"0": "0x12345789: 1 wei + 2 gas x 3 wei"
}
}
}
*/
func (c *ChainProviderImpl) TxPoolInspect(ctx context.Context) (
map[string]map[common.Address]map[string]string, error,
) {
if client, ok := c.GetHTTP(); ok {
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
return client.TxPoolContent(ctxWithTimeout)
return client.TxPoolInspect(ctxWithTimeout)
}
return nil, ErrClientNotFound
}
Expand Down
26 changes: 17 additions & 9 deletions core/transactor/tracker/noncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tracker

import (
"context"
"strconv"
"sync"
"time"

Expand All @@ -19,7 +20,8 @@ type Noncer struct {

// mempool state
latestPendingNonce uint64
queuedNonces map[uint64]struct{}
// TODO: purge old nonces from the map to avoid infinite memory growth
inMempoolNonces map[uint64]struct{}

// "in-process" nonces
acquired map[uint64]struct{} // The set of acquired nonces.
Expand All @@ -34,7 +36,7 @@ type Noncer struct {
func NewNoncer(sender common.Address, refreshInterval time.Duration) *Noncer {
return &Noncer{
sender: sender,
queuedNonces: make(map[uint64]struct{}),
inMempoolNonces: make(map[uint64]struct{}),
acquired: make(map[uint64]struct{}),
inFlight: skiplist.New(skiplist.Uint64),
refreshInterval: refreshInterval,
Expand All @@ -61,7 +63,7 @@ func (n *Noncer) refreshLoop(ctx context.Context) {
}
}

// refreshNonces refreshes the pending nonce and queued nonces from the mempool.
// refreshNonces refreshes the pending nonces from the mempool.
func (n *Noncer) refreshNonces(ctx context.Context) {
n.mu.Lock()
defer n.mu.Unlock()
Expand All @@ -72,9 +74,15 @@ func (n *Noncer) refreshNonces(ctx context.Context) {
// TODO: handle case where stored & chain pending nonce is out of sync?
}

if content, err := n.ethClient.TxPoolContent(ctx); err == nil {
for _, tx := range content["queued"][n.sender.Hex()] {
n.queuedNonces[tx.Nonce()] = struct{}{}
// Use txpool.inspect instead of txpool.content. Less data to fetch.
if content, err := n.ethClient.TxPoolInspect(ctx); err == nil {
for nonceStr := range content["pending"][n.sender] {
nonce, _ := strconv.ParseUint(nonceStr, 10, 64)
n.inMempoolNonces[nonce] = struct{}{}
}
for nonceStr := range content["queued"][n.sender] {
nonce, _ := strconv.ParseUint(nonceStr, 10, 64)
n.inMempoolNonces[nonce] = struct{}{}
}
}
}
Expand Down Expand Up @@ -108,9 +116,9 @@ func (n *Noncer) Acquire() (uint64, bool) {
}
n.acquired[nonce] = struct{}{}

// Set isReplacing to true only if the next nonce is already queued in the mempool.
if _, isQueued := n.queuedNonces[nonce]; isQueued {
delete(n.queuedNonces, nonce)
// Set isReplacing to true only if the next nonce is already pending in the mempool.
if _, inMempool := n.inMempoolNonces[nonce]; inMempool {
delete(n.inMempoolNonces, nonce)
isReplacing = true
}

Expand Down
11 changes: 5 additions & 6 deletions core/transactor/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const retryBackoff = 500 * time.Millisecond
type Tracker struct {
noncer *Noncer
dispatcher *event.Dispatcher[*Response]
senderAddr string // hex address of tx sender
senderAddr common.Address // tx sender address

inMempoolTimeout time.Duration // for hitting mempool
staleTimeout time.Duration // for a tx receipt
Expand All @@ -34,7 +34,7 @@ func New(
return &Tracker{
noncer: noncer,
dispatcher: dispatcher,
senderAddr: sender.Hex(),
senderAddr: sender,
inMempoolTimeout: inMempoolTimeout,
staleTimeout: staleTimeout,
}
Expand Down Expand Up @@ -89,20 +89,19 @@ func (t *Tracker) trackStatus(ctx context.Context, resp *Response) {

// checkMempool marks the tx according to its state in the mempool. Returns true if found.
func (t *Tracker) checkMempool(ctx context.Context, resp *Response) bool {
content, err := t.ethClient.TxPoolContent(ctx)
content, err := t.ethClient.TxPoolContentFrom(ctx, t.senderAddr)
if err != nil {
return false
}
txNonce := strconv.FormatUint(resp.Nonce(), 10)

if senderTxs, ok := content["pending"][t.senderAddr]; ok {
if senderTxs, ok := content["pending"]; ok {
if _, isPending := senderTxs[txNonce]; isPending {
t.markPending(ctx, resp)
return true
}
}

if senderTxs, ok := content["queued"][t.senderAddr]; ok {
if senderTxs, ok := content["queued"]; ok {
if _, isQueued := senderTxs[txNonce]; isQueued {
// mark the transaction as expired, but it does exist in the mempool.
t.markExpired(resp, false)
Expand Down
34 changes: 32 additions & 2 deletions core/transactor/transactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import (

// TxrV2 is the main transactor object. TODO: deprecate off being a job.
type TxrV2 struct {
cfg Config
logger log.Logger
cfg Config
logger log.Logger
signerAddr common.Address

requests queuetypes.Queue[*types.Request]
factory *factory.Factory
Expand Down Expand Up @@ -62,6 +63,7 @@ func NewTransactor(cfg Config, signer kmstypes.TxSigner) (*TxrV2, error) {
return &TxrV2{
cfg: cfg,
requests: queue,
signerAddr: signer.Address(),
factory: factory,
noncer: noncer,
sender: sender.New(factory, noncer),
Expand Down Expand Up @@ -96,6 +98,13 @@ func (t *TxrV2) Setup(ctx context.Context) error {
t.sender.Setup(chain, t.logger)
t.tracker.SetClient(chain)
t.noncer.Start(ctx, chain)

// If there are any pending txns at startup, they are likely to be stuck in the mempool.
// Resend them.
if err := t.resendStaleTxns(ctx); err != nil {
return err
}

go t.mainLoop(ctx)

return nil
Expand Down Expand Up @@ -175,3 +184,24 @@ func (t *TxrV2) removeStateTracking(msgIDs ...string) {
delete(t.preconfirmedStates, msgID)
}
}

// resendStaleTxns resends all the stale (pending) transactions in the tx pool.
func (t *TxrV2) resendStaleTxns(ctx context.Context) error {
sCtx := sdk.UnwrapContext(ctx)
chain := sCtx.Chain()

content, err := chain.TxPoolContentFrom(ctx, t.signerAddr)
if err != nil {
t.logger.Error("failed to get tx pool content", "err", err)
return err
}

for _, txn := range content["pending"] {
bumpedTxn := sender.BumpGas(txn)
if err = t.sender.SendTransaction(ctx, bumpedTxn); err != nil {
t.logger.Error("failed to resend stale transaction", "err", err)
return err
}
}
return nil
}

0 comments on commit 000ac92

Please sign in to comment.