Skip to content

Commit

Permalink
chore(rpc): Add timeouts to all network calls (berachain#71)
Browse files Browse the repository at this point in the history
* rpc timeout

* lint
  • Loading branch information
calbera authored Feb 27, 2024
1 parent a01938f commit 0f0fec6
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 79 deletions.
40 changes: 28 additions & 12 deletions client/eth/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@ import (
"github.com/ethereum/go-ethereum/rpc"
)

const (
MaxRetries = 3
defaultRetryTime = 1 * time.Second
)

type Client interface {
DialContext(ctx context.Context, rawurl string) error
Close() error
Expand Down Expand Up @@ -62,6 +57,14 @@ type Writer interface {
// client is the indexer eth client.
type ExtendedEthClient struct {
*ethclient.Client
rpcTimeout time.Duration
}

func NewExtendedEthClient(c *ethclient.Client, rpcTimeout time.Duration) *ExtendedEthClient {
return &ExtendedEthClient{
Client: c,
rpcTimeout: rpcTimeout,
}
}

// ==================================================================
Expand All @@ -74,7 +77,9 @@ func (c *ExtendedEthClient) DialContext(ctx context.Context, rawurl string) erro
}

var err error
c.Client, err = ethclient.DialContext(ctx, rawurl)
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
c.Client, err = ethclient.DialContext(ctxWithTimeout, rawurl)
cancel()
return err
}

Expand All @@ -88,7 +93,9 @@ func (c *ExtendedEthClient) Close() error {
}

func (c *ExtendedEthClient) Health() bool {
_, err := c.ChainID(context.TODO())
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), c.rpcTimeout)
_, err := c.ChainID(ctxWithTimeout)
cancel()
return err == nil
}

