Skip to content

Commit

Permalink
Ms.spikes4 (Fix duplicate signage points) (Chia-Network#4844)
Browse files Browse the repository at this point in the history
* Fix duplicate signage point spikes

* Block record not hash

* Fix bugs in signage point cache

* Fix test, and log in debug level

* Change transaction logging to info

* More logging improvements
  • Loading branch information
mariano54 authored May 16, 2021
1 parent 7012ca9 commit 1c808b6
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 43 deletions.
2 changes: 1 addition & 1 deletion chia/farmer/farmer.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ async def _periodically_clear_cache_task(self):
now = time.time()
removed_keys: List[bytes32] = []
for key, add_time in self.cache_add_time.items():
if now - float(add_time) > self.constants.SUB_SLOT_TIME_TARGET * 2:
if now - float(add_time) > self.constants.SUB_SLOT_TIME_TARGET * 3:
self.sps.pop(key, None)
self.proofs_of_space.pop(key, None)
self.quality_str_to_identifiers.pop(key, None)
Expand Down
4 changes: 4 additions & 0 deletions chia/farmer/farmer_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ async def new_signage_point(self, new_signage_point: farmer_protocol.NewSignageP
await self.farmer.server.send_to_all([msg], NodeType.HARVESTER)
if new_signage_point.challenge_chain_sp not in self.farmer.sps:
self.farmer.sps[new_signage_point.challenge_chain_sp] = []
if new_signage_point in self.farmer.sps[new_signage_point.challenge_chain_sp]:
self.farmer.log.debug(f"Duplicate signage point {new_signage_point.signage_point_index}")
return

self.farmer.sps[new_signage_point.challenge_chain_sp].append(new_signage_point)
self.farmer.cache_add_time[new_signage_point.challenge_chain_sp] = uint64(int(time.time()))
self.farmer.state_changed("new_signage_point", {"sp_hash": new_signage_point.challenge_chain_sp})
Expand Down
51 changes: 43 additions & 8 deletions chia/full_node/full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,8 @@ async def signage_point_post_processing(
self.log.info(
f"⏲️ Finished signage point {request.index_from_challenge}/"
f"{self.constants.NUM_SPS_SUB_SLOT}: "
f"{request.challenge_chain_vdf.output.get_hash()} "
f"CC: {request.challenge_chain_vdf.output.get_hash()} "
f"RC: {request.reward_chain_vdf.output.get_hash()} "
)
self.signage_point_times[request.index_from_challenge] = time.time()
sub_slot_tuple = self.full_node_store.get_sub_slot(request.challenge_chain_vdf.challenge)
Expand Down Expand Up @@ -957,12 +958,17 @@ async def peak_post_processing(
if not self.sync_store.get_sync_mode():
self.blockchain.clean_block_records()

fork_block: Optional[BlockRecord] = None
if fork_height != block.height - 1 and block.height != 0:
# This is a reorg
fork_block = self.blockchain.block_record(self.blockchain.height_to_hash(fork_height))

added_eos, new_sps, new_ips = self.full_node_store.new_peak(
record,
block,
sub_slots[0],
sub_slots[1],
fork_height != block.height - 1 and block.height != 0,
fork_block,
self.blockchain,
)
if sub_slots[1] is None:
Expand Down Expand Up @@ -1182,7 +1188,6 @@ async def respond_block(
# Only propagate blocks which extend the blockchain (becomes one of the heads)
new_peak: Optional[BlockRecord] = self.blockchain.get_peak()
assert new_peak is not None and fork_height is not None
self.log.debug(f"Validation time for peak: {validation_time}")

await self.peak_post_processing(block, new_peak, fork_height, peer)

Expand All @@ -1193,6 +1198,20 @@ async def respond_block(
else:
# Should never reach here, all the cases are covered
raise RuntimeError(f"Invalid result from receive_block {added}")
percent_full_str = (
(
", percent full: "
+ str(round(100.0 * float(block.transactions_info.cost) / self.constants.MAX_BLOCK_COST_CLVM, 3))
+ "%"
)
if block.transactions_info is not None
else ""
)
self.log.info(
f"Block validation time: {validation_time}, "
f"cost: {block.transactions_info.cost if block.transactions_info is not None else 'None'}"
f"{percent_full_str}"
)

# This code path is reached if added == ADDED_AS_ORPHAN or NEW_TIP
peak = self.blockchain.get_peak()
Expand Down Expand Up @@ -1273,13 +1292,15 @@ async def respond_unfinished_block(

async with self.blockchain.lock:
# TODO: pre-validate VDFs outside of lock
validation_start = time.time()
validate_result = await self.blockchain.validate_unfinished_block(block)
if validate_result.error is not None:
if validate_result.error == Err.COIN_AMOUNT_NEGATIVE.value:
# TODO: remove in the future, hotfix for 1.1.5 peers to not disconnect older peers
self.log.info(f"Consensus error {validate_result.error}, not disconnecting")
return
raise ConsensusError(Err(validate_result.error))
validation_time = time.time() - validation_start

assert validate_result.required_iters is not None

Expand All @@ -1303,14 +1324,28 @@ async def respond_unfinished_block(
self.full_node_store.add_unfinished_block(height, block, validate_result)
if farmed_block is True:
self.log.info(
f"🍀 ️Farmed unfinished_block {block_hash}, SP: {block.reward_chain_block.signage_point_index}"
f"🍀 ️Farmed unfinished_block {block_hash}, SP: {block.reward_chain_block.signage_point_index}, "
f"validation time: {validation_time}, "
f"cost: {block.transactions_info.cost if block.transactions_info else 'None'}"
)
else:
percent_full_str = (
(
", percent full: "
+ str(round(100.0 * float(block.transactions_info.cost) / self.constants.MAX_BLOCK_COST_CLVM, 3))
+ "%"
)
if block.transactions_info is not None
else ""
)
self.log.info(
f"Added unfinished_block {block_hash}, not farmed by us,"
f" SP: {block.reward_chain_block.signage_point_index} time: "
f"{time.time() - self.signage_point_times[block.reward_chain_block.signage_point_index]}"
f"Pool pk {encode_puzzle_hash(block.foliage.foliage_block_data.pool_target.puzzle_hash, 'xch')}"
f" SP: {block.reward_chain_block.signage_point_index} farmer response time: "
f"{time.time() - self.signage_point_times[block.reward_chain_block.signage_point_index]}, "
f"Pool pk {encode_puzzle_hash(block.foliage.foliage_block_data.pool_target.puzzle_hash, 'xch')}, "
f"validation time: {validation_time}, "
f"cost: {block.transactions_info.cost if block.transactions_info else 'None'}"
f"{percent_full_str}"
)

sub_slot_iters, difficulty = get_next_sub_slot_iters_and_difficulty(
Expand Down Expand Up @@ -1601,7 +1636,7 @@ async def respond_transaction(
await self.server.send_to_all_except([msg], NodeType.FULL_NODE, peer.peer_node_id)
else:
self.mempool_manager.remove_seen(spend_name)
self.log.warning(
self.log.debug(
f"Wasn't able to add transaction with id {spend_name}, " f"status {status} error: {error}"
)
return status, error
Expand Down
15 changes: 11 additions & 4 deletions chia/full_node/full_node_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,11 +550,18 @@ async def respond_signage_point(
return None
async with self.full_node.timelord_lock:
# Already have signage point
if (
self.full_node.full_node_store.get_signage_point(request.challenge_chain_vdf.output.get_hash())
is not None

if self.full_node.full_node_store.have_newer_signage_point(
request.challenge_chain_vdf.challenge,
request.index_from_challenge,
request.reward_chain_vdf.challenge,
):
return None
existing_sp = self.full_node.full_node_store.get_signage_point(
request.challenge_chain_vdf.output.get_hash()
)
if existing_sp is not None and existing_sp.rc_vdf == request.reward_chain_vdf:
return None
peak = self.full_node.blockchain.get_peak()
if peak is not None and peak.height > self.full_node.constants.MAX_SUB_SLOT_BLOCKS:

Expand Down Expand Up @@ -583,7 +590,7 @@ async def respond_signage_point(
if added:
await self.full_node.signage_point_post_processing(request, peer, ip_sub_slot)
else:
self.log.info(
self.log.debug(
f"Signage point {request.index_from_challenge} not added, CC challenge: "
f"{request.challenge_chain_vdf.challenge}, RC challenge: {request.reward_chain_vdf.challenge}"
)
Expand Down
51 changes: 44 additions & 7 deletions chia/full_node/full_node_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from chia.consensus.difficulty_adjustment import can_finish_sub_and_full_epoch
from chia.consensus.make_sub_epoch_summary import next_sub_epoch_summary
from chia.consensus.multiprocess_validation import PreValidationResult
from chia.consensus.pot_iterations import calculate_sp_interval_iters
from chia.full_node.signage_point import SignagePoint
from chia.protocols import timelord_protocol
from chia.server.outbound_message import Message
Expand Down Expand Up @@ -174,6 +175,17 @@ def add_to_future_ip(self, infusion_point: timelord_protocol.NewInfusionPointVDF
self.future_ip_cache[ch] = []
self.future_ip_cache[ch].append(infusion_point)

def in_future_sp_cache(self, signage_point: SignagePoint, index: uint8) -> bool:
if signage_point.rc_vdf is None:
return False

if signage_point.rc_vdf.challenge not in self.future_sp_cache:
return False
for cache_index, cache_sp in self.future_sp_cache[signage_point.rc_vdf.challenge]:
if cache_index == index and cache_sp.rc_vdf == signage_point.rc_vdf:
return True
return False

def add_to_future_sp(self, signage_point: SignagePoint, index: uint8):
# We are missing a block here
if (
Expand All @@ -185,9 +197,11 @@ def add_to_future_sp(self, signage_point: SignagePoint, index: uint8):
return None
if signage_point.rc_vdf.challenge not in self.future_sp_cache:
self.future_sp_cache[signage_point.rc_vdf.challenge] = []
if (index, signage_point) not in self.future_sp_cache[signage_point.rc_vdf.challenge]:
self.future_sp_cache[signage_point.rc_vdf.challenge].append((index, signage_point))
if self.in_future_sp_cache(signage_point, index):
return None

self.future_cache_key_times[signage_point.rc_vdf.challenge] = int(time.time())
self.future_sp_cache[signage_point.rc_vdf.challenge].append((index, signage_point))
log.info(f"Don't have rc hash {signage_point.rc_vdf.challenge}. caching signage point {index}.")

def get_future_ip(self, rc_challenge_hash: bytes32) -> List[timelord_protocol.NewInfusionPointVDF]:
Expand Down Expand Up @@ -625,7 +639,7 @@ def new_peak(
peak_full_block: FullBlock,
sp_sub_slot: Optional[EndOfSubSlotBundle], # None if not overflow, or in first/second slot
ip_sub_slot: Optional[EndOfSubSlotBundle], # None if in first slot
reorg: bool,
fork_block: Optional[BlockRecord],
blocks: BlockchainInterface,
) -> Tuple[
Optional[EndOfSubSlotBundle], List[Tuple[uint8, SignagePoint]], List[timelord_protocol.NewInfusionPointVDF]
Expand All @@ -645,16 +659,39 @@ def new_peak(
# This is not the first sub-slot in the chain
sp_sub_slot_sps: List[Optional[SignagePoint]] = [None] * self.constants.NUM_SPS_SUB_SLOT
ip_sub_slot_sps: List[Optional[SignagePoint]] = [None] * self.constants.NUM_SPS_SUB_SLOT
if not reorg:
# If it's not a reorg, we can keep signage points that we had before, in the cache

if fork_block is not None and fork_block.sub_slot_iters != peak.sub_slot_iters:
# If there was a reorg and a difficulty adjustment, just clear all the slots
self.clear_slots()
else:
interval_iters = calculate_sp_interval_iters(self.constants, peak.sub_slot_iters)
# If it's not a reorg, or there is a reorg on the same difficulty, we can keep signage points
# that we had before, in the cache
for index, (sub_slot, sps, total_iters) in enumerate(self.finished_sub_slots):
if sub_slot is None:
continue

if fork_block is None:
# If this is not a reorg, we still want to remove signage points after the new peak
fork_block = peak
replaced_sps: List[Optional[SignagePoint]] = [] # index 0 is the end of sub slot
for i, sp in enumerate(sps):
if (total_iters + i * interval_iters) < fork_block.total_iters:
# Sps before the fork point as still valid
replaced_sps.append(sp)
else:
if sp is not None:
log.debug(
f"Reverting {i} {(total_iters + i * interval_iters)} {fork_block.total_iters}"
)
# Sps after the fork point should be removed
replaced_sps.append(None)
assert len(sps) == len(replaced_sps)

if sub_slot == sp_sub_slot:
sp_sub_slot_sps = sps
sp_sub_slot_sps = replaced_sps
if sub_slot == ip_sub_slot:
ip_sub_slot_sps = sps
ip_sub_slot_sps = replaced_sps

self.clear_slots()

Expand Down
7 changes: 5 additions & 2 deletions chia/full_node/mempool_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,10 @@ async def add_spendbundle(

new_item = MempoolItem(new_spend, uint64(fees), npc_result, cost, spend_name, additions, removals, program)
self.mempool.add_to_pool(new_item, additions, removal_coin_dict)
log.info(f"add_spendbundle took {time.time() - start_time} seconds")
log.info(
f"add_spendbundle took {time.time() - start_time} seconds, cost {cost} "
f"({round(100.0 * cost/self.constants.MAX_BLOCK_COST_CLVM, 3)}%)"
)
return uint64(cost), MempoolInclusionStatus.SUCCESS, None

async def check_removals(self, removals: Dict[bytes32, CoinRecord]) -> Tuple[Optional[Err], List[Coin]]:
Expand Down Expand Up @@ -524,7 +527,7 @@ async def new_peak(self, new_peak: Optional[BlockRecord]) -> List[Tuple[SpendBun
)
if status == MempoolInclusionStatus.SUCCESS:
txs_added.append((item.spend_bundle, item.npc_result, item.spend_bundle_name))
log.debug(
log.info(
f"Size of mempool: {len(self.mempool.spends)} spends, cost: {self.mempool.total_mempool_cost} "
f"minimum fee to get in: {self.mempool.get_min_fee_rate(100000)}"
)
Expand Down
2 changes: 1 addition & 1 deletion chia/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ async def api_call(full_message: Message, connection: WSChiaConnection, task_id)
try:
if self.received_message_callback is not None:
await self.received_message_callback(connection)
connection.log.info(
connection.log.debug(
f"<- {ProtocolMessageTypes(full_message.type).name} from peer "
f"{connection.peer_node_id} {connection.peer_host}"
)
Expand Down
4 changes: 2 additions & 2 deletions chia/server/ws_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ async def time_out(req_id, req_timeout):
if message.id in self.request_results:
result = self.request_results[message.id]
assert result is not None
self.log.info(f"<- {ProtocolMessageTypes(result.type).name} from: {self.peer_host}:{self.peer_port}")
self.log.debug(f"<- {ProtocolMessageTypes(result.type).name} from: {self.peer_host}:{self.peer_port}")
self.request_results.pop(result.id)

return result
Expand Down Expand Up @@ -366,7 +366,7 @@ async def _send_message(self, message: Message):
)

await self.ws.send_bytes(encoded)
self.log.info(f"-> {ProtocolMessageTypes(message.type).name} to peer {self.peer_host} {self.peer_node_id}")
self.log.debug(f"-> {ProtocolMessageTypes(message.type).name} to peer {self.peer_host} {self.peer_node_id}")
self.bytes_written += size

async def _read_one_message(self) -> Optional[Message]:
Expand Down
Loading

0 comments on commit 1c808b6

Please sign in to comment.