Skip to content

Commit

Permalink
Lock for all writes (Chia-Network#1758)
Browse files Browse the repository at this point in the history
* lock for all writes

* use async  with where convinant

* wrapper

* more

* lint

* update wallet

* rl wallet

* indentation

* fix tests

* bad path merged into main

* wallet lock

* refacoted by mistake

* re-raise

* memory/disk inconsistency

* more inconsitency

* asyncio.cancelled is baseexception in 3.8 and 3.9
  • Loading branch information
Yostra authored Apr 11, 2021
1 parent 7d6ea6a commit 773adfa
Show file tree
Hide file tree
Showing 25 changed files with 427 additions and 329 deletions.
136 changes: 65 additions & 71 deletions chia/consensus/blockchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,20 +234,34 @@ async def receive_block(
None,
)
# Always add the block to the database
await self.block_store.add_full_block(block, block_record)

self.add_block_record(block_record)

fork_height: Optional[uint32] = await self._reconsider_peak(block_record, genesis, fork_point_with_peak)

async with self.block_store.db_wrapper.lock:
try:
await self.block_store.db_wrapper.begin_transaction()
await self.block_store.add_full_block(block, block_record)
fork_height, peak_height, records = await self._reconsider_peak(
block_record, genesis, fork_point_with_peak
)
await self.block_store.db_wrapper.commit_transaction()
self.add_block_record(block_record)
for fetched_block_record in records:
self.__height_to_hash[fetched_block_record.height] = fetched_block_record.header_hash
if fetched_block_record.sub_epoch_summary_included is not None:
self.__sub_epoch_summaries[
fetched_block_record.height
] = fetched_block_record.sub_epoch_summary_included
if peak_height is not None:
self._peak_height = peak_height
except BaseException:
await self.block_store.db_wrapper.rollback_transaction()
raise
if fork_height is not None:
return ReceiveBlockResult.NEW_PEAK, None, fork_height
else:
return ReceiveBlockResult.ADDED_AS_ORPHAN, None, None

async def _reconsider_peak(
self, block_record: BlockRecord, genesis: bool, fork_point_with_peak: Optional[uint32]
) -> Optional[uint32]:
) -> Tuple[Optional[uint32], Optional[uint32], List[BlockRecord]]:
"""
When a new block is added, this is called, to check if the new block is the new peak of the chain.
This also handles reorgs by reverting blocks which are not in the heaviest chain.
Expand All @@ -262,18 +276,10 @@ async def _reconsider_peak(

# Begins a transaction, because we want to ensure that the coin store and block store are only updated
# in sync.
await self.block_store.begin_transaction()
try:
await self.coin_store.new_block(block)
self.__height_to_hash[uint32(0)] = block.header_hash
self._peak_height = uint32(0)
await self.block_store.set_peak(block.header_hash)
await self.block_store.commit_transaction()
except Exception:
await self.block_store.rollback_transaction()
raise
return uint32(0)
return None
await self.coin_store.new_block(block)
await self.block_store.set_peak(block.header_hash)
return uint32(0), uint32(0), [block_record]
return None, None, []

assert peak is not None
if block_record.weight > peak.weight:
Expand All @@ -286,60 +292,48 @@ async def _reconsider_peak(

# Begins a transaction, because we want to ensure that the coin store and block store are only updated
# in sync.
await self.block_store.begin_transaction()
try:
# Rollback to fork
await self.coin_store.rollback_to_block(fork_height)
# Rollback sub_epoch_summaries
heights_to_delete = []
for ses_included_height in self.__sub_epoch_summaries.keys():
if ses_included_height > fork_height:
heights_to_delete.append(ses_included_height)
for height in heights_to_delete:
log.info(f"delete ses at height {height}")
del self.__sub_epoch_summaries[height]

if len(heights_to_delete) > 0:
# remove segments from prev fork
log.info(f"remove segments for se above {fork_height}")
await self.block_store.delete_sub_epoch_challenge_segments(uint32(fork_height))

# Collect all blocks from fork point to new peak
blocks_to_add: List[Tuple[FullBlock, BlockRecord]] = []
curr = block_record.header_hash

while fork_height < 0 or curr != self.height_to_hash(uint32(fork_height)):
fetched_full_block: Optional[FullBlock] = await self.block_store.get_full_block(curr)
fetched_block_record: Optional[BlockRecord] = await self.block_store.get_block_record(curr)
assert fetched_full_block is not None
assert fetched_block_record is not None
blocks_to_add.append((fetched_full_block, fetched_block_record))
if fetched_full_block.height == 0:
# Doing a full reorg, starting at height 0
break
curr = fetched_block_record.prev_hash

for fetched_full_block, fetched_block_record in reversed(blocks_to_add):
self.__height_to_hash[fetched_block_record.height] = fetched_block_record.header_hash
if fetched_block_record.is_transaction_block:
await self.coin_store.new_block(fetched_full_block)
if fetched_block_record.sub_epoch_summary_included is not None:
self.__sub_epoch_summaries[
fetched_block_record.height
] = fetched_block_record.sub_epoch_summary_included

# Changes the peak to be the new peak
await self.block_store.set_peak(block_record.header_hash)
self._peak_height = block_record.height
await self.block_store.commit_transaction()
except Exception:
await self.block_store.rollback_transaction()
raise

return uint32(max(fork_height, 0))
# Rollback to fork
await self.coin_store.rollback_to_block(fork_height)
# Rollback sub_epoch_summaries
heights_to_delete = []
for ses_included_height in self.__sub_epoch_summaries.keys():
if ses_included_height > fork_height:
heights_to_delete.append(ses_included_height)
for height in heights_to_delete:
log.info(f"delete ses at height {height}")
del self.__sub_epoch_summaries[height]

if len(heights_to_delete) > 0:
# remove segments from prev fork
log.info(f"remove segments for se above {fork_height}")
await self.block_store.delete_sub_epoch_challenge_segments(uint32(fork_height))

# Collect all blocks from fork point to new peak
blocks_to_add: List[Tuple[FullBlock, BlockRecord]] = []
curr = block_record.header_hash

while fork_height < 0 or curr != self.height_to_hash(uint32(fork_height)):
fetched_full_block: Optional[FullBlock] = await self.block_store.get_full_block(curr)
fetched_block_record: Optional[BlockRecord] = await self.block_store.get_block_record(curr)
assert fetched_full_block is not None
assert fetched_block_record is not None
blocks_to_add.append((fetched_full_block, fetched_block_record))
if fetched_full_block.height == 0:
# Doing a full reorg, starting at height 0
break
curr = fetched_block_record.prev_hash
records_to_add = []
for fetched_full_block, fetched_block_record in reversed(blocks_to_add):
records_to_add.append(fetched_block_record)
if fetched_block_record.is_transaction_block:
await self.coin_store.new_block(fetched_full_block)

# Changes the peak to be the new peak
await self.block_store.set_peak(block_record.header_hash)
return uint32(max(fork_height, 0)), block_record.height, records_to_add

# This is not a heavier block than the heaviest we have seen, so we don't change the coin set
return None
return None, None, []

def get_next_difficulty(self, header_hash: bytes32, new_slot: bool) -> uint64:
assert self.contains_block(header_hash)
Expand Down
43 changes: 20 additions & 23 deletions chia/full_node/block_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from chia.types.full_block import FullBlock
from chia.types.header_block import HeaderBlock
from chia.types.weight_proof import SubEpochChallengeSegment, SubEpochSegments
from chia.util.db_wrapper import DBWrapper
from chia.util.ints import uint32
from chia.util.lru_cache import LRUCache

Expand All @@ -19,13 +20,15 @@
class BlockStore:
db: aiosqlite.Connection
block_cache: LRUCache
db_wrapper: DBWrapper

@classmethod
async def create(cls, connection: aiosqlite.Connection):
async def create(cls, db_wrapper: DBWrapper):
self = cls()

# All full blocks which have been added to the blockchain. Header_hash -> block
self.db = connection
self.db_wrapper = db_wrapper
self.db = db_wrapper.db
await self.db.execute("pragma journal_mode=wal")
await self.db.execute("pragma synchronous=2")
await self.db.execute(
Expand Down Expand Up @@ -63,21 +66,12 @@ async def create(cls, connection: aiosqlite.Connection):
self.block_cache = LRUCache(1000)
return self

async def begin_transaction(self) -> None:
# Also locks the coin store, since both stores must be updated at once
cursor = await self.db.execute("BEGIN TRANSACTION")
await cursor.close()

async def commit_transaction(self) -> None:
await self.db.commit()

async def rollback_transaction(self) -> None:
# Also rolls back the coin store, since both stores must be updated at once
cursor = await self.db.execute("ROLLBACK")
await cursor.close()

async def add_full_block(self, block: FullBlock, block_record: BlockRecord) -> None:
self.block_cache.put(block.header_hash, block)
cached = self.block_cache.get(block.header_hash)
if cached is not None:
# Since write to db can fail, we remove from cache here to avoid potential inconsistency
# Adding to cache only from reading
self.block_cache.put(block.header_hash, None)
cursor_1 = await self.db.execute(
"INSERT OR REPLACE INTO full_blocks VALUES(?, ?, ?, ?, ?)",
(
Expand Down Expand Up @@ -106,16 +100,17 @@ async def add_full_block(self, block: FullBlock, block_record: BlockRecord) -> N
),
)
await cursor_2.close()
await self.db.commit()

async def persist_sub_epoch_challenge_segments(
self, sub_epoch_summary_height: uint32, segments: List[SubEpochChallengeSegment]
) -> None:
cursor_1 = await self.db.execute(
"INSERT OR REPLACE INTO sub_epoch_segments_v2 VALUES(?, ?)",
(sub_epoch_summary_height, bytes(SubEpochSegments(segments))),
)
await cursor_1.close()
async with self.db_wrapper.lock:
cursor_1 = await self.db.execute(
"INSERT OR REPLACE INTO sub_epoch_segments_v2 VALUES(?, ?)",
(sub_epoch_summary_height, bytes(SubEpochSegments(segments))),
)
await cursor_1.close()
await self.db.commit()

async def get_sub_epoch_challenge_segments(
self,
Expand All @@ -142,7 +137,9 @@ async def get_full_block(self, header_hash: bytes32) -> Optional[FullBlock]:
row = await cursor.fetchone()
await cursor.close()
if row is not None:
return FullBlock.from_bytes(row[0])
block = FullBlock.from_bytes(row[0])
self.block_cache.put(block.header_hash, block)
return block
return None

async def get_full_blocks_at(self, heights: List[uint32]) -> List[FullBlock]:
Expand Down
7 changes: 5 additions & 2 deletions chia/full_node/coin_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.types.coin_record import CoinRecord
from chia.types.full_block import FullBlock
from chia.util.db_wrapper import DBWrapper
from chia.util.ints import uint32, uint64


Expand All @@ -18,13 +19,15 @@ class CoinStore:
coin_record_db: aiosqlite.Connection
coin_record_cache: Dict[str, CoinRecord]
cache_size: uint32
db_wrapper: DBWrapper

@classmethod
async def create(cls, connection: aiosqlite.Connection, cache_size: uint32 = uint32(60000)):
async def create(cls, db_wrapper: DBWrapper, cache_size: uint32 = uint32(60000)):
self = cls()

self.cache_size = cache_size
self.coin_record_db = connection
self.db_wrapper = db_wrapper
self.coin_record_db = db_wrapper.db
await self.coin_record_db.execute("pragma journal_mode=wal")
await self.coin_record_db.execute("pragma synchronous=2")
await self.coin_record_db.execute(
Expand Down
10 changes: 7 additions & 3 deletions chia/full_node/full_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from chia.types.mempool_inclusion_status import MempoolInclusionStatus
from chia.types.spend_bundle import SpendBundle
from chia.types.unfinished_block import UnfinishedBlock
from chia.util.db_wrapper import DBWrapper
from chia.util.errors import ConsensusError, Err
from chia.util.ints import uint8, uint32, uint64, uint128
from chia.util.path import mkdir, path_from_root
Expand Down Expand Up @@ -104,10 +105,11 @@ async def _start(self):
self.timelord_lock = asyncio.Lock()
# create the store (db) and full node instance
self.connection = await aiosqlite.connect(self.db_path)
self.block_store = await BlockStore.create(self.connection)
self.db_wrapper = DBWrapper(self.connection)
self.block_store = await BlockStore.create(self.db_wrapper)
self.full_node_store = await FullNodeStore.create(self.constants)
self.sync_store = await SyncStore.create()
self.coin_store = await CoinStore.create(self.connection)
self.coin_store = await CoinStore.create(self.db_wrapper)
self.log.info("Initializing blockchain from disk")
start_time = time.time()
self.blockchain = await Blockchain.create(self.coin_store, self.block_store, self.constants)
Expand Down Expand Up @@ -1512,7 +1514,9 @@ async def _replace_proof(
if field_vdf == CompressibleVDFField.CC_IP_VDF:
new_block = dataclasses.replace(block, challenge_chain_ip_proof=vdf_proof)
assert new_block is not None
await self.block_store.add_full_block(new_block, block_record)
async with self.db_wrapper.lock:
await self.block_store.add_full_block(new_block, block_record)
await self.block_store.db_wrapper.commit_transaction()

async def respond_compact_proof_of_time(self, request: timelord_protocol.RespondCompactProofOfTime):
field_vdf = CompressibleVDFField(int(request.field_vdf))
Expand Down
29 changes: 29 additions & 0 deletions chia/util/db_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import asyncio

import aiosqlite


class DBWrapper:
"""
This object handles HeaderBlocks and Blocks stored in DB used by wallet.
"""

db: aiosqlite.Connection
lock: asyncio.Lock

def __init__(self, connection: aiosqlite.Connection):
self.db = connection
self.lock = asyncio.Lock()

async def begin_transaction(self):
cursor = await self.db.execute("BEGIN TRANSACTION")
await cursor.close()

async def rollback_transaction(self):
# Also rolls back the coin store, since both stores must be updated at once
if self.db.in_transaction:
cursor = await self.db.execute("ROLLBACK")
await cursor.close()

async def commit_transaction(self):
await self.db.commit()
Loading

0 comments on commit 773adfa

Please sign in to comment.