Skip to content

Commit

Permalink
[Narwhal] improve batch maker throughput by avoiding yielding per txn
Browse files Browse the repository at this point in the history
  • Loading branch information
mwtian committed Apr 24, 2023
1 parent 6b8a0c8 commit 81cfafc
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 5 deletions.
2 changes: 1 addition & 1 deletion narwhal/types/src/metered_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl<T> Receiver<T> {
}

/// Polls to receive the next message on this channel.
/// Decrements the gauge in case of a successful `poll_recv`.
/// Decrements the gauge in case of a successful `poll_recv`.
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
match self.inner.poll_recv(cx) {
res @ Poll::Ready(Some(_)) => {
Expand Down
10 changes: 6 additions & 4 deletions narwhal/worker/src/batch_maker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use config::WorkerId;
use fastcrypto::hash::Hash;
use futures::stream::FuturesUnordered;
use futures::{Future, StreamExt};
use mysten_metrics::spawn_logged_monitored_task;
use mysten_metrics::{monitored_scope, spawn_logged_monitored_task};
use network::{client::NetworkClient, WorkerToPrimaryClient};
use store::{rocks::DBMap, Map};
use tokio::{
Expand Down Expand Up @@ -114,6 +114,7 @@ impl BatchMaker {
// 'in-flight' are below a certain number (MAX_PARALLEL_BATCH). This
// condition will be met eventually if the store and network are functioning.
Some((transaction, response_sender)) = self.rx_batch_maker.recv(), if batch_pipeline.len() < MAX_PARALLEL_BATCH => {
let _scope = monitored_scope("BatchMaker::recv");
current_batch_size += transaction.len();
current_batch.transactions_mut().push(transaction);
current_responses.push(response_sender);
Expand All @@ -129,11 +130,15 @@ impl BatchMaker {

timer.as_mut().reset(Instant::now() + self.max_batch_delay);
self.batch_start_timestamp = Instant::now();

// Yield once per size threshold to allow other tasks to run.
tokio::task::yield_now().await;
}
},

// If the timer triggers, seal the batch even if it contains few transactions.
() = &mut timer => {
let _scope = monitored_scope("BatchMaker::timer");
if !current_batch.transactions().is_empty() {
if let Some(seal) = self.seal(true, current_batch, current_batch_size, current_responses).await {
batch_pipeline.push(seal);
Expand All @@ -160,9 +165,6 @@ impl BatchMaker {
}

}

// Give the change to schedule other tasks.
tokio::task::yield_now().await;
}
}

Expand Down

0 comments on commit 81cfafc

Please sign in to comment.