Skip to content

Commit

Permalink
Mempool sorting and accept reverted pending transactions (Chia-Networ…
Browse files Browse the repository at this point in the history
…k#3683)

* Sort by fee/cost, and fix pending tx issue in reorgs

* Fix test name

* Bring the seen list size back to normal.
  • Loading branch information
mariano54 authored May 4, 2021
1 parent c72d065 commit 88310d6
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 10 deletions.
2 changes: 1 addition & 1 deletion chia/full_node/full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -1545,7 +1545,7 @@ async def respond_transaction(
if self.mempool_manager.seen(spend_name):
return MempoolInclusionStatus.FAILED, Err.ALREADY_INCLUDING_TRANSACTION
self.mempool_manager.add_and_maybe_pop_seen(spend_name)
self.log.debug(f"Processingetransaction: {spend_name}")
self.log.debug(f"Processing transaction: {spend_name}")
# Ignore if syncing
if self.sync_store.get_sync_mode():
status = MempoolInclusionStatus.FAILED
Expand Down
19 changes: 13 additions & 6 deletions chia/full_node/mempool_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ async def create_bundle_from_mempool(
additions = []
broke_from_inner_loop = False
log.info(f"Starting to make block, max cost: {self.constants.MAX_BLOCK_COST_CLVM}")
for dic in self.mempool.sorted_spends.values():
for dic in reversed(self.mempool.sorted_spends.values()):
if broke_from_inner_loop:
break
for item in dic.values():
log.info(f"Cumulative cost: {cost_sum}")
log.info(f"Cumulative cost: {cost_sum}, fee per cost: {item.fee / item.cost}")
if (
item.cost + cost_sum <= self.limit_factor * self.constants.MAX_BLOCK_COST_CLVM
and item.fee + fee_sum <= self.constants.MAX_COIN_AMOUNT
Expand Down Expand Up @@ -157,7 +157,7 @@ def add_and_maybe_pop_seen(self, spend_name: bytes32):
self.seen_bundle_hashes.pop(first_in)

def seen(self, bundle_hash: bytes32) -> bool:
"""Return true if we saw this spendbundle before"""
"""Return true if we saw this spendbundle recently"""
return bundle_hash in self.seen_bundle_hashes

def remove_seen(self, bundle_hash: bytes32):
Expand Down Expand Up @@ -229,8 +229,8 @@ async def add_spendbundle(
program: Optional[SerializedProgram] = None,
) -> Tuple[Optional[uint64], MempoolInclusionStatus, Optional[Err]]:
"""
Tries to add spendbundle to either self.mempools or to_pool if it's specified.
Returns true if it's added in any of pools, Returns error if it fails.
Tries to add spend bundle to the mempool
Returns the cost (if SUCCESS), the result (MempoolInclusion status), and an optional error
"""
start_time = time.time()
if self.peak is None:
Expand Down Expand Up @@ -505,7 +505,14 @@ async def new_peak(self, new_peak: Optional[BlockRecord]) -> List[Tuple[SpendBun
self.mempool = Mempool(self.mempool_max_total_cost)

for item in old_pool.spends.values():
await self.add_spendbundle(item.spend_bundle, item.npc_result, item.spend_bundle_name, False, item.program)
_, result, _ = await self.add_spendbundle(
item.spend_bundle, item.npc_result, item.spend_bundle_name, False, item.program
)
# If the spend bundle was confirmed or conflicting (can no longer be in mempool), it won't be successfully
# added to the new mempool. In this case, remove it from seen, so in the case of a reorg, it can be
# resubmitted
if result != MempoolInclusionStatus.SUCCESS:
self.remove_seen(item.spend_bundle_name)

potential_txs_copy = self.potential_txs.copy()
self.potential_txs = {}
Expand Down
43 changes: 40 additions & 3 deletions tests/core/full_node/test_full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from chia.types.condition_opcodes import ConditionOpcode
from chia.types.condition_with_args import ConditionWithArgs
from chia.types.full_block import FullBlock
from chia.types.mempool_inclusion_status import MempoolInclusionStatus
from chia.types.peer_info import PeerInfo, TimestampedPeerInfo
from chia.types.spend_bundle import SpendBundle
from chia.types.unfinished_block import UnfinishedBlock
Expand Down Expand Up @@ -388,8 +389,11 @@ async def do_test_block_compression(self, setup_two_nodes_and_wallet, empty_bloc

@pytest.mark.asyncio
async def test_block_compression(self, setup_two_nodes_and_wallet, empty_blockchain):
self.do_test_block_compression(setup_two_nodes_and_wallet, empty_blockchain, 10000, True)
self.do_test_block_compression(setup_two_nodes_and_wallet, empty_blockchain, 3000000000000, False)
await self.do_test_block_compression(setup_two_nodes_and_wallet, empty_blockchain, 10000, True)

@pytest.mark.asyncio
async def test_block_compression_2(self, setup_two_nodes_and_wallet, empty_blockchain):
await self.do_test_block_compression(setup_two_nodes_and_wallet, empty_blockchain, 3000000000000, False)


class TestFullNodeProtocol:
Expand Down Expand Up @@ -792,6 +796,7 @@ async def test_new_transaction_and_mempool(self, wallet_nodes):
included_tx = 0
not_included_tx = 0
seen_bigger_transaction_has_high_fee = False
successful_bundle: Optional[SpendBundle] = None

# Fill mempool
for puzzle_hash in puzzle_hashes[1:]:
Expand All @@ -807,7 +812,6 @@ async def test_new_transaction_and_mempool(self, wallet_nodes):
uint64(500), receiver_puzzlehash, coin_record.coin, fee=fee
)
respond_transaction = fnp.RespondTransaction(spend_bundle)
cost_result = await full_node_1.full_node.mempool_manager.pre_validate_spendbundle(spend_bundle)

await full_node_1.respond_transaction(respond_transaction, peer)

Expand All @@ -828,6 +832,8 @@ async def test_new_transaction_and_mempool(self, wallet_nodes):
spend_bundles.append(spend_bundle)
assert not full_node_1.full_node.mempool_manager.mempool.at_full_capacity(0)
assert full_node_1.full_node.mempool_manager.mempool.get_min_fee_rate(0) == 0
if force_high_fee:
successful_bundle = spend_bundle
else:
assert full_node_1.full_node.mempool_manager.mempool.at_full_capacity(10000000)
assert full_node_1.full_node.mempool_manager.mempool.get_min_fee_rate(10000000) > 0
Expand All @@ -845,15 +851,46 @@ async def test_new_transaction_and_mempool(self, wallet_nodes):

await time_out_assert(10, new_transaction_not_requested, True, incoming_queue, new_transaction)

# Cannot resubmit transaction
status, err = await full_node_1.full_node.respond_transaction(
successful_bundle, successful_bundle.name(), peer, test=True
)
assert status == MempoolInclusionStatus.FAILED
assert err == Err.ALREADY_INCLUDING_TRANSACTION

# Farm one block to clear mempool
await full_node_1.farm_new_transaction_block(FarmNewBlockProtocol(receiver_puzzlehash))

# No longer full
new_transaction = fnp.NewTransaction(token_bytes(32), uint64(1000000), uint64(1))
await full_node_1.new_transaction(new_transaction, fake_peer)

# Cannot resubmit transaction, but not because of ALREADY_INCLUDING
status, err = await full_node_1.full_node.respond_transaction(
successful_bundle, successful_bundle.name(), peer, test=True
)
assert status == MempoolInclusionStatus.FAILED
assert err != Err.ALREADY_INCLUDING_TRANSACTION

await time_out_assert(10, new_transaction_requested, True, incoming_queue, new_transaction)

# Reorg the blockchain
blocks = await full_node_1.get_all_full_blocks()
blocks = bt.get_consecutive_blocks(
3,
block_list_input=blocks[:-1],
guarantee_transaction_block=True,
)
for block in blocks[-3:]:
await full_node_1.full_node.respond_block(fnp.RespondBlock(block), peer)

# Can now resubmit a transaction after the reorg
status, err = await full_node_1.full_node.respond_transaction(
successful_bundle, successful_bundle.name(), peer, test=True
)
assert err is None
assert status == MempoolInclusionStatus.SUCCESS

@pytest.mark.asyncio
async def test_request_respond_transaction(self, wallet_nodes):
full_node_1, full_node_2, server_1, server_2, wallet_a, wallet_receiver = wallet_nodes
Expand Down

0 comments on commit 88310d6

Please sign in to comment.