Skip to content

Commit

Permalink
[State Sync] Move "wait_until_initialized" method into StateSyncClient.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind authored and bors-libra committed Jan 19, 2021
1 parent c195d93 commit 749e084
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 14 deletions.
6 changes: 4 additions & 2 deletions diem-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,12 +436,14 @@ pub fn setup_environment(node_config: &NodeConfig, logger: Option<Arc<Logger>>)
// network provider -> consensus -> state synchronizer -> network provider. This has resulted
// in a deadlock as observed in GitHub issue #749.
if let Some((consensus_network_sender, consensus_network_events)) = consensus_network_handles {
let state_sync_client = state_synchronizer.create_client();

// Make sure that state synchronizer is caught up at least to its waypoint
// (in case it's present). There is no sense to start consensus prior to that.
// TODO: Note that we need the networking layer to be able to discover & connect to the
// peers with potentially outdated network identity public keys.
debug!("Wait until state synchronizer is initialized");
block_on(state_synchronizer.wait_until_initialized())
block_on(state_sync_client.wait_until_initialized())
.expect("State synchronizer initialization failure");
debug!("State synchronizer initialization complete.");

Expand All @@ -451,7 +453,7 @@ pub fn setup_environment(node_config: &NodeConfig, logger: Option<Arc<Logger>>)
node_config,
consensus_network_sender,
consensus_network_events,
state_synchronizer.create_client(),
state_sync_client,
consensus_to_mempool_sender,
diem_db,
consensus_reconfig_events,
Expand Down
24 changes: 13 additions & 11 deletions state-synchronizer/src/state_synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,17 +188,6 @@ impl StateSynchronizer {
pub fn create_client(&self) -> StateSyncClient {
StateSyncClient::new(self.coordinator_sender.clone())
}

/// The function returns a future that is fulfilled when the state synchronizer is
/// caught up with the waypoint specified in the local config.
pub async fn wait_until_initialized(&self) -> Result<()> {
let mut sender = self.coordinator_sender.clone();
let (cb_sender, cb_receiver) = oneshot::channel();
sender
.send(CoordinatorMessage::WaitInitialize(cb_sender))
.await?;
cb_receiver.await?
}
}

pub struct StateSyncClient {
Expand Down Expand Up @@ -291,4 +280,17 @@ impl StateSyncClient {
Ok(cb_receiver.await?)
}
}

/// Waits until state sync is caught up with the waypoint specified in the local config.
pub fn wait_until_initialized(&self) -> impl Future<Output = Result<()>> {
let mut sender = self.coordinator_sender.clone();
let (cb_sender, cb_receiver) = oneshot::channel();

async move {
sender
.send(CoordinatorMessage::WaitInitialize(cb_sender))
.await?;
cb_receiver.await?
}
}
}
2 changes: 1 addition & 1 deletion state-synchronizer/src/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ fn catch_up_with_waypoints() {
);
block_on(
env.peers[1]
.synchronizer
.client
.as_ref()
.unwrap()
.wait_until_initialized(),
Expand Down

0 comments on commit 749e084

Please sign in to comment.