Skip to content

Commit

Permalink
[Aptos Data Client] Remove RwLock around peer states.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Sep 22, 2023
1 parent 711d299 commit d686113
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 102 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions state-sync/aptos-data-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ aptos-time-service = { workspace = true }
aptos-types = { workspace = true }
arc-swap = { workspace = true }
async-trait = { workspace = true }
dashmap = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
rand = { workspace = true }
Expand Down
54 changes: 25 additions & 29 deletions state-sync/aptos-data-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use aptos_config::{
network_id::PeerNetworkId,
};
use aptos_id_generator::{IdGenerator, U64IdGenerator};
use aptos_infallible::{Mutex, RwLock};
use aptos_infallible::Mutex;
use aptos_logger::{debug, info, sample, sample::SampleRate, trace, warn};
use aptos_network::{application::interface::NetworkClient, protocols::network::RpcError};
use aptos_storage_interface::DbReader;
Expand Down Expand Up @@ -77,13 +77,13 @@ const PEER_LOG_FREQ_SECS: u64 = 10;
#[derive(Clone, Debug)]
pub struct AptosDataClient {
/// Config for AptosNet data client.
data_client_config: AptosDataClientConfig,
data_client_config: Arc<AptosDataClientConfig>,
/// The underlying AptosNet storage service client.
storage_service_client: StorageServiceClient<NetworkClient<StorageServiceMessage>>,
/// The state of the active subscription stream.
active_subscription_state: Arc<Mutex<Option<SubscriptionState>>>,
/// All of the data-client specific data we have on each network peer.
peer_states: Arc<RwLock<PeerStates>>,
peer_states: Arc<PeerStates>,
/// A cached, aggregate data summary of all unbanned peers' data summaries.
global_summary_cache: Arc<ArcSwap<GlobalDataSummary>>,
/// Used for generating the next request/response id.
Expand All @@ -101,16 +101,20 @@ impl AptosDataClient {
storage_service_client: StorageServiceClient<NetworkClient<StorageServiceMessage>>,
runtime: Option<Handle>,
) -> (Self, DataSummaryPoller) {
// Wrap the configs in an Arc (to be shared across components)
let base_config = Arc::new(base_config);
let data_client_config = Arc::new(data_client_config);

// Create the data client
let data_client = Self {
data_client_config,
data_client_config: data_client_config.clone(),
storage_service_client: storage_service_client.clone(),
active_subscription_state: Arc::new(Mutex::new(None)),
peer_states: Arc::new(RwLock::new(PeerStates::new(
peer_states: Arc::new(PeerStates::new(
base_config,
data_client_config,
data_client_config.clone(),
storage_service_client.get_peers_and_metadata(),
))),
)),
global_summary_cache: Arc::new(ArcSwap::from(Arc::new(GlobalDataSummary::empty()))),
response_id_generator: Arc::new(U64IdGenerator::new()),
time_service: time_service.clone(),
Expand Down Expand Up @@ -151,7 +155,7 @@ impl AptosDataClient {

/// Update a peer's data summary.
pub fn update_summary(&self, peer: PeerNetworkId, summary: StorageServerSummary) {
self.peer_states.write().update_summary(peer, summary)
self.peer_states.update_summary(peer, summary)
}

/// Recompute and update the global data summary cache
Expand All @@ -161,7 +165,7 @@ impl AptosDataClient {
self.garbage_collect_peer_states()?;

// Calculate the global data summary
let global_data_summary = self.peer_states.read().calculate_global_data_summary();
let global_data_summary = self.peer_states.calculate_global_data_summary();

// Update the cached data summary
self.global_summary_cache
Expand All @@ -177,7 +181,6 @@ impl AptosDataClient {

// Garbage collect the disconnected peers
self.peer_states
.write()
.garbage_collect_peer_states(all_connected_peers);

Ok(())
Expand Down Expand Up @@ -285,11 +288,8 @@ impl AptosDataClient {
prospective_peers
.into_iter()
.filter(|peer| {
self.peer_states.read().can_service_request(
peer,
self.time_service.clone(),
request,
)
self.peer_states
.can_service_request(peer, self.time_service.clone(), request)
})
.collect::<Vec<_>>()
}
Expand All @@ -299,7 +299,7 @@ impl AptosDataClient {
&self,
) -> crate::error::Result<Option<PeerNetworkId>, Error> {
// Fetch the number of in-flight polls and update the metrics
let num_in_flight_polls = self.peer_states.read().num_in_flight_priority_polls();
let num_in_flight_polls = self.peer_states.num_in_flight_priority_polls();
update_in_flight_metrics(PRIORITIZED_PEER, num_in_flight_polls);

// Ensure we don't go over the maximum number of in-flight polls
Expand All @@ -315,7 +315,7 @@ impl AptosDataClient {
/// Fetches the next regular peer to poll
pub fn fetch_regular_peer_to_poll(&self) -> crate::error::Result<Option<PeerNetworkId>, Error> {
// Fetch the number of in-flight polls and update the metrics
let num_in_flight_polls = self.peer_states.read().num_in_flight_regular_polls();
let num_in_flight_polls = self.peer_states.num_in_flight_regular_polls();
update_in_flight_metrics(REGULAR_PEER, num_in_flight_polls);

// Ensure we don't go over the maximum number of in-flight polls
Expand All @@ -334,7 +334,7 @@ impl AptosDataClient {
mut peers: Vec<PeerNetworkId>,
) -> crate::error::Result<Option<PeerNetworkId>, Error> {
// Identify the peers who do not already have in-flight requests.
peers.retain(|peer| !self.peer_states.read().existing_in_flight_request(peer));
peers.retain(|peer| !self.peer_states.existing_in_flight_request(peer));

// Select a peer at random for polling
let peer_to_poll = peers.choose(&mut rand::thread_rng());
Expand All @@ -343,14 +343,12 @@ impl AptosDataClient {

/// Marks the given peers as having an in-flight poll request
pub fn in_flight_request_started(&self, peer: &PeerNetworkId) {
self.peer_states.write().new_in_flight_request(peer);
self.peer_states.new_in_flight_request(peer);
}

/// Marks the given peers as polled
pub fn in_flight_request_complete(&self, peer: &PeerNetworkId) {
self.peer_states
.write()
.mark_in_flight_request_complete(peer);
self.peer_states.mark_in_flight_request_complete(peer);
}

/// Returns all peers connected to us
Expand All @@ -376,7 +374,7 @@ impl AptosDataClient {
let mut priority_peers = vec![];
let mut regular_peers = vec![];
for peer in all_connected_peers {
if self.peer_states.read().is_priority_peer(&peer) {
if self.peer_states.is_priority_peer(&peer) {
priority_peers.push(peer);
} else {
regular_peers.push(peer);
Expand Down Expand Up @@ -504,7 +502,7 @@ impl AptosDataClient {
// On the one hand, scoring dynamics are simpler when each request
// is successful or failed but not both; on the other hand, this
// feels simpler for the consumer.
self.peer_states.write().update_score_success(peer);
self.peer_states.update_score_success(peer);

// Package up all of the context needed to fully report an error
// with this RPC.
Expand Down Expand Up @@ -569,9 +567,7 @@ impl AptosDataClient {
_request: &StorageServiceRequest,
error_type: ErrorType,
) {
self.peer_states
.write()
.update_score_error(peer, error_type);
self.peer_states.update_score_error(peer, error_type);
}

/// Creates a storage service request using the given data request
Expand All @@ -592,8 +588,8 @@ impl AptosDataClient {

/// Returns a copy of the peer states for testing
#[cfg(test)]
pub(crate) fn get_peer_states(&self) -> PeerStates {
self.peer_states.read().clone()
pub(crate) fn get_peer_states(&self) -> Arc<PeerStates> {
self.peer_states.clone()
}
}

Expand Down
6 changes: 3 additions & 3 deletions state-sync/aptos-data-client/src/latency_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub struct LatencyMonitor {

impl LatencyMonitor {
pub fn new(
data_client_config: AptosDataClientConfig,
data_client_config: Arc<AptosDataClientConfig>,
data_client: Arc<dyn AptosDataClientInterface + Send + Sync>,
storage: Arc<dyn DbReader>,
time_service: TimeService,
Expand Down Expand Up @@ -318,7 +318,7 @@ mod tests {
};
use aptos_config::config::AptosDataClientConfig;
use aptos_time_service::{TimeService, TimeServiceTrait};
use std::time::Duration;
use std::{sync::Arc, time::Duration};

#[test]
fn test_calculate_duration_from_proposal() {
Expand Down Expand Up @@ -626,7 +626,7 @@ mod tests {

/// Creates a latency monitor for testing
fn create_latency_monitor() -> (TimeService, LatencyMonitor) {
let data_client_config = AptosDataClientConfig::default();
let data_client_config = Arc::new(AptosDataClientConfig::default());
let data_client = create_mock_data_client();
let storage = create_mock_db_reader();
let time_service = TimeService::mock();
Expand Down
Loading

0 comments on commit d686113

Please sign in to comment.