Skip to content

Commit

Permalink
Avoid multiple requests (Chia-Network#2860)
Browse files Browse the repository at this point in the history
* avoid dupe requests

* move to store, clean tasks

* linting

* limit

* 10peers,5sec
  • Loading branch information
Yostra authored Apr 28, 2021
1 parent 47e6f0d commit 4ef3777
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 63 deletions.
11 changes: 5 additions & 6 deletions chia/full_node/full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
from chia.util.errors import ConsensusError, Err
from chia.util.ints import uint8, uint32, uint64, uint128
from chia.util.path import mkdir, path_from_root
from chia.util.safe_cancel_task import cancel_task_safe


class FullNode:
Expand Down Expand Up @@ -96,6 +97,7 @@ def __init__(
self.full_node_peers = None
self.sync_store = None
self.signage_point_times = [time.time() for _ in range(self.constants.NUM_SPS_SUB_SLOT)]
self.full_node_store = FullNodeStore(self.constants)

if name:
self.log = logging.getLogger(name)
Expand All @@ -115,7 +117,6 @@ async def _start(self):
self.connection = await aiosqlite.connect(self.db_path)
self.db_wrapper = DBWrapper(self.connection)
self.block_store = await BlockStore.create(self.db_wrapper)
self.full_node_store = await FullNodeStore.create(self.constants)
self.sync_store = await SyncStore.create()
self.coin_store = await CoinStore.create(self.db_wrapper)
self.log.info("Initializing blockchain from disk")
Expand Down Expand Up @@ -526,11 +527,9 @@ def _close(self):
self.uncompact_task.cancel()

async def _await_closed(self):
try:
if self._sync_task is not None:
self._sync_task.cancel()
except asyncio.TimeoutError:
pass
cancel_task_safe(self._sync_task, self.log)
for task_id, task in list(self.full_node_store.tx_fetch_tasks.items()):
cancel_task_safe(task, self.log)
await self.connection.close()

async def _sync(self):
Expand Down
76 changes: 71 additions & 5 deletions chia/full_node/full_node_api.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import asyncio
import dataclasses
import time
from typing import Callable, Dict, List, Optional, Tuple
from secrets import token_bytes
from typing import Callable, Dict, List, Optional, Tuple, Set

from blspy import AugSchemeMPL, G2Element
from chiabip158 import PyBIP158
Expand Down Expand Up @@ -100,8 +101,11 @@ async def new_peak(self, request: full_node_protocol.NewPeak, peer: ws.WSChiaCon
"""
return await self.full_node.new_peak(request, peer)

@peer_required
@api_request
async def new_transaction(self, transaction: full_node_protocol.NewTransaction) -> Optional[Message]:
async def new_transaction(
self, transaction: full_node_protocol.NewTransaction, peer: ws.WSChiaConnection
) -> Optional[Message]:
"""
A peer notifies us of a new transaction.
Requests a full transaction if we haven't seen it previously, and if the fees are enough.
Expand All @@ -120,9 +124,67 @@ async def new_transaction(self, transaction: full_node_protocol.NewTransaction)
return None

if self.full_node.mempool_manager.is_fee_enough(transaction.fees, transaction.cost):
request_tx = full_node_protocol.RequestTransaction(transaction.transaction_id)
msg = make_msg(ProtocolMessageTypes.request_transaction, request_tx)
return msg
# If there's current pending request just add this peer to the set of peers that have this tx
if transaction.transaction_id in self.full_node.full_node_store.pending_tx_request:
if transaction.transaction_id in self.full_node.full_node_store.peers_with_tx:
current_set = self.full_node.full_node_store.peers_with_tx[transaction.transaction_id]
if peer.peer_node_id in current_set:
return None
current_set.add(peer.peer_node_id)
return None
else:
new_set = set()
new_set.add(peer.peer_node_id)
self.full_node.full_node_store.peers_with_tx[transaction.transaction_id] = new_set
return None

self.full_node.full_node_store.pending_tx_request[transaction.transaction_id] = peer.peer_node_id
new_set = set()
new_set.add(peer.peer_node_id)
self.full_node.full_node_store.peers_with_tx[transaction.transaction_id] = new_set

async def tx_request_and_timeout(full_node: FullNode, transaction_id, task_id):
counter = 0
try:
while True:
# Limit to asking 10 peers, it's possible that this tx got included on chain already
# Highly unlikely 10 peers that advertised a tx don't respond to a request
if counter == 10:
break
if transaction_id not in full_node.full_node_store.peers_with_tx:
break
peers_with_tx: Set = full_node.full_node_store.peers_with_tx[transaction_id]
if len(peers_with_tx) == 0:
break
peer_id = peers_with_tx.pop()
assert full_node.server is not None
if peer_id not in full_node.server.all_connections:
continue
peer = full_node.server.all_connections[peer_id]
request_tx = full_node_protocol.RequestTransaction(transaction.transaction_id)
msg = make_msg(ProtocolMessageTypes.request_transaction, request_tx)
await peer.send_message(msg)
await asyncio.sleep(5)
counter += 1
if full_node.mempool_manager.seen(transaction_id):
break
except asyncio.CancelledError:
pass
finally:
# Always Cleanup
if transaction_id in full_node.full_node_store.peers_with_tx:
full_node.full_node_store.peers_with_tx.pop(transaction_id)
if transaction_id in full_node.full_node_store.pending_tx_request:
full_node.full_node_store.pending_tx_request.pop(transaction_id)
if task_id in full_node.full_node_store.tx_fetch_tasks:
full_node.full_node_store.tx_fetch_tasks.pop(task_id)

task_id = token_bytes()
fetch_task = asyncio.create_task(
tx_request_and_timeout(self.full_node, transaction.transaction_id, task_id)
)
self.full_node.full_node_store.tx_fetch_tasks[task_id] = fetch_task
return None
return None

@api_request
Expand Down Expand Up @@ -156,6 +218,10 @@ async def respond_transaction(
"""
assert tx_bytes != b""
spend_name = std_hash(tx_bytes)
if spend_name in self.full_node.full_node_store.pending_tx_request:
self.full_node.full_node_store.pending_tx_request.pop(spend_name)
if spend_name in self.full_node.full_node_store.peers_with_tx:
self.full_node.full_node_store.peers_with_tx.pop(spend_name)
await self.full_node.respond_transaction(tx.transaction, spend_name, peer, test)
return None

Expand Down
14 changes: 8 additions & 6 deletions chia/full_node/full_node_store.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import dataclasses
import logging
import time
Expand Down Expand Up @@ -62,8 +63,11 @@ class FullNodeStore:
requesting_unfinished_blocks: Set[bytes32]

previous_generator: Optional[CompressorArg]
pending_tx_request: Dict[bytes32, bytes32] # tx_id: peer_id
peers_with_tx: Dict[bytes32, Set[bytes32]] # tx_id: Set[peer_ids}
tx_fetch_tasks: Dict[bytes32, asyncio.Task] # Task id: task

def __init__(self):
def __init__(self, constants: ConsensusConstants):
self.candidate_blocks = {}
self.candidate_backup_blocks = {}
self.seen_unfinished_blocks = set()
Expand All @@ -75,14 +79,12 @@ def __init__(self):
self.requesting_unfinished_blocks = set()
self.previous_generator = None
self.future_cache_key_times = {}

@classmethod
async def create(cls, constants: ConsensusConstants):
self = cls()
self.constants = constants
self.clear_slots()
self.initialize_genesis_sub_slot()
return self
self.pending_tx_request = {}
self.peers_with_tx = {}
self.tx_fetch_tasks = {}

def add_candidate_block(
self, quality_string: bytes32, height: uint32, unfinished_block: UnfinishedBlock, backup: bool = False
Expand Down
11 changes: 11 additions & 0 deletions chia/util/safe_cancel_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import asyncio
from typing import Optional


def cancel_task_safe(task: Optional[asyncio.Task], log=None):
if task is not None:
try:
task.cancel()
except Exception as e:
if log is not None:
log.error(f"Error while canceling task.{e} {task}")
55 changes: 44 additions & 11 deletions tests/core/full_node/test_full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
from chia.full_node.bundle_tools import detect_potential_template_generator
from chia.full_node.full_node_api import FullNodeAPI
from chia.full_node.signage_point import SignagePoint
from chia.protocols import full_node_protocol as fnp
from chia.protocols import full_node_protocol as fnp, full_node_protocol
from chia.protocols import timelord_protocol
from chia.protocols.full_node_protocol import RespondTransaction
from chia.protocols.protocol_message_types import ProtocolMessageTypes
from chia.server.address_manager import AddressManager
from chia.server.outbound_message import Message
from chia.simulator.simulator_protocol import FarmNewBlockProtocol
from chia.types.blockchain_format.classgroup import ClassgroupElement
from chia.types.blockchain_format.program import SerializedProgram
Expand Down Expand Up @@ -52,6 +53,36 @@
log = logging.getLogger(__name__)


async def new_transaction_not_requested(incoming, new_spend):
await asyncio.sleep(3)
while not incoming.empty():
response, peer = await incoming.get()
if (
response is not None
and isinstance(response, Message)
and response.type == ProtocolMessageTypes.request_transaction.value
):
request = full_node_protocol.RequestTransaction.from_bytes(response.data)
if request.transaction_id == new_spend.transaction_id:
return False
return True


async def new_transaction_requested(incoming, new_spend):
await asyncio.sleep(1)
while not incoming.empty():
response, peer = await incoming.get()
if (
response is not None
and isinstance(response, Message)
and response.type == ProtocolMessageTypes.request_transaction.value
):
request = full_node_protocol.RequestTransaction.from_bytes(response.data)
if request.transaction_id == new_spend.transaction_id:
return True
return False


async def get_block_path(full_node: FullNodeAPI):
blocks_list = [await full_node.full_node.blockchain.get_full_peak()]
assert blocks_list[0] is not None
Expand Down Expand Up @@ -706,7 +737,8 @@ async def test_new_transaction_and_mempool(self, wallet_nodes):
else -1
)
peer = await connect_and_get_peer(server_1, server_2)

incoming_queue, node_id = await add_dummy_connection(server_1, 12312)
fake_peer = server_1.all_connections[node_id]
# Mempool has capacity of 100, make 110 unspents that we can use
puzzle_hashes = []

Expand Down Expand Up @@ -735,8 +767,8 @@ async def test_new_transaction_and_mempool(self, wallet_nodes):

new_transaction = fnp.NewTransaction(spend_bundle.get_hash(), uint64(100), uint64(100))

msg = await full_node_1.new_transaction(new_transaction)
assert msg.data == bytes(fnp.RequestTransaction(spend_bundle.get_hash()))
await full_node_1.new_transaction(new_transaction, fake_peer)
await time_out_assert(10, new_transaction_requested, True, incoming_queue, new_transaction)

respond_transaction_2 = fnp.RespondTransaction(spend_bundle)
await full_node_1.respond_transaction(respond_transaction_2, peer)
Expand All @@ -750,8 +782,8 @@ async def test_new_transaction_and_mempool(self, wallet_nodes):
await full_node_1.full_node.respond_block(fnp.RespondBlock(blocks[-1]), peer)

# Already seen
msg = await full_node_1.new_transaction(new_transaction)
assert msg is None
await full_node_1.new_transaction(new_transaction, fake_peer)
await time_out_assert(10, new_transaction_not_requested, True, incoming_queue, new_transaction)

await time_out_assert(10, node_height_at_least, True, full_node_1, start_height + 5)

Expand Down Expand Up @@ -809,17 +841,18 @@ async def test_new_transaction_and_mempool(self, wallet_nodes):

# Mempool is full
new_transaction = fnp.NewTransaction(token_bytes(32), 10000000, uint64(1))
msg = await full_node_1.new_transaction(new_transaction)
log.warning(f"MSG: {msg} {cost_result.clvm_cost}")
assert msg is None
await full_node_1.new_transaction(new_transaction, fake_peer)

await time_out_assert(10, new_transaction_not_requested, True, incoming_queue, new_transaction)

# Farm one block to clear mempool
await full_node_1.farm_new_transaction_block(FarmNewBlockProtocol(receiver_puzzlehash))

# No longer full
new_transaction = fnp.NewTransaction(token_bytes(32), uint64(1000000), uint64(1))
msg = await full_node_1.new_transaction(new_transaction)
assert msg is not None
await full_node_1.new_transaction(new_transaction, fake_peer)

await time_out_assert(10, new_transaction_requested, True, incoming_queue, new_transaction)

@pytest.mark.asyncio
async def test_request_respond_transaction(self, wallet_nodes):
Expand Down
Loading

0 comments on commit 4ef3777

Please sign in to comment.