Skip to content

Commit

Permalink
Log better compacts, test fnp for compacts. (Chia-Network#1190)
Browse files Browse the repository at this point in the history
* Log better compacts, test fnp for compacts.

* First attempt peer gossip limit.

* Fix wallet gossip.

* typos.

* Add logging to wallet api.
  • Loading branch information
fchirica authored Mar 9, 2021
1 parent 26f18d4 commit 91f9457
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 15 deletions.
11 changes: 8 additions & 3 deletions src/full_node/full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -1419,15 +1419,22 @@ async def _can_accept_compact_proof(
"""
is_fully_compactified = await self.block_store.is_fully_compactified(header_hash)
if is_fully_compactified is None or is_fully_compactified:
self.log.info(f"Already compactified block: {header_hash}. Ignoring.")
return False
if vdf_proof.witness_type > 0 or not vdf_proof.normalized_to_identity:
self.log.error(f"Received vdf proof is not compact: {vdf_proof}.")
return False
if not vdf_proof.is_valid(self.constants, ClassgroupElement.get_default_element(), vdf_info):
self.log.error(f"Received compact vdf proof is not valid: {vdf_proof}.")
return False
header_block = await self.blockchain.get_header_block_by_height(height, header_hash)
if header_block is None:
self.log.error(f"Can't find block for given compact vdf. Height: {height} Header hash: {header_hash}")
return False
return await self._needs_compact_proof(vdf_info, header_block, field_vdf)
is_new_proof = await self._needs_compact_proof(vdf_info, header_block, field_vdf)
if not is_new_proof:
self.log.info(f"Duplicate compact proof. Height: {height}. Header hash: {header_hash}.")
return is_new_proof

async def _replace_proof(
self,
Expand Down Expand Up @@ -1477,7 +1484,6 @@ async def respond_compact_vdf_timelord(self, request: timelord_protocol.RespondC
if not await self._can_accept_compact_proof(
request.vdf_info, request.vdf_proof, request.height, request.header_hash, field_vdf
):
self.log.error(f"Couldn't add compact proof of time from a bluebox: {request}.")
return
async with self.blockchain.lock:
await self._replace_proof(request.vdf_info, request.vdf_proof, request.height, field_vdf)
Expand Down Expand Up @@ -1552,7 +1558,6 @@ async def respond_compact_vdf(self, request: full_node_protocol.RespondCompactVD
if not await self._can_accept_compact_proof(
request.vdf_info, request.vdf_proof, request.height, request.header_hash, field_vdf
):
self.log.error(f"Couldn't add compact proof of time from a full_node peer: {peer}.")
return
async with self.blockchain.lock:
if self.blockchain.seen_compact_proofs(request.vdf_info, request.height):
Expand Down
46 changes: 35 additions & 11 deletions src/server/node_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
from typing import Dict, Optional
from src.util.ints import uint64

MAX_PEERS_RECEIVED_PER_REQUEST = 1000
MAX_TOTAL_PEERS_RECEIVED = 3000


class FullNodeDiscovery:
def __init__(
Expand Down Expand Up @@ -53,6 +56,8 @@ def __init__(
self.relay_queue = None
self.address_manager = None
self.connection_time_pretest: Dict = {}
self.received_count_from_peers: Dict = {}
self.lock = asyncio.Lock()

async def initialize_address_manager(self):
mkdir(self.peer_db_path.parent)
Expand Down Expand Up @@ -101,7 +106,7 @@ async def on_connect(self, peer: ws.WSChiaConnection):
peer.is_outbound
and peer.peer_server_port is not None
and peer.connection_type is NodeType.FULL_NODE
and self.server._local_type is NodeType.FULL_NODE
and (self.server._local_type is NodeType.FULL_NODE or self.server._local_type is NodeType.WALLET)
and self.address_manager is not None
):
msg = make_msg(ProtocolMessageTypes.request_peers, full_node_protocol.RequestPeers())
Expand Down Expand Up @@ -324,6 +329,20 @@ async def _periodically_cleanup(self):
async def _respond_peers_common(self, request, peer_src, is_full_node):
# Check if we got the peers from a full node or from the introducer.
peers_adjusted_timestamp = []
is_misbehaving = False
if len(request.peer_list) > MAX_PEERS_RECEIVED_PER_REQUEST:
is_misbehaving = True
if is_full_node:
if peer_src is None:
return
async with self.lock:
if peer_src.host not in self.received_count_from_peers:
self.received_count_from_peers[peer_src.host] = 0
self.received_count_from_peers[peer_src.host] += len(request.peer_list)
if self.received_count_from_peers[peer_src.host] > MAX_TOTAL_PEERS_RECEIVED:
is_misbehaving = True
if is_misbehaving:
return
for peer in request.peer_list:
if peer.timestamp < 100000000 or peer.timestamp > time.time() + 10 * 60:
# Invalid timestamp, predefine a bad one.
Expand Down Expand Up @@ -370,13 +389,12 @@ def __init__(
log,
)
self.relay_queue = asyncio.Queue()
self.lock = asyncio.Lock()
self.neighbour_known_peers = {}
self.key = randbits(256)

async def start(self):
await self.initialize_address_manager()
self.self_advertise_task = asyncio.create_task(self._periodically_self_advertise())
self.self_advertise_task = asyncio.create_task(self._periodically_self_advertise_and_clean_data())
self.address_relay_task = asyncio.create_task(self._address_relay())
await self.start_tasks()

Expand All @@ -385,7 +403,7 @@ async def close(self):
self.self_advertise_task.cancel()
self.address_relay_task.cancel()

async def _periodically_self_advertise(self):
async def _periodically_self_advertise_and_clean_data(self):
while not self.is_closed:
try:
try:
Expand Down Expand Up @@ -413,6 +431,9 @@ async def _periodically_self_advertise(self):
)
await self.server.send_to_all([msg], NodeType.FULL_NODE)

async with self.lock:
for host in list(self.received_count_from_peers.keys()):
self.received_count_from_peers[host] = 0
except Exception as e:
self.log.error(f"Exception in self advertise: {e}")
self.log.error(f"Traceback: {traceback.format_exc()}")
Expand Down Expand Up @@ -450,13 +471,16 @@ async def request_peers(self, peer_info: PeerInfo):
self.log.error(f"Request peers exception: {e}")

async def respond_peers(self, request, peer_src, is_full_node):
await self._respond_peers_common(request, peer_src, is_full_node)
if is_full_node:
await self.add_peers_neighbour(request.peer_list, peer_src)
if len(request.peer_list) == 1 and self.relay_queue is not None:
peer = request.peer_list[0]
if peer.timestamp > time.time() - 60 * 10:
self.relay_queue.put_nowait((peer, 2))
try:
await self._respond_peers_common(request, peer_src, is_full_node)
if is_full_node:
await self.add_peers_neighbour(request.peer_list, peer_src)
if len(request.peer_list) == 1 and self.relay_queue is not None:
peer = request.peer_list[0]
if peer.timestamp > time.time() - 60 * 10:
self.relay_queue.put_nowait((peer, 2))
except Exception as e:
self.log.error(f"Respond peers exception: {e}. Traceback: {traceback.format_exc()}")

async def _address_relay(self):
while not self.is_closed:
Expand Down
2 changes: 2 additions & 0 deletions src/wallet/wallet_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,8 @@ async def on_connect(self, peer: WSChiaConnection):
if peer.peer_node_id in peer_ids:
continue
await peer.send_message(msg)
if not self.has_full_node() and self.wallet_peers is not None:
asyncio.create_task(self.wallet_peers.on_connect(peer))

async def _periodically_check_full_node(self):
tries = 0
Expand Down
15 changes: 15 additions & 0 deletions src/wallet/wallet_node_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ class WalletNodeAPI:
def __init__(self, wallet_node):
self.wallet_node = wallet_node

@property
def log(self):
return self.wallet_node.log

@peer_required
@api_request
async def respond_removals(self, response: wallet_protocol.RespondRemovals, peer: WSChiaConnection):
Expand Down Expand Up @@ -95,6 +99,17 @@ async def respond_peers_introducer(
if peer is not None and peer.connection_type is NodeType.INTRODUCER:
await peer.close()

@peer_required
@api_request
async def respond_peers(self, request: full_node_protocol.RespondPeers, peer: WSChiaConnection):
if not self.wallet_node.has_full_node():
self.log.info(f"Wallet received {len(request.peer_list)} peers.")
await self.wallet_node.wallet_peers.respond_peers(request, peer.get_peer_info(), True)
else:
self.log.info(f"Wallet received {len(request.peer_list)} peers, but ignoring, since we have a full node.")
await self.wallet_node.wallet_peers.ensure_is_closed()
return None

@api_request
async def respond_puzzle_solution(self, request: wallet_protocol.RespondPuzzleSolution):
if self.wallet_node.wallet_state_manager is None or self.wallet_node.backup_initialized is False:
Expand Down
119 changes: 118 additions & 1 deletion tests/core/full_node/test_full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from src.consensus.pot_iterations import is_overflow_block
from src.full_node.full_node_api import FullNodeAPI
from src.protocols import full_node_protocol as fnp
from src.protocols import full_node_protocol as fnp, timelord_protocol
from src.protocols.protocol_message_types import ProtocolMessageTypes
from src.types.blockchain_format.program import SerializedProgram
from src.types.full_block import FullBlock
Expand All @@ -36,6 +36,9 @@
time_out_assert_custom_interval,
time_out_messages,
)
from src.util.vdf_prover import get_vdf_info_and_proof
from src.types.blockchain_format.classgroup import ClassgroupElement
from src.types.blockchain_format.vdf import CompressibleVDFField

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -919,6 +922,120 @@ async def test_mainnet_softfork(self, wallet_nodes_mainnet):

assert error is None

@pytest.mark.asyncio
async def test_compact_protocol(self, setup_two_nodes):
nodes, _ = setup_two_nodes
full_node_1 = nodes[0]
full_node_2 = nodes[1]
blocks = bt.get_consecutive_blocks(num_blocks=1, skip_slots=3)
block = blocks[0]
await full_node_1.full_node.respond_block(fnp.RespondBlock(block))
timelord_protocol_finished = []
cc_eos_count = 0
for sub_slot in block.finished_sub_slots:
vdf_info, vdf_proof = get_vdf_info_and_proof(
test_constants,
ClassgroupElement.get_default_element(),
sub_slot.challenge_chain.challenge_chain_end_of_slot_vdf.challenge,
sub_slot.challenge_chain.challenge_chain_end_of_slot_vdf.number_of_iterations,
True,
)
cc_eos_count += 1
timelord_protocol_finished.append(
timelord_protocol.RespondCompactProofOfTime(
vdf_info,
vdf_proof,
block.header_hash,
block.height,
CompressibleVDFField.CC_EOS_VDF,
)
)
blocks_2 = bt.get_consecutive_blocks(num_blocks=2, block_list_input=blocks, skip_slots=3)
block = blocks_2[1]
await full_node_1.full_node.respond_block(fnp.RespondBlock(block))
icc_eos_count = 0
for sub_slot in block.finished_sub_slots:
if sub_slot.infused_challenge_chain is not None:
icc_eos_count += 1
vdf_info, vdf_proof = get_vdf_info_and_proof(
test_constants,
ClassgroupElement.get_default_element(),
sub_slot.infused_challenge_chain.infused_challenge_chain_end_of_slot_vdf.challenge,
sub_slot.infused_challenge_chain.infused_challenge_chain_end_of_slot_vdf.number_of_iterations,
True,
)
timelord_protocol_finished.append(
timelord_protocol.RespondCompactProofOfTime(
vdf_info,
vdf_proof,
block.header_hash,
block.height,
CompressibleVDFField.ICC_EOS_VDF,
)
)
assert block.reward_chain_block.challenge_chain_sp_vdf is not None
vdf_info, vdf_proof = get_vdf_info_and_proof(
test_constants,
ClassgroupElement.get_default_element(),
block.reward_chain_block.challenge_chain_sp_vdf.challenge,
block.reward_chain_block.challenge_chain_sp_vdf.number_of_iterations,
True,
)
timelord_protocol_finished.append(
timelord_protocol.RespondCompactProofOfTime(
vdf_info,
vdf_proof,
block.header_hash,
block.height,
CompressibleVDFField.CC_SP_VDF,
)
)
vdf_info, vdf_proof = get_vdf_info_and_proof(
test_constants,
ClassgroupElement.get_default_element(),
block.reward_chain_block.challenge_chain_ip_vdf.challenge,
block.reward_chain_block.challenge_chain_ip_vdf.number_of_iterations,
True,
)
timelord_protocol_finished.append(
timelord_protocol.RespondCompactProofOfTime(
vdf_info,
vdf_proof,
block.header_hash,
block.height,
CompressibleVDFField.CC_IP_VDF,
)
)

assert cc_eos_count == 3 and icc_eos_count == 3
for compact_proof in timelord_protocol_finished:
await full_node_1.full_node.respond_compact_vdf_timelord(compact_proof)
stored_blocks = await full_node_1.get_all_full_blocks()
cc_eos_compact_count = 0
icc_eos_compact_count = 0
has_compact_cc_sp_vdf = False
has_compact_cc_ip_vdf = False
for block in stored_blocks:
for sub_slot in block.finished_sub_slots:
if sub_slot.proofs.challenge_chain_slot_proof.normalized_to_identity:
cc_eos_compact_count += 1
if (
sub_slot.proofs.infused_challenge_chain_slot_proof is not None
and sub_slot.proofs.infused_challenge_chain_slot_proof.normalized_to_identity
):
icc_eos_compact_count += 1
if block.challenge_chain_sp_proof is not None and block.challenge_chain_sp_proof.normalized_to_identity:
has_compact_cc_sp_vdf = True
if block.challenge_chain_ip_proof.normalized_to_identity:
has_compact_cc_ip_vdf = True
assert cc_eos_compact_count == 3
assert icc_eos_compact_count == 3
assert has_compact_cc_sp_vdf
assert has_compact_cc_ip_vdf
for height, block in enumerate(stored_blocks):
await full_node_2.full_node.respond_block(fnp.RespondBlock(block))
assert full_node_2.full_node.blockchain.get_peak().height == height

#
# async def test_new_unfinished(self, two_nodes, wallet_nodes):
# full_node_1, full_node_2, server_1, server_2, wallet_a, wallet_receiver = wallet_nodes
Expand Down

0 comments on commit 91f9457

Please sign in to comment.