Skip to content

Commit

Permalink
[Narwhal] swap typed_store::Store with DBMap (MystenLabs#9387)
Browse files Browse the repository at this point in the history
## Description 

This PR swaps for Narwhal all the `typed_store::Store` instances with
`DBMap`. Also, I've wrapped the `DBMaps` behind dedicated structs so we
can make things easier if we consider to swap later on `DBMaps` with
something different in the future + make it easier to add additional
methods on each store if needed in the future (ex update additional
secondary index stores in batch fashion etc).

## Test Plan 

Existing & new unit and integration tests.

## TODO:
- [ ] wrap the `batch_store` DBMap behind a dedicated struct

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
akichidis authored Mar 16, 2023
1 parent f85177c commit f2b24f5
Show file tree
Hide file tree
Showing 20 changed files with 465 additions and 253 deletions.
17 changes: 7 additions & 10 deletions narwhal/primary/src/block_remover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,12 @@ use futures::future::try_join_all;
use itertools::Either;
use network::PrimaryToWorkerRpc;
use std::{collections::HashMap, sync::Arc};
use storage::{CertificateStore, PayloadToken};
use store::{rocks::TypedStoreError, Store};
use storage::{CertificateStore, HeaderStore, PayloadStore};
use store::rocks::TypedStoreError;

use tracing::{debug, instrument, warn};
use types::{
metered_channel::Sender, BatchDigest, Certificate, CertificateDigest, Header, HeaderDigest,
Round,
metered_channel::Sender, BatchDigest, Certificate, CertificateDigest, HeaderDigest, Round,
};

#[cfg(test)]
Expand All @@ -39,10 +38,10 @@ pub struct BlockRemover {
certificate_store: CertificateStore,

/// Storage that keeps the headers by their digest id
header_store: Store<HeaderDigest, Header>,
header_store: HeaderStore,

/// The persistent storage for payload markers from workers.
payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
payload_store: PayloadStore,

/// The Dag structure for managing the stored certificates
dag: Option<Arc<Dag>>,
Expand All @@ -60,8 +59,8 @@ impl BlockRemover {
name: PublicKey,
worker_cache: WorkerCache,
certificate_store: CertificateStore,
header_store: Store<HeaderDigest, Header>,
payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
header_store: HeaderStore,
payload_store: PayloadStore,
dag: Option<Arc<Dag>>,
worker_network: anemo::Network,
tx_committed_certificates: Sender<(Round, Vec<Certificate>)>,
Expand Down Expand Up @@ -133,7 +132,6 @@ impl BlockRemover {

self.header_store
.remove_all(header_digests)
.await
.map_err(Either::Left)?;

// delete batch from the payload store as well
Expand All @@ -145,7 +143,6 @@ impl BlockRemover {
}
self.payload_store
.remove_all(batches_to_cleanup)
.await
.map_err(Either::Left)?;

// NOTE: delete certificates in the end since if we need to repeat the request
Expand Down
20 changes: 7 additions & 13 deletions narwhal/primary/src/block_synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ use std::{
collections::{HashMap, HashSet},
time::Duration,
};
use storage::{CertificateStore, PayloadToken};
use store::Store;
use storage::{CertificateStore, PayloadStore};
use thiserror::Error;
use tokio::{sync::mpsc::Sender, task::JoinHandle, time::timeout};
use tracing::{debug, error, info, instrument, trace, warn};
Expand Down Expand Up @@ -171,7 +170,7 @@ pub struct BlockSynchronizer {
certificate_store: CertificateStore,

/// The persistent storage for payload markers from workers
payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
payload_store: PayloadStore,

/// Timeout when synchronizing the certificates
certificates_synchronize_timeout: Duration,
Expand All @@ -192,7 +191,7 @@ impl BlockSynchronizer {
rx_shutdown: ConditionalBroadcastReceiver,
rx_block_synchronizer_commands: metered_channel::Receiver<Command>,
network: anemo::Network,
payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
payload_store: PayloadStore,
certificate_store: CertificateStore,
parameters: Parameters,
) -> JoinHandle<()> {
Expand Down Expand Up @@ -548,7 +547,7 @@ impl BlockSynchronizer {
"Certificate with id {} not our own, checking in storage.",
certificate.digest()
);
match self.payload_store.read_all(payload).await {
match self.payload_store.read_all(payload) {
Ok(payload_result) => {
payload_result.into_iter().all(|x| x.is_some()).to_owned()
}
Expand Down Expand Up @@ -663,26 +662,21 @@ impl BlockSynchronizer {
#[instrument(level = "trace", skip_all, fields(request_id, certificate=?certificate.header.digest()))]
async fn wait_for_block_payload<'a>(
payload_synchronize_timeout: Duration,
payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
payload_store: PayloadStore,
certificate: Certificate,
) -> State {
let futures = certificate
.header
.payload
.iter()
.map(|(batch_digest, (worker_id, _))| {
payload_store.notify_read((*batch_digest, *worker_id))
payload_store.notify_contains(*batch_digest, *worker_id)
})
.collect::<Vec<_>>();

// Wait for all the items to sync - have a timeout
let result = timeout(payload_synchronize_timeout, join_all(futures)).await;
if result.is_err()
|| result
.unwrap()
.into_iter()
.any(|r| r.map_or_else(|_| true, |f| f.is_none()))
{
if result.is_err() {
return State::PayloadSynchronized {
result: Err(SyncError::Timeout {
digest: certificate.digest(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ async fn test_successful_payload_synchronization() {
// Assume that the request is the correct one and just immediately
// store the batch to the payload store.
for digest in m.digests {
payload_store.async_write((digest, worker), 1).await;
payload_store.write(&digest, &worker).unwrap();
}
}
}
Expand Down Expand Up @@ -637,8 +637,8 @@ async fn test_reply_with_payload_already_in_storage() {
if i > NUM_OF_CERTIFICATES_WITH_MISSING_PAYLOAD {
certificate_store.write(certificate.clone()).unwrap();

for (digest, (worker_id, _)) in certificate.header.payload {
payload_store.async_write((digest, worker_id), 1).await;
for (digest, (worker_id, _)) in &certificate.header.payload {
payload_store.write(digest, worker_id).unwrap();
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions narwhal/primary/src/block_synchronizer/tests/handler_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ async fn test_synchronize_block_payload() {
.build()
.unwrap();
let cert_stored = fixture.certificate(&header);
for (digest, (worker_id, _)) in cert_stored.clone().header.payload {
payload_store.async_write((digest, worker_id), 1).await;
for (digest, (worker_id, _)) in &cert_stored.clone().header.payload {
payload_store.write(digest, worker_id).unwrap();
}

// AND a certificate with payload NOT available
Expand Down
17 changes: 7 additions & 10 deletions narwhal/primary/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ use mysten_metrics::{monitored_future, spawn_logged_monitored_task};
use network::anemo_ext::NetworkExt;
use std::sync::Arc;
use std::time::Duration;
use storage::CertificateStore;
use store::Store;
use storage::{CertificateStore, HeaderStore};
use tokio::{
sync::oneshot,
task::{JoinHandle, JoinSet},
Expand All @@ -23,8 +22,8 @@ use types::{
ensure,
error::{DagError, DagResult},
metered_channel::Receiver,
Certificate, CertificateDigest, ConditionalBroadcastReceiver, Header, HeaderDigest,
PrimaryToPrimaryClient, RequestVoteRequest, Vote,
Certificate, CertificateDigest, ConditionalBroadcastReceiver, Header, PrimaryToPrimaryClient,
RequestVoteRequest, Vote,
};

#[cfg(test)]
Expand All @@ -37,7 +36,7 @@ pub struct Core {
/// The committee information.
committee: Committee,
/// The persistent storage keyed to headers.
header_store: Store<HeaderDigest, Header>,
header_store: HeaderStore,
/// The persistent storage keyed to certificates.
certificate_store: CertificateStore,
/// Handles synchronization with other nodes and our workers.
Expand Down Expand Up @@ -68,7 +67,7 @@ impl Core {
pub fn spawn(
name: PublicKey,
committee: Committee,
header_store: Store<HeaderDigest, Header>,
header_store: HeaderStore,
certificate_store: CertificateStore,
synchronizer: Arc<Synchronizer>,
signature_service: SignatureService<Signature, { crypto::INTENT_MESSAGE_LENGTH }>,
Expand Down Expand Up @@ -230,7 +229,7 @@ impl Core {
async fn propose_header(
name: PublicKey,
committee: Committee,
header_store: Store<HeaderDigest, Header>,
header_store: HeaderStore,
certificate_store: CertificateStore,
signature_service: SignatureService<Signature, { crypto::INTENT_MESSAGE_LENGTH }>,
metrics: Arc<PrimaryMetrics>,
Expand All @@ -251,9 +250,7 @@ impl Core {
}

// Process the header.
header_store
.async_write(header.digest(), header.clone())
.await;
header_store.write(&header)?;
metrics.proposed_header_round.set(header.round as i64);

// Reset the votes aggregator and sign our own header.
Expand Down
73 changes: 28 additions & 45 deletions narwhal/primary/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ use std::{
thread::sleep,
time::Duration,
};
use storage::{CertificateStore, PayloadToken, ProposerStore};
use store::Store;
use storage::{CertificateStore, HeaderStore, PayloadStore, ProposerStore, VoteDigestStore};
use tokio::{sync::watch, task::JoinHandle};
use tokio::{
sync::{mpsc, oneshot},
Expand All @@ -64,13 +63,12 @@ use types::{
ensure,
error::{DagError, DagResult},
metered_channel::{channel_with_total, Receiver, Sender},
now, BatchDigest, Certificate, CertificateDigest, FetchCertificatesRequest,
FetchCertificatesResponse, GetCertificatesRequest, GetCertificatesResponse, Header,
HeaderDigest, PayloadAvailabilityRequest, PayloadAvailabilityResponse,
PreSubscribedBroadcastSender, PrimaryToPrimary, PrimaryToPrimaryServer, RequestVoteRequest,
RequestVoteResponse, Round, SendCertificateRequest, SendCertificateResponse, Vote, VoteInfo,
WorkerInfoResponse, WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerToPrimary,
WorkerToPrimaryServer,
now, Certificate, CertificateDigest, FetchCertificatesRequest, FetchCertificatesResponse,
GetCertificatesRequest, GetCertificatesResponse, PayloadAvailabilityRequest,
PayloadAvailabilityResponse, PreSubscribedBroadcastSender, PrimaryToPrimary,
PrimaryToPrimaryServer, RequestVoteRequest, RequestVoteResponse, Round, SendCertificateRequest,
SendCertificateResponse, Vote, WorkerInfoResponse, WorkerOthersBatchMessage,
WorkerOurBatchMessage, WorkerToPrimary, WorkerToPrimaryServer,
};

#[cfg(any(test))]
Expand Down Expand Up @@ -104,11 +102,11 @@ impl Primary {
committee: Committee,
worker_cache: WorkerCache,
parameters: Parameters,
header_store: Store<HeaderDigest, Header>,
header_store: HeaderStore,
certificate_store: CertificateStore,
proposer_store: ProposerStore,
payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
vote_digest_store: Store<PublicKey, VoteInfo>,
payload_store: PayloadStore,
vote_digest_store: VoteDigestStore,
tx_new_certificates: Sender<Certificate>,
rx_committed_certificates: Receiver<(Round, Vec<Certificate>)>,
rx_consensus_round_updates: watch::Receiver<ConsensusRound>,
Expand Down Expand Up @@ -649,11 +647,11 @@ struct PrimaryReceiverHandler {
synchronizer: Arc<Synchronizer>,
/// Service to sign headers.
signature_service: SignatureService<Signature, { crypto::INTENT_MESSAGE_LENGTH }>,
header_store: Store<HeaderDigest, Header>,
header_store: HeaderStore,
certificate_store: CertificateStore,
payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
payload_store: PayloadStore,
/// The store to persist the last voted round per authority, used to ensure idempotence.
vote_digest_store: Store<PublicKey, VoteInfo>,
vote_digest_store: VoteDigestStore,
/// Get a signal when the round changes.
rx_narwhal_round_updates: watch::Receiver<Round>,
metrics: Arc<PrimaryMetrics>,
Expand Down Expand Up @@ -847,8 +845,8 @@ impl PrimaryReceiverHandler {

// Store the header.
self.header_store
.async_write(header.digest(), header.clone())
.await;
.write(header)
.map_err(DagError::StoreError)?;

// Check if we can vote for this header.
// Send the vote when:
Expand All @@ -861,8 +859,7 @@ impl PrimaryReceiverHandler {
// so we don't.
let result = self
.vote_digest_store
.read(header.author.clone())
.await
.read(&header.author)
.map_err(DagError::StoreError)?;

if let Some(vote_info) = result {
Expand Down Expand Up @@ -897,18 +894,8 @@ impl PrimaryReceiverHandler {
header, header.round
);

// Update the vote digest store with the vote we just sent. We don't need to store the
// vote itself, since it can be reconstructed using the headers.
self.vote_digest_store
.sync_write(
header.author.clone(),
VoteInfo {
epoch: header.epoch,
round: header.round,
vote_digest: vote.digest(),
},
)
.await?;
// Update the vote digest store with the vote we just sent.
self.vote_digest_store.write(&vote)?;

Ok(RequestVoteResponse {
vote: Some(vote),
Expand Down Expand Up @@ -1101,17 +1088,13 @@ impl PrimaryToPrimary for PrimaryReceiverHandler {
for (id, certificate_option) in digests.into_iter().zip(certificates) {
// Find batches only for certificates that exist.
if let Some(certificate) = certificate_option {
let payload_available = match self
.payload_store
.read_all(
certificate
.header
.payload
.into_iter()
.map(|(batch, (worker_id, _))| (batch, worker_id)),
)
.await
{
let payload_available = match self.payload_store.read_all(
certificate
.header
.payload
.into_iter()
.map(|(batch, (worker_id, _))| (batch, worker_id)),
) {
Ok(payload_result) => payload_result.into_iter().all(|x| x.is_some()),
Err(err) => {
// Assume that we don't have the payloads available,
Expand All @@ -1138,7 +1121,7 @@ impl PrimaryToPrimary for PrimaryReceiverHandler {
#[derive(Clone)]
struct WorkerReceiverHandler {
tx_our_digests: Sender<OurDigestMessage>,
payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
payload_store: PayloadStore,
our_workers: BTreeMap<WorkerId, WorkerInfo>,
}

Expand Down Expand Up @@ -1184,8 +1167,8 @@ impl WorkerToPrimary for WorkerReceiverHandler {
) -> Result<anemo::Response<()>, anemo::rpc::Status> {
let message = request.into_body();
self.payload_store
.async_write((message.digest, message.worker_id), 0u8)
.await;
.write(&message.digest, &message.worker_id)
.map_err(|e| anemo::rpc::Status::internal(e.to_string()))?;
Ok(anemo::Response::new(()))
}

Expand Down
Loading

0 comments on commit f2b24f5

Please sign in to comment.