Skip to content

Commit

Permalink
[Storage] Make the state commit to be fully async. (aptos-labs#6351)
Browse files Browse the repository at this point in the history
  • Loading branch information
grao1991 authored Jan 27, 2023
1 parent 969d036 commit b5ad725
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 67 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions storage/aptosdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ proptest = { workspace = true, optional = true }
proptest-derive = { workspace = true, optional = true }
rayon = { workspace = true }
serde = { workspace = true }
static_assertions = { workspace = true }
thiserror = { workspace = true }

[dev-dependencies]
Expand Down
41 changes: 9 additions & 32 deletions storage/aptosdb/src/state_store/buffered_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ use aptos_types::{
transaction::Version,
};
use std::{
collections::{HashMap, VecDeque},
collections::HashMap,
mem::swap,
sync::{
mpsc,
mpsc::{Receiver, Sender, SyncSender},
mpsc::{Sender, SyncSender},
Arc,
},
thread::JoinHandle,
Expand All @@ -42,16 +42,11 @@ pub struct BufferedState {
state_after_checkpoint: StateDelta,
state_commit_sender: SyncSender<CommitMessage<Arc<StateDelta>>>,
target_items: usize,
snapshot_ready_receivers: VecDeque<Receiver<()>>,
join_handle: Option<JoinHandle<()>>,
}

pub(crate) enum CommitMessage<T> {
Data {
data: T,
prev_snapshot_ready_receiver: Option<Receiver<()>>,
snapshot_ready_sender: Sender<()>,
},
Data(T),
Sync(Sender<()>),
Exit,
}
Expand All @@ -65,22 +60,18 @@ impl BufferedState {
let (state_commit_sender, state_commit_receiver) =
mpsc::sync_channel(ASYNC_COMMIT_CHANNEL_BUFFER_SIZE as usize);
let arc_state_db = Arc::clone(state_db);
let (initial_snapshot_ready_sender, initial_snapshot_ready_receiver) = mpsc::channel();
let join_handle = std::thread::Builder::new()
.name("state-committer".to_string())
.spawn(move || {
let committer = StateSnapshotCommitter::new(arc_state_db, state_commit_receiver);
committer.run();
})
.expect("Failed to spawn state committer thread.");
// The initial snapshot is always already persisted in db.
initial_snapshot_ready_sender.send(()).unwrap();
let myself = Self {
state_until_checkpoint: None,
state_after_checkpoint,
state_commit_sender,
target_items,
snapshot_ready_receivers: VecDeque::from([initial_snapshot_ready_receiver]),
// The join handle of the async state commit thread for graceful drop.
join_handle: Some(join_handle),
};
Expand All @@ -96,29 +87,13 @@ impl BufferedState {
self.state_after_checkpoint.base_version
}

fn send_to_commit(&mut self, to_commit: Arc<StateDelta>) {
let prev_snapshot_ready_receiver = self
.snapshot_ready_receivers
.pop_front()
.expect("receivers should never be empty");
assert!(self.snapshot_ready_receivers.is_empty());
let (snapshot_ready_sender, snapshot_ready_receiver) = mpsc::channel();
self.snapshot_ready_receivers
.push_back(snapshot_ready_receiver);
self.state_commit_sender
.send(CommitMessage::Data {
data: to_commit,
prev_snapshot_ready_receiver: Some(prev_snapshot_ready_receiver),
snapshot_ready_sender,
})
.unwrap();
}

fn maybe_commit(&mut self, sync_commit: bool) {
if sync_commit {
let (commit_sync_sender, commit_sync_receiver) = mpsc::channel();
if let Some(to_commit) = self.state_until_checkpoint.take().map(Arc::from) {
self.send_to_commit(to_commit);
self.state_commit_sender
.send(CommitMessage::Data(to_commit))
.unwrap();
}
self.state_commit_sender
.send(CommitMessage::Sync(commit_sync_sender))
Expand All @@ -144,7 +119,9 @@ impl BufferedState {
version = to_commit.current_version,
"Sent StateDelta to async commit thread."
);
self.send_to_commit(to_commit);
self.state_commit_sender
.send(CommitMessage::Data(to_commit))
.unwrap();
}
}
}
Expand Down
11 changes: 2 additions & 9 deletions storage/aptosdb/src/state_store/state_merkle_batch_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,12 @@ impl StateMerkleBatchCommitter {
pub fn run(self) {
while let Ok(msg) = self.state_merkle_batch_receiver.recv() {
match msg {
CommitMessage::Data {
data,
snapshot_ready_sender,
..
} => {
CommitMessage::Data(state_merkle_batch) => {
let StateMerkleBatch {
batch,
root_hash,
state_delta,
} = data;
} = state_merkle_batch;
// commit jellyfish merkle nodes
let _timer = OTHER_TIMERS_SECONDS
.with_label_values(&["commit_jellyfish_merkle_nodes"])
Expand All @@ -68,9 +64,6 @@ impl StateMerkleBatchCommitter {
.version_cache()
.maybe_evict_version(self.state_db.state_merkle_db.lru_cache());
}
// TODO(grao): Consider remove the following sender once we verified the
// version cache correctly cached all nodes we need.
snapshot_ready_sender.send(()).unwrap();
info!(
version = state_delta.current_version,
base_version = state_delta.base_version,
Expand Down
46 changes: 21 additions & 25 deletions storage/aptosdb/src/state_store/state_snapshot_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@

//! This file defines the state snapshot committer running in background thread within StateStore.
use crate::state_store::{
buffered_state::CommitMessage,
state_merkle_batch_committer::{StateMerkleBatch, StateMerkleBatchCommitter},
StateDb,
use crate::{
state_store::{
buffered_state::CommitMessage,
state_merkle_batch_committer::{StateMerkleBatch, StateMerkleBatchCommitter},
StateDb,
},
versioned_node_cache::VersionedNodeCache,
};
use aptos_logger::trace;
use aptos_storage_interface::{jmt_update_refs, jmt_updates, state_delta::StateDelta};
use static_assertions::const_assert;
use std::{
sync::{
mpsc,
Expand All @@ -27,13 +31,19 @@ pub(crate) struct StateSnapshotCommitter {
}

impl StateSnapshotCommitter {
const CHANNEL_SIZE: usize = 0;

pub fn new(
state_db: Arc<StateDb>,
state_snapshot_commit_receiver: Receiver<CommitMessage<Arc<StateDelta>>>,
) -> Self {
// Note: This is to ensure we cache nodes in memory from previous batches before they get committed to DB.
const_assert!(
StateSnapshotCommitter::CHANNEL_SIZE < VersionedNodeCache::NUM_VERSIONS_TO_CACHE
);
// Rendezvous channel
let (state_merkle_batch_commit_sender, state_merkle_batch_commit_receiver) =
mpsc::sync_channel(0);
mpsc::sync_channel(Self::CHANNEL_SIZE);
let arc_state_db = Arc::clone(&state_db);
let join_handle = std::thread::Builder::new()
.name("state_batch_committer".to_string())
Expand All @@ -56,11 +66,7 @@ impl StateSnapshotCommitter {
pub fn run(self) {
while let Ok(msg) = self.state_snapshot_commit_receiver.recv() {
match msg {
CommitMessage::Data {
data: delta_to_commit,
prev_snapshot_ready_receiver,
snapshot_ready_sender,
} => {
CommitMessage::Data(delta_to_commit) => {
let node_hashes = delta_to_commit
.current
.clone()
Expand All @@ -69,12 +75,6 @@ impl StateSnapshotCommitter {
let version = delta_to_commit.current_version.expect("Cannot be empty");
let base_version = delta_to_commit.base_version;

// Wait for the previous batch to commit before reading the snapshot from db.
prev_snapshot_ready_receiver
.expect("prev_snapshot_ready_receiver cannot be None")
.recv()
.unwrap();

let (batch, root_hash) = self
.state_db
.state_merkle_db
Expand All @@ -90,15 +90,11 @@ impl StateSnapshotCommitter {
)
.expect("Error writing snapshot");
self.state_merkle_batch_commit_sender
.send(CommitMessage::Data {
data: StateMerkleBatch {
batch,
root_hash,
state_delta: delta_to_commit,
},
prev_snapshot_ready_receiver: None,
snapshot_ready_sender,
})
.send(CommitMessage::Data(StateMerkleBatch {
batch,
root_hash,
state_delta: delta_to_commit,
}))
.unwrap();
},
CommitMessage::Sync(finish_sender) => {
Expand Down
2 changes: 1 addition & 1 deletion storage/aptosdb/src/versioned_node_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub(crate) struct VersionedNodeCache {
}

impl VersionedNodeCache {
const NUM_VERSIONS_TO_CACHE: usize = 2;
pub(crate) const NUM_VERSIONS_TO_CACHE: usize = 2;

pub fn new() -> Self {
Self {
Expand Down

0 comments on commit b5ad725

Please sign in to comment.