Skip to content

Commit

Permalink
[narwhal] Make the primary re-transmit batch digests in non-committed…
Browse files Browse the repository at this point in the history
… headers (MystenLabs#5825)

* Added a response to the worker when the batch id is processed by primary
* Record Proposed headers
* Send sequenced certificates as a list rather than one by one
* Derive our own round committed
* Added re-transmit of digests

Co-authored-by: George Danezis <[email protected]>
  • Loading branch information
gdanezis and George Danezis authored Nov 8, 2022
1 parent 19fd50c commit a24f88e
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 64 deletions.
22 changes: 16 additions & 6 deletions narwhal/consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ pub struct Consensus<ConsensusProtocol> {
/// if it already sent us its whole history.
rx_new_certificates: metered_channel::Receiver<Certificate>,
/// Outputs the sequence of ordered certificates to the primary (for cleanup and feedback).
tx_committed_certificates: metered_channel::Sender<Certificate>,
tx_committed_certificates: metered_channel::Sender<(Round, Vec<Certificate>)>,
/// Outputs the sequence of ordered certificates to the application layer.
tx_sequence: metered_channel::Sender<ConsensusOutput>,

Expand Down Expand Up @@ -250,7 +250,7 @@ where
cert_store: CertificateStore,
rx_reconfigure: watch::Receiver<ReconfigureNotification>,
rx_new_certificates: metered_channel::Receiver<Certificate>,
tx_committed_certificates: metered_channel::Sender<Certificate>,
tx_committed_certificates: metered_channel::Sender<(Round, Vec<Certificate>)>,
tx_sequence: metered_channel::Sender<ConsensusOutput>,
protocol: Protocol,
metrics: Arc<ConsensusMetrics>,
Expand Down Expand Up @@ -331,13 +331,19 @@ where
}

// Process the certificate using the selected consensus protocol.
let commit_round_leader = certificate.header.round;
let sequence =
self.protocol
.process_certificate(&mut self.state, self.consensus_index, certificate)?;

// Update the consensus index.
self.consensus_index += sequence.len() as u64;

// We extract a list of headers from this specific validator that
// have been agreed upon, and signal this back to the narwhal sub-system
// to be used to re-send batches that have not made it to a commit.
let mut commited_certificates = Vec::new();

// Output the sequence in the right order.
for output in sequence {
let certificate = &output.certificate;
Expand All @@ -363,16 +369,20 @@ where
.set((mysten_util_mem::malloc_size(&self.state.dag) + std::mem::size_of::<Dag>()) as i64);
}

self.tx_committed_certificates
.send(certificate.clone())
.await
.expect("Failed to send certificate to primary");
commited_certificates.push(certificate.clone());

if let Err(e) = self.tx_sequence.send(output).await {
tracing::warn!("Failed to output certificate: {e}");
}
}

if !commited_certificates.is_empty(){
self.tx_committed_certificates
.send((commit_round_leader, commited_certificates))
.await
.expect("Failed to send certificate to primary");
}

self.metrics
.consensus_dag_rounds
.with_label_values(&[])
Expand Down
2 changes: 1 addition & 1 deletion narwhal/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ impl Node {
execution_state: State,
tx_reconfigure: &watch::Sender<ReconfigureNotification>,
rx_new_certificates: metered_channel::Receiver<Certificate>,
tx_committed_certificates: metered_channel::Sender<Certificate>,
tx_committed_certificates: metered_channel::Sender<(Round, Vec<Certificate>)>,
registry: &Registry,
) -> SubscriberResult<Vec<JoinHandle<()>>>
where
Expand Down
14 changes: 10 additions & 4 deletions narwhal/primary/src/block_remover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use store::{rocks::TypedStoreError, Store};
use tracing::{debug, instrument, warn};
use types::{
metered_channel::Sender, BatchDigest, Certificate, CertificateDigest, Header, HeaderDigest,
Round,
};

#[cfg(test)]
Expand Down Expand Up @@ -50,7 +51,7 @@ pub struct BlockRemover {
worker_network: P2pNetwork,

/// Outputs all the successfully deleted certificates
tx_committed_certificates: Sender<Certificate>,
tx_committed_certificates: Sender<(Round, Vec<Certificate>)>,
}

