Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

Commit

Permalink
Private Transaction API Sample (v1.10.13)
Browse files Browse the repository at this point in the history
  • Loading branch information
xcarlo authored and avalonche committed Mar 22, 2023
1 parent e1f11ed commit 6da1599
Show file tree
Hide file tree
Showing 13 changed files with 162 additions and 24 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ var (
utils.TxPoolAccountQueueFlag,
utils.TxPoolGlobalQueueFlag,
utils.TxPoolLifetimeFlag,
utils.TxPoolPrivateLifetimeFlag,
utils.SyncModeFlag,
utils.SyncTargetFlag,
utils.ExitWhenSyncedFlag,
Expand Down
9 changes: 9 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,12 @@ var (
Category: flags.TxPoolCategory,
}

TxPoolPrivateLifetimeFlag = &cli.DurationFlag{
Name: "txpool.privatelifetime",
Usage: "Maximum amount of time private transactions are withheld from public broadcasting",
Value: ethconfig.Defaults.TxPool.PrivateTxLifetime,
Category: flags.TxPoolCategory,
}
// Performance tuning settings
CacheFlag = &cli.IntFlag{
Name: "cache",
Expand Down Expand Up @@ -1669,6 +1675,9 @@ func setTxPool(ctx *cli.Context, cfg *txpool.Config) {
if ctx.IsSet(TxPoolLifetimeFlag.Name) {
cfg.Lifetime = ctx.Duration(TxPoolLifetimeFlag.Name)
}
if ctx.IsSet(TxPoolPrivateLifetimeFlag.Name) {
cfg.PrivateTxLifetime = ctx.Duration(TxPoolPrivateLifetimeFlag.Name)
}

addresses := strings.Split(ctx.String(MinerTrustedRelaysFlag.Name), ",")
for _, address := range addresses {
Expand Down
111 changes: 99 additions & 12 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ var (
)

var (
evictionInterval = time.Minute // Time interval to check for evictable transactions
statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats
evictionInterval = time.Minute // Time interval to check for evictable transactions
statsReportInterval = 8 * time.Second // Time interval to report transaction pool stats
privateTxCleanupInterval = 1 * time.Hour
)

var (
Expand Down Expand Up @@ -174,7 +175,8 @@ type Config struct {
AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account
GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts

Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
Lifetime time.Duration // Maximum amount of time non-executable transaction are queued
PrivateTxLifetime time.Duration // Maximum amount of time to keep private transactions private

TrustedRelays []common.Address // Trusted relay addresses. Duplicated from the miner config.
}
Expand All @@ -193,7 +195,8 @@ var DefaultConfig = Config{
AccountQueue: 64,
GlobalQueue: 1024,

Lifetime: 3 * time.Hour,
Lifetime: 3 * time.Hour,
PrivateTxLifetime: 3 * 24 * time.Hour,
}

// sanitize checks the provided user configurations and changes anything that's
Expand Down Expand Up @@ -232,6 +235,10 @@ func (config *Config) sanitize() Config {
log.Warn("Sanitizing invalid txpool lifetime", "provided", conf.Lifetime, "updated", DefaultConfig.Lifetime)
conf.Lifetime = DefaultConfig.Lifetime
}
if conf.PrivateTxLifetime < 1 {
log.Warn("Sanitizing invalid txpool private tx lifetime", "provided", conf.PrivateTxLifetime, "updated", DefaultTxPoolConfig.PrivateTxLifetime)
conf.PrivateTxLifetime = DefaultTxPoolConfig.PrivateTxLifetime
}
return conf
}

Expand Down Expand Up @@ -353,9 +360,10 @@ func (pool *TxPool) loop() {
var (
prevPending, prevQueued, prevStales int
// Start the stats reporting and transaction eviction tickers
report = time.NewTicker(statsReportInterval)
evict = time.NewTicker(evictionInterval)
journal = time.NewTicker(pool.config.Rejournal)
report = time.NewTicker(statsReportInterval)
evict = time.NewTicker(evictionInterval)
journal = time.NewTicker(pool.config.Rejournal)
privateTx = time.NewTicker(privateTxCleanupInterval)
// Track the previous head headers for transaction reorgs
head = pool.chain.CurrentBlock()
)
Expand Down Expand Up @@ -419,6 +427,10 @@ func (pool *TxPool) loop() {
}
pool.mu.Unlock()
}

// Remove stale hashes that must be kept private
case <-privateTx.C:
pool.privateTxs.prune()
}
}
}
Expand Down Expand Up @@ -539,6 +551,11 @@ func (pool *TxPool) ContentFrom(addr common.Address) (types.Transactions, types.
return pending, queued
}

// IsPrivateTxHash indicates whether the transaction should be shared with peers
func (pool *TxPool) IsPrivateTxHash(hash common.Hash) bool {
return pool.privateTxs.Contains(hash)
}

// Pending retrieves all currently processable transactions, grouped by origin
// account and sorted by nonce. The returned transaction set is a copy and can be
// freely modified by calling code.
Expand Down Expand Up @@ -1024,7 +1041,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
// This method is used to add transactions from the RPC API and performs synchronous pool
// reorganization and event propagation.
func (pool *TxPool) AddLocals(txs []*types.Transaction) []error {
return pool.addTxs(txs, !pool.config.NoLocals, true)
return pool.addTxs(txs, !pool.config.NoLocals, true, false)
}

// AddLocal enqueues a single local transaction into the pool if it is valid. This is
Expand All @@ -1040,12 +1057,18 @@ func (pool *TxPool) AddLocal(tx *types.Transaction) error {
// This method is used to add transactions from the p2p network and does not wait for pool
// reorganization and internal event propagation.
func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error {
return pool.addTxs(txs, false, false)
return pool.addTxs(txs, false, false, false)
}

// AddPrivateRemote adds transactions to the pool, but does not broadcast these transactions to any peers.
func (pool *TxPool) AddPrivateRemote(tx *types.Transaction) error {
errs := pool.addTxs([]*types.Transaction{tx}, false, false, true)
return errs[0]
}

// AddRemotesSync is like AddRemotes, but waits for pool reorganization. Tests use this method.
func (pool *TxPool) AddRemotesSync(txs []*types.Transaction) []error {
return pool.addTxs(txs, false, true)
return pool.addTxs(txs, false, true, false)
}

// This is like AddRemotes with a single transaction, but waits for pool reorganization. Tests use this method.
Expand All @@ -1064,7 +1087,7 @@ func (pool *TxPool) AddRemote(tx *types.Transaction) error {
}

// addTxs attempts to queue a batch of transactions if they are valid.
func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync, private bool) []error {
// Filter out known ones without obtaining the pool lock or recovering signatures
var (
errs = make([]error, len(txs))
Expand Down Expand Up @@ -1093,6 +1116,13 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error {
return errs
}

// Track private transactions, so they don't get leaked to the public mempool
if private {
for _, tx := range news {
pool.privateTxs.Add(tx.Hash())
}
}

// Process all the new transaction and merge any errors into the original slice
pool.mu.Lock()
newErrs, dirtyAddrs := pool.addTxsLocked(news, local)
Expand Down Expand Up @@ -1389,7 +1419,11 @@ func (pool *TxPool) runReorg(done chan struct{}, reset *txpoolResetRequest, dirt
if len(events) > 0 {
var txs []*types.Transaction
for _, set := range events {
txs = append(txs, set.Flatten()...)
for _, tx := range set.Flatten() {
if !pool.IsPrivateTxHash(tx.Hash()) {
txs = append(txs, tx)
}
}
}
pool.txFeed.Send(core.NewTxsEvent{Txs: txs})
}
Expand Down Expand Up @@ -1998,6 +2032,59 @@ func (t *lookup) RemotesBelowTip(threshold *big.Int) types.Transactions {
return found
}

type timestampedTxHashSet struct {
lock sync.RWMutex
hashes []common.Hash
timestamps map[common.Hash]time.Time
ttl time.Duration
}

func newExpiringTxHashSet(ttl time.Duration) *timestampedTxHashSet {
s := &timestampedTxHashSet{
hashes: make([]common.Hash, 0),
timestamps: make(map[common.Hash]time.Time),
ttl: ttl,
}

return s
}

func (s *timestampedTxHashSet) Add(hash common.Hash) {
s.lock.Lock()
defer s.lock.Unlock()

s.hashes = append(s.hashes, hash)
s.timestamps[hash] = time.Now().Add(s.ttl)
}

func (s *timestampedTxHashSet) Contains(hash common.Hash) bool {
s.lock.RLock()
defer s.lock.RUnlock()
_, ok := s.timestamps[hash]
return ok
}

func (s *timestampedTxHashSet) prune() {
s.lock.Lock()
defer s.lock.Unlock()

var (
count int
now = time.Now()
)
for _, hash := range s.hashes {
ts := s.timestamps[hash]
if ts.After(now) {
break
}

delete(s.timestamps, hash)
count += 1
}

s.hashes = s.hashes[count:]
}

// numSlots calculates the number of slots needed for a single transaction.
func numSlots(tx *types.Transaction) int {
return int((tx.Size() + txSlotSize - 1) / txSlotSize)
Expand Down
8 changes: 6 additions & 2 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,12 @@ func (b *EthAPIBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscri
return b.eth.BlockChain().SubscribeLogsEvent(ch)
}

func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error {
return b.eth.txPool.AddLocal(signedTx)
func (b *EthAPIBackend) SendTx(ctx context.Context, signedTx *types.Transaction, private bool) error {
if private {
return b.eth.txPool.AddPrivateRemote(signedTx)
} else {
return b.eth.txPool.AddLocal(signedTx)
}
}

func (b *EthAPIBackend) SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error {
Expand Down
4 changes: 4 additions & 0 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ type txPool interface {
// SubscribeNewTxsEvent should return an event subscription of
// NewTxsEvent and send events to the given channel.
SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription

// IsPrivateTxHash indicates if the transaction hash should not
// be broadcast on public channels
IsPrivateTxHash(hash common.Hash) bool
}

// handlerConfig is the collection of initialization parameters to create a full
Expand Down
5 changes: 5 additions & 0 deletions eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ func (p *testTxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subs
return p.txFeed.Subscribe(ch)
}

// IsPrivateTxHash always returns false in tests
func (p *testTxPool) IsPrivateTxHash(hash common.Hash) bool {
return false
}

// testHandler is a live implementation of the Ethereum protocol handler, just
// preinitialized with some sane testing defaults and the transaction pool mocked
// out.
Expand Down
4 changes: 4 additions & 0 deletions eth/protocols/eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ type Backend interface {
type TxPool interface {
// Get retrieves the transaction from the local txpool with the given hash.
Get(hash common.Hash) *types.Transaction

// IsPrivateTxHash indicates if the transaction hash should not
// be broadcast on public channels
IsPrivateTxHash(hash common.Hash) bool
}

// MakeProtocols constructs the P2P protocol definitions for `eth`.
Expand Down
7 changes: 6 additions & 1 deletion eth/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ func (h *handler) syncTransactions(p *eth.Peer) {
var txs types.Transactions
pending := h.txpool.Pending(false)
for _, batch := range pending {
txs = append(txs, batch...)
for _, tx := range batch {
// don't share any transactions marked as private
if !h.txpool.IsPrivateTxHash(tx.Hash()) {
txs = append(txs, tx)
}
}
}
if len(txs) == 0 {
return
Expand Down
2 changes: 1 addition & 1 deletion graphql/graphql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1270,7 +1270,7 @@ func (r *Resolver) SendRawTransaction(ctx context.Context, args struct{ Data hex
if err := tx.UnmarshalBinary(args.Data); err != nil {
return common.Hash{}, err
}
hash, err := ethapi.SubmitTransaction(ctx, r.backend, tx)
hash, err := ethapi.SubmitTransaction(ctx, r.backend, tx, false)
return hash, err
}

Expand Down
25 changes: 19 additions & 6 deletions internal/ethapi/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ func (s *PersonalAccountAPI) SendTransaction(ctx context.Context, args Transacti
log.Warn("Failed transaction send attempt", "from", args.from(), "to", args.To, "value", args.Value.ToInt(), "err", err)
return common.Hash{}, err
}
return SubmitTransaction(ctx, s.b, signed)
return SubmitTransaction(ctx, s.b, signed, false)
}

// SignTransaction will create a transaction from the given arguments and
Expand Down Expand Up @@ -1684,7 +1684,7 @@ func (s *TransactionAPI) sign(addr common.Address, tx *types.Transaction) (*type
}

// SubmitTransaction is a helper function that submits tx to txPool and logs a message.
func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (common.Hash, error) {
func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction, private bool) (common.Hash, error) {
// If the transaction fee cap is already specified, ensure the
// fee of the given transaction is _reasonable_.
if err := checkTxFee(tx.GasPrice(), tx.Gas(), b.RPCTxFeeCap()); err != nil {
Expand All @@ -1694,7 +1694,7 @@ func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (c
// Ensure only eip155 signed transactions are submitted if EIP155Required is set.
return common.Hash{}, errors.New("only replay-protected (EIP-155) transactions allowed over RPC")
}
if err := b.SendTx(ctx, tx); err != nil {
if err := b.SendTx(ctx, tx, private); err != nil {
return common.Hash{}, err
}
// Print a log with full tx details for manual investigations and interventions
Expand Down Expand Up @@ -1742,7 +1742,7 @@ func (s *TransactionAPI) SendTransaction(ctx context.Context, args TransactionAr
if err != nil {
return common.Hash{}, err
}
return SubmitTransaction(ctx, s.b, signed)
return SubmitTransaction(ctx, s.b, signed, false)
}

// FillTransaction fills the defaults (nonce, gas, gasPrice or 1559 fields)
Expand All @@ -1769,7 +1769,20 @@ func (s *TransactionAPI) SendRawTransaction(ctx context.Context, input hexutil.B
if err := tx.UnmarshalBinary(input); err != nil {
return common.Hash{}, err
}
return SubmitTransaction(ctx, s.b, tx)
return SubmitTransaction(ctx, s.b, tx, false)
}

// SendPrivateRawTransaction will add the signed transaction to the transaction pool,
// without broadcasting the transaction to its peers, and mark the transaction to avoid
// future syncs.
//
// See SendRawTransaction.
func (s *TransactionAPI) SendPrivateRawTransaction(ctx context.Context, input hexutil.Bytes) (common.Hash, error) {
tx := new(types.Transaction)
if err := tx.UnmarshalBinary(input); err != nil {
return common.Hash{}, err
}
return SubmitTransaction(ctx, s.b, tx, true)
}

// Sign calculates an ECDSA signature for:
Expand Down Expand Up @@ -1902,7 +1915,7 @@ func (s *TransactionAPI) Resend(ctx context.Context, sendArgs TransactionArgs, g
if err != nil {
return common.Hash{}, err
}
if err = s.b.SendTx(ctx, signedTx); err != nil {
if err = s.b.SendTx(ctx, signedTx, false); err != nil {
return common.Hash{}, err
}
return signedTx.Hash(), nil
Expand Down
2 changes: 1 addition & 1 deletion internal/ethapi/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type Backend interface {
SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription

// Transaction pool API
SendTx(ctx context.Context, signedTx *types.Transaction) error
SendTx(ctx context.Context, signedTx *types.Transaction, private bool) error
SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error
SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error
GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error)
Expand Down
6 changes: 6 additions & 0 deletions internal/web3ext/web3ext.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,12 @@ web3._extend({
params: 1,
inputFormatter: [web3._extend.formatters.inputTransactionFormatter]
}),
new web3._extend.Method({
name: 'sendPrivateRawTransaction',
call: 'eth_sendPrivateRawTransaction',
params: 1,
inputFormatter: [null]
}),
new web3._extend.Method({
name: 'fillTransaction',
call: 'eth_fillTransaction',
Expand Down
Loading

0 comments on commit 6da1599

Please sign in to comment.