Skip to content

Commit

Permalink
Use mempool from optimint repository
Browse files Browse the repository at this point in the history
  • Loading branch information
tzdybal committed Apr 16, 2021
1 parent 19ab38d commit fc3b448
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 37 deletions.
46 changes: 23 additions & 23 deletions mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx
// (eg. after committing a block, txs are removed from mempool but not cache),
// so we only record the sender for txs still in the mempool.
if e, ok := mem.txsMap.Load(TxKey(tx)); ok {
memTx := e.(*clist.CElement).Value.(*mempoolTx)
memTx := e.(*clist.CElement).Value.(*MempoolTx)
memTx.senders.LoadOrStore(txInfo.SenderID, true)
// TODO: consider punishing peer for dups,
// its non-trivial since invalid txs can become valid,
Expand Down Expand Up @@ -357,11 +357,11 @@ func (mem *CListMempool) reqResCb(

// Called from:
// - resCbFirstTime (lock not held) if tx is valid
func (mem *CListMempool) addTx(memTx *mempoolTx) {
func (mem *CListMempool) addTx(memTx *MempoolTx) {
e := mem.txs.PushBack(memTx)
mem.txsMap.Store(TxKey(memTx.tx), e)
atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx)))
mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx)))
mem.txsMap.Store(TxKey(memTx.Tx), e)
atomic.AddInt64(&mem.txsBytes, int64(len(memTx.Tx)))
mem.metrics.TxSizeBytes.Observe(float64(len(memTx.Tx)))
}

