Skip to content

Commit

Permalink
feat(transactor): Support returning tx status before confirmation (be…
Browse files Browse the repository at this point in the history
…rachain#76)

* status

* better naming

* cleaner comments
  • Loading branch information
calbera authored Mar 18, 2024
1 parent 6c26ac0 commit 28c0c25
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 83 deletions.
11 changes: 4 additions & 7 deletions core/transactor/event/dispatcher.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package event

import (
"fmt"
)

// Event is an interface that all events should implement. It requires a String() method that
// should return a unique identifier for the event.
type Event fmt.Stringer
// Event is an interface that all events should implement.
type Event interface {
ID() string // returns a unique identifier for the event
}

// Dispatcher is a generic event dispatcher. It maintains a mapping of callers to subscribers,
// which are channels that events are sent to.
Expand Down
6 changes: 4 additions & 2 deletions core/transactor/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func (f *Factory) SetClient(ethClient eth.Client) {
f.ethClient = ethClient
}

// BuildTransactionFromRequests builds a transaction from a list of requests.
// BuildTransactionFromRequests builds a transaction from a list of requests. A non-zero nonce
// should only be provided if this is a retry with a specific nonce necessary.
func (f *Factory) BuildTransactionFromRequests(
ctx context.Context, forcedNonce uint64, txReqs ...*types.TxRequest,
) (*coretypes.Transaction, error) {
Expand All @@ -67,7 +68,8 @@ func (f *Factory) BuildTransactionFromRequests(
}
}

// buildTransaction builds a transaction with the configured signer.
// buildTransaction builds a transaction with the configured signer. If nonce of 0 is provided,
// a fresh nonce is acquired from the noncer.
func (f *Factory) buildTransaction(
ctx context.Context, nonce uint64, txReq *types.TxRequest,
) (*coretypes.Transaction, error) {
Expand Down
29 changes: 22 additions & 7 deletions core/transactor/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

"github.com/berachain/offchain-sdk/client/eth"
"github.com/berachain/offchain-sdk/core/transactor/tracker"
"github.com/berachain/offchain-sdk/core/transactor/types"
"github.com/berachain/offchain-sdk/log"

Expand All @@ -18,8 +17,11 @@ type Sender struct {
tracker Tracker // tracker to track sent transactions
txReplacementPolicy TxReplacementPolicy // policy to replace transactions
retryPolicy RetryPolicy // policy to retry transactions
chain eth.Client
logger log.Logger

sendingTxs map[string]struct{} // msgs that are currently sending (or retrying)

chain eth.Client
logger log.Logger
}

// New creates a new Sender with default replacement and exponential retry policies.
Expand All @@ -29,6 +31,7 @@ func New(factory Factory, tracker Tracker) *Sender {
factory: factory,
txReplacementPolicy: &DefaultTxReplacementPolicy{nf: factory},
retryPolicy: &ExpoRetryPolicy{}, // TODO: choose from config.
sendingTxs: make(map[string]struct{}),
}
}

Expand All @@ -38,22 +41,34 @@ func (s *Sender) Setup(chain eth.Client, logger log.Logger) {
s.tracker.SetClient(chain)
}

// If a msgID IsSending (true is returned), the preconfirmed state is "StateSending".
func (s *Sender) IsSending(msgID string) bool {
_, ok := s.sendingTxs[msgID]
return ok
}

// SendTransaction sends a transaction using the Ethereum client. If the transaction fails,
// it retries based on the retry policy, only once (further retries will not retry again). If
// sending is successful, it uses the tracker to track the transaction.
func (s *Sender) SendTransactionAndTrack(
ctx context.Context, tx *coretypes.Transaction, msgIDs []string, shouldRetry bool,
) error {
// Try sending the transaction.
for _, msgID := range msgIDs {
s.sendingTxs[msgID] = struct{}{}
}
if err := s.chain.SendTransaction(ctx, tx); err != nil {
// If sending the transaction fails, retry according to the retry policy.
if shouldRetry {
if shouldRetry { // If sending the transaction fails, retry according to the retry policy.
go s.retryTxWithPolicy(ctx, tx, msgIDs, err)
}
return err
}

// If no error on sending, start tracking the inFlight transaction.
s.tracker.Track(ctx, &tracker.InFlightTx{Transaction: tx, MsgIDs: msgIDs})
// If no error on sending, start tracking the transaction.
for _, msgID := range msgIDs {
delete(s.sendingTxs, msgID)
}
s.tracker.Track(ctx, tx, msgIDs)
return nil
}

Expand Down
3 changes: 1 addition & 2 deletions core/transactor/sender/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

"github.com/berachain/offchain-sdk/client/eth"
"github.com/berachain/offchain-sdk/core/transactor/tracker"
"github.com/berachain/offchain-sdk/core/transactor/types"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -21,7 +20,7 @@ type (
// Tracker is an interface for tracking sent transactions.
Tracker interface {
SetClient(chain eth.Client)
Track(ctx context.Context, tx *tracker.InFlightTx)
Track(ctx context.Context, tx *coretypes.Transaction, msgIDs []string)
}

// Factory is an interface for signing transactions.
Expand Down
4 changes: 0 additions & 4 deletions core/transactor/tracker/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ func (sub *Subscription) Start(ctx context.Context, ch chan *InFlightTx) error {
if err = sub.OnStale(ctx, e); err != nil {
sub.logger.Error("failed to handle stale tx", "err", err)
}
case StatusError:
// If there was an error with the transaction, call OnError.
sub.logger.Error("error with transaction", "err", e)
sub.OnError(ctx, e, e.Err())
case StatusPending:
// If the transaction is pending, do nothing.
time.Sleep(retryPendingBackoff)
Expand Down
37 changes: 27 additions & 10 deletions core/transactor/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Tracker struct {
staleTimeout time.Duration // for a tx receipt
inMempoolTimeout time.Duration // for hitting mempool
dispatcher *event.Dispatcher[*InFlightTx]
inFlightTxs map[string]struct{} // msgs that have been sent, but not confirmed
ethClient eth.Client
}

Expand All @@ -31,6 +32,7 @@ func New(
staleTimeout: staleTimeout,
inMempoolTimeout: inMempoolTimeout,
dispatcher: dispatcher,
inFlightTxs: make(map[string]struct{}),
}
}

Expand All @@ -39,9 +41,19 @@ func (t *Tracker) SetClient(chain eth.Client) {
}

// Track adds a transaction to the in-flight list and waits for a status.
func (t *Tracker) Track(ctx context.Context, tx *InFlightTx) {
t.noncer.SetInFlight(tx)
go t.trackStatus(ctx, tx)
func (t *Tracker) Track(ctx context.Context, tx *coretypes.Transaction, msgIDs []string) {
for _, msgID := range msgIDs {
t.inFlightTxs[msgID] = struct{}{}
}
inFlightTx := &InFlightTx{Transaction: tx, MsgIDs: msgIDs}
t.noncer.SetInFlight(inFlightTx)
go t.trackStatus(ctx, inFlightTx)
}

// If a msgID IsInFlight (true is returned), the preconfirmed state is "StateInFlight".
func (t *Tracker) IsInFlight(msgID string) bool {
_, ok := t.inFlightTxs[msgID]
return ok
}

// trackStatus polls the for transaction status and updates the in-flight list.
Expand Down Expand Up @@ -135,21 +147,26 @@ func (t *Tracker) markPending(ctx context.Context, tx *InFlightTx) {

// markConfirmed is called once a transaction has been included in the canonical chain.
func (t *Tracker) markConfirmed(tx *InFlightTx, receipt *coretypes.Receipt) {
t.noncer.RemoveInFlight(tx)
tx.Receipt = receipt

// Set the contract address field on the receipt since geth doesn't do this.
if contractAddr := tx.To(); contractAddr != nil && tx.Receipt != nil {
tx.Receipt.ContractAddress = *contractAddr
if contractAddr := tx.To(); contractAddr != nil && receipt != nil {
receipt.ContractAddress = *contractAddr
}

t.dispatcher.Dispatch(tx)
tx.Receipt = receipt
t.dispatchTx(tx)
}

// markStale marks a stale transaction that needs to be resent if not pending.
func (t *Tracker) markStale(tx *InFlightTx, isPending bool) {
t.noncer.RemoveInFlight(tx)
tx.isStale = !isPending
t.dispatchTx(tx)
}

// dispatchTx is called once the tx status is confirmed.
func (t *Tracker) dispatchTx(tx *InFlightTx) {
t.noncer.RemoveInFlight(tx)
for _, msgID := range tx.MsgIDs {
delete(t.inFlightTxs, msgID)
}
t.dispatcher.Dispatch(tx)
}
34 changes: 19 additions & 15 deletions core/transactor/tracker/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,30 @@ package tracker

import (
"context"
"strings"

coretypes "github.com/ethereum/go-ethereum/core/types"
)

type Status int
// PreconfirmStates are used before the tx status is confirmed by the chain.
type PreconfirmState uint8

const (
StateUnknown PreconfirmState = iota
StateQueued
StateSending // The tx is sending (or retrying), equivalent to noncer "acquired".
StateInFlight // The tx has been sent, equivalent to noncer "inFlight".
)

// Status represents the current status of a tx owned by the transactor. // These are used only
// after the tx status has been confirmed by the chain.
type Status uint8

const (
StatusPending Status = iota
StatusSuccess
StatusReverted
StatusStale
StatusError
)

// Subscriber is an interface that defines methods for handling transaction events.
Expand All @@ -24,27 +36,23 @@ type Subscriber interface {
OnRevert(*InFlightTx, *coretypes.Receipt) error
// OnStale is called when a transaction becomes stale.
OnStale(context.Context, *InFlightTx) error
// OnError is called when there is an error with the transaction.
OnError(context.Context, *InFlightTx, error)
}

// InFlightTx represents a transaction that is currently being tracked by the transactor.
type InFlightTx struct {
*coretypes.Transaction
MsgIDs []string
Receipt *coretypes.Receipt
err error
isStale bool
}

func (t *InFlightTx) String() string {
return t.Hash().Hex()
// ID returns a unique identifier for the event.
func (t *InFlightTx) ID() string {
return strings.Join(t.MsgIDs, " | ")
}

// Status returns the current status of a transaction owned by the transactor.
func (t *InFlightTx) Status() Status {
if t.err != nil {
return StatusError
}

if t.Receipt == nil {
if t.isStale {
return StatusStale
Expand All @@ -58,7 +66,3 @@ func (t *InFlightTx) Status() Status {

return StatusReverted
}

func (t *InFlightTx) Err() error {
return t.err
}
66 changes: 40 additions & 26 deletions core/transactor/transactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type TxrV2 struct {
cfg Config
requests queuetypes.Queue[*types.TxRequest]
sender *sender.Sender
tracker *tracker.Tracker
factory *factory.Factory
noncer *tracker.Noncer
dispatcher *event.Dispatcher[*tracker.InFlightTx]
Expand All @@ -42,19 +43,17 @@ func NewTransactor(
noncer, signer,
factory.NewMulticall3Batcher(common.HexToAddress(cfg.Multicall3Address)),
)

dispatcher := event.NewDispatcher[*tracker.InFlightTx]()
tracker := tracker.New(noncer, dispatcher, cfg.TxReceiptTimeout, cfg.InMempoolTimeout)

return &TxrV2{
dispatcher: dispatcher,
cfg: cfg,
factory: factory,
sender: sender.New(
factory, tracker.New(noncer, dispatcher, cfg.TxReceiptTimeout, cfg.InMempoolTimeout),
),
noncer: noncer,
requests: queue,
mu: sync.Mutex{},
sender: sender.New(factory, tracker),
tracker: tracker,
noncer: noncer,
requests: queue,
}
}

Expand All @@ -63,15 +62,28 @@ func (t *TxrV2) RegistryKey() string {
return "transactor"
}

// SubscribeTxResults sends the tx results (inflight) to the given channel.
func (t *TxrV2) SubscribeTxResults(ctx context.Context, subscriber tracker.Subscriber) {
// Setup implements job.HasSetup.
// TODO: deprecate off being a job.
func (t *TxrV2) Setup(ctx context.Context) error {
sCtx := sdk.UnwrapContext(ctx)
t.chain = sCtx.Chain()
t.logger = sCtx.Logger()

// Register the transactor as a subscriber to the tracker.
ch := make(chan *tracker.InFlightTx)
go func() {
subCtx, cancel := context.WithCancel(ctx)
_ = tracker.NewSubscription(subscriber, t.logger).Start(subCtx, ch) // TODO: handle error
_ = tracker.NewSubscription(t, t.logger).Start(subCtx, ch) // TODO: handle error
cancel()
}()
t.dispatcher.Subscribe(ch)

// TODO: need lock on nonce to support more than one
t.noncer.SetClient(t.chain)
t.factory.SetClient(t.chain)
t.sender.Setup(t.chain, t.logger)
t.Start(sCtx)
return nil
}

// Execute implements job.Basic.
Expand All @@ -90,35 +102,37 @@ func (t *TxrV2) IntervalTime(_ context.Context) time.Duration {
return 5 * time.Second //nolint:gomnd // TODO: read from config.
}

// Setup implements job.HasSetup.
// TODO: deprecate off being a job.
func (t *TxrV2) Setup(ctx context.Context) error {
sCtx := sdk.UnwrapContext(ctx)
t.chain = sCtx.Chain()
t.logger = sCtx.Logger()

// Register the transactor as a subscriber to the tracker.
// SubscribeTxResults sends the tx results (inflight) to the given channel.
func (t *TxrV2) SubscribeTxResults(ctx context.Context, subscriber tracker.Subscriber) {
ch := make(chan *tracker.InFlightTx)
go func() {
subCtx, cancel := context.WithCancel(ctx)
_ = tracker.NewSubscription(t, t.logger).Start(subCtx, ch) // TODO: handle error
_ = tracker.NewSubscription(subscriber, t.logger).Start(subCtx, ch) // TODO: handle error
cancel()
}()
t.dispatcher.Subscribe(ch)

// TODO: need lock on nonce to support more than one
t.noncer.SetClient(t.chain)
t.factory.SetClient(t.chain)
t.sender.Setup(t.chain, t.logger)
t.Start(sCtx)
return nil
}

// SendTxRequest adds the given tx request to the tx queue.
func (t *TxrV2) SendTxRequest(txReq *types.TxRequest) (string, error) {
return t.requests.Push(txReq)
}

// GetPreconfirmedState returns the status of the given message ID before it has been confirmed by
// the chain.
func (t *TxrV2) GetPreconfirmedState(msgID string) tracker.PreconfirmState {
switch {
case t.tracker.IsInFlight(msgID):
return tracker.StateInFlight
case t.sender.IsSending(msgID):
return tracker.StateSending
case t.requests.InQueue(msgID):
return tracker.StateQueued
default:
return tracker.StateUnknown
}
}

// Start starts the transactor.
func (t *TxrV2) Start(ctx context.Context) {
go t.noncer.RefreshLoop(ctx)
Expand Down
Loading

0 comments on commit 28c0c25

Please sign in to comment.