Skip to content

Commit

Permalink
[State Sync] Make the data summary poller more intelligent.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind authored and aptos-bot committed Mar 28, 2022
1 parent 0fe55ae commit 54d2b5d
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 106 deletions.
2 changes: 1 addition & 1 deletion aptos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ fn setup_aptos_data_client(
.enable_all()
.build()
.expect("Failed to create aptos data client!");
aptos_data_client_runtime.spawn(data_summary_poller.start());
aptos_data_client_runtime.spawn(data_summary_poller.start_poller());

(aptos_data_client, aptos_data_client_runtime)
}
Expand Down
2 changes: 1 addition & 1 deletion state-sync/aptos-data-client/src/aptosnet/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl<'a> LogSchema<'a> {
#[derive(Clone, Copy, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum LogEntry {
DataSummaryPollerStart,
DataSummaryPoller,
PeerStates,
StorageServiceRequest,
StorageServiceResponse,
Expand Down
220 changes: 133 additions & 87 deletions state-sync/aptos-data-client/src/aptosnet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ impl AptosNetDataClient {
(client, poller)
}

/// Generates a new response id
fn next_response_id(&self) -> u64 {
self.response_id_generator.next()
}
Expand All @@ -120,45 +121,79 @@ impl AptosNetDataClient {

/// Choose a connected peer that can service the given request. Returns an
/// error if no such peer can be found.
fn choose_peer(&self, request: &StorageServiceRequest) -> Result<PeerNetworkId, Error> {
let all_connected = {
let network_peer_metadata = self.network_client.peer_metadata_storage();
network_peer_metadata
.networks()
.flat_map(|network_id| {
network_peer_metadata
.read_filtered(network_id, |(_, peer_metadata)| {
peer_metadata.is_connected()
&& peer_metadata.supports_protocol(ProtocolId::StorageServiceRpc)
})
.into_keys()
})
.collect::<Vec<_>>()
};

if all_connected.is_empty() {
return Err(Error::DataIsUnavailable(
"No connected aptosnet peers".to_owned(),
));
}
fn choose_peer_for_request(
&self,
request: &StorageServiceRequest,
) -> Result<PeerNetworkId, Error> {
let all_connected_peers = self.get_all_connected_peers()?;

// Identify the peers that can service this request
let internal_peer_states = self.peer_states.read();
let all_serviceable = all_connected
let serviceable_peers = all_connected_peers
.into_iter()
.filter(|peer| internal_peer_states.can_service_request(peer, request))
.collect::<Vec<_>>();

all_serviceable
// Choose a random peer from those that can service the request
serviceable_peers
.choose(&mut rand::thread_rng())
.copied()
.ok_or_else(|| {
Error::DataIsUnavailable(
"No connected peers are advertising that they can serve this data range"
.to_owned(),
"No connected peers are advertising that they can serve this data!".to_owned(),
)
})
}

/// Fetches the next group of peers to poll. The group will contain: (i) the peer who was last
/// polled (i.e., contains the oldest data); and (ii) any (new) peers that have connected
/// since the last time this method was called (i.e., the peers that have not been polled yet).
fn fetch_peers_to_poll(&self) -> Result<Vec<PeerNetworkId>, Error> {
// Fetch all (new) unpolled peers
let mut peers_to_poll = self
.get_all_connected_peers()?
.into_iter()
.filter(|peer| !self.peer_states.read().already_polled_peer(peer))
.collect::<Vec<_>>();

// Fetch the last polled peer
if let Some(peer) = self.peer_states.write().oldest_polled_peer() {
peers_to_poll.push(peer);
}

// Mark these peers as now polled
for peer in &peers_to_poll {
self.peer_states.write().add_polled_peer(*peer);
}

Ok(peers_to_poll)
}

/// Returns all peers connected to us
fn get_all_connected_peers(&self) -> Result<Vec<PeerNetworkId>, Error> {
let network_peer_metadata = self.network_client.peer_metadata_storage();
let connected_peers = network_peer_metadata
.networks()
.flat_map(|network_id| {
network_peer_metadata
.read_filtered(network_id, |(_, peer_metadata)| {
peer_metadata.is_connected()
&& peer_metadata.supports_protocol(ProtocolId::StorageServiceRpc)
})
.into_keys()
})
.collect::<Vec<_>>();

// Ensure connected peers is not empty
if connected_peers.is_empty() {
return Err(Error::DataIsUnavailable(
"No connected AptosNet peers!".to_owned(),
));
}
Ok(connected_peers)
}

/// Sends a request (to an undecided peer) and decodes the response
async fn send_request_and_decode<T, E>(
&self,
request: StorageServiceRequest,
Expand All @@ -167,7 +202,7 @@ impl AptosNetDataClient {
T: TryFrom<StorageServiceResponse, Error = E>,
E: Into<Error>,
{
let peer = self.choose_peer(&request).map_err(|error| {
let peer = self.choose_peer_for_request(&request).map_err(|error| {
error!(
(LogSchema::new(LogEntry::StorageServiceRequest)
.event(LogEvent::PeerSelectionError)
Expand All @@ -180,6 +215,7 @@ impl AptosNetDataClient {
self.send_request_to_peer_and_decode(peer, request).await
}

/// Sends a request to a specific peer and decodes the response
async fn send_request_to_peer_and_decode<T, E>(
&self,
peer: PeerNetworkId,
Expand All @@ -206,6 +242,7 @@ impl AptosNetDataClient {
}
}

/// Sends a request to a specific peer
async fn send_request_to_peer(
&self,
peer: PeerNetworkId,
Expand Down Expand Up @@ -299,6 +336,7 @@ impl AptosNetDataClient {
}
}

/// Updates the score of the peer who sent the response with the specified id
fn notify_bad_response(
&self,
_id: ResponseId,
Expand Down Expand Up @@ -414,14 +452,6 @@ impl fmt::Debug for AptosNetResponseCallback {
}
}

// TODO(philiphayes):
// + ownership b/w poller and data client a bit murky
// + how to stop poller loop? ideally all data client refs get dropped and it
// just works.
// + would need to make data client contain weak refs somehow when in poller...
// + or maybe poller needs to not depend on data client?
// + an explicit close method seems unsafe / easy to forget...
// + ofc, in prod we will never cancel lol
pub struct DataSummaryPoller {
time_service: TimeService,
data_client: AptosNetDataClient,
Expand All @@ -441,85 +471,101 @@ impl DataSummaryPoller {
}
}

pub async fn start(self) {
/// Runs the poller that continuously updates the global data summary
pub async fn start_poller(self) {
info!(
(LogSchema::new(LogEntry::DataSummaryPollerStart)
(LogSchema::new(LogEntry::DataSummaryPoller)
.message("Starting the Aptos data poller!"))
);
let ticker = self.time_service.interval(self.poll_interval);
futures::pin_mut!(ticker);

// TODO(philiphayes): rather than polling one at a time, maybe do
// round-robin with a few concurrent polls.
loop {
// wait for next round to poll
// Wait for next round before polling
ticker.next().await;

// just sample a random peer for now. do something smarter here in
// the future.
let peer = match self
.data_client
.choose_peer(&StorageServiceRequest::GetStorageServerSummary)
{
Ok(peer) => peer,
// Fetch the peers to poll
let peers_to_poll = match self.data_client.fetch_peers_to_poll() {
Ok(peers_to_poll) => peers_to_poll,
Err(error) => {
sample!(
SampleRate::Duration(Duration::from_secs(POLLER_ERROR_LOG_FREQ_SECS)),
error!(
(LogSchema::new(LogEntry::StorageSummaryRequest)
.event(LogEvent::NoPeersToPoll)
.message("Unable to select next peer")
.message("Unable to fetch any peers to poll!")
.error(&error))
);
);
continue;
}
};

let timer = start_timer(
&metrics::REQUEST_LATENCIES,
StorageServiceRequest::GetStorageServerSummary
.get_label()
.into(),
);
let result: Result<StorageServerSummary> = self
.data_client
.send_request_to_peer_and_decode(
peer,
StorageServiceRequest::GetStorageServerSummary,
)
.await
.map(Response::into_payload);
drop(timer);

let storage_summary = match result {
Ok(storage_summary) => storage_summary,
Err(error) => {
// Ensure peers to poll is not empty
if peers_to_poll.is_empty() {
sample!(
SampleRate::Duration(Duration::from_secs(POLLER_ERROR_LOG_FREQ_SECS)),
error!(
(LogSchema::new(LogEntry::StorageSummaryResponse)
.event(LogEvent::PeerPollingError)
.message("Error encountered when polling peer")
.error(&error)
.peer(&peer))
(LogSchema::new(LogEntry::StorageSummaryRequest)
.event(LogEvent::NoPeersToPoll)
.message("Peers to poll is empty!"))
);
continue;
}
};
);
}

self.data_client.update_summary(peer, storage_summary);
self.data_client.update_global_summary_cache();
// Go through each peer and poll individually
for peer in peers_to_poll {
// Start the peer polling timer
let timer = start_timer(
&metrics::REQUEST_LATENCIES,
StorageServiceRequest::GetStorageServerSummary
.get_label()
.into(),
);

sample!(
SampleRate::Duration(Duration::from_secs(GLOBAL_DATA_LOG_FREQ_SECS)),
debug!(
(LogSchema::new(LogEntry::PeerStates)
.event(LogEvent::AggregateSummary)
.message(&format!(
"Global data summary: {:?}",
self.data_client.get_global_data_summary()
)))
)
);
// Fetch the storage summary for the peer
let result: Result<StorageServerSummary> = self
.data_client
.send_request_to_peer_and_decode(
peer,
StorageServiceRequest::GetStorageServerSummary,
)
.await
.map(Response::into_payload);
drop(timer);

// Check the storage summary response
let storage_summary = match result {
Ok(storage_summary) => storage_summary,
Err(error) => {
error!(
(LogSchema::new(LogEntry::StorageSummaryResponse)
.event(LogEvent::PeerPollingError)
.message("Error encountered when polling peer!")
.error(&error)
.peer(&peer))
);
continue;
}
};

// Update the global storage summary and the summary for the peer
self.data_client.update_summary(peer, storage_summary);
self.data_client.update_global_summary_cache();

// Log the new global data summary
sample!(
SampleRate::Duration(Duration::from_secs(GLOBAL_DATA_LOG_FREQ_SECS)),
debug!(
(LogSchema::new(LogEntry::PeerStates)
.event(LogEvent::AggregateSummary)
.message(&format!(
"Global data summary: {:?}",
self.data_client.get_global_data_summary()
)))
)
);
}
}
}
}
Loading

0 comments on commit 54d2b5d

Please sign in to comment.