Skip to content

Commit

Permalink
Reconnect ethclient on repeated error
Browse files Browse the repository at this point in the history
  • Loading branch information
hkalodner committed Feb 17, 2022
1 parent 06ccd03 commit 0cb0c97
Showing 1 changed file with 182 additions and 16 deletions.
198 changes: 182 additions & 16 deletions packages/arb-util/ethutils/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,21 @@ import (
"context"
"encoding/json"
"math/big"

"github.com/ethereum/go-ethereum/accounts/abi/bind/backends"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"sync"
"sync/atomic"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/accounts/abi/bind/backends"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
)

const maxErrCount = 5

type ReceiptFetcher interface {
TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error)
NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error)
Expand All @@ -51,8 +54,12 @@ type EthClient interface {
}

type RPCEthClient struct {
*ethclient.Client
rpc *rpc.Client
sync.RWMutex

url string
eth *ethclient.Client
rpc *rpc.Client
errCount uint64
}

type BlockInfo struct {
Expand All @@ -63,20 +70,39 @@ type BlockInfo struct {
}

func NewRPCEthClient(url string) (*RPCEthClient, error) {
ethcl, err := ethclient.Dial(url)
r := &RPCEthClient{url: url}
err := r.reconnect()
return r, err
}

func (r *RPCEthClient) reconnect() error {
r.Lock()
defer r.Unlock()
rpccl, err := rpc.Dial(r.url)
if err != nil {
return nil, err
return err
}
r.eth = ethclient.NewClient(rpccl)
r.rpc = rpccl
return nil
}

rpccl, err := rpc.Dial(url)
if err != nil {
return nil, err
func (r *RPCEthClient) handleCallErr(err error) error {
if err == nil {
// Reset err count if any call is working since we're looking for a connection error
atomic.StoreUint64(&r.errCount, 0)
return nil
}
totalErrCount := atomic.AddUint64(&r.errCount, 1)

return &RPCEthClient{
Client: ethcl,
rpc: rpccl,
}, nil
// If we've had above a threshold number of errors, reinitialize the connection
if totalErrCount >= maxErrCount {
if err := r.reconnect(); err != nil {
return err
}
atomic.StoreUint64(&r.errCount, 0)
}
return err
}

func (r *RPCEthClient) BlockInfoByNumber(ctx context.Context, number *big.Int) (*BlockInfo, error) {
Expand All @@ -100,6 +126,146 @@ func (r *RPCEthClient) BlockInfoByNumber(ctx context.Context, number *big.Int) (
return &ret, nil
}

func (r *RPCEthClient) ChainID(ctx context.Context) (*big.Int, error) {
r.RLock()
val, err := r.eth.ChainID(ctx)
r.RUnlock()
return val, r.handleCallErr(err)
}

func (r *RPCEthClient) CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) {
r.RLock()
val, err := r.eth.CodeAt(ctx, account, blockNumber)
r.RUnlock()
return val, r.handleCallErr(err)
}

func (r *RPCEthClient) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) {
r.RLock()
val, err := r.eth.BalanceAt(ctx, account, blockNumber)
r.RUnlock()
return val, r.handleCallErr(err)
}

func (r *RPCEthClient) CallContract(ctx context.Context, msg ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) {
r.RLock()
val, err := r.eth.CallContract(ctx, msg, blockNumber)
r.RUnlock()
return val, r.handleCallErr(err)
}

func (r *RPCEthClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
r.RLock()
val, err := r.eth.HeaderByNumber(ctx, number)
r.RUnlock()
return val, r.handleCallErr(err)
}

func (r *RPCEthClient) PendingCodeAt(ctx context.Context, account common.Address) ([]byte, error) {
r.RLock()
val, err := r.eth.PendingCodeAt(ctx, account)
r.RUnlock()
return val, r.handleCallErr(err)
}

func (r *RPCEthClient) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) {
r.RLock()
val, err := r.eth.PendingNonceAt(ctx, account)
r.RUnlock()
return val, r.handleCallErr(err)
}

func (r *RPCEthClient) PendingCallContract(ctx context.Context, msg ethereum.CallMsg) ([]byte, error) {
r.RLock()
val, err := r.eth.PendingCallContract(ctx, msg)
r.RUnlock()
return val, r.handleCallErr(err)
}

func (r *RPCEthClient) SuggestGasPrice(ctx context.Context) (*big.Int, error) {
r.RLock()
val, err := r.eth.SuggestGasPrice(ctx)
r.RUnlock()
return val, r.handleCallErr(err)
}

func (r *RPCEthClient) SuggestGasTipCap(ctx context.Context) (*big.Int, error) {
r.RLock()
val, err := r.eth.SuggestGasTipCap(ctx)
r.RUnlock()
return val, r.handleCallErr(err)
}

func (r *RPCEthClient) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) {
r.RLock()
val, err := r.eth.EstimateGas(ctx, msg)
r.RUnlock()
return val, r.handleCallErr(err)
}

func (r *RPCEthClient) SendTransaction(ctx context.Context, tx *types.Transaction) error {
r.RLock()
err := r.eth.SendTransaction(ctx, tx)
r.RUnlock()
return r.handleCallErr(err)
}

func (r *RPCEthClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) {
r.RLock()
val, err := r.eth.FilterLogs(ctx, q)
r.RUnlock()
return val, r.handleCallErr(err)
}

func (r *RPCEthClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) {
r.RLock()
val, err := r.eth.SubscribeFilterLogs(ctx, q, ch)
r.RUnlock()
return val, r.handleCallErr(err)
}

func (r *RPCEthClient) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) {
r.RLock()
val, err := r.eth.TransactionReceipt(ctx, txHash)
r.RUnlock()
return val, r.handleCallErr(err)
}

func (r *RPCEthClient) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) {
r.RLock()
val, err := r.eth.NonceAt(ctx, account, blockNumber)
r.RUnlock()
return val, r.handleCallErr(err)
}

func (r *RPCEthClient) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) {
r.RLock()
val, err := r.eth.HeaderByHash(ctx, hash)
r.RUnlock()
return val, r.handleCallErr(err)
}

func (r *RPCEthClient) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) {
r.RLock()
val, err := r.eth.BlockByHash(ctx, hash)
r.RUnlock()
return val, r.handleCallErr(err)
}

func (r *RPCEthClient) TransactionByHash(ctx context.Context, hash common.Hash) (tx *types.Transaction, isPending bool, err error) {
r.RLock()
tx, isPending, err = r.eth.TransactionByHash(ctx, hash)
r.RUnlock()
return tx, isPending, r.handleCallErr(err)
}

func (r *RPCEthClient) TransactionInBlock(ctx context.Context, blockHash common.Hash, index uint) (*types.Transaction, error) {
r.RLock()
val, err := r.eth.TransactionInBlock(ctx, blockHash, index)
r.RUnlock()
return val, r.handleCallErr(err)
}

type SimulatedEthClient struct {
*backends.SimulatedBackend
}
Expand Down

0 comments on commit 0cb0c97

Please sign in to comment.