From f2b24f5de6ade39d608b00ecb22e1b8a79d93f38 Mon Sep 17 00:00:00 2001 From: Anastasios Kichidis Date: Thu, 16 Mar 2023 23:35:46 +0000 Subject: [PATCH] [Narwhal] swap typed_store::Store with DBMap (#9387) ## Description This PR swaps for Narwhal all the `typed_store::Store` instances with `DBMap`. Also, I've wrapped the `DBMaps` behind dedicated structs so we can make things easier if we consider to swap later on `DBMaps` with something different in the future + make it easier to add additional methods on each store if needed in the future (ex update additional secondary index stores in batch fashion etc). ## Test Plan Existing & new unit and integration tests. ## TODO: - [ ] wrap the `batch_store` DBMap behind a dedicated struct --- If your changes are not user-facing and not a breaking change, you can skip the following section. Otherwise, please indicate what changed, and then add to the Release Notes section as highlighted during the release process. ### Type of Change (Check all that apply) - [ ] user-visible impact - [ ] breaking change for a client SDKs - [ ] breaking change for FNs (FN binary must upgrade) - [ ] breaking change for validators or node operators (must upgrade binaries) - [ ] breaking change for on-chain data layout - [ ] necessitate either a data wipe or data migration ### Release notes --- narwhal/primary/src/block_remover.rs | 17 +- narwhal/primary/src/block_synchronizer/mod.rs | 20 +-- .../tests/block_synchronizer_tests.rs | 6 +- .../block_synchronizer/tests/handler_tests.rs | 4 +- narwhal/primary/src/core.rs | 17 +- narwhal/primary/src/primary.rs | 73 ++++---- narwhal/primary/src/synchronizer.rs | 29 ++-- .../primary/src/tests/block_remover_tests.rs | 38 ++--- .../src/tests/certificate_fetcher_tests.rs | 2 +- narwhal/primary/src/tests/common.rs | 28 +--- narwhal/primary/src/tests/primary_tests.rs | 47 +++--- .../primary/src/tests/synchronizer_tests.rs | 4 +- .../tests/integration_tests_validator_api.rs | 40 ++--- narwhal/storage/src/certificate_store.rs | 46 ++--- narwhal/storage/src/header_store.rs | 48 ++++++ narwhal/storage/src/lib.rs | 47 ++++++ narwhal/storage/src/node_store.rs | 38 +++-- narwhal/storage/src/payload_store.rs | 158 ++++++++++++++++++ narwhal/storage/src/vote_digest_store.rs | 46 +++++ narwhal/types/src/primary.rs | 10 ++ 20 files changed, 465 insertions(+), 253 deletions(-) create mode 100644 narwhal/storage/src/header_store.rs create mode 100644 narwhal/storage/src/payload_store.rs create mode 100644 narwhal/storage/src/vote_digest_store.rs diff --git a/narwhal/primary/src/block_remover.rs b/narwhal/primary/src/block_remover.rs index 9579190ee9df9..9fc755f2c1f52 100644 --- a/narwhal/primary/src/block_remover.rs +++ b/narwhal/primary/src/block_remover.rs @@ -10,13 +10,12 @@ use futures::future::try_join_all; use itertools::Either; use network::PrimaryToWorkerRpc; use std::{collections::HashMap, sync::Arc}; -use storage::{CertificateStore, PayloadToken}; -use store::{rocks::TypedStoreError, Store}; +use storage::{CertificateStore, HeaderStore, PayloadStore}; +use store::rocks::TypedStoreError; use tracing::{debug, instrument, warn}; use types::{ - metered_channel::Sender, BatchDigest, Certificate, CertificateDigest, Header, HeaderDigest, - Round, + metered_channel::Sender, BatchDigest, Certificate, CertificateDigest, HeaderDigest, Round, }; #[cfg(test)] @@ -39,10 +38,10 @@ pub struct BlockRemover { certificate_store: CertificateStore, /// Storage that keeps the headers by their digest id - header_store: Store, + header_store: HeaderStore, /// The persistent storage for payload markers from workers. - payload_store: Store<(BatchDigest, WorkerId), PayloadToken>, + payload_store: PayloadStore, /// The Dag structure for managing the stored certificates dag: Option>, @@ -60,8 +59,8 @@ impl BlockRemover { name: PublicKey, worker_cache: WorkerCache, certificate_store: CertificateStore, - header_store: Store, - payload_store: Store<(BatchDigest, WorkerId), PayloadToken>, + header_store: HeaderStore, + payload_store: PayloadStore, dag: Option>, worker_network: anemo::Network, tx_committed_certificates: Sender<(Round, Vec)>, @@ -133,7 +132,6 @@ impl BlockRemover { self.header_store .remove_all(header_digests) - .await .map_err(Either::Left)?; // delete batch from the payload store as well @@ -145,7 +143,6 @@ impl BlockRemover { } self.payload_store .remove_all(batches_to_cleanup) - .await .map_err(Either::Left)?; // NOTE: delete certificates in the end since if we need to repeat the request diff --git a/narwhal/primary/src/block_synchronizer/mod.rs b/narwhal/primary/src/block_synchronizer/mod.rs index 4a57b1c067b56..b9ba005fac457 100644 --- a/narwhal/primary/src/block_synchronizer/mod.rs +++ b/narwhal/primary/src/block_synchronizer/mod.rs @@ -26,8 +26,7 @@ use std::{ collections::{HashMap, HashSet}, time::Duration, }; -use storage::{CertificateStore, PayloadToken}; -use store::Store; +use storage::{CertificateStore, PayloadStore}; use thiserror::Error; use tokio::{sync::mpsc::Sender, task::JoinHandle, time::timeout}; use tracing::{debug, error, info, instrument, trace, warn}; @@ -171,7 +170,7 @@ pub struct BlockSynchronizer { certificate_store: CertificateStore, /// The persistent storage for payload markers from workers - payload_store: Store<(BatchDigest, WorkerId), PayloadToken>, + payload_store: PayloadStore, /// Timeout when synchronizing the certificates certificates_synchronize_timeout: Duration, @@ -192,7 +191,7 @@ impl BlockSynchronizer { rx_shutdown: ConditionalBroadcastReceiver, rx_block_synchronizer_commands: metered_channel::Receiver, network: anemo::Network, - payload_store: Store<(BatchDigest, WorkerId), PayloadToken>, + payload_store: PayloadStore, certificate_store: CertificateStore, parameters: Parameters, ) -> JoinHandle<()> { @@ -548,7 +547,7 @@ impl BlockSynchronizer { "Certificate with id {} not our own, checking in storage.", certificate.digest() ); - match self.payload_store.read_all(payload).await { + match self.payload_store.read_all(payload) { Ok(payload_result) => { payload_result.into_iter().all(|x| x.is_some()).to_owned() } @@ -663,7 +662,7 @@ impl BlockSynchronizer { #[instrument(level = "trace", skip_all, fields(request_id, certificate=?certificate.header.digest()))] async fn wait_for_block_payload<'a>( payload_synchronize_timeout: Duration, - payload_store: Store<(BatchDigest, WorkerId), PayloadToken>, + payload_store: PayloadStore, certificate: Certificate, ) -> State { let futures = certificate @@ -671,18 +670,13 @@ impl BlockSynchronizer { .payload .iter() .map(|(batch_digest, (worker_id, _))| { - payload_store.notify_read((*batch_digest, *worker_id)) + payload_store.notify_contains(*batch_digest, *worker_id) }) .collect::>(); // Wait for all the items to sync - have a timeout let result = timeout(payload_synchronize_timeout, join_all(futures)).await; - if result.is_err() - || result - .unwrap() - .into_iter() - .any(|r| r.map_or_else(|_| true, |f| f.is_none())) - { + if result.is_err() { return State::PayloadSynchronized { result: Err(SyncError::Timeout { digest: certificate.digest(), diff --git a/narwhal/primary/src/block_synchronizer/tests/block_synchronizer_tests.rs b/narwhal/primary/src/block_synchronizer/tests/block_synchronizer_tests.rs index 5cb147e6fe87a..0c531e813f27a 100644 --- a/narwhal/primary/src/block_synchronizer/tests/block_synchronizer_tests.rs +++ b/narwhal/primary/src/block_synchronizer/tests/block_synchronizer_tests.rs @@ -315,7 +315,7 @@ async fn test_successful_payload_synchronization() { // Assume that the request is the correct one and just immediately // store the batch to the payload store. for digest in m.digests { - payload_store.async_write((digest, worker), 1).await; + payload_store.write(&digest, &worker).unwrap(); } } } @@ -637,8 +637,8 @@ async fn test_reply_with_payload_already_in_storage() { if i > NUM_OF_CERTIFICATES_WITH_MISSING_PAYLOAD { certificate_store.write(certificate.clone()).unwrap(); - for (digest, (worker_id, _)) in certificate.header.payload { - payload_store.async_write((digest, worker_id), 1).await; + for (digest, (worker_id, _)) in &certificate.header.payload { + payload_store.write(digest, worker_id).unwrap(); } } } diff --git a/narwhal/primary/src/block_synchronizer/tests/handler_tests.rs b/narwhal/primary/src/block_synchronizer/tests/handler_tests.rs index c876c4bd910cb..c12b16952df87 100644 --- a/narwhal/primary/src/block_synchronizer/tests/handler_tests.rs +++ b/narwhal/primary/src/block_synchronizer/tests/handler_tests.rs @@ -277,8 +277,8 @@ async fn test_synchronize_block_payload() { .build() .unwrap(); let cert_stored = fixture.certificate(&header); - for (digest, (worker_id, _)) in cert_stored.clone().header.payload { - payload_store.async_write((digest, worker_id), 1).await; + for (digest, (worker_id, _)) in &cert_stored.clone().header.payload { + payload_store.write(digest, worker_id).unwrap(); } // AND a certificate with payload NOT available diff --git a/narwhal/primary/src/core.rs b/narwhal/primary/src/core.rs index 7169fdd4f1762..803f06c863b4f 100644 --- a/narwhal/primary/src/core.rs +++ b/narwhal/primary/src/core.rs @@ -12,8 +12,7 @@ use mysten_metrics::{monitored_future, spawn_logged_monitored_task}; use network::anemo_ext::NetworkExt; use std::sync::Arc; use std::time::Duration; -use storage::CertificateStore; -use store::Store; +use storage::{CertificateStore, HeaderStore}; use tokio::{ sync::oneshot, task::{JoinHandle, JoinSet}, @@ -23,8 +22,8 @@ use types::{ ensure, error::{DagError, DagResult}, metered_channel::Receiver, - Certificate, CertificateDigest, ConditionalBroadcastReceiver, Header, HeaderDigest, - PrimaryToPrimaryClient, RequestVoteRequest, Vote, + Certificate, CertificateDigest, ConditionalBroadcastReceiver, Header, PrimaryToPrimaryClient, + RequestVoteRequest, Vote, }; #[cfg(test)] @@ -37,7 +36,7 @@ pub struct Core { /// The committee information. committee: Committee, /// The persistent storage keyed to headers. - header_store: Store, + header_store: HeaderStore, /// The persistent storage keyed to certificates. certificate_store: CertificateStore, /// Handles synchronization with other nodes and our workers. @@ -68,7 +67,7 @@ impl Core { pub fn spawn( name: PublicKey, committee: Committee, - header_store: Store, + header_store: HeaderStore, certificate_store: CertificateStore, synchronizer: Arc, signature_service: SignatureService, @@ -230,7 +229,7 @@ impl Core { async fn propose_header( name: PublicKey, committee: Committee, - header_store: Store, + header_store: HeaderStore, certificate_store: CertificateStore, signature_service: SignatureService, metrics: Arc, @@ -251,9 +250,7 @@ impl Core { } // Process the header. - header_store - .async_write(header.digest(), header.clone()) - .await; + header_store.write(&header)?; metrics.proposed_header_round.set(header.round as i64); // Reset the votes aggregator and sign our own header. diff --git a/narwhal/primary/src/primary.rs b/narwhal/primary/src/primary.rs index 4452049e90987..450edf6e0cd72 100644 --- a/narwhal/primary/src/primary.rs +++ b/narwhal/primary/src/primary.rs @@ -50,8 +50,7 @@ use std::{ thread::sleep, time::Duration, }; -use storage::{CertificateStore, PayloadToken, ProposerStore}; -use store::Store; +use storage::{CertificateStore, HeaderStore, PayloadStore, ProposerStore, VoteDigestStore}; use tokio::{sync::watch, task::JoinHandle}; use tokio::{ sync::{mpsc, oneshot}, @@ -64,13 +63,12 @@ use types::{ ensure, error::{DagError, DagResult}, metered_channel::{channel_with_total, Receiver, Sender}, - now, BatchDigest, Certificate, CertificateDigest, FetchCertificatesRequest, - FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse, Header, - HeaderDigest, PayloadAvailabilityRequest, PayloadAvailabilityResponse, - PreSubscribedBroadcastSender, PrimaryToPrimary, PrimaryToPrimaryServer, RequestVoteRequest, - RequestVoteResponse, Round, SendCertificateRequest, SendCertificateResponse, Vote, VoteInfo, - WorkerInfoResponse, WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerToPrimary, - WorkerToPrimaryServer, + now, Certificate, CertificateDigest, FetchCertificatesRequest, FetchCertificatesResponse, + GetCertificatesRequest, GetCertificatesResponse, PayloadAvailabilityRequest, + PayloadAvailabilityResponse, PreSubscribedBroadcastSender, PrimaryToPrimary, + PrimaryToPrimaryServer, RequestVoteRequest, RequestVoteResponse, Round, SendCertificateRequest, + SendCertificateResponse, Vote, WorkerInfoResponse, WorkerOthersBatchMessage, + WorkerOurBatchMessage, WorkerToPrimary, WorkerToPrimaryServer, }; #[cfg(any(test))] @@ -104,11 +102,11 @@ impl Primary { committee: Committee, worker_cache: WorkerCache, parameters: Parameters, - header_store: Store, + header_store: HeaderStore, certificate_store: CertificateStore, proposer_store: ProposerStore, - payload_store: Store<(BatchDigest, WorkerId), PayloadToken>, - vote_digest_store: Store, + payload_store: PayloadStore, + vote_digest_store: VoteDigestStore, tx_new_certificates: Sender, rx_committed_certificates: Receiver<(Round, Vec)>, rx_consensus_round_updates: watch::Receiver, @@ -649,11 +647,11 @@ struct PrimaryReceiverHandler { synchronizer: Arc, /// Service to sign headers. signature_service: SignatureService, - header_store: Store, + header_store: HeaderStore, certificate_store: CertificateStore, - payload_store: Store<(BatchDigest, WorkerId), PayloadToken>, + payload_store: PayloadStore, /// The store to persist the last voted round per authority, used to ensure idempotence. - vote_digest_store: Store, + vote_digest_store: VoteDigestStore, /// Get a signal when the round changes. rx_narwhal_round_updates: watch::Receiver, metrics: Arc, @@ -847,8 +845,8 @@ impl PrimaryReceiverHandler { // Store the header. self.header_store - .async_write(header.digest(), header.clone()) - .await; + .write(header) + .map_err(DagError::StoreError)?; // Check if we can vote for this header. // Send the vote when: @@ -861,8 +859,7 @@ impl PrimaryReceiverHandler { // so we don't. let result = self .vote_digest_store - .read(header.author.clone()) - .await + .read(&header.author) .map_err(DagError::StoreError)?; if let Some(vote_info) = result { @@ -897,18 +894,8 @@ impl PrimaryReceiverHandler { header, header.round ); - // Update the vote digest store with the vote we just sent. We don't need to store the - // vote itself, since it can be reconstructed using the headers. - self.vote_digest_store - .sync_write( - header.author.clone(), - VoteInfo { - epoch: header.epoch, - round: header.round, - vote_digest: vote.digest(), - }, - ) - .await?; + // Update the vote digest store with the vote we just sent. + self.vote_digest_store.write(&vote)?; Ok(RequestVoteResponse { vote: Some(vote), @@ -1101,17 +1088,13 @@ impl PrimaryToPrimary for PrimaryReceiverHandler { for (id, certificate_option) in digests.into_iter().zip(certificates) { // Find batches only for certificates that exist. if let Some(certificate) = certificate_option { - let payload_available = match self - .payload_store - .read_all( - certificate - .header - .payload - .into_iter() - .map(|(batch, (worker_id, _))| (batch, worker_id)), - ) - .await - { + let payload_available = match self.payload_store.read_all( + certificate + .header + .payload + .into_iter() + .map(|(batch, (worker_id, _))| (batch, worker_id)), + ) { Ok(payload_result) => payload_result.into_iter().all(|x| x.is_some()), Err(err) => { // Assume that we don't have the payloads available, @@ -1138,7 +1121,7 @@ impl PrimaryToPrimary for PrimaryReceiverHandler { #[derive(Clone)] struct WorkerReceiverHandler { tx_our_digests: Sender, - payload_store: Store<(BatchDigest, WorkerId), PayloadToken>, + payload_store: PayloadStore, our_workers: BTreeMap, } @@ -1184,8 +1167,8 @@ impl WorkerToPrimary for WorkerReceiverHandler { ) -> Result, anemo::rpc::Status> { let message = request.into_body(); self.payload_store - .async_write((message.digest, message.worker_id), 0u8) - .await; + .write(&message.digest, &message.worker_id) + .map_err(|e| anemo::rpc::Status::internal(e.to_string()))?; Ok(anemo::Response::new(())) } diff --git a/narwhal/primary/src/synchronizer.rs b/narwhal/primary/src/synchronizer.rs index e05006982d83f..a8c18355b5b99 100644 --- a/narwhal/primary/src/synchronizer.rs +++ b/narwhal/primary/src/synchronizer.rs @@ -2,7 +2,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 use anemo::{rpc::Status, Network, Request, Response}; -use config::{Committee, Epoch, WorkerCache, WorkerId}; +use config::{Committee, Epoch, WorkerCache}; use consensus::consensus::ConsensusRound; use consensus::dag::Dag; use crypto::{NetworkPublicKey, PublicKey}; @@ -25,8 +25,7 @@ use std::{ }, time::Duration, }; -use storage::{CertificateStore, PayloadToken}; -use store::Store; +use storage::{CertificateStore, PayloadStore}; use tokio::{ sync::{broadcast, mpsc, oneshot, watch, MutexGuard}, task::JoinSet, @@ -37,9 +36,8 @@ use types::{ ensure, error::{AcceptNotification, DagError, DagResult}, metered_channel::Sender, - BatchDigest, Certificate, CertificateDigest, Header, PrimaryToPrimaryClient, - PrimaryToWorkerClient, Round, SendCertificateRequest, SendCertificateResponse, - WorkerSynchronizeMessage, + Certificate, CertificateDigest, Header, PrimaryToPrimaryClient, PrimaryToWorkerClient, Round, + SendCertificateRequest, SendCertificateResponse, WorkerSynchronizeMessage, }; use crate::{aggregators::CertificatesAggregator, metrics::PrimaryMetrics, CHANNEL_CAPACITY}; @@ -65,7 +63,9 @@ struct Inner { highest_received_round: AtomicU64, /// The persistent storage tables. certificate_store: CertificateStore, - payload_store: Store<(BatchDigest, WorkerId), PayloadToken>, + /// The persistent store of the available batch digests produced either via our own workers + /// or others workers. + payload_store: PayloadStore, /// Send missing certificates to the `CertificateFetcher`. tx_certificate_fetcher: Sender, /// Send certificates to be accepted into a separate task that runs @@ -263,7 +263,7 @@ impl Synchronizer { worker_cache: WorkerCache, gc_depth: Round, certificate_store: CertificateStore, - payload_store: Store<(BatchDigest, WorkerId), PayloadToken>, + payload_store: PayloadStore, tx_certificate_fetcher: Sender, tx_new_certificates: Sender, tx_parents: Sender<(Vec, Round, Epoch)>, @@ -861,12 +861,7 @@ impl Synchronizer { // 4. The last good node will never be able to sync as it will keep sending its sync requests // to workers #1 (rather than workers #0). Also, clients will never be able to retrieve batch // X as they will be querying worker #1. - if inner - .payload_store - .read((*digest, *worker_id)) - .await? - .is_none() - { + if !inner.payload_store.contains(*digest, *worker_id)? { missing .entry(*worker_id) .or_insert_with(Vec::new) @@ -904,11 +899,11 @@ impl Synchronizer { backoff::Error::transient(DagError::NetworkError(format!("{e:?}"))) }); if result.is_ok() { - for digest in digests.clone() { + for digest in &digests { inner .payload_store - .async_write((digest, worker_id), 0u8) - .await; + .write(digest, &worker_id) + .map_err(|e| backoff::Error::permanent(DagError::StoreError(e)))? } } result diff --git a/narwhal/primary/src/tests/block_remover_tests.rs b/narwhal/primary/src/tests/block_remover_tests.rs index a4e7af9bcf7e3..28a7cd5fea0fe 100644 --- a/narwhal/primary/src/tests/block_remover_tests.rs +++ b/narwhal/primary/src/tests/block_remover_tests.rs @@ -85,19 +85,16 @@ async fn test_successful_blocks_delete() { dag.insert(certificate).await.unwrap(); // write the header - header_store - .async_write(header.clone().digest(), header.clone()) - .await; + header_store.write(&header).unwrap(); header_ids.push(header.clone().digest()); // write the batches to payload store payload_store - .sync_write_all(vec![ - ((batch_1.clone().digest(), worker_id_0), 0), - ((batch_2.clone().digest(), worker_id_1), 0), + .write_all(vec![ + (batch_1.clone().digest(), worker_id_0), + (batch_2.clone().digest(), worker_id_1), ]) - .await .expect("couldn't store batches"); digests.push(digest); @@ -153,7 +150,7 @@ async fn test_successful_blocks_delete() { // ensure that headers have been deleted from store for header_id in header_ids { assert!( - header_store.read(header_id).await.unwrap().is_none(), + header_store.read(&header_id).unwrap().is_none(), "Header shouldn't exist" ); } @@ -162,11 +159,7 @@ async fn test_successful_blocks_delete() { for (worker_id, batch_digests) in worker_batches { for digest in batch_digests { assert!( - payload_store - .read((digest, worker_id)) - .await - .unwrap() - .is_none(), + !payload_store.contains(digest, worker_id).unwrap(), "Payload shouldn't exist" ); } @@ -258,19 +251,16 @@ async fn test_failed_blocks_delete() { dag.insert(certificate).await.unwrap(); // write the header - header_store - .async_write(header.clone().digest(), header.clone()) - .await; + header_store.write(&header).unwrap(); header_ids.push(header.clone().digest()); // write the batches to payload store payload_store - .sync_write_all(vec![ - ((batch_1.clone().digest(), worker_id_0), 0), - ((batch_2.clone().digest(), worker_id_1), 0), + .write_all(vec![ + (batch_1.clone().digest(), worker_id_0), + (batch_2.clone().digest(), worker_id_1), ]) - .await .expect("couldn't store batches"); digests.push(digest); @@ -326,15 +316,11 @@ async fn test_failed_blocks_delete() { assert!(certificate_store.read(digest).unwrap().is_some()); } for header_id in header_ids { - assert!(header_store.read(header_id).await.unwrap().is_some()); + assert!(header_store.read(&header_id).unwrap().is_some()); } for (worker_id, batch_digests) in worker_batches { for digest in batch_digests { - assert!(payload_store - .read((digest, worker_id)) - .await - .unwrap() - .is_some()); + assert!(payload_store.contains(digest, worker_id).unwrap()); } } let mut total_deleted = 0; diff --git a/narwhal/primary/src/tests/certificate_fetcher_tests.rs b/narwhal/primary/src/tests/certificate_fetcher_tests.rs index d40143b7d8b74..d5ee533b399b2 100644 --- a/narwhal/primary/src/tests/certificate_fetcher_tests.rs +++ b/narwhal/primary/src/tests/certificate_fetcher_tests.rs @@ -237,7 +237,7 @@ async fn fetch_certificates_basic() { // Avoid any sort of missing payload by pre-populating the batch for (digest, (worker_id, _)) in headers.iter().flat_map(|h| h.payload.iter()) { - payload_store.async_write((*digest, *worker_id), 0u8).await; + payload_store.write(digest, worker_id).unwrap(); } let total_certificates = fixture.authorities().count() * rounds as usize; diff --git a/narwhal/primary/src/tests/common.rs b/narwhal/primary/src/tests/common.rs index 8c494afb5401b..f90b4e37998bd 100644 --- a/narwhal/primary/src/tests/common.rs +++ b/narwhal/primary/src/tests/common.rs @@ -3,27 +3,23 @@ use config::WorkerId; use crypto::NetworkKeyPair; use std::time::Duration; -use storage::CertificateStore; -use store::{reopen, rocks, rocks::DBMap, rocks::ReadWriteOptions, Store}; +use storage::{CertificateStore, HeaderStore, PayloadStore}; +use store::{reopen, rocks, rocks::DBMap, rocks::ReadWriteOptions}; use test_utils::{ temp_dir, PrimaryToWorkerMockServer, CERTIFICATES_CF, CERTIFICATE_DIGEST_BY_ORIGIN_CF, - CERTIFICATE_DIGEST_BY_ROUND_CF, HEADERS_CF, PAYLOAD_CF, VOTES_CF, + CERTIFICATE_DIGEST_BY_ROUND_CF, HEADERS_CF, PAYLOAD_CF, }; use types::{ - BatchDigest, Certificate, CertificateDigest, Header, HeaderDigest, Round, VoteInfo, + BatchDigest, Certificate, CertificateDigest, Header, HeaderDigest, Round, WorkerSynchronizeMessage, }; -use crypto::{PublicKey, PublicKeyBytes}; +use crypto::PublicKeyBytes; use storage::PayloadToken; use store::rocks::MetricConf; use tokio::{task::JoinHandle, time::Instant}; -pub fn create_db_stores() -> ( - Store, - CertificateStore, - Store<(BatchDigest, WorkerId), PayloadToken>, -) { +pub fn create_db_stores() -> (HeaderStore, CertificateStore, PayloadStore) { // Create a new test store. let rocksdb = rocks::open_cf( temp_dir(), @@ -53,24 +49,16 @@ pub fn create_db_stores() -> ( PAYLOAD_CF;<(BatchDigest, WorkerId), PayloadToken>); ( - Store::new(header_map), + HeaderStore::new(header_map), CertificateStore::new( certificate_map, certificate_digest_by_round_map, certificate_digest_by_origin_map, ), - Store::new(payload_map), + PayloadStore::new(payload_map), ) } -pub fn create_test_vote_store() -> Store { - // Create a new test store. - let rocksdb = rocks::open_cf(temp_dir(), None, MetricConf::default(), &[VOTES_CF]) - .expect("Failed creating database"); - let votes_map = reopen!(&rocksdb, VOTES_CF;); - Store::new(votes_map) -} - #[must_use] pub fn worker_listener( // -1 means receive unlimited messages until timeout expires diff --git a/narwhal/primary/src/tests/primary_tests.rs b/narwhal/primary/src/tests/primary_tests.rs index b5a0913d48f64..99fe6866b4b8c 100644 --- a/narwhal/primary/src/tests/primary_tests.rs +++ b/narwhal/primary/src/tests/primary_tests.rs @@ -27,11 +27,10 @@ use std::{ sync::Arc, time::Duration, }; -use storage::CertificateStore; -use storage::NodeStorage; use storage::PayloadToken; +use storage::{CertificateStore, VoteDigestStore}; +use storage::{NodeStorage, PayloadStore}; use store::rocks::{DBMap, MetricConf, ReadWriteOptions}; -use store::Store; use test_utils::{make_optimal_signed_certificates, temp_dir, CommitteeFixture}; use tokio::{ sync::{oneshot, watch}, @@ -341,7 +340,7 @@ async fn test_request_vote_send_missing_parents() { header_store: header_store.clone(), certificate_store: certificate_store.clone(), payload_store: payload_store.clone(), - vote_digest_store: crate::common::create_test_vote_store(), + vote_digest_store: VoteDigestStore::new_for_tests(), rx_narwhal_round_updates, metrics: metrics.clone(), }; @@ -375,7 +374,7 @@ async fn test_request_vote_send_missing_parents() { // into the storage as parents of round 2 certificates. But to test phase 2 they are left out. for cert in round_2_parents { for (digest, (worker_id, _)) in &cert.header.payload { - payload_store.async_write((*digest, *worker_id), 1).await; + payload_store.write(digest, worker_id).unwrap(); } certificate_store.write(cert.clone()).unwrap(); } @@ -485,7 +484,7 @@ async fn test_request_vote_accept_missing_parents() { header_store: header_store.clone(), certificate_store: certificate_store.clone(), payload_store: payload_store.clone(), - vote_digest_store: crate::common::create_test_vote_store(), + vote_digest_store: VoteDigestStore::new_for_tests(), rx_narwhal_round_updates, metrics: metrics.clone(), }; @@ -520,19 +519,19 @@ async fn test_request_vote_accept_missing_parents() { // should be able to get accepted. for cert in round_1_certs { for (digest, (worker_id, _)) in &cert.header.payload { - payload_store.async_write((*digest, *worker_id), 1).await; + payload_store.write(digest, worker_id).unwrap(); } certificate_store.write(cert.clone()).unwrap(); } for cert in round_2_parents { for (digest, (worker_id, _)) in &cert.header.payload { - payload_store.async_write((*digest, *worker_id), 1).await; + payload_store.write(digest, worker_id).unwrap(); } certificate_store.write(cert.clone()).unwrap(); } // Populate new header payload so they don't have to be retrieved. for (digest, (worker_id, _)) in &test_header.payload { - payload_store.async_write((*digest, *worker_id), 1).await; + payload_store.write(digest, worker_id).unwrap(); } // TEST PHASE 1: Handler should report missing parent certificates to caller. @@ -621,7 +620,7 @@ async fn test_request_vote_missing_batches() { header_store: header_store.clone(), certificate_store: certificate_store.clone(), payload_store: payload_store.clone(), - vote_digest_store: crate::common::create_test_vote_store(), + vote_digest_store: VoteDigestStore::new_for_tests(), rx_narwhal_round_updates, metrics: metrics.clone(), }; @@ -640,8 +639,8 @@ async fn test_request_vote_missing_batches() { certificates.insert(digest, certificate.clone()); certificate_store.write(certificate.clone()).unwrap(); - for (digest, (worker_id, _)) in certificate.header.payload { - payload_store.async_write((digest, worker_id), 1).await; + for (digest, (worker_id, _)) in &certificate.header.payload { + payload_store.write(digest, worker_id).unwrap(); } } let test_header = author @@ -746,7 +745,7 @@ async fn test_request_vote_already_voted() { header_store: header_store.clone(), certificate_store: certificate_store.clone(), payload_store: payload_store.clone(), - vote_digest_store: crate::common::create_test_vote_store(), + vote_digest_store: VoteDigestStore::new_for_tests(), rx_narwhal_round_updates, metrics: metrics.clone(), }; @@ -765,8 +764,8 @@ async fn test_request_vote_already_voted() { certificates.insert(digest, certificate.clone()); certificate_store.write(certificate.clone()).unwrap(); - for (digest, (worker_id, _)) in certificate.header.payload { - payload_store.async_write((digest, worker_id), 1).await; + for (digest, (worker_id, _)) in &certificate.header.payload { + payload_store.write(digest, worker_id).unwrap(); } } @@ -904,7 +903,7 @@ async fn test_fetch_certificates_handler() { header_store: header_store.clone(), certificate_store: certificate_store.clone(), payload_store: payload_store.clone(), - vote_digest_store: crate::common::create_test_vote_store(), + vote_digest_store: VoteDigestStore::new_for_tests(), rx_narwhal_round_updates, metrics: metrics.clone(), }; @@ -1073,7 +1072,7 @@ async fn test_process_payload_availability_success() { header_store: header_store.clone(), certificate_store: certificate_store.clone(), payload_store: payload_store.clone(), - vote_digest_store: crate::common::create_test_vote_store(), + vote_digest_store: VoteDigestStore::new_for_tests(), rx_narwhal_round_updates, metrics: metrics.clone(), }; @@ -1101,8 +1100,8 @@ async fn test_process_payload_availability_success() { // write the certificate certificate_store.write(certificate.clone()).unwrap(); - for (digest, (worker_id, _)) in certificate.header.payload { - payload_store.async_write((digest, worker_id), 1).await; + for (digest, (worker_id, _)) in &certificate.header.payload { + payload_store.write(digest, worker_id).unwrap(); } } else { missing_certificates.insert(digest); @@ -1177,7 +1176,7 @@ async fn test_process_payload_availability_when_failures() { certificate_digest_by_round_map, certificate_digest_by_origin_map, ); - let payload_store: Store<(BatchDigest, WorkerId), PayloadToken> = Store::new(payload_map); + let payload_store = PayloadStore::new(payload_map); let fixture = CommitteeFixture::builder() .randomize_ports(true) @@ -1224,7 +1223,7 @@ async fn test_process_payload_availability_when_failures() { header_store: header_store.clone(), certificate_store: certificate_store.clone(), payload_store: payload_store.clone(), - vote_digest_store: crate::common::create_test_vote_store(), + vote_digest_store: VoteDigestStore::new_for_tests(), rx_narwhal_round_updates, metrics: metrics.clone(), }; @@ -1323,7 +1322,7 @@ async fn test_request_vote_created_at_in_future() { header_store: header_store.clone(), certificate_store: certificate_store.clone(), payload_store: payload_store.clone(), - vote_digest_store: crate::common::create_test_vote_store(), + vote_digest_store: VoteDigestStore::new_for_tests(), rx_narwhal_round_updates, metrics: metrics.clone(), }; @@ -1342,8 +1341,8 @@ async fn test_request_vote_created_at_in_future() { certificates.insert(digest, certificate.clone()); certificate_store.write(certificate.clone()).unwrap(); - for (digest, (worker_id, _)) in certificate.header.payload { - payload_store.async_write((digest, worker_id), 1).await; + for (digest, (worker_id, _)) in &certificate.header.payload { + payload_store.write(digest, worker_id).unwrap(); } } diff --git a/narwhal/primary/src/tests/synchronizer_tests.rs b/narwhal/primary/src/tests/synchronizer_tests.rs index c39b93a1c1889..942486fad17cf 100644 --- a/narwhal/primary/src/tests/synchronizer_tests.rs +++ b/narwhal/primary/src/tests/synchronizer_tests.rs @@ -813,8 +813,8 @@ async fn sync_batches_drops_old() { certificates.insert(digest, certificate.clone()); certificate_store.write(certificate.clone()).unwrap(); - for (digest, (worker_id, _)) in certificate.header.payload { - payload_store.async_write((digest, worker_id), 1).await; + for (digest, (worker_id, _)) in &certificate.header.payload { + payload_store.write(digest, worker_id).unwrap(); } } let test_header = author diff --git a/narwhal/primary/tests/integration_tests_validator_api.rs b/narwhal/primary/tests/integration_tests_validator_api.rs index 36d783ea38dd7..3945dc96c5ae8 100644 --- a/narwhal/primary/tests/integration_tests_validator_api.rs +++ b/narwhal/primary/tests/integration_tests_validator_api.rs @@ -1,7 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use config::{BlockSynchronizerParameters, Committee, Parameters, WorkerId}; +use config::{BlockSynchronizerParameters, Committee, Parameters}; use consensus::consensus::ConsensusRound; use consensus::{dag::Dag, metrics::ConsensusMetrics}; use crypto::PublicKey; @@ -16,9 +16,9 @@ use std::{ sync::Arc, time::Duration, }; -use storage::NodeStorage; -use storage::{CertificateStore, PayloadToken}; -use store::{rocks::DBMap, Map, Store}; +use storage::{CertificateStore, HeaderStore}; +use storage::{NodeStorage, PayloadStore}; +use store::{rocks::DBMap, Map}; use test_utils::{ fixture_batch_with_transactions, make_optimal_certificates, make_optimal_signed_certificates, temp_dir, AuthorityFixture, CommitteeFixture, @@ -27,9 +27,8 @@ use tokio::sync::watch; use tonic::transport::Channel; use types::{ Batch, BatchDigest, Certificate, CertificateDigest, CertificateDigestProto, - CollectionRetrievalResult, Empty, GetCollectionsRequest, Header, HeaderDigest, - PreSubscribedBroadcastSender, ReadCausalRequest, RemoveCollectionsRequest, RetrievalResult, - Transaction, ValidatorClient, + CollectionRetrievalResult, Empty, GetCollectionsRequest, PreSubscribedBroadcastSender, + ReadCausalRequest, RemoveCollectionsRequest, RetrievalResult, Transaction, ValidatorClient, }; use worker::{metrics::initialise_metrics, TrivialTransactionValidator, Worker}; @@ -77,18 +76,14 @@ async fn test_get_collections() { store.certificate_store.write(certificate.clone()).unwrap(); // Write the header - store - .header_store - .async_write(header.clone().digest(), header.clone()) - .await; + store.header_store.write(&header).unwrap(); header_digests.push(header.clone().digest()); // Write the batches to payload store store .payload_store - .sync_write_all(vec![((batch.clone().digest(), worker_id), 0)]) - .await + .write_all(vec![(batch.clone().digest(), worker_id)]) .expect("couldn't store batches"); if n != 4 { // Add batches to the workers store @@ -294,18 +289,14 @@ async fn test_remove_collections() { dag.insert(certificate.clone()).await.unwrap(); // Write the header - store - .header_store - .async_write(header.clone().digest(), header.clone()) - .await; + store.header_store.write(&header).unwrap(); header_digests.push(header.clone().digest()); // Write the batches to payload store store .payload_store - .sync_write_all(vec![((batch.clone().digest(), worker_id), 0)]) - .await + .write_all(vec![(batch.clone().digest(), worker_id)]) .expect("couldn't store batches"); if n != 4 { // Add batches to the workers store @@ -1179,9 +1170,9 @@ async fn fixture_certificate( authority: &AuthorityFixture, committee: &Committee, fixture: &CommitteeFixture, - header_store: Store, + header_store: HeaderStore, certificate_store: CertificateStore, - payload_store: Store<(BatchDigest, WorkerId), PayloadToken>, + payload_store: PayloadStore, batch_store: DBMap, ) -> (Certificate, Batch) { let batch = fixture_batch_with_transactions(10); @@ -1204,14 +1195,11 @@ async fn fixture_certificate( certificate_store.write(certificate.clone()).unwrap(); // Write the header - header_store - .async_write(header.clone().digest(), header.clone()) - .await; + header_store.write(&header).unwrap(); // Write the batches to payload store payload_store - .sync_write_all(vec![((batch_digest, worker_id), 0)]) - .await + .write_all(vec![(batch_digest, worker_id)]) .expect("couldn't store batches"); // Add a batch to the workers store diff --git a/narwhal/storage/src/certificate_store.rs b/narwhal/storage/src/certificate_store.rs index b4a7c3d410223..754d38271e311 100644 --- a/narwhal/storage/src/certificate_store.rs +++ b/narwhal/storage/src/certificate_store.rs @@ -1,21 +1,14 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 use crypto::{traits::InsecureDefault, PublicKey, PublicKeyBytes}; -use dashmap::DashMap; use fastcrypto::hash::Hash; -use std::{ - cmp::Ordering, - collections::{BTreeMap, VecDeque}, - iter, - sync::Arc, -}; +use std::{cmp::Ordering, collections::BTreeMap, iter}; +use crate::NotifySubscribers; use store::{ rocks::{DBMap, TypedStoreError::RocksDBError}, Map, }; -use tokio::sync::{oneshot, oneshot::Sender}; -use tracing::warn; use types::{Certificate, CertificateDigest, Round, StoreResult}; /// The main storage when we have to deal with certificates. It maintains @@ -39,9 +32,8 @@ pub struct CertificateStore { /// This helps us to perform range requests based on rounds. We avoid storing again the /// certificate here to not waste space. To dereference we use the certificates_by_id storage. certificate_id_by_origin: DBMap<(PublicKeyBytes, Round), CertificateDigest>, - /// Senders to notify for a write that happened for - /// the specified certificate digest id - notify_on_write_subscribers: Arc>>>, + /// The pub/sub to notify for a write that happened for a certificate digest id + notify_subscribers: NotifySubscribers, } impl CertificateStore { @@ -54,7 +46,7 @@ impl CertificateStore { certificates_by_id, certificate_id_by_round, certificate_id_by_origin, - notify_on_write_subscribers: Arc::new(DashMap::new()), + notify_subscribers: NotifySubscribers::new(), } } @@ -102,7 +94,7 @@ impl CertificateStore { let result = batch.write(); if result.is_ok() { - self.notify_subscribers(id, certificate); + self.notify_subscribers.notify(&id, &certificate); } result @@ -152,7 +144,8 @@ impl CertificateStore { if result.is_ok() { for (_id, certificate) in certificates { - self.notify_subscribers(certificate.digest(), certificate); + self.notify_subscribers + .notify(&certificate.digest(), &certificate); } } @@ -220,17 +213,13 @@ impl CertificateStore { /// Waits to get notified until the requested certificate becomes available pub async fn notify_read(&self, id: CertificateDigest) -> StoreResult { // we register our interest to be notified with the value - let (sender, receiver) = oneshot::channel(); - self.notify_on_write_subscribers - .entry(id) - .or_insert_with(VecDeque::new) - .push_back(sender); + let receiver = self.notify_subscribers.subscribe(&id); // let's read the value because we might have missed the opportunity // to get notified about it if let Ok(Some(cert)) = self.read(id) { // notify any obligations - and remove the entries - self.notify_subscribers(id, cert.clone()); + self.notify_subscribers.notify(&id, &cert); // reply directly return Ok(cert); @@ -472,21 +461,6 @@ impl CertificateStore { pub fn is_empty(&self) -> bool { self.certificates_by_id.is_empty() } - - /// Notifies the subscribed ones that listen on updates for the - /// certificate with the provided id. The obligations are notified - /// with the provided value. The obligation entries under the certificate id - /// are removed completely. If we fail to notify an obligation we don't - /// fail and we rather print a warn message. - fn notify_subscribers(&self, id: CertificateDigest, value: Certificate) { - if let Some((_, mut senders)) = self.notify_on_write_subscribers.remove(&id) { - while let Some(s) = senders.pop_front() { - if s.send(value.clone()).is_err() { - warn!("Couldn't notify obligation for certificate with id {id}"); - } - } - } - } } #[cfg(test)] diff --git a/narwhal/storage/src/header_store.rs b/narwhal/storage/src/header_store.rs new file mode 100644 index 0000000000000..373bc5e616a9a --- /dev/null +++ b/narwhal/storage/src/header_store.rs @@ -0,0 +1,48 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::NodeStorage; +use store::rocks::ReadWriteOptions; +use store::rocks::{open_cf, DBMap, MetricConf}; +use store::{reopen, Map, TypedStoreError}; +use types::{Header, HeaderDigest}; + +#[derive(Clone)] +pub struct HeaderStore { + store: DBMap, +} + +impl HeaderStore { + pub fn new(header_store: DBMap) -> Self { + Self { + store: header_store, + } + } + + pub fn new_for_tests() -> Self { + let rocksdb = open_cf( + tempfile::tempdir().unwrap(), + None, + MetricConf::default(), + &[NodeStorage::HEADERS_CF], + ) + .expect("Cannot open database"); + let map = reopen!(&rocksdb, NodeStorage::HEADERS_CF;); + Self::new(map) + } + + pub fn read(&self, id: &HeaderDigest) -> Result, TypedStoreError> { + self.store.get(id) + } + + pub fn write(&self, header: &Header) -> Result<(), TypedStoreError> { + self.store.insert(&header.digest(), header) + } + + pub fn remove_all( + &self, + keys: impl IntoIterator, + ) -> Result<(), TypedStoreError> { + self.store.multi_remove(keys) + } +} diff --git a/narwhal/storage/src/lib.rs b/narwhal/storage/src/lib.rs index ea80b5cd8cc19..35817d5139ec6 100644 --- a/narwhal/storage/src/lib.rs +++ b/narwhal/storage/src/lib.rs @@ -2,9 +2,56 @@ // SPDX-License-Identifier: Apache-2.0 mod certificate_store; +mod header_store; mod node_store; +mod payload_store; mod proposer_store; +mod vote_digest_store; pub use certificate_store::*; +use dashmap::DashMap; +pub use header_store::*; pub use node_store::*; +pub use payload_store::*; pub use proposer_store::*; +use std::hash::Hash; +use std::sync::Arc; +use tokio::sync::oneshot; +use tokio::sync::oneshot::{Receiver, Sender}; +use tracing::warn; +pub use vote_digest_store::*; + +// A simple pub/sub to notify subscribers when a value becomes available. +#[derive(Clone)] +struct NotifySubscribers { + notify_subscribers: Arc>>>, +} + +impl NotifySubscribers { + fn new() -> Self { + Self { + notify_subscribers: Arc::new(DashMap::new()), + } + } + + // Subscribe in order to be notified once the value for the corresponding key becomes available. + fn subscribe(&self, key: &K) -> Receiver { + let (sender, receiver) = oneshot::channel(); + self.notify_subscribers + .entry(key.clone()) + .or_insert_with(Vec::new) + .push(sender); + receiver + } + + // Notify the subscribers that are waiting on the value for the corresponding key. + fn notify(&self, key: &K, value: &V) { + if let Some((_, mut senders)) = self.notify_subscribers.remove(key) { + while let Some(s) = senders.pop() { + if s.send(value.clone()).is_err() { + warn!("Couldn't notify subscriber"); + } + } + } + } +} diff --git a/narwhal/storage/src/node_store.rs b/narwhal/storage/src/node_store.rs index eadc52af4711b..175864a20509b 100644 --- a/narwhal/storage/src/node_store.rs +++ b/narwhal/storage/src/node_store.rs @@ -1,15 +1,17 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use crate::payload_store::PayloadStore; use crate::proposer_store::ProposerKey; -use crate::{CertificateStore, ProposerStore}; +use crate::vote_digest_store::VoteDigestStore; +use crate::{CertificateStore, HeaderStore, ProposerStore}; use config::WorkerId; use crypto::{PublicKey, PublicKeyBytes}; use std::sync::Arc; use std::time::Duration; use store::metrics::SamplingInterval; +use store::reopen; use store::rocks::DBMap; use store::rocks::{open_cf, MetricConf, ReadWriteOptions}; -use store::{reopen, Store}; use types::{ Batch, BatchDigest, Certificate, CertificateDigest, CommittedSubDagShell, ConsensusStore, Header, HeaderDigest, Round, SequenceNumber, VoteInfo, @@ -22,26 +24,26 @@ pub type PayloadToken = u8; #[derive(Clone)] pub struct NodeStorage { pub proposer_store: ProposerStore, - pub vote_digest_store: Store, - pub header_store: Store, + pub vote_digest_store: VoteDigestStore, + pub header_store: HeaderStore, pub certificate_store: CertificateStore, - pub payload_store: Store<(BatchDigest, WorkerId), PayloadToken>, + pub payload_store: PayloadStore, pub batch_store: DBMap, pub consensus_store: Arc, } impl NodeStorage { /// The datastore column family names. - const LAST_PROPOSED_CF: &'static str = "last_proposed"; - const VOTES_CF: &'static str = "votes"; - const HEADERS_CF: &'static str = "headers"; - const CERTIFICATES_CF: &'static str = "certificates"; - const CERTIFICATE_DIGEST_BY_ROUND_CF: &'static str = "certificate_digest_by_round"; - const CERTIFICATE_DIGEST_BY_ORIGIN_CF: &'static str = "certificate_digest_by_origin"; - const PAYLOAD_CF: &'static str = "payload"; - const BATCHES_CF: &'static str = "batches"; - const LAST_COMMITTED_CF: &'static str = "last_committed"; - const SUB_DAG_INDEX_CF: &'static str = "sub_dag"; + pub(crate) const LAST_PROPOSED_CF: &'static str = "last_proposed"; + pub(crate) const VOTES_CF: &'static str = "votes"; + pub(crate) const HEADERS_CF: &'static str = "headers"; + pub(crate) const CERTIFICATES_CF: &'static str = "certificates"; + pub(crate) const CERTIFICATE_DIGEST_BY_ROUND_CF: &'static str = "certificate_digest_by_round"; + pub(crate) const CERTIFICATE_DIGEST_BY_ORIGIN_CF: &'static str = "certificate_digest_by_origin"; + pub(crate) const PAYLOAD_CF: &'static str = "payload"; + pub(crate) const BATCHES_CF: &'static str = "batches"; + pub(crate) const LAST_COMMITTED_CF: &'static str = "last_committed"; + pub(crate) const SUB_DAG_INDEX_CF: &'static str = "sub_dag"; /// Open or reopen all the storage of the node. pub fn reopen + Send>(store_path: Path) -> Self { @@ -91,14 +93,14 @@ impl NodeStorage { ); let proposer_store = ProposerStore::new(last_proposed_map); - let vote_digest_store = Store::new(votes_map); - let header_store = Store::new(header_map); + let vote_digest_store = VoteDigestStore::new(votes_map); + let header_store = HeaderStore::new(header_map); let certificate_store = CertificateStore::new( certificate_map, certificate_digest_by_round_map, certificate_digest_by_origin_map, ); - let payload_store = Store::new(payload_map); + let payload_store = PayloadStore::new(payload_map); let batch_store = batch_map; let consensus_store = Arc::new(ConsensusStore::new(last_committed_map, sub_dag_index_map)); diff --git a/narwhal/storage/src/payload_store.rs b/narwhal/storage/src/payload_store.rs new file mode 100644 index 0000000000000..488cb881f1e24 --- /dev/null +++ b/narwhal/storage/src/payload_store.rs @@ -0,0 +1,158 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{NodeStorage, NotifySubscribers, PayloadToken}; +use config::WorkerId; +use store::reopen; +use store::rocks::{open_cf, MetricConf, ReadWriteOptions}; +use store::{rocks::DBMap, Map, TypedStoreError}; +use types::BatchDigest; + +/// Store of the batch digests for the primary node for the own created batches. +#[derive(Clone)] +pub struct PayloadStore { + store: DBMap<(BatchDigest, WorkerId), PayloadToken>, + + /// Senders to notify for a write that happened for the specified batch digest and worker id + notify_subscribers: NotifySubscribers<(BatchDigest, WorkerId), ()>, +} + +impl PayloadStore { + pub fn new(payload_store: DBMap<(BatchDigest, WorkerId), PayloadToken>) -> Self { + Self { + store: payload_store, + notify_subscribers: NotifySubscribers::new(), + } + } + + pub fn new_for_tests() -> Self { + let rocksdb = open_cf( + tempfile::tempdir().unwrap(), + None, + MetricConf::default(), + &[NodeStorage::PAYLOAD_CF], + ) + .expect("Cannot open database"); + let map = + reopen!(&rocksdb, NodeStorage::PAYLOAD_CF;<(BatchDigest, WorkerId), PayloadToken>); + PayloadStore::new(map) + } + + pub fn write(&self, digest: &BatchDigest, worker_id: &WorkerId) -> Result<(), TypedStoreError> { + self.store.insert(&(*digest, *worker_id), &0u8)?; + self.notify_subscribers.notify(&(*digest, *worker_id), &()); + Ok(()) + } + + /// Writes all the provided values atomically in store - either all will succeed or nothing will + /// be stored. + pub fn write_all( + &self, + keys: impl IntoIterator + Clone, + ) -> Result<(), TypedStoreError> { + self.store + .multi_insert(keys.clone().into_iter().map(|e| (e, 0u8)))?; + + keys.into_iter().for_each(|(digest, worker_id)| { + self.notify_subscribers.notify(&(digest, worker_id), &()); + }); + Ok(()) + } + + /// Queries the store whether the batch with provided `digest` and `worker_id` exists. It returns + /// `true` if exists, `false` otherwise. + pub fn contains( + &self, + digest: BatchDigest, + worker_id: WorkerId, + ) -> Result { + self.store + .get(&(digest, worker_id)) + .map(|result| result.is_some()) + } + + /// When called the method will wait until the entry of batch with `digest` and `worker_id` + /// becomes available. + pub async fn notify_contains( + &self, + digest: BatchDigest, + worker_id: WorkerId, + ) -> Result<(), TypedStoreError> { + let receiver = self.notify_subscribers.subscribe(&(digest, worker_id)); + + // let's read the value because we might have missed the opportunity + // to get notified about it + if self.contains(digest, worker_id)? { + // notify any obligations - and remove the entries (including ours) + self.notify_subscribers.notify(&(digest, worker_id), &()); + + // reply directly + return Ok(()); + } + + // now wait to hear back the result + receiver + .await + .expect("Irrecoverable error while waiting to receive the notify_contains result"); + + Ok(()) + } + + pub fn read_all( + &self, + keys: impl IntoIterator, + ) -> Result>, TypedStoreError> { + self.store.multi_get(keys) + } + + pub fn remove_all( + &self, + keys: impl IntoIterator, + ) -> Result<(), TypedStoreError> { + self.store.multi_remove(keys) + } +} + +#[cfg(test)] +mod tests { + use crate::PayloadStore; + use fastcrypto::hash::Hash; + use futures::future::join_all; + use types::Batch; + + #[tokio::test] + async fn test_notify_read() { + let store = PayloadStore::new_for_tests(); + + // run the tests a few times + let batch: Batch = test_utils::fixture_batch_with_transactions(10); + let id = batch.digest(); + let worker_id = 0; + + // now populate a batch + store.write(&id, &worker_id).unwrap(); + + // now spawn a series of tasks before writing anything in store + let mut handles = vec![]; + for _i in 0..5 { + let cloned_store = store.clone(); + let handle = + tokio::spawn(async move { cloned_store.notify_contains(id, worker_id).await }); + + handles.push(handle) + } + + // and populate the rest with a write_all + store.write_all(vec![(id, worker_id)]).unwrap(); + + // now asset the notify reads return with the result + let result = join_all(handles).await; + + assert_eq!(result.len(), 5); + + for r in result { + let token = r.unwrap(); + assert!(token.is_ok()); + } + } +} diff --git a/narwhal/storage/src/vote_digest_store.rs b/narwhal/storage/src/vote_digest_store.rs new file mode 100644 index 0000000000000..e2b0b8713e094 --- /dev/null +++ b/narwhal/storage/src/vote_digest_store.rs @@ -0,0 +1,46 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::NodeStorage; +use crypto::PublicKey; +use store::reopen; +use store::rocks::{open_cf, MetricConf, ReadWriteOptions}; +use store::{rocks::DBMap, Map, TypedStoreError}; +use types::{Vote, VoteInfo}; + +/// The storage for the last votes digests per authority +#[derive(Clone)] +pub struct VoteDigestStore { + store: DBMap, +} + +impl VoteDigestStore { + pub fn new(vote_digest_store: DBMap) -> VoteDigestStore { + Self { + store: vote_digest_store, + } + } + + pub fn new_for_tests() -> VoteDigestStore { + let rocksdb = open_cf( + tempfile::tempdir().unwrap(), + None, + MetricConf::default(), + &[NodeStorage::VOTES_CF], + ) + .expect("Cannot open database"); + let map = reopen!(&rocksdb, NodeStorage::VOTES_CF;); + VoteDigestStore::new(map) + } + + /// Insert the vote's basic details into the database for the corresponding + /// header author key. + pub fn write(&self, vote: &Vote) -> Result<(), TypedStoreError> { + self.store.insert(&vote.origin, &vote.into()) + } + + /// Read the vote info based on the provided corresponding header author key + pub fn read(&self, header_author: &PublicKey) -> Result, TypedStoreError> { + self.store.get(header_author) + } +} diff --git a/narwhal/types/src/primary.rs b/narwhal/types/src/primary.rs index 7bf36fae9f2ad..e3859897130ff 100644 --- a/narwhal/types/src/primary.rs +++ b/narwhal/types/src/primary.rs @@ -1020,6 +1020,16 @@ pub struct VoteInfo { pub vote_digest: VoteDigest, } +impl From<&Vote> for VoteInfo { + fn from(vote: &Vote) -> Self { + VoteInfo { + epoch: vote.epoch, + round: vote.round, + vote_digest: vote.digest(), + } + } +} + #[cfg(test)] mod tests { use crate::{Batch, Metadata, Timestamp};