Skip to content

Commit

Permalink
Skip batch validation for certified batches (MystenLabs#9193)
Browse files Browse the repository at this point in the history
## Test Plan 

How did you test the new or updated feature?

Unit Tests / Labnet / Benchmark
  • Loading branch information
arun-koshy authored Mar 14, 2023
1 parent ea80d2e commit a51157b
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 12 deletions.
1 change: 1 addition & 0 deletions narwhal/node/src/generate_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ fn get_registry() -> Result<Registry> {
let sync = WorkerSynchronizeMessage {
digests: vec![BatchDigest([0u8; 32])],
target: pk,
is_certified: true,
};

tracer.trace_value(&mut samples, &our_batch)?;
Expand Down
2 changes: 1 addition & 1 deletion narwhal/node/tests/staged/narwhal.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,4 @@ WorkerSynchronizeMessage:
TUPLEARRAY:
CONTENT: U8
SIZE: 96

- is_certified: BOOL
1 change: 1 addition & 0 deletions narwhal/primary/src/block_synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,7 @@ impl BlockSynchronizer {
let message = WorkerSynchronizeMessage {
digests: batch_ids,
target: primary_peer_name.clone(),
is_certified: true,
};
let _ = self.network.unreliable_send(worker_name, &message);

Expand Down
2 changes: 1 addition & 1 deletion narwhal/primary/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ impl PrimaryReceiverHandler {

// Synchronize all batches referenced in the header.
self.synchronizer
.sync_batches(header, network, /* max_age */ 0)
.sync_header_batches(header, network, /* max_age */ 0)
.await?;

// Check that the time of the header is smaller than the current time. If not but the difference is
Expand Down
20 changes: 17 additions & 3 deletions narwhal/primary/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ impl Synchronizer {
let sync_network = network.clone();
let max_age = self.inner.gc_depth.saturating_sub(1);
self.inner.batch_tasks.lock().spawn(async move {
Synchronizer::sync_batches_internal(inner, &header, sync_network, max_age).await
Synchronizer::sync_batches_internal(inner, &header, sync_network, max_age, true).await
});

let (sender, receiver) = oneshot::channel();
Expand Down Expand Up @@ -801,20 +801,33 @@ impl Synchronizer {
/// Blocks until either synchronization is complete, or the current consensus rounds advances
/// past the max allowed age. (`max_age == 0` means the header's round must match current
/// round.)
pub async fn sync_batches(
pub async fn sync_header_batches(
&self,
header: &Header,
network: anemo::Network,
max_age: Round,
) -> DagResult<()> {
Synchronizer::sync_batches_internal(self.inner.clone(), header, network, max_age).await
Synchronizer::sync_batches_internal(self.inner.clone(), header, network, max_age, false)
.await
}

// TODO: Add batching support to synchronizer and use this call from executor.
// pub async fn sync_certificate_batches(
// &self,
// header: &Header,
// network: anemo::Network,
// max_age: Round,
// ) -> DagResult<()> {
// Synchronizer::sync_batches_internal(self.inner.clone(), header, network, max_age, true)
// .await
// }

async fn sync_batches_internal(
inner: Arc<Inner>,
header: &Header,
network: anemo::Network,
max_age: Round,
is_certified: bool,
) -> DagResult<()> {
if header.author == inner.name {
debug!("skipping sync_batches for header {header}: no need to sync payload from own workers");
Expand Down Expand Up @@ -880,6 +893,7 @@ impl Synchronizer {
let message = WorkerSynchronizeMessage {
digests: digests.clone(),
target: header.author.clone(),
is_certified,
};
let peer = network.waiting_peer(anemo::PeerId(worker_name.0.to_bytes()));
let mut client = PrimaryToWorkerClient::new(peer);
Expand Down
2 changes: 1 addition & 1 deletion narwhal/primary/src/tests/synchronizer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ async fn sync_batches_drops_old() {
let _ = tx_consensus_round_updates.send(30);
});
match synchronizer
.sync_batches(&test_header, network.clone(), 10)
.sync_header_batches(&test_header, network.clone(), 10)
.await
{
Err(DagError::TooOld(_, _, _)) => (),
Expand Down
4 changes: 4 additions & 0 deletions narwhal/types/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,10 @@ impl PayloadAvailabilityResponse {
pub struct WorkerSynchronizeMessage {
pub digests: Vec<BatchDigest>,
pub target: PublicKey,
// Used to indicate to the worker that it does not need to fully validate
// the batch it receives because it is part of a certificate. Only digest
// verification is required.
pub is_certified: bool,
}

/// Used by the primary to request that the worker delete the specified batches.
Expand Down
15 changes: 9 additions & 6 deletions narwhal/worker/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,15 @@ impl<V: TransactionValidator> PrimaryToWorker for PrimaryReceiverHandler<V> {
match result {
Ok(response) => {
if let Some(batch) = response.into_body().batch {
if let Err(err) = self.validator.validate_batch(&batch) {
// The batch is invalid, we don't want to process it.
return Err(anemo::rpc::Status::new_with_message(
StatusCode::BadRequest,
format!("Invalid batch: {err}"),
));
if !message.is_certified {
// This batch is not part of a certificate, so we need to validate it.
if let Err(err) = self.validator.validate_batch(&batch) {
// The batch is invalid, we don't want to process it.
return Err(anemo::rpc::Status::new_with_message(
StatusCode::BadRequest,
format!("Invalid batch: {err}"),
));
}
}
let digest = batch.digest();
if missing.remove(&digest) {
Expand Down
2 changes: 2 additions & 0 deletions narwhal/worker/src/tests/handlers_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ async fn synchronize() {
let message = WorkerSynchronizeMessage {
digests: vec![digest],
target: target_primary.public_key(),
is_certified: false,
};

let mut mock_server = MockWorkerToWorker::new();
Expand Down Expand Up @@ -113,6 +114,7 @@ async fn synchronize_when_batch_exists() {
let message = WorkerSynchronizeMessage {
digests: missing.clone(),
target: target_primary.public_key(),
is_certified: false,
};

// Send a sync request.
Expand Down

0 comments on commit a51157b

Please sign in to comment.