Skip to content

Commit

Permalink
Dedupe worker cache update code in worker Reconfigure handler
Browse files Browse the repository at this point in the history
  • Loading branch information
aschran committed Oct 12, 2022
1 parent 567dc87 commit a8ce206
Showing 1 changed file with 33 additions and 38 deletions.
71 changes: 33 additions & 38 deletions narwhal/worker/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// SPDX-License-Identifier: Apache-2.0
use anyhow::Result;
use async_trait::async_trait;
use config::{SharedCommittee, SharedWorkerCache, WorkerCache, WorkerId, WorkerIndex};
use config::{Committee, SharedCommittee, SharedWorkerCache, WorkerCache, WorkerId, WorkerIndex};
use crypto::PublicKey;
use fastcrypto::Hash;
use futures::{stream::FuturesUnordered, StreamExt};
Expand Down Expand Up @@ -105,48 +105,15 @@ impl PrimaryToWorker for PrimaryReceiverHandler {
match &message {
ReconfigureNotification::NewEpoch(new_committee) => {
self.committee.swap(Arc::new(new_committee.clone()));

// TODO: duplicated code in this file.
// Update the worker cache.
self.worker_cache.swap(Arc::new(WorkerCache {
epoch: new_committee.epoch,
workers: new_committee.keys().iter().map(|key|
(
(*key).clone(),
self.worker_cache
.load()
.workers
.get(key)
.tap_none(||
warn!("Worker cache does not have a key for the new committee member"))
.unwrap_or(&WorkerIndex(BTreeMap::new()))
.clone()
)).collect(),
}));
self.update_worker_cache(new_committee);
tracing::debug!("Committee updated to {}", self.committee);
}
ReconfigureNotification::UpdateCommittee(new_committee) => {
self.committee.swap(Arc::new(new_committee.clone()));

// Update the worker cache.
self.worker_cache.swap(Arc::new(WorkerCache {
epoch: new_committee.epoch,
workers: new_committee.keys().iter().map(|key|
(
(*key).clone(),
self.worker_cache
.load()
.workers
.get(key)
.tap_none(||
warn!("Worker cache does not have a key for the new committee member"))
.unwrap_or(&WorkerIndex(BTreeMap::new()))
.clone()
)).collect(),
}));

self.update_worker_cache(new_committee);
tracing::debug!("Committee updated to {}", self.committee);
}
ReconfigureNotification::Shutdown => {} // no-op
ReconfigureNotification::Shutdown => (), // no-op
};

// Notify all other tasks.
Expand Down Expand Up @@ -347,3 +314,31 @@ impl PrimaryToWorker for PrimaryReceiverHandler {
Ok(anemo::Response::new(()))
}
}

impl PrimaryReceiverHandler {
fn update_worker_cache(&self, new_committee: &Committee) {
self.worker_cache.swap(Arc::new(WorkerCache {
epoch: new_committee.epoch,
workers: new_committee
.keys()
.iter()
.map(|key| {
(
(*key).clone(),
self.worker_cache
.load()
.workers
.get(key)
.tap_none(|| {
warn!(
"Worker cache does not have a key for the new committee member"
)
})
.unwrap_or(&WorkerIndex(BTreeMap::new()))
.clone(),
)
})
.collect(),
}));
}
}

0 comments on commit a8ce206

Please sign in to comment.