// Called from:
Expand All @@ -381,9 +381,9 @@ func (mem *CListMempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromC
// RemoveTxByKey removes a transaction from the mempool by its TxKey index.
func (mem *CListMempool) RemoveTxByKey(txKey [TxKeySize]byte, removeFromCache bool) {
if e, ok := mem.txsMap.Load(txKey); ok {
memTx := e.(*clist.CElement).Value.(*mempoolTx)
memTx := e.(*clist.CElement).Value.(*MempoolTx)
if memTx != nil {
mem.removeTx(memTx.tx, e.(*clist.CElement), removeFromCache)
mem.removeTx(memTx.Tx, e.(*clist.CElement), removeFromCache)
}
}
}
Expand Down Expand Up @@ -430,10 +430,10 @@ func (mem *CListMempool) resCbFirstTime(
return
}

memTx := &mempoolTx{
memTx := &MempoolTx{
height: mem.height,
gasWanted: r.CheckTx.GasWanted,
tx: tx,
Tx: tx,
}
memTx.senders.Store(peerID, true)
mem.addTx(memTx)
Expand Down Expand Up @@ -465,11 +465,11 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {
switch r := res.Value.(type) {
case *abci.Response_CheckTx:
tx := req.GetCheckTx().Tx
memTx := mem.recheckCursor.Value.(*mempoolTx)
if !bytes.Equal(tx, memTx.tx) {
memTx := mem.recheckCursor.Value.(*MempoolTx)
if !bytes.Equal(tx, memTx.Tx) {
panic(fmt.Sprintf(
"Unexpected tx response from proxy during recheck\nExpected %X, got %X",
memTx.tx,
memTx.Tx,
tx))
}
var postCheckErr error
Expand Down Expand Up @@ -534,9 +534,9 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
// txs := make([]types.Tx, 0, tmmath.MinInt(mem.txs.Len(), max/mem.avgTxSize))
txs := make([]types.Tx, 0, mem.txs.Len())
for e := mem.txs.Front(); e != nil; e = e.Next() {
memTx := e.Value.(*mempoolTx)
memTx := e.Value.(*MempoolTx)

dataSize := types.ComputeProtoSizeForTxs(append(txs, memTx.tx))
dataSize := types.ComputeProtoSizeForTxs(append(txs, memTx.Tx))

// Check total size requirement
if maxBytes > -1 && dataSize > maxBytes {
Expand All @@ -551,7 +551,7 @@ func (mem *CListMempool) ReapMaxBytesMaxGas(maxBytes, maxGas int64) types.Txs {
return txs
}
totalGas = newTotalGas
txs = append(txs, memTx.tx)
txs = append(txs, memTx.Tx)
}
return txs
}
Expand All @@ -567,8 +567,8 @@ func (mem *CListMempool) ReapMaxTxs(max int) types.Txs {

txs := make([]types.Tx, 0, tmmath.MinInt(mem.txs.Len(), max))
for e := mem.txs.Front(); e != nil && len(txs) <= max; e = e.Next() {
memTx := e.Value.(*mempoolTx)
txs = append(txs, memTx.tx)
memTx := e.Value.(*MempoolTx)
txs = append(txs, memTx.Tx)
}
return txs
}
Expand Down Expand Up @@ -649,9 +649,9 @@ func (mem *CListMempool) recheckTxs() {
// Push txs to proxyAppConn
// NOTE: globalCb may be called concurrently.
for e := mem.txs.Front(); e != nil; e = e.Next() {
memTx := e.Value.(*mempoolTx)
memTx := e.Value.(*MempoolTx)
_, err := mem.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{
Tx: memTx.tx,
Tx: memTx.Tx,
Type: abci.CheckTxType_Recheck,
})
if err != nil {
Expand All @@ -668,19 +668,19 @@ func (mem *CListMempool) recheckTxs() {

//--------------------------------------------------------------------------------

// mempoolTx is a transaction that successfully ran
type mempoolTx struct {
// MempoolTx is a transaction that successfully ran
type MempoolTx struct {
height int64 // height that this tx had been validated in
gasWanted int64 // amount of gas this tx states it will require
tx types.Tx //
Tx types.Tx //

// ids of peers who've sent us this tx (as a map for quick lookups).
// senders: PeerID -> bool
senders sync.Map
}

// Height returns the height for this transaction
func (memTx *mempoolTx) Height() int64 {
func (memTx *MempoolTx) Height() int64 {
return atomic.LoadInt64(&memTx.height)
}

Expand Down
6 changes: 3 additions & 3 deletions mempool/clist_mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ func TestReapMaxBytesMaxGas(t *testing.T) {

// Ensure gas calculation behaves as expected
checkTxs(t, mempool, 1, UnknownPeerID)
tx0 := mempool.TxsFront().Value.(*mempoolTx)
tx0 := mempool.TxsFront().Value.(*MempoolTx)
// assert that kv store has gas wanted = 1.
require.Equal(t, app.CheckTx(abci.RequestCheckTx{Tx: tx0.tx}).GasWanted, int64(1), "KVStore had a gas value neq to 1")
require.Equal(t, app.CheckTx(abci.RequestCheckTx{Tx: tx0.Tx}).GasWanted, int64(1), "KVStore had a gas value neq to 1")
require.Equal(t, tx0.gasWanted, int64(1), "transactions gas was set incorrectly")
// ensure each tx is 20 bytes long
require.Equal(t, len(tx0.tx), 20, "Tx is longer than 20 bytes")
require.Equal(t, len(tx0.Tx), 20, "Tx is longer than 20 bytes")
mempool.Flush()

// each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs.
Expand Down
8 changes: 4 additions & 4 deletions mempool/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
}

// Allow for a lag of 1 block.
memTx := next.Value.(*mempoolTx)
memTx := next.Value.(*MempoolTx)
if peerState.GetHeight() < memTx.Height()-1 {
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
continue
Expand Down Expand Up @@ -274,20 +274,20 @@ func (memR *Reactor) txs(next *clist.CElement, peerID uint16, peerHeight int64)
batch := make([][]byte, 0)

for {
memTx := next.Value.(*mempoolTx)
memTx := next.Value.(*MempoolTx)

if _, ok := memTx.senders.Load(peerID); !ok {
// If current batch + this tx size is greater than max => return.
batchMsg := protomem.Message{
Sum: &protomem.Message_Txs{
Txs: &protomem.Txs{Txs: append(batch, memTx.tx)},
Txs: &protomem.Txs{Txs: append(batch, memTx.Tx)},
},
}
if batchMsg.Size() > memR.config.MaxBatchBytes {
return batch
}

batch = append(batch, memTx.tx)
batch = append(batch, memTx.Tx)
}

n := next.Next()
Expand Down
9 changes: 3 additions & 6 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,19 @@ package node
import (
"context"
"fmt"
"reflect"

abci "github.com/lazyledger/lazyledger-core/abci/types"
llcfg "github.com/lazyledger/lazyledger-core/config"
"github.com/lazyledger/lazyledger-core/libs/clist"
"github.com/lazyledger/lazyledger-core/libs/log"
"github.com/lazyledger/lazyledger-core/libs/service"
"github.com/lazyledger/lazyledger-core/mempool"
corep2p "github.com/lazyledger/lazyledger-core/p2p"
"github.com/lazyledger/lazyledger-core/proxy"
"github.com/lazyledger/lazyledger-core/types"
"github.com/libp2p/go-libp2p-core/crypto"

"github.com/lazyledger/optimint/config"
"github.com/lazyledger/optimint/mempool"
"github.com/lazyledger/optimint/p2p"
)

Expand Down Expand Up @@ -114,10 +113,8 @@ func (n *Node) mempoolPublishLoop(ctx context.Context) {
// send transactions
for {
n.Logger.Debug("Gossiping...")
// TODO(tzdybal): refactor lazyledger-core to avoid reflection
v := reflect.Indirect(reflect.ValueOf(next.Value))
f := v.FieldByName("tx")
tx := f.Bytes()
memTx := next.Value.(*mempool.MempoolTx)
tx := memTx.Tx

err := n.P2P.GossipTx(ctx, tx)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion rpcclient/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ import (
tmbytes "github.com/lazyledger/lazyledger-core/libs/bytes"
tmpubsub "github.com/lazyledger/lazyledger-core/libs/pubsub"
tmquery "github.com/lazyledger/lazyledger-core/libs/pubsub/query"
"github.com/lazyledger/lazyledger-core/mempool"
"github.com/lazyledger/lazyledger-core/proxy"
rpcclient "github.com/lazyledger/lazyledger-core/rpc/client"
ctypes "github.com/lazyledger/lazyledger-core/rpc/core/types"
rpctypes "github.com/lazyledger/lazyledger-core/rpc/jsonrpc/types"
"github.com/lazyledger/lazyledger-core/types"

"github.com/lazyledger/optimint/mempool"
"github.com/lazyledger/optimint/node"
)

Expand Down

0 comments on commit fc3b448

Please sign in to comment.