Skip to content

Commit

Permalink
[Narwhal] switch to DBMap for batch store (MystenLabs#9255)
Browse files Browse the repository at this point in the history
## Description 

Payload writes and reads can take significant time transferring
non-trivial amount of bytes. It does not make sense to do this in a
single threaded loop in `typed_store::Store`. Use `DBMap` interface for
multithreaded processing instead.

## Test Plan 

Existing unit tests.

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
mwtian authored Mar 14, 2023
1 parent b1fcc7c commit 17c7ed9
Show file tree
Hide file tree
Showing 9 changed files with 52 additions and 66 deletions.
16 changes: 5 additions & 11 deletions narwhal/primary/tests/integration_tests_validator_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::{
};
use storage::NodeStorage;
use storage::{CertificateStore, PayloadToken};
use store::Store;
use store::{rocks::DBMap, Map, Store};
use test_utils::{
fixture_batch_with_transactions, make_optimal_certificates, make_optimal_signed_certificates,
temp_dir, AuthorityFixture, CommitteeFixture,
Expand Down Expand Up @@ -92,10 +92,7 @@ async fn test_get_collections() {
.expect("couldn't store batches");
if n != 4 {
// Add batches to the workers store
store
.batch_store
.async_write(batch.digest(), batch.clone())
.await;
store.batch_store.insert(&batch.digest(), &batch).unwrap();
} else {
missing_certificate = digest;
}
Expand Down Expand Up @@ -312,10 +309,7 @@ async fn test_remove_collections() {
.expect("couldn't store batches");
if n != 4 {
// Add batches to the workers store
store
.batch_store
.async_write(batch.digest(), batch.clone())
.await;
store.batch_store.insert(&batch.digest(), &batch).unwrap();
}
}

Expand Down Expand Up @@ -1188,7 +1182,7 @@ async fn fixture_certificate(
header_store: Store<HeaderDigest, Header>,
certificate_store: CertificateStore,
payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
batch_store: Store<BatchDigest, Batch>,
batch_store: DBMap<BatchDigest, Batch>,
) -> (Certificate, Batch) {
let batch = fixture_batch_with_transactions(10);
let worker_id = 0;
Expand Down Expand Up @@ -1221,7 +1215,7 @@ async fn fixture_certificate(
.expect("couldn't store batches");

// Add a batch to the workers store
batch_store.async_write(batch_digest, batch.clone()).await;
batch_store.insert(&batch_digest, &batch).unwrap();

(certificate, batch)
}
Expand Down
4 changes: 2 additions & 2 deletions narwhal/storage/src/node_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct NodeStorage {
pub header_store: Store<HeaderDigest, Header>,
pub certificate_store: CertificateStore,
pub payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
pub batch_store: Store<BatchDigest, Batch>,
pub batch_store: DBMap<BatchDigest, Batch>,
pub consensus_store: Arc<ConsensusStore>,
}

Expand Down Expand Up @@ -95,7 +95,7 @@ impl NodeStorage {
certificate_digest_by_origin_map,
);
let payload_store = Store::new(payload_map);
let batch_store = Store::new(batch_map);
let batch_store = batch_map;
let consensus_store = Arc::new(ConsensusStore::new(last_committed_map, sub_dag_index_map));

Self {
Expand Down
9 changes: 4 additions & 5 deletions narwhal/test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::{
};
use store::rocks::MetricConf;
use store::rocks::ReadWriteOptions;
use store::{reopen, rocks, rocks::DBMap, Store};
use store::{reopen, rocks, rocks::DBMap};
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tracing::info;
use types::{
Expand Down Expand Up @@ -362,16 +362,15 @@ pub fn batch_with_transactions(num_of_transactions: usize) -> Batch {

const BATCHES_CF: &str = "batches";

pub fn open_batch_store() -> Store<BatchDigest, Batch> {
let db = DBMap::<BatchDigest, Batch>::open(
pub fn open_batch_store() -> DBMap<BatchDigest, Batch> {
DBMap::<BatchDigest, Batch>::open(
temp_dir(),
MetricConf::default(),
None,
Some(BATCHES_CF),
&ReadWriteOptions::default(),
)
.unwrap();
Store::new(db)
.unwrap()
}

// Creates one certificate per authority starting and finishing at the specified rounds (inclusive).
Expand Down
8 changes: 4 additions & 4 deletions narwhal/worker/src/batch_maker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::metrics::WorkerMetrics;
use byteorder::{BigEndian, ReadBytesExt};
use fastcrypto::hash::Hash;
use futures::stream::FuturesUnordered;
use store::Store;
use store::{rocks::DBMap, Map};

use config::WorkerId;
use tracing::{debug, error};
Expand Down Expand Up @@ -56,7 +56,7 @@ pub struct BatchMaker {
/// Average resident time in the batch would be ~ (batch seal time - creation time) / 2
batch_start_timestamp: Instant,
/// The batch store to store our own batches.
store: Store<BatchDigest, Batch>,
store: DBMap<BatchDigest, Batch>,
// Output channel to send out batches' digests.
tx_our_batch: Sender<(WorkerOurBatchMessage, PrimaryResponse)>,
}
Expand All @@ -71,7 +71,7 @@ impl BatchMaker {
rx_batch_maker: Receiver<(Transaction, TxResponse)>,
tx_quorum_waiter: Sender<(Batch, tokio::sync::oneshot::Sender<()>)>,
node_metrics: Arc<WorkerMetrics>,
store: Store<BatchDigest, Batch>,
store: DBMap<BatchDigest, Batch>,
tx_our_batch: Sender<(WorkerOurBatchMessage, PrimaryResponse)>,
) -> JoinHandle<()> {
spawn_logged_monitored_task!(
Expand Down Expand Up @@ -272,7 +272,7 @@ impl BatchMaker {
// Now save it to disk
let digest = batch.digest();

if let Err(e) = store.sync_write(digest, batch).await {
if let Err(e) = store.insert(&digest, &batch) {
error!("Store failed with error: {:?}", e);
return;
}
Expand Down
40 changes: 20 additions & 20 deletions narwhal/worker/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ use futures::{stream::FuturesUnordered, StreamExt};

use rand::seq::SliceRandom;
use std::{collections::HashSet, time::Duration};
use store::Store;
use store::{rocks::DBMap, Map};
use tokio::time::sleep;
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, info, trace, warn};
use types::{
metered_channel::Sender, Batch, BatchDigest, PrimaryToWorker, RequestBatchRequest,
RequestBatchResponse, WorkerBatchMessage, WorkerDeleteBatchesMessage, WorkerOthersBatchMessage,
Expand All @@ -33,7 +33,7 @@ pub mod handlers_tests;
pub struct WorkerReceiverHandler<V> {
pub id: WorkerId,
pub tx_others_batch: Sender<WorkerOthersBatchMessage>,
pub store: Store<BatchDigest, Batch>,
pub store: DBMap<BatchDigest, Batch>,
pub validator: V,
}

Expand All @@ -52,7 +52,9 @@ impl<V: TransactionValidator> WorkerToWorker for WorkerReceiverHandler<V> {
));
}
let digest = message.batch.digest();
self.store.async_write(digest, message.batch).await;
self.store.insert(&digest, &message.batch).map_err(|e| {
anemo::rpc::Status::internal(format!("failed to write to batch store: {e:?}"))
})?;
self.tx_others_batch
.send(WorkerOthersBatchMessage {
digest,
Expand All @@ -69,11 +71,9 @@ impl<V: TransactionValidator> WorkerToWorker for WorkerReceiverHandler<V> {
) -> Result<anemo::Response<RequestBatchResponse>, anemo::rpc::Status> {
// TODO [issue #7]: Do some accounting to prevent bad actors from monopolizing our resources
let batch = request.into_body().batch;
let batch = self
.store
.read(batch)
.await
.map_err(|e| anemo::rpc::Status::from_error(Box::new(e)))?;
let batch = self.store.get(&batch).map_err(|e| {
anemo::rpc::Status::internal(format!("failed to read from batch store: {e:?}"))
})?;

Ok(anemo::Response::new(RequestBatchResponse { batch }))
}
Expand All @@ -90,7 +90,7 @@ pub struct PrimaryReceiverHandler<V> {
// The worker information cache.
pub worker_cache: WorkerCache,
// The batch store
pub store: Store<BatchDigest, Batch>,
pub store: DBMap<BatchDigest, Batch>,
// Timeout on RequestBatch RPC.
pub request_batch_timeout: Duration,
// Number of random nodes to query when retrying batch requests.
Expand All @@ -110,7 +110,7 @@ impl<V: TransactionValidator> PrimaryToWorker for PrimaryReceiverHandler<V> {
let mut missing = HashSet::new();
for digest in message.digests.iter() {
// Check if we already have the batch.
match self.store.read(*digest).await {
match self.store.get(digest) {
Ok(None) => {
missing.insert(*digest);
debug!("Requesting sync for batch {digest}");
Expand All @@ -119,8 +119,9 @@ impl<V: TransactionValidator> PrimaryToWorker for PrimaryReceiverHandler<V> {
trace!("Digest {digest} already in store, nothing to sync");
}
Err(e) => {
error!("Failed to read from batch store: {e}");
return Err(anemo::rpc::Status::from_error(Box::new(e)));
return Err(anemo::rpc::Status::internal(format!(
"failed to read from batch store: {e:?}"
)));
}
};
}
Expand Down Expand Up @@ -229,7 +230,7 @@ impl<V: TransactionValidator> PrimaryToWorker for PrimaryReceiverHandler<V> {
}
let digest = batch.digest();
if missing.remove(&digest) {
self.store.sync_write(digest, batch).await.map_err(|e| {
self.store.insert(&digest, &batch).map_err(|e| {
anemo::rpc::Status::internal(format!(
"failed to write to batch store: {e:?}"
))
Expand Down Expand Up @@ -259,12 +260,11 @@ impl<V: TransactionValidator> PrimaryToWorker for PrimaryReceiverHandler<V> {
&self,
request: anemo::Request<WorkerDeleteBatchesMessage>,
) -> Result<anemo::Response<()>, anemo::rpc::Status> {
let digests = request.into_body().digests;
self.store
.remove_all(digests)
.await
.map_err(|e| anemo::rpc::Status::from_error(Box::new(e)))?;

for digest in request.into_body().digests {
self.store.remove(&digest).map_err(|e| {
anemo::rpc::Status::internal(format!("failed to remove from batch store: {e:?}"))
})?;
}
Ok(anemo::Response::new(()))
}
}
15 changes: 5 additions & 10 deletions narwhal/worker/src/tests/batch_maker_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@ use store::rocks::ReadWriteOptions;
use test_utils::{temp_dir, transaction};
use types::PreSubscribedBroadcastSender;

fn create_batches_store() -> Store<BatchDigest, Batch> {
let db = rocks::DBMap::<BatchDigest, Batch>::open(
fn create_batches_store() -> DBMap<BatchDigest, Batch> {
rocks::DBMap::<BatchDigest, Batch>::open(
temp_dir(),
MetricConf::default(),
None,
Some("batches"),
&ReadWriteOptions::default(),
)
.unwrap();
Store::new(db)
.unwrap()
}

#[tokio::test]
Expand Down Expand Up @@ -71,11 +70,7 @@ async fn make_batch() {
assert!(r1.await.is_ok());

// Ensure the batch is stored
assert!(store
.notify_read(expected_batch.digest())
.await
.unwrap()
.is_some());
assert!(store.get(&expected_batch.digest()).unwrap().is_some());
}

#[tokio::test]
Expand Down Expand Up @@ -122,5 +117,5 @@ async fn batch_timeout() {
assert!(r0.await.is_ok());

// Ensure the batch is stored
assert!(store.notify_read(batch.digest()).await.unwrap().is_some());
assert!(store.get(&batch.digest()).unwrap().is_some());
}
10 changes: 5 additions & 5 deletions narwhal/worker/src/tests/handlers_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async fn synchronize() {
let _recv_network = target_worker.new_network(routes);

// Check not in store
assert!(store.read(digest).await.unwrap().is_none());
assert!(store.get(&digest).unwrap().is_none());

// Send a sync request.
let mut request = anemo::Request::new(message);
Expand All @@ -76,7 +76,7 @@ async fn synchronize() {
handler.synchronize(request).await.unwrap();

// Check its now stored
assert!(store.notify_read(digest).await.unwrap().is_some())
assert!(store.get(&digest).unwrap().is_some())
}

#[tokio::test]
Expand Down Expand Up @@ -107,7 +107,7 @@ async fn synchronize_when_batch_exists() {
let batch = test_utils::batch();
let batch_id = batch.digest();
let missing = vec![batch_id];
store.async_write(batch_id, batch).await;
store.insert(&batch_id, &batch).unwrap();

// Set up mock behavior for child RequestBatches RPC.
let target_primary = fixture.authorities().nth(1).unwrap();
Expand Down Expand Up @@ -139,7 +139,7 @@ async fn delete_batches() {
let store = test_utils::open_batch_store();
let batch = test_utils::batch();
let digest = batch.digest();
store.async_write(digest, batch.clone()).await;
store.insert(&digest, &batch).unwrap();

// Send a delete request.
let handler = PrimaryReceiverHandler {
Expand All @@ -160,5 +160,5 @@ async fn delete_batches() {
.await
.unwrap();

assert!(store.read(digest).await.unwrap().is_none());
assert!(store.get(&digest).unwrap().is_none());
}
10 changes: 4 additions & 6 deletions narwhal/worker/src/tests/worker_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,14 @@ async fn reject_invalid_clients_transactions() {
};

// Create a new test store.
let db = rocks::DBMap::<BatchDigest, Batch>::open(
let batch_store = rocks::DBMap::<BatchDigest, Batch>::open(
temp_dir(),
MetricConf::default(),
None,
Some("batches"),
&ReadWriteOptions::default(),
)
.unwrap();
let store = Store::new(db);

let registry = Registry::new();
let metrics = initialise_metrics(&registry);
Expand All @@ -81,7 +80,7 @@ async fn reject_invalid_clients_transactions() {
worker_cache.clone(),
parameters,
NilTxValidator,
store,
batch_store,
metrics,
&mut tx_shutdown,
);
Expand Down Expand Up @@ -147,15 +146,14 @@ async fn handle_clients_transactions() {
};

// Create a new test store.
let db = rocks::DBMap::<BatchDigest, Batch>::open(
let batch_store = rocks::DBMap::<BatchDigest, Batch>::open(
temp_dir(),
MetricConf::default(),
None,
Some("batches"),
&ReadWriteOptions::default(),
)
.unwrap();
let store = Store::new(db);

let registry = Registry::new();
let metrics = initialise_metrics(&registry);
Expand All @@ -171,7 +169,7 @@ async fn handle_clients_transactions() {
worker_cache.clone(),
parameters,
TrivialTransactionValidator::default(),
store,
batch_store,
metrics,
&mut tx_shutdown,
);
Expand Down
6 changes: 3 additions & 3 deletions narwhal/worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use network::metrics::MetricsMakeCallbackHandler;
use std::collections::HashMap;
use std::time::Duration;
use std::{net::Ipv4Addr, sync::Arc, thread::sleep};
use store::Store;
use store::rocks::DBMap;
use tap::TapFallible;
use tokio::task::JoinHandle;
use tower::ServiceBuilder;
Expand Down Expand Up @@ -63,7 +63,7 @@ pub struct Worker {
/// The configuration parameters
parameters: Parameters,
/// The persistent storage.
store: Store<BatchDigest, Batch>,
store: DBMap<BatchDigest, Batch>,
}

impl Worker {
Expand All @@ -75,7 +75,7 @@ impl Worker {
worker_cache: WorkerCache,
parameters: Parameters,
validator: impl TransactionValidator,
store: Store<BatchDigest, Batch>,
store: DBMap<BatchDigest, Batch>,
metrics: Metrics,
tx_shutdown: &mut PreSubscribedBroadcastSender,
) -> Vec<JoinHandle<()>> {
Expand Down

0 comments on commit 17c7ed9

Please sign in to comment.