Expand All @@ -101,7 +108,9 @@ func (c *ExtendedEthClient) GetReceipts(
ctx context.Context, txs ethcoretypes.Transactions) (ethcoretypes.Receipts, error) {
var receipts ethcoretypes.Receipts
for _, tx := range txs {
receipt, err := c.TransactionReceipt(ctx, tx.Hash())
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
receipt, err := c.TransactionReceipt(ctxWithTimeout, tx.Hash())
cancel()
if err != nil {
return nil, err
}
Expand All @@ -114,22 +123,29 @@ func (c *ExtendedEthClient) GetReceipts(
func (c *ExtendedEthClient) SubscribeNewHead(
ctx context.Context) (chan *ethcoretypes.Header, ethereum.Subscription, error) {
ch := make(chan *ethcoretypes.Header)
sub, err := c.Client.SubscribeNewHead(ctx, ch)
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
sub, err := c.Client.SubscribeNewHead(ctxWithTimeout, ch)
cancel()
return ch, sub, err
}

func (c *ExtendedEthClient) SubscribeFilterLogs(
ctx context.Context,
q ethereum.FilterQuery, ch chan<- ethcoretypes.Log) (ethereum.Subscription, error) {
return c.Client.SubscribeFilterLogs(ctx, q, ch)
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
return c.Client.SubscribeFilterLogs(ctxWithTimeout, q, ch)
}

func (c *ExtendedEthClient) TxPoolContent(
ctx context.Context,
) (map[string]map[string]map[string]*ethcoretypes.Transaction, error) {
// var result map[string]map[string]map[string]*ethcoretypes.Transaction
var result map[string]map[string]map[string]*ethcoretypes.Transaction
if err := c.Client.Client().CallContext(ctx, &result, "txpool_content"); err != nil {
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
if err := c.Client.Client().CallContext(
ctxWithTimeout, &result, "txpool_content",
); err != nil {
return nil, err
}
return result, nil
Expand Down
104 changes: 71 additions & 33 deletions client/eth/client_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"math/big"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -19,11 +20,16 @@ var (
// ChainProviderImpl is an implementation of the ChainProvider interface.
type ChainProviderImpl struct {
ConnectionPool
rpcTimeout time.Duration
}

// NewChainProviderImpl creates a new ChainProviderImpl with the given ConnectionPool.
func NewChainProviderImpl(pool ConnectionPool) (Client, error) {
return &ChainProviderImpl{pool}, nil
func NewChainProviderImpl(pool ConnectionPool, cfg ConnectionPoolConfig) (Client, error) {
c := &ChainProviderImpl{ConnectionPool: pool, rpcTimeout: cfg.DefaultTimeout}
if c.rpcTimeout == 0 {
c.rpcTimeout = defaultRPCTimeout
}
return c, nil
}

// ==================================================================
Expand All @@ -34,7 +40,9 @@ func NewChainProviderImpl(pool ConnectionPool) (Client, error) {
func (c *ChainProviderImpl) BlockByNumber(
ctx context.Context, num *big.Int) (*types.Block, error) {
if client, ok := c.GetHTTP(); ok {
return client.BlockByNumber(ctx, num)
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
return client.BlockByNumber(ctxWithTimeout, num)
}
return nil, ErrClientNotFound
}
Expand All @@ -43,7 +51,9 @@ func (c *ChainProviderImpl) BlockByNumber(
func (c *ChainProviderImpl) BlockReceipts(
ctx context.Context, blockNrOrHash rpc.BlockNumberOrHash) ([]*types.Receipt, error) {
if client, ok := c.GetHTTP(); ok {
return client.BlockReceipts(ctx, blockNrOrHash)
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
return client.BlockReceipts(ctxWithTimeout, blockNrOrHash)
}
return nil, ErrClientNotFound
}
Expand All @@ -52,7 +62,9 @@ func (c *ChainProviderImpl) BlockReceipts(
func (c *ChainProviderImpl) TransactionReceipt(
ctx context.Context, txHash common.Hash) (*types.Receipt, error) {
if client, ok := c.GetHTTP(); ok {
return client.TransactionReceipt(ctx, txHash)
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
return client.TransactionReceipt(ctxWithTimeout, txHash)
}
return nil, ErrClientNotFound
}
Expand All @@ -61,23 +73,29 @@ func (c *ChainProviderImpl) TransactionReceipt(
func (c *ChainProviderImpl) SubscribeNewHead(
ctx context.Context) (chan *types.Header, ethereum.Subscription, error) {
if client, ok := c.GetWS(); ok {
return client.SubscribeNewHead(ctx)
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
return client.SubscribeNewHead(ctxWithTimeout)
}
return nil, nil, ErrClientNotFound
}

// BlockNumber returns the current block number.
func (c *ChainProviderImpl) BlockNumber(ctx context.Context) (uint64, error) {
if client, ok := c.GetHTTP(); ok {
return client.BlockNumber(ctx)
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
return client.BlockNumber(ctxWithTimeout)
}
return 0, ErrClientNotFound
}

// ChainID returns the current chain ID.
func (c *ChainProviderImpl) ChainID(ctx context.Context) (*big.Int, error) {
if client, ok := c.GetHTTP(); ok {
return client.ChainID(ctx)
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
return client.ChainID(ctxWithTimeout)
}
return nil, ErrClientNotFound
}
Expand All @@ -86,7 +104,9 @@ func (c *ChainProviderImpl) ChainID(ctx context.Context) (*big.Int, error) {
func (c *ChainProviderImpl) BalanceAt(
ctx context.Context, address common.Address, blockNumber *big.Int) (*big.Int, error) {
if client, ok := c.GetHTTP(); ok {
return client.BalanceAt(ctx, address, blockNumber)
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
return client.BalanceAt(ctxWithTimeout, address, blockNumber)
}
return nil, ErrClientNotFound
}
Expand All @@ -95,7 +115,9 @@ func (c *ChainProviderImpl) BalanceAt(
func (c *ChainProviderImpl) CodeAt(
ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) {
if client, ok := c.GetHTTP(); ok {
return client.CodeAt(ctx, account, blockNumber)
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
return client.CodeAt(ctxWithTimeout, account, blockNumber)
}
return nil, ErrClientNotFound
}
Expand All @@ -104,7 +126,9 @@ func (c *ChainProviderImpl) CodeAt(
func (c *ChainProviderImpl) EstimateGas(
ctx context.Context, msg ethereum.CallMsg) (uint64, error) {
if client, ok := c.GetHTTP(); ok {
return client.EstimateGas(ctx, msg)
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
return client.EstimateGas(ctxWithTimeout, msg)
}
return 0, ErrClientNotFound
}
Expand All @@ -113,7 +137,9 @@ func (c *ChainProviderImpl) EstimateGas(
func (c *ChainProviderImpl) FilterLogs(
ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) {
if client, ok := c.GetHTTP(); ok {
return client.FilterLogs(ctx, q)
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
return client.FilterLogs(ctxWithTimeout, q)
}
return nil, ErrClientNotFound
}
Expand All @@ -122,7 +148,9 @@ func (c *ChainProviderImpl) FilterLogs(
func (c *ChainProviderImpl) HeaderByNumber(
ctx context.Context, number *big.Int) (*types.Header, error) {
if client, ok := c.GetHTTP(); ok {
return client.HeaderByNumber(ctx, number)
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
return client.HeaderByNumber(ctxWithTimeout, number)
}
return nil, ErrClientNotFound
}
Expand All @@ -131,7 +159,9 @@ func (c *ChainProviderImpl) HeaderByNumber(
func (c *ChainProviderImpl) PendingCodeAt(
ctx context.Context, account common.Address) ([]byte, error) {
if client, ok := c.GetHTTP(); ok {
return client.PendingCodeAt(ctx, account)
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
return client.PendingCodeAt(ctxWithTimeout, account)
}
return nil, ErrClientNotFound
}
Expand All @@ -140,7 +170,9 @@ func (c *ChainProviderImpl) PendingCodeAt(
func (c *ChainProviderImpl) PendingNonceAt(
ctx context.Context, account common.Address) (uint64, error) {
if client, ok := c.GetHTTP(); ok {
return client.PendingNonceAt(ctx, account)
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
return client.PendingNonceAt(ctxWithTimeout, account)
}
return 0, ErrClientNotFound
}
Expand All @@ -149,7 +181,9 @@ func (c *ChainProviderImpl) PendingNonceAt(
func (c *ChainProviderImpl) NonceAt(
ctx context.Context, account common.Address, bn *big.Int) (uint64, error) {
if client, ok := c.GetHTTP(); ok {
return client.NonceAt(ctx, account, bn)
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
return client.NonceAt(ctxWithTimeout, account, bn)
}
return 0, ErrClientNotFound
}
Expand All @@ -158,7 +192,9 @@ func (c *ChainProviderImpl) NonceAt(
func (c *ChainProviderImpl) SendTransaction(
ctx context.Context, tx *types.Transaction) error {
if client, ok := c.GetHTTP(); ok {
return client.SendTransaction(ctx, tx)
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
return client.SendTransaction(ctxWithTimeout, tx)
}
return ErrClientNotFound
}
Expand All @@ -168,15 +204,19 @@ func (c *ChainProviderImpl) SubscribeFilterLogs(
ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log,
) (ethereum.Subscription, error) {
if client, ok := c.GetWS(); ok {
return client.SubscribeFilterLogs(ctx, q, ch)
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
return client.SubscribeFilterLogs(ctxWithTimeout, q, ch)
}
return nil, ErrClientNotFound
}

// SuggestGasPrice suggests a gas price.
func (c *ChainProviderImpl) SuggestGasPrice(ctx context.Context) (*big.Int, error) {
if client, ok := c.GetHTTP(); ok {
return client.SuggestGasPrice(ctx)
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
return client.SuggestGasPrice(ctxWithTimeout)
}
return nil, ErrClientNotFound
}
Expand All @@ -186,15 +226,19 @@ func (c *ChainProviderImpl) CallContract(
ctx context.Context, msg ethereum.CallMsg, blockNumber *big.Int,
) ([]byte, error) {
if client, ok := c.GetHTTP(); ok {
return client.CallContract(ctx, msg, blockNumber)
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
return client.CallContract(ctxWithTimeout, msg, blockNumber)
}
return nil, ErrClientNotFound
}

// SuggestGasTipCap suggests a gas tip cap.
func (c *ChainProviderImpl) SuggestGasTipCap(ctx context.Context) (*big.Int, error) {
if client, ok := c.GetHTTP(); ok {
return client.SuggestGasTipCap(ctx)
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
return client.SuggestGasTipCap(ctxWithTimeout)
}
return nil, ErrClientNotFound
}
Expand All @@ -204,26 +248,20 @@ func (c *ChainProviderImpl) TransactionByHash(
ctx context.Context, hash common.Hash,
) (*types.Transaction, bool, error) {
if client, ok := c.GetHTTP(); ok {
return client.TransactionByHash(ctx, hash)
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
return client.TransactionByHash(ctxWithTimeout, hash)
}
return nil, false, ErrClientNotFound
}

// "id": 1,
// "result": {
// "pending": {
// "0xe74aA377Dbc22450349774d1C427337995120DCB": {
// "3698316": {
// "blockHash": null,
// "blockNumber": null,
// "from": "0xe74aa377dbc22450349774d1c427337995120dcb",
// "gas": "0x715b",

func (c *ChainProviderImpl) TxPoolContent(ctx context.Context) (
map[string]map[string]map[string]*types.Transaction, error,
) {
if client, ok := c.GetHTTP(); ok {
return client.TxPoolContent(ctx)
ctxWithTimeout, cancel := context.WithTimeout(ctx, c.rpcTimeout)
defer cancel()
return client.TxPoolContent(ctxWithTimeout)
}
return nil, ErrClientNotFound
}
Expand Down
26 changes: 18 additions & 8 deletions client/eth/config.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
package eth

// Config is the configuration for the eth client.
type Config struct {
EthHTTPURLs string
EthWSURLs string
import "time"

const (
defaultRPCTimeout = 5 * time.Second
defaultHealthCheckInterval = 5 * time.Second
)

type ConnectionPoolConfig struct {
EthHTTPURLs []string
EthWSURLs []string
DefaultTimeout time.Duration
HealthCheckInterval time.Duration
}

func DefaultConfig() *Config {
return &Config{
EthHTTPURLs: "http://localhost:8545",
EthWSURLs: "ws://localhost:8546",
func DefaultConnectPoolConfig() *ConnectionPoolConfig {
return &ConnectionPoolConfig{
EthHTTPURLs: []string{"http://localhost:8545"},
EthWSURLs: []string{"ws://localhost:8546"},
DefaultTimeout: defaultRPCTimeout,
HealthCheckInterval: defaultHealthCheckInterval,
}
}
Loading

0 comments on commit 0f0fec6

Please sign in to comment.