impl BlockRemover {
Expand All @@ -63,7 +64,7 @@ impl BlockRemover {
payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
dag: Option<Arc<Dag>>,
worker_network: P2pNetwork,
tx_committed_certificates: Sender<Certificate>,
tx_committed_certificates: Sender<(Round, Vec<Certificate>)>,
) -> BlockRemover {
Self {
name,
Expand Down Expand Up @@ -163,9 +164,14 @@ impl BlockRemover {
.map_err(Either::Left)?;

// Now output all the removed certificates
for certificate in certificates.clone() {
if !certificates.is_empty() {
let all_certs = certificates.clone();
// Unwrap safe since list is not empty.
let highest_round = certificates.iter().map(|c| c.header.round).max().unwrap();

// We signal that these certificates must have been committed by the external consensus
self.tx_committed_certificates
.send(certificate)
.send((highest_round, all_certs))
.await
.expect("Couldn't forward removed certificates to channel");
}
Expand Down
15 changes: 15 additions & 0 deletions narwhal/primary/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ pub struct PrimaryChannelMetrics {
pub tx_committed_certificates: IntGauge,
/// occupancy of the channel from the `primary::Core` to the `Consensus`
pub tx_new_certificates: IntGauge,
/// occupancy of the channel signaling own committed headers
pub tx_commited_own_headers: IntGauge,

// totals
/// total received on channel from the `primary::WorkerReceiverHandler` to the `primary::PayloadReceiver`
Expand Down Expand Up @@ -116,6 +118,8 @@ pub struct PrimaryChannelMetrics {
pub tx_committed_certificates_total: IntCounter,
/// total received on channel from the `primary::Core` to the `Consensus`
pub tx_new_certificates_total: IntCounter,
/// total received on the channel signaling own committed headers
pub tx_commited_own_headers_total: IntCounter,
}

impl PrimaryChannelMetrics {
Expand Down Expand Up @@ -213,6 +217,11 @@ impl PrimaryChannelMetrics {
Self::DESC_NEW_CERTS,
registry
).unwrap(),
tx_commited_own_headers: register_int_gauge_with_registry!(
"tx_commited_own_headers",
"occupancy of the channel signaling own committed headers.",
registry
).unwrap(),

// totals
tx_others_digests_total: register_int_counter_with_registry!(
Expand Down Expand Up @@ -285,6 +294,12 @@ impl PrimaryChannelMetrics {
Self::DESC_NEW_CERTS_TOTAL,
registry
).unwrap(),
tx_commited_own_headers_total: register_int_counter_with_registry!(
"tx_commited_own_headers_total",
"total received on channel signaling own committed headers.",
registry
).unwrap(),




Expand Down
39 changes: 28 additions & 11 deletions narwhal/primary/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
grpc_server::ConsensusAPIGrpc,
header_waiter::HeaderWaiter,
metrics::initialise_metrics,
proposer::Proposer,
proposer::{OurDigestMessage, Proposer},
state_handler::StateHandler,
synchronizer::Synchronizer,
BlockRemover,
Expand Down Expand Up @@ -92,11 +92,11 @@ impl Primary {
payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
vote_digest_store: Store<PublicKey, RoundVoteDigestPair>,
tx_new_certificates: Sender<Certificate>,
rx_committed_certificates: Receiver<Certificate>,
rx_committed_certificates: Receiver<(Round, Vec<Certificate>)>,
dag: Option<Arc<Dag>>,
network_model: NetworkModel,
tx_reconfigure: watch::Sender<ReconfigureNotification>,
tx_committed_certificates: Sender<Certificate>,
tx_committed_certificates: Sender<(Round, Vec<Certificate>)>,
registry: &Registry,
// See comments in Subscriber::spawn
rx_executor_network: Option<oneshot::Sender<P2pNetwork>>,
Expand Down Expand Up @@ -163,6 +163,11 @@ impl Primary {
&primary_channel_metrics.tx_state_handler,
&primary_channel_metrics.tx_state_handler_total,
);
let (tx_commited_own_headers, rx_commited_own_headers) = channel_with_total(
CHANNEL_CAPACITY,
&primary_channel_metrics.tx_commited_own_headers,
&primary_channel_metrics.tx_commited_own_headers_total,
);

// we need to hack the gauge from this consensus channel into the primary registry
// This avoids a cyclic dependency in the initialization of consensus and primary
Expand Down Expand Up @@ -427,6 +432,7 @@ impl Primary {
rx_parents,
rx_our_digests,
tx_headers,
rx_commited_own_headers,
node_metrics,
);

Expand All @@ -439,6 +445,7 @@ impl Primary {
tx_consensus_round_updates,
rx_state_handler,
tx_reconfigure,
Some(tx_commited_own_headers),
P2pNetwork::new(network.clone()),
);

Expand Down Expand Up @@ -693,7 +700,7 @@ impl PrimaryToPrimary for PrimaryReceiverHandler {
/// Defines how the network receiver handles incoming workers messages.
#[derive(Clone)]
struct WorkerReceiverHandler {
tx_our_digests: Sender<(BatchDigest, WorkerId, u64)>,
tx_our_digests: Sender<OurDigestMessage>,
payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
our_workers: BTreeMap<WorkerId, WorkerInfo>,
}
Expand All @@ -705,15 +712,25 @@ impl WorkerToPrimary for WorkerReceiverHandler {
request: anemo::Request<WorkerOurBatchMessage>,
) -> Result<anemo::Response<()>, anemo::rpc::Status> {
let message = request.into_body();
self.tx_our_digests
.send((
message.digest,
message.worker_id,
message.metadata.created_at,
))
let (tx_ack, rx_ack) = oneshot::channel();
let response = self
.tx_our_digests
.send(OurDigestMessage {
digest: message.digest,
worker_id: message.worker_id,
timestamp: message.metadata.created_at,
ack_channel: tx_ack,
})
.await
.map(|_| anemo::Response::new(()))
.map_err(|e| anemo::rpc::Status::internal(e.to_string()))
.map_err(|e| anemo::rpc::Status::internal(e.to_string()))?;

// If we are ok, then wait for the ack
rx_ack
.await
.map_err(|e| anemo::rpc::Status::internal(e.to_string()))?;

Ok(response)
}

async fn report_others_batch(
Expand Down
Loading

0 comments on commit a24f88e

Please sign in to comment.