Skip to content

Commit

Permalink
Added senders cache to txpool to reduce block validation time
Browse files Browse the repository at this point in the history
  • Loading branch information
jdowning100 committed Aug 10, 2023
1 parent ad55eae commit e013f7c
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 31 deletions.
1 change: 1 addition & 0 deletions core/headerchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type HeaderChain struct {

bc *BodyDb
engine consensus.Engine
pool *TxPool

chainHeadFeed event.Feed
chainSideFeed event.Feed
Expand Down
1 change: 1 addition & 0 deletions core/slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func NewSlice(db ethdb.Database, config *Config, txConfig *TxPoolConfig, txLooku
// tx pool is only used in zone
if nodeCtx == common.ZONE_CTX {
sl.txPool = NewTxPool(*txConfig, chainConfig, sl.hc)
sl.hc.pool = sl.txPool
}
sl.miner = New(sl.hc, sl.txPool, config, db, chainConfig, engine, isLocalBlock)

Expand Down
15 changes: 14 additions & 1 deletion core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,19 @@ func (p *StateProcessor) Process(block *types.Block, etxSet types.EtxSet) (types
return types.Receipts{}, []*types.Log{}, nil, 0, err
}

senders := make(map[common.Hash]*common.InternalAddress) // temporary cache for senders of internal txs
p.hc.pool.SendersMutex.RLock()
for _, tx := range block.Transactions() { // get all senders of internal txs from cache - easier on the SendersMutex to do it all at once here
if tx.Type() == types.InternalTxType || tx.Type() == types.InternalToExternalTxType {
if sender, ok := p.hc.pool.GetSenderThreadUnsafe(tx.Hash()); ok {
senders[tx.Hash()] = &sender // This pointer must never be modified
} else {
// TODO: calcuate the sender and add it to the pool senders cache in case of reorg (not necessary for now)
}
}
}
p.hc.pool.SendersMutex.RUnlock()

blockContext := NewEVMBlockContext(header, p.hc, nil)
vmenv := vm.NewEVM(blockContext, vm.TxContext{}, statedb, p.config, p.vmConfig)

Expand All @@ -234,7 +247,7 @@ func (p *StateProcessor) Process(block *types.Block, etxSet types.EtxSet) (types
}
var emittedEtxs types.Transactions
for i, tx := range block.Transactions() {
msg, err := tx.AsMessage(types.MakeSigner(p.config, header.Number()), header.BaseFee())
msg, err := tx.AsMessageWithSender(types.MakeSigner(p.config, header.Number()), header.BaseFee(), senders[tx.Hash()])
if err != nil {
return nil, nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
}
Expand Down
160 changes: 130 additions & 30 deletions core/tx_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/dominant-strategies/go-quai/log"
"github.com/dominant-strategies/go-quai/metrics"
"github.com/dominant-strategies/go-quai/params"
orderedmap "github.com/wk8/go-ordered-map/v2"
)

const (
Expand Down Expand Up @@ -156,10 +157,12 @@ type TxPoolConfig struct {
PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool
PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce)

AccountSlots uint64 // Number of executable transaction slots guaranteed per account
GlobalSlots uint64 // Maximum number of executable transaction slots for all accounts
AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts
AccountSlots uint64 // Number of executable transaction slots guaranteed per account
GlobalSlots uint64 // Maximum number of executable transaction slots for all accounts
MaxSenders uint64 // Maximum number of senders in the senders cache
SendersChBuffer uint64 // Senders cache channel buffer size
AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts

Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
}
Expand All @@ -173,10 +176,12 @@ var DefaultTxPoolConfig = TxPoolConfig{
PriceLimit: 1,
PriceBump: 10,

AccountSlots: 1,
GlobalSlots: 9000 + 1024, // urgent + floating queue capacity with 4:1 ratio
AccountQueue: 1,
GlobalQueue: 2048,
AccountSlots: 1,
GlobalSlots: 9000 + 1024, // urgent + floating queue capacity with 4:1 ratio
MaxSenders: 100000, // 5 MB - at least 10 blocks worth of transactions in case of reorg or high production rate
SendersChBuffer: 1024, // at 500 TPS in zone, 2s buffer
AccountQueue: 1,
GlobalQueue: 2048,

Lifetime: 3 * time.Hour,
}
Expand Down Expand Up @@ -244,14 +249,16 @@ type TxPool struct {
locals *accountSet // Set of local transaction to exempt from eviction rules
journal *txJournal // Journal of local transaction to back up to disk

pending map[common.InternalAddress]*txList // All currently processable transactions
queue map[common.InternalAddress]*txList // Queued but non-processable transactions
beats map[common.InternalAddress]time.Time // Last heartbeat from each known account
all *txLookup // All transactions to allow lookups
priced *txPricedList // All transactions sorted by price

localTxsCount int // count of txs in last 1 min. Purely for logging purpose
remoteTxsCount int // count of txs in last 1 min. Purely for logging purpose
pending map[common.InternalAddress]*txList // All currently processable transactions
queue map[common.InternalAddress]*txList // Queued but non-processable transactions
beats map[common.InternalAddress]time.Time // Last heartbeat from each known account
all *txLookup // All transactions to allow lookups
priced *txPricedList // All transactions sorted by price
senders *orderedmap.OrderedMap[common.Hash, common.InternalAddress] // Tx hash to sender lookup cache (async populated)
sendersCh chan newSender // Channel for async senders cache goroutine
SendersMutex sync.RWMutex // Mutex for senders map
localTxsCount int // count of txs in last 1 min. Purely for logging purpose
remoteTxsCount int // count of txs in last 1 min. Purely for logging purpose

reOrgCounter int // keeps track of the number of times the runReorg is called, it is reset every c_reorgCounterThreshold times

Expand All @@ -269,6 +276,11 @@ type txpoolResetRequest struct {
oldHead, newHead *types.Header
}

type newSender struct {
hash common.Hash
sender common.InternalAddress
}

// NewTxPool creates a new transaction pool to gather, sort and filter inbound
// transactions from the network.
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
Expand All @@ -284,6 +296,8 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
pending: make(map[common.InternalAddress]*txList),
queue: make(map[common.InternalAddress]*txList),
beats: make(map[common.InternalAddress]time.Time),
senders: orderedmap.New[common.Hash, common.InternalAddress](),
sendersCh: make(chan newSender, config.SendersChBuffer),
all: newTxLookup(),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
reqResetCh: make(chan *txpoolResetRequest),
Expand Down Expand Up @@ -324,7 +338,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
pool.wg.Add(1)
go pool.loop()

go pool.sendersGoroutine()
return pool
}

Expand Down Expand Up @@ -617,19 +631,33 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
if tx.GasFeeCapIntCmp(tx.GasTipCap()) < 0 {
return ErrTipAboveFeeCap
}
// Make sure the transaction is signed properly.
from, err := types.Sender(pool.signer, tx)
if err != nil {
return ErrInvalidSender
var internal common.InternalAddress
addToCache := true
if sender := tx.From(); sender != nil { // Check tx cache first
var err error
internal, err = sender.InternalAddress()
if err != nil {
return err
}
} else if sender, found := pool.GetSender(tx.Hash()); found {
internal = sender
addToCache = false
} else {
// Make sure the transaction is signed properly.
from, err := types.Sender(pool.signer, tx)
if err != nil {
return ErrInvalidSender
}
internal, err = from.InternalAddress()
if err != nil {
return err
}
}

// Drop non-local transactions under our own minimal accepted gas price or tip
if !local && tx.GasTipCapIntCmp(pool.gasPrice) < 0 {
return ErrUnderpriced
}
internal, err := from.InternalAddress()
if err != nil {
return err
}
// Ensure the transaction adheres to nonce ordering
if pool.currentState.GetNonce(internal) > tx.Nonce() {
return ErrNonceTooLow
Expand All @@ -648,6 +676,17 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
log.Warn("tx has insufficient gas", "gas supplied", tx.Gas(), "gas needed", intrGas, "tx", tx)
return ErrIntrinsicGas
}
if len(pool.sendersCh) == int(pool.config.SendersChBuffer) {
log.Error("sendersCh is full, skipping until there is room")
}
if addToCache {
select {
case pool.sendersCh <- newSender{tx.Hash(), internal}: // Non-blocking
default:
log.Error("sendersCh is full, skipping until there is room")
}
}

return nil
}

Expand Down Expand Up @@ -910,12 +949,31 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
// Exclude transactions with invalid signatures as soon as
// possible and cache senders in transactions before
// obtaining lock
_, err := types.Sender(pool.signer, tx)
if err != nil {
errs[i] = ErrInvalidSender
invalidTxMeter.Mark(1)
continue
if sender := tx.From(); sender != nil {
var err error
_, err = sender.InternalAddress()
if err != nil {
errs[i] = err
invalidTxMeter.Mark(1)
continue
}
} else if _, found := pool.GetSender(tx.Hash()); found {
// if the sender is cached in the tx or in the pool cache, we don't need to add it into the cache
} else {
from, err := types.Sender(pool.signer, tx)
if err != nil {
errs[i] = ErrInvalidSender
invalidTxMeter.Mark(1)
continue
}
_, err = from.InternalAddress()
if err != nil {
errs[i] = ErrInvalidSender
invalidTxMeter.Mark(1)
continue
}
}

// Accumulate all unknown transactions for deeper processing
news = append(news, tx)
}
Expand Down Expand Up @@ -1631,6 +1689,48 @@ func (pool *TxPool) demoteUnexecutables() {
}
}

// GetSender returns the sender of a stored transaction.
func (pool *TxPool) GetSender(hash common.Hash) (common.InternalAddress, bool) {
pool.SendersMutex.RLock()
defer pool.SendersMutex.RUnlock()
return pool.senders.Get(hash)
}

// GetSenderThreadUnsafe returns the sender of a stored transaction.
// It is not thread safe and should only be used when the pool senders mutex is locked.
func (pool *TxPool) GetSenderThreadUnsafe(hash common.Hash) (common.InternalAddress, bool) {
return pool.senders.Get(hash)
}

// SetSender caches the sender of a transaction.
func (pool *TxPool) SetSender(hash common.Hash, address common.InternalAddress) {
pool.SendersMutex.Lock()
defer pool.SendersMutex.Unlock()
pool.senders.Set(hash, address)
}

// sendersGoroutine asynchronously adds a new sender to the cache
func (pool *TxPool) sendersGoroutine() {
for {
select {
case <-pool.reorgShutdownCh:
return
case tx := <-pool.sendersCh:
// Add transaction to sender cache
pool.SendersMutex.Lock() // We could RLock here but it's unlikely to just be a read
if _, ok := pool.senders.Get(tx.hash); !ok {
pool.senders.Set(tx.hash, tx.sender)
if pool.senders.Len() > int(pool.config.MaxSenders) {
pool.senders.Delete(pool.senders.Oldest().Key) // FIFO
}
} else {
log.Debug("Tx already seen in sender cache (reorg?)", "tx", tx.hash.String(), "sender", tx.sender.String())
}
pool.SendersMutex.Unlock()
}
}
}

// addressByHeartbeat is an account address tagged with its last activity timestamp.
type addressByHeartbeat struct {
address common.InternalAddress
Expand Down
51 changes: 51 additions & 0 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,16 @@ func (tx *Transaction) IsInternalToExternalTx() (inner *InternalToExternalTx, ok
return
}

func (tx *Transaction) From() *common.Address {
sc := tx.from.Load()
if sc != nil {
sigCache := sc.(sigCache)
return &sigCache.from
} else {
return nil
}
}

// To returns the recipient address of the transaction.
// For contract-creation transactions, To returns nil.
func (tx *Transaction) To() *common.Address {
Expand Down Expand Up @@ -690,6 +700,47 @@ func (tx *Transaction) AsMessage(s Signer, baseFee *big.Int) (Message, error) {
return msg, err
}

// AsMessageWithSender returns the transaction as a core.Message.
func (tx *Transaction) AsMessageWithSender(s Signer, baseFee *big.Int, sender *common.InternalAddress) (Message, error) {
msg := Message{
nonce: tx.Nonce(),
gasLimit: tx.Gas(),
gasPrice: new(big.Int).Set(tx.GasPrice()),
gasFeeCap: new(big.Int).Set(tx.GasFeeCap()),
gasTipCap: new(big.Int).Set(tx.GasTipCap()),
to: tx.To(),
amount: tx.Value(),
data: tx.Data(),
accessList: tx.AccessList(),
checkNonce: true,
txtype: tx.Type(),
}
// If baseFee provided, set gasPrice to effectiveGasPrice.
if baseFee != nil {
msg.gasPrice = math.BigMin(msg.gasPrice.Add(msg.gasTipCap, baseFee), msg.gasFeeCap)
}
var err error
if tx.Type() == ExternalTxType {
msg.from = common.ZeroAddr
msg.etxsender, err = Sender(s, tx)
msg.checkNonce = false
} else {
if sender != nil {
msg.from = common.NewAddressFromData(sender)
} else {
msg.from, err = Sender(s, tx)
}
}
if internalToExternalTx, ok := tx.IsInternalToExternalTx(); ok {
msg.etxGasLimit = internalToExternalTx.ETXGasLimit
msg.etxGasPrice = internalToExternalTx.ETXGasPrice
msg.etxGasTip = internalToExternalTx.ETXGasTip
msg.etxData = internalToExternalTx.ETXData
msg.etxAccessList = internalToExternalTx.ETXAccessList
}
return msg, err
}

func (m Message) From() common.Address { return m.from }
func (m Message) To() *common.Address { return m.to }
func (m Message) GasPrice() *big.Int { return m.gasPrice }
Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ require (

require (
github.com/DataDog/zstd v1.5.2 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/cockroachdb/errors v1.9.1 // indirect
github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect
github.com/cockroachdb/redact v1.1.3 // indirect
Expand All @@ -69,6 +71,7 @@ require (
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-ieproxy v0.0.0-20190702010315-6dee0af9227d // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
Expand All @@ -83,6 +86,7 @@ require (
github.com/rogpeppe/go-internal v1.9.0 // indirect
github.com/tklauser/go-sysconf v0.3.5 // indirect
github.com/tklauser/numcpus v0.2.2 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
golang.org/x/exp v0.0.0-20230206171751-46f607a40771 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect
Expand Down
Loading

0 comments on commit e013f7c

Please sign in to comment.