Skip to content

Commit

Permalink
Speedup normal sync and wallet sync (Chia-Network#5590)
Browse files Browse the repository at this point in the history
* Correctly return from bytes without parsing

* Huge speedup for wallet sync

* Lint

* Only construct CoinRecord when necessary

* Punt on creation of coin

* Removing warning log

* Flaky test

* Finally fix flaky test
  • Loading branch information
mariano54 authored May 20, 2021
1 parent 4ad8d9d commit a76446e
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 22 deletions.
2 changes: 1 addition & 1 deletion chia/full_node/block_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ async def get_full_block_bytes(self, header_hash: bytes32) -> Optional[bytes]:
cached = self.block_cache.get(header_hash)
if cached is not None:
log.debug(f"cache hit for block {header_hash.hex()}")
return cached
return bytes(cached)
log.debug(f"cache miss for block {header_hash.hex()}")
cursor = await self.db.execute("SELECT block from full_blocks WHERE header_hash=?", (header_hash.hex(),))
row = await cursor.fetchone()
Expand Down
11 changes: 6 additions & 5 deletions chia/full_node/coin_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,16 @@ async def get_coins_added_at_height(self, height: uint32) -> List[CoinRecord]:
return coins

async def get_coins_removed_at_height(self, height: uint32) -> List[CoinRecord]:
cursor = await self.coin_record_db.execute(
"SELECT * from coin_record WHERE spent_index=? and spent=1", (height,)
)
cursor = await self.coin_record_db.execute("SELECT * from coin_record WHERE spent_index=?", (height,))
rows = await cursor.fetchall()
await cursor.close()
coins = []
for row in rows:
coin = Coin(bytes32(bytes.fromhex(row[6])), bytes32(bytes.fromhex(row[5])), uint64.from_bytes(row[7]))
coins.append(CoinRecord(coin, row[1], row[2], row[3], row[4], row[8]))
spent: bool = bool(row[3])
if spent:
coin = Coin(bytes32(bytes.fromhex(row[6])), bytes32(bytes.fromhex(row[5])), uint64.from_bytes(row[7]))
coin_record = CoinRecord(coin, row[1], row[2], spent, row[4], row[8])
coins.append(coin_record)
return coins

# Checks DB and DiffStores for CoinRecords with puzzle_hash and returns them
Expand Down
27 changes: 19 additions & 8 deletions chia/full_node/full_node_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import dataclasses
import time
from secrets import token_bytes
from typing import Callable, Dict, List, Optional, Tuple, Set, Any
from typing import Callable, Dict, List, Optional, Tuple, Set

from blspy import AugSchemeMPL, G2Element
from chiabip158 import PyBIP158
Expand Down Expand Up @@ -292,16 +292,16 @@ async def request_block(self, request: full_node_protocol.RequestBlock) -> Optio
async def request_blocks(self, request: full_node_protocol.RequestBlocks) -> Optional[Message]:
if request.end_height < request.start_height or request.end_height - request.start_height > 32:
reject = RejectBlocks(request.start_height, request.end_height)
msg = make_msg(ProtocolMessageTypes.reject_blocks, reject)
msg: Message = make_msg(ProtocolMessageTypes.reject_blocks, reject)
return msg
for i in range(request.start_height, request.end_height + 1):
if not self.full_node.blockchain.contains_height(uint32(i)):
reject = RejectBlocks(request.start_height, request.end_height)
msg = make_msg(ProtocolMessageTypes.reject_blocks, reject)
return msg

blocks: List[Any] = []
if not request.include_transaction_block:
blocks: List[FullBlock] = []
for i in range(request.start_height, request.end_height + 1):
block: Optional[FullBlock] = await self.full_node.block_store.get_full_block(
self.full_node.blockchain.height_to_hash(uint32(i))
Expand All @@ -312,7 +312,12 @@ async def request_blocks(self, request: full_node_protocol.RequestBlocks) -> Opt
return msg
block = dataclasses.replace(block, transactions_generator=None)
blocks.append(block)
msg = make_msg(
ProtocolMessageTypes.respond_blocks,
full_node_protocol.RespondBlocks(request.start_height, request.end_height, blocks),
)
else:
blocks_bytes: List[bytes] = []
for i in range(request.start_height, request.end_height + 1):
block_bytes: Optional[bytes] = await self.full_node.block_store.get_full_block_bytes(
self.full_node.blockchain.height_to_hash(uint32(i))
Expand All @@ -321,12 +326,18 @@ async def request_blocks(self, request: full_node_protocol.RequestBlocks) -> Opt
reject = RejectBlocks(request.start_height, request.end_height)
msg = make_msg(ProtocolMessageTypes.reject_blocks, reject)
return msg
blocks.append(block_bytes)

msg = make_msg(
ProtocolMessageTypes.respond_blocks,
full_node_protocol.RespondBlocks(request.start_height, request.end_height, blocks),
)
blocks_bytes.append(block_bytes)

respond_blocks_manually_streamed: bytes = (
bytes(uint32(request.start_height))
+ bytes(uint32(request.end_height))
+ len(blocks_bytes).to_bytes(4, "big", signed=False)
)
for block_bytes in blocks_bytes:
respond_blocks_manually_streamed += block_bytes
msg = make_msg(ProtocolMessageTypes.respond_blocks, respond_blocks_manually_streamed)

return msg

@api_request
Expand Down
2 changes: 1 addition & 1 deletion chia/simulator/full_node_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async def reorg_from_index_to_new_index(self, request: ReorgProtocol):
block_count,
farmer_reward_puzzle_hash=coinbase_ph,
pool_reward_puzzle_hash=coinbase_ph,
block_list_input=current_blocks[:old_index],
block_list_input=current_blocks[: old_index + 1],
force_overflow=True,
guarantee_transaction_block=True,
seed=32 * b"1",
Expand Down
30 changes: 23 additions & 7 deletions tests/wallet/test_wallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ async def test_wallet_coinbase_reorg(self, wallet_node):

await time_out_assert(5, wallet.get_confirmed_balance, funds)

await full_node_api.reorg_from_index_to_new_index(ReorgProtocol(uint32(3), uint32(num_blocks + 6), 32 * b"0"))
await full_node_api.reorg_from_index_to_new_index(ReorgProtocol(uint32(2), uint32(num_blocks + 6), 32 * b"0"))

funds = sum(
[
Expand Down Expand Up @@ -562,7 +562,14 @@ async def test_wallet_tx_reorg(self, two_wallet_nodes):
funds = sum(
[calculate_pool_reward(uint32(i)) + calculate_base_farmer_reward(uint32(i)) for i in range(1, num_blocks)]
)
tx = await wallet.generate_signed_transaction(1000, ph2)
# Waits a few seconds to receive rewards
all_blocks = await full_node_api.get_all_full_blocks()

# Ensure that we use a coin that we will not reorg out
coin = list(all_blocks[-3].get_included_reward_coins())[0]
await asyncio.sleep(5)

tx = await wallet.generate_signed_transaction(1000, ph2, coins={coin})
await wallet.push_transaction(tx)
await full_node_api.full_node.respond_transaction(tx.spend_bundle, tx.name)
await time_out_assert(5, wallet.get_confirmed_balance, funds)
Expand All @@ -571,22 +578,31 @@ async def test_wallet_tx_reorg(self, two_wallet_nodes):
await time_out_assert(5, wallet_2.get_confirmed_balance, 1000)

await time_out_assert(5, wallet_node.wallet_state_manager.blockchain.get_peak_height, 7)
peak_height = full_node_api.full_node.blockchain.get_peak().height
print(peak_height)

await full_node_api.reorg_from_index_to_new_index(ReorgProtocol(uint32(3), uint32(num_blocks + 6), 32 * b"0"))
# Perform a reorg, which will revert the transaction in the full node and wallet, and cause wallet to resubmit
await full_node_api.reorg_from_index_to_new_index(
ReorgProtocol(uint32(peak_height - 3), uint32(peak_height + 3), 32 * b"0")
)

funds = sum(
[
calculate_pool_reward(uint32(i)) + calculate_base_farmer_reward(uint32(i))
for i in range(1, num_blocks - 2)
for i in range(1, peak_height - 2)
]
)
await time_out_assert(7, full_node_api.full_node.blockchain.get_peak_height, 10)
await time_out_assert(7, wallet_node.wallet_state_manager.blockchain.get_peak_height, 10)

for i in range(0, num_blocks * 3):
await time_out_assert(7, full_node_api.full_node.blockchain.get_peak_height, peak_height + 3)
await time_out_assert(7, wallet_node.wallet_state_manager.blockchain.get_peak_height, peak_height + 3)

# Farm a few blocks so we can confirm the resubmitted transaction
for i in range(0, num_blocks):
await asyncio.sleep(1)
await full_node_api.farm_new_transaction_block(FarmNewBlockProtocol(32 * b"0"))

# By this point, the transaction should be confirmed
print(await wallet.get_confirmed_balance())
await time_out_assert(15, wallet.get_confirmed_balance, funds - 1000)
unconfirmed = await wallet_node.wallet_state_manager.tx_store.get_unconfirmed_for_wallet(int(wallet.id()))
assert len(unconfirmed) == 0
Expand Down

0 comments on commit a76446e

Please sign in to comment.