Skip to content

Commit

Permalink
DL multiple batch updates. (Chia-Network#17414)
Browse files Browse the repository at this point in the history
* DL multiple batch updates.

* Lint.

* Lint.

* Add RPCs.

* Lint

* Lint

* Lint

* Add test.

* Lint.

* Add publish pending root option.

* Improve test.

* Fix cmd.

* Lint.

* Fix test

* Revert

* Add test.

* More tests.

* Improve test.

* Apply suggestions from code review

Co-authored-by: Kyle Altendorf <[email protected]>

* Lint.

* Test.

* Latest local root.

* Change publish to submit.

* More submit instead of publish.

* Coverage.

* Lint.

* Improve coverage.

* Lint.

* Lint.

---------

Co-authored-by: Kyle Altendorf <[email protected]>
  • Loading branch information
fchirica and altendky authored Feb 23, 2024
1 parent cc5a5a9 commit 368fecc
Show file tree
Hide file tree
Showing 9 changed files with 470 additions and 40 deletions.
26 changes: 26 additions & 0 deletions chia/cmds/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,14 @@ def get_value(
@create_rpc_port_option()
@create_fee_option()
@options.create_fingerprint()
@click.option("--submit/--no-submit", default=True, help="Submit the result on chain")
def update_data_store(
id: str,
changelist_string: str,
data_rpc_port: int,
fee: str,
fingerprint: Optional[int],
submit: bool,
) -> None:
from chia.cmds.data_funcs import update_data_store_cmd

Expand All @@ -182,6 +184,30 @@ def update_data_store(
changelist=json.loads(changelist_string),
fee=fee,
fingerprint=fingerprint,
submit_on_chain=submit,
)
)


@data_cmd.command("submit_pending_root", help="Submit on chain a locally stored batch")
@create_data_store_id_option()
@create_rpc_port_option()
@create_fee_option()
@options.create_fingerprint()
def submit_pending_root(
id: str,
data_rpc_port: int,
fee: str,
fingerprint: Optional[int],
) -> None:
from chia.cmds.data_funcs import submit_pending_root_cmd

run(
submit_pending_root_cmd(
rpc_port=data_rpc_port,
store_id=id,
fee=fee,
fingerprint=fingerprint,
)
)

Expand Down
37 changes: 34 additions & 3 deletions chia/cmds/data_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,44 @@ async def update_data_store_cmd(
changelist: List[Dict[str, str]],
fee: Optional[str],
fingerprint: Optional[int],
) -> None:
submit_on_chain: bool,
root_path: Optional[Path] = None,
) -> Dict[str, Any]:
store_id_bytes = bytes32.from_hexstr(store_id)
final_fee = None if fee is None else uint64(int(Decimal(fee) * units["chia"]))
async with get_client(rpc_port=rpc_port, fingerprint=fingerprint) as (client, _):
res = await client.update_data_store(store_id=store_id_bytes, changelist=changelist, fee=final_fee)
res = dict()

async with get_client(rpc_port=rpc_port, fingerprint=fingerprint, root_path=root_path) as (client, _):
res = await client.update_data_store(
store_id=store_id_bytes,
changelist=changelist,
fee=final_fee,
submit_on_chain=submit_on_chain,
)
print(json.dumps(res, indent=4, sort_keys=True))

return res


async def submit_pending_root_cmd(
rpc_port: Optional[int],
store_id: str,
fee: Optional[str],
fingerprint: Optional[int],
root_path: Optional[Path] = None,
) -> Dict[str, Any]:
store_id_bytes = bytes32.from_hexstr(store_id)
final_fee = None if fee is None else uint64(int(Decimal(fee) * units["chia"]))
res = dict()
async with get_client(rpc_port=rpc_port, fingerprint=fingerprint, root_path=root_path) as (client, _):
res = await client.submit_pending_root(
store_id=store_id_bytes,
fee=final_fee,
)
print(json.dumps(res, indent=4, sort_keys=True))

return res


async def get_keys_cmd(
rpc_port: Optional[int],
Expand Down
36 changes: 31 additions & 5 deletions chia/data_layer/data_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,20 +250,43 @@ async def batch_update(
tree_id: bytes32,
changelist: List[Dict[str, Any]],
fee: uint64,
submit_on_chain: bool = True,
) -> Optional[TransactionRecord]:
status = Status.PENDING if submit_on_chain else Status.PENDING_BATCH
await self.batch_insert(tree_id=tree_id, changelist=changelist, status=status)

if submit_on_chain:
return await self.publish_update(tree_id=tree_id, fee=fee)
else:
return None

async def submit_pending_root(
self,
tree_id: bytes32,
fee: uint64,
) -> TransactionRecord:
await self.batch_insert(tree_id=tree_id, changelist=changelist)
return await self.publish_update(tree_id=tree_id, fee=fee)
await self._update_confirmation_status(tree_id=tree_id)

pending_root: Optional[Root] = await self.data_store.get_pending_root(tree_id=tree_id)
if pending_root is None:
raise Exception("Latest root is already confirmed.")
if pending_root.status == Status.PENDING:
raise Exception("Pending root is already submitted.")

await self.data_store.change_root_status(pending_root, Status.PENDING)
return await self.publish_update(tree_id, fee)

async def batch_insert(
self,
tree_id: bytes32,
changelist: List[Dict[str, Any]],
status: Status = Status.PENDING,
) -> bytes32:
await self._update_confirmation_status(tree_id=tree_id)

async with self.data_store.transaction():
pending_root: Optional[Root] = await self.data_store.get_pending_root(tree_id=tree_id)
if pending_root is not None:
if pending_root is not None and pending_root.status == Status.PENDING:
raise Exception("Already have a pending root waiting for confirmation.")

# check before any DL changes that this singleton is currently owned by this wallet
Expand All @@ -272,7 +295,7 @@ async def batch_insert(
raise ValueError(f"Singleton with launcher ID {tree_id} is not owned by DL Wallet")

t1 = time.monotonic()
batch_hash = await self.data_store.insert_batch(tree_id, changelist)
batch_hash = await self.data_store.insert_batch(tree_id, changelist, status)
t2 = time.monotonic()
self.log.info(f"Data store batch update process time: {t2 - t1}.")
# todo return empty node hash from get_tree_root
Expand All @@ -293,6 +316,8 @@ async def publish_update(
pending_root: Optional[Root] = await self.data_store.get_pending_root(tree_id=tree_id)
if pending_root is None:
raise Exception("Latest root is already confirmed.")
if pending_root.status == Status.PENDING_BATCH:
raise Exception("Unable to publish on chain, batch update set still open.")

root_hash = self.none_bytes if pending_root.node_hash is None else pending_root.node_hash

Expand Down Expand Up @@ -411,7 +436,7 @@ async def _update_confirmation_status(self, tree_id: bytes32) -> None:
return
if root is None:
pending_root = await self.data_store.get_pending_root(tree_id=tree_id)
if pending_root is not None:
if pending_root is not None and pending_root.status == Status.PENDING:
if pending_root.generation == 0 and pending_root.node_hash is None:
await self.data_store.change_root_status(pending_root, Status.COMMITTED)
await self.data_store.clear_pending_roots(tree_id=tree_id)
Expand Down Expand Up @@ -450,6 +475,7 @@ async def _update_confirmation_status(self, tree_id: bytes32) -> None:
pending_root is not None
and pending_root.generation == root.generation + 1
and pending_root.node_hash == expected_root_hash
and pending_root.status == Status.PENDING
):
await self.data_store.change_root_status(pending_root, Status.COMMITTED)
await self.data_store.build_ancestor_table_for_latest_root(tree_id=tree_id)
Expand Down
1 change: 1 addition & 0 deletions chia/data_layer/data_layer_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ def row_to_node(row: aiosqlite.Row) -> Node:
class Status(IntEnum):
PENDING = 1
COMMITTED = 2
PENDING_BATCH = 3


class NodeType(IntEnum):
Expand Down
72 changes: 52 additions & 20 deletions chia/data_layer/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,15 @@ async def _insert_terminal_node(self, key: bytes, value: bytes) -> bytes32:
async def get_pending_root(self, tree_id: bytes32) -> Optional[Root]:
async with self.db_wrapper.reader() as reader:
cursor = await reader.execute(
"SELECT * FROM root WHERE tree_id == :tree_id AND status == :status LIMIT 2",
{"tree_id": tree_id, "status": Status.PENDING.value},
"""
SELECT * FROM root WHERE tree_id == :tree_id
AND status IN (:pending_status, :pending_batch_status) LIMIT 2
""",
{
"tree_id": tree_id,
"pending_status": Status.PENDING.value,
"pending_batch_status": Status.PENDING_BATCH.value,
},
)

row = await cursor.fetchone()
Expand All @@ -395,8 +402,12 @@ async def clear_pending_roots(self, tree_id: bytes32) -> Optional[Root]:

if pending_root is not None:
await writer.execute(
"DELETE FROM root WHERE tree_id == :tree_id AND status == :status",
{"tree_id": tree_id, "status": Status.PENDING.value},
"DELETE FROM root WHERE tree_id == :tree_id AND status IN (:pending_status, :pending_batch_status)",
{
"tree_id": tree_id,
"pending_status": Status.PENDING.value,
"pending_batch_status": Status.PENDING_BATCH.value,
},
)

return pending_root
Expand Down Expand Up @@ -1314,7 +1325,7 @@ async def clean_node_table(self, writer: aiosqlite.Connection) -> None:
"""
WITH RECURSIVE pending_nodes AS (
SELECT node_hash AS hash FROM root
WHERE status = ?
WHERE status IN (:pending_status, :pending_batch_status)
UNION ALL
SELECT n.left FROM node n
INNER JOIN pending_nodes pn ON n.hash = pn.hash
Expand All @@ -1328,7 +1339,10 @@ async def clean_node_table(self, writer: aiosqlite.Connection) -> None:
WHERE hash NOT IN (SELECT hash FROM ancestors)
AND hash NOT IN (SELECT hash FROM pending_nodes)
""",
(Status.PENDING.value,),
{
"pending_status": Status.PENDING.value,
"pending_batch_status": Status.PENDING_BATCH.value,
},
)

async def insert_batch(
Expand All @@ -1339,14 +1353,28 @@ async def insert_batch(
) -> Optional[bytes32]:
async with self.db_wrapper.writer() as writer:
old_root = await self.get_tree_root(tree_id)
root_hash = old_root.node_hash
if old_root.node_hash is None:
pending_root = await self.get_pending_root(tree_id=tree_id)
if pending_root is None:
latest_local_root: Optional[Root] = old_root
else:
if pending_root.status == Status.PENDING_BATCH:
# We have an unfinished batch, continue the current batch on top of it.
if pending_root.generation != old_root.generation + 1:
raise Exception("Internal error")
await self.change_root_status(pending_root, Status.COMMITTED)
await self.build_ancestor_table_for_latest_root(tree_id=tree_id)
latest_local_root = pending_root
else:
raise Exception("Internal error")

assert latest_local_root is not None
root_hash = latest_local_root.node_hash
if latest_local_root.node_hash is None:
hint_keys_values = {}
else:
kv_compressed = await self.get_keys_values_compressed(tree_id, root_hash=root_hash)
hint_keys_values = kv_compressed.keys_values_hashed

intermediate_root: Optional[Root] = old_root
for change in changelist:
if change["action"] == "insert":
key = change["key"]
Expand All @@ -1355,9 +1383,9 @@ async def insert_batch(
side = change.get("side", None)
if reference_node_hash is None and side is None:
insert_result = await self.autoinsert(
key, value, tree_id, hint_keys_values, True, Status.COMMITTED, root=intermediate_root
key, value, tree_id, hint_keys_values, True, Status.COMMITTED, root=latest_local_root
)
intermediate_root = insert_result.root
latest_local_root = insert_result.root
else:
if reference_node_hash is None or side is None:
raise Exception("Provide both reference_node_hash and side or neither.")
Expand All @@ -1370,21 +1398,21 @@ async def insert_batch(
hint_keys_values,
True,
Status.COMMITTED,
root=intermediate_root,
root=latest_local_root,
)
intermediate_root = insert_result.root
latest_local_root = insert_result.root
elif change["action"] == "delete":
key = change["key"]
intermediate_root = await self.delete(
key, tree_id, hint_keys_values, True, Status.COMMITTED, root=intermediate_root
latest_local_root = await self.delete(
key, tree_id, hint_keys_values, True, Status.COMMITTED, root=latest_local_root
)
elif change["action"] == "upsert":
key = change["key"]
new_value = change["value"]
insert_result = await self.upsert(
key, new_value, tree_id, hint_keys_values, True, Status.COMMITTED, root=intermediate_root
key, new_value, tree_id, hint_keys_values, True, Status.COMMITTED, root=latest_local_root
)
intermediate_root = insert_result.root
latest_local_root = insert_result.root
else:
raise Exception(f"Operation in batch is not insert or delete: {change}")

Expand All @@ -1396,7 +1424,7 @@ async def insert_batch(
# We delete all "temporary" records stored in root and ancestor tables and store only the final result.
await self.rollback_to_generation(tree_id, old_root.generation)
await self.insert_root_with_ancestor_table(tree_id=tree_id, node_hash=root.node_hash, status=status)
if status == Status.PENDING:
if status in (Status.PENDING, Status.PENDING_BATCH):
new_root = await self.get_pending_root(tree_id=tree_id)
assert new_root is not None
elif status == Status.COMMITTED:
Expand Down Expand Up @@ -1724,7 +1752,7 @@ async def delete_store_data(self, tree_id: bytes32) -> None:
),
pending_nodes AS (
SELECT node_hash AS hash FROM root
WHERE status = :status
WHERE status IN (:pending_status, :pending_batch_status)
UNION ALL
SELECT n.left FROM node n
INNER JOIN pending_nodes pn ON n.hash = pn.hash
Expand All @@ -1740,7 +1768,11 @@ async def delete_store_data(self, tree_id: bytes32) -> None:
WHERE hash NOT IN (SELECT hash FROM ancestors WHERE tree_id != :tree_id)
AND hash NOT IN (SELECT hash from pending_nodes)
""",
{"tree_id": tree_id, "status": Status.PENDING.value},
{
"tree_id": tree_id,
"pending_status": Status.PENDING.value,
"pending_batch_status": Status.PENDING_BATCH.value,
},
)
to_delete: Dict[bytes, Tuple[bytes, bytes]] = {}
ref_counts: Dict[bytes, int] = {}
Expand Down
21 changes: 18 additions & 3 deletions chia/rpc/data_layer_rpc_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def get_routes(self) -> Dict[str, Endpoint]:
"/create_data_store": self.create_data_store,
"/get_owned_stores": self.get_owned_stores,
"/batch_update": self.batch_update,
"/submit_pending_root": self.submit_pending_root,
"/get_value": self.get_value,
"/get_keys": self.get_keys,
"/get_keys_values": self.get_keys_values,
Expand Down Expand Up @@ -240,12 +241,24 @@ async def batch_update(self, request: Dict[str, Any]) -> EndpointResult:
fee = get_fee(self.service.config, request)
changelist = [process_change(change) for change in request["changelist"]]
store_id = bytes32(hexstr_to_bytes(request["id"]))
submit_on_chain = request.get("submit_on_chain", True)
# todo input checks
if self.service is None:
raise Exception("Data layer not created")
transaction_record = await self.service.batch_update(store_id, changelist, uint64(fee))
if transaction_record is None:
raise Exception(f"Batch update failed for: {store_id}")
transaction_record = await self.service.batch_update(store_id, changelist, uint64(fee), submit_on_chain)
if submit_on_chain:
if transaction_record is None:
raise Exception(f"Batch update failed for: {store_id}")
return {"tx_id": transaction_record.name}
else:
if transaction_record is not None:
raise Exception("Transaction submitted on chain, but submit_on_chain set to False")
return {}

async def submit_pending_root(self, request: Dict[str, Any]) -> EndpointResult:
store_id = bytes32(hexstr_to_bytes(request["id"]))
fee = get_fee(self.service.config, request)
transaction_record = await self.service.submit_pending_root(store_id, uint64(fee))
return {"tx_id": transaction_record.name}

async def insert(self, request: Dict[str, Any]) -> EndpointResult:
Expand All @@ -262,6 +275,7 @@ async def insert(self, request: Dict[str, Any]) -> EndpointResult:
raise Exception("Data layer not created")
changelist = [{"action": "insert", "key": key, "value": value}]
transaction_record = await self.service.batch_update(store_id, changelist, uint64(fee))
assert transaction_record is not None
return {"tx_id": transaction_record.name}

async def delete_key(self, request: Dict[str, Any]) -> EndpointResult:
Expand All @@ -277,6 +291,7 @@ async def delete_key(self, request: Dict[str, Any]) -> EndpointResult:
raise Exception("Data layer not created")
changelist = [{"action": "delete", "key": key}]
transaction_record = await self.service.batch_update(store_id, changelist, uint64(fee))
assert transaction_record is not None
return {"tx_id": transaction_record.name}

async def get_root(self, request: Dict[str, Any]) -> EndpointResult:
Expand Down
Loading

0 comments on commit 368fecc

Please sign in to comment.