Skip to content

Commit

Permalink
Migrate worker Synchronize handler to native anemo timeouts (MystenLa…
Browse files Browse the repository at this point in the history
  • Loading branch information
aschran authored Oct 7, 2022
1 parent 92fbd80 commit aac626c
Showing 1 changed file with 17 additions and 24 deletions.
41 changes: 17 additions & 24 deletions narwhal/worker/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use rand::seq::SliceRandom;
use std::{collections::HashSet, time::Duration};
use store::Store;
use tap::TapFallible;
use tokio::time::{self};
use tracing::{debug, error, info, trace};
use types::{
error::DagError, metered_channel::Sender, Batch, BatchDigest, PrimaryToWorker,
Expand Down Expand Up @@ -161,9 +160,10 @@ impl PrimaryToWorker for PrimaryReceiverHandler {
)));
}
};
let message = WorkerBatchRequest {
let request = anemo::Request::new(WorkerBatchRequest {
digests: missing.iter().cloned().collect(),
};
})
.with_timeout(self.request_batches_timeout);
debug!(
"Sending WorkerBatchRequest message to {worker_name} for missing batches {:?}",
message.digests
Expand All @@ -172,21 +172,18 @@ impl PrimaryToWorker for PrimaryReceiverHandler {
let network = request
.extensions()
.get::<anemo::NetworkRef>()
.unwrap()
.upgrade()
.and_then(anemo::NetworkRef::upgrade)
.ok_or_else(|| {
anemo::rpc::Status::internal("Unable to access network to send child RPCs")
})?;
let peer_id = anemo::PeerId(worker_name.0.to_bytes());
if let Some(peer) = network.peer(peer_id) {
match time::timeout(
self.request_batches_timeout,
WorkerToWorkerClient::new(peer).request_batches(message),
)
.await
match WorkerToWorkerClient::new(peer)
.request_batches(request)
.await
// TODO: duplicated code in the same file.
{
Ok(Ok(response)) => {
Ok(response) => {
for batch in response.into_body().batches {
let digest = &batch.digest();
if missing.remove(digest) {
Expand All @@ -199,12 +196,9 @@ impl PrimaryToWorker for PrimaryReceiverHandler {
}
}
}
Ok(Err(e)) => {
Err(e) => {
info!("WorkerBatchRequest to first target {worker_name} failed: {e:?}");
}
Err(_) => {
debug!("WorkerBatchRequest to first target {worker_name} timed out");
}
}
} else {
info!("Unable to reach primary peer {worker_name} on the network");
Expand Down Expand Up @@ -236,15 +230,14 @@ impl PrimaryToWorker for PrimaryReceiverHandler {
let mut handles: FuturesUnordered<_> = clients
.iter_mut()
.map(|client| {
time::timeout(
self.request_batches_timeout,
client.request_batches(message.clone()),
client.request_batches(
anemo::Request::new(message.clone()).with_timeout(self.request_batches_timeout),
)
})
.collect();
while let Some(result) = handles.next().await {
match result {
Ok(Ok(response)) => {
Ok(response) => {
for batch in response.into_body().batches {
let digest = &batch.digest();
if missing.remove(digest) {
Expand All @@ -256,12 +249,12 @@ impl PrimaryToWorker for PrimaryReceiverHandler {
}
}
}
if missing.is_empty() {
break;
}
}
Ok(Err(e)) => {
info!("WorkerBatchRequest to first target {worker_name} failed: {e:?}");
}
Err(_) => {
debug!("WorkerBatchRequest to first target {worker_name} timed out");
Err(e) => {
info!("WorkerBatchRequest to retry target {worker_name} failed: {e:?}");
}
}
if missing.is_empty() {
Expand Down

0 comments on commit aac626c

Please sign in to comment.