Skip to content

Commit

Permalink
[Aptos Data Poller] Use non-blocking polling.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind authored and aptos-bot committed May 10, 2022
1 parent b26c315 commit 1571612
Show file tree
Hide file tree
Showing 9 changed files with 671 additions and 293 deletions.
17 changes: 9 additions & 8 deletions aptos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,20 +344,21 @@ fn setup_aptos_data_client(
peer_metadata_storage,
);

// Create the data client
// Create a new runtime for the data client
let aptos_data_client_runtime = Builder::new_multi_thread()
.thread_name("aptos-data-client")
.enable_all()
.build()
.expect("Failed to create aptos data client!");

// Create the data client and spawn the data poller
let (aptos_data_client, data_summary_poller) = AptosNetDataClient::new(
aptos_data_client_config,
storage_service_config,
TimeService::real(),
network_client,
Some(aptos_data_client_runtime.handle().clone()),
);

// Create a new runtime for the data client and spawn the data poller
let aptos_data_client_runtime = Builder::new_multi_thread()
.thread_name("aptos-data-client")
.enable_all()
.build()
.expect("Failed to create aptos data client!");
aptos_data_client_runtime.spawn(data_summary_poller.start_poller());

(aptos_data_client, aptos_data_client_runtime)
Expand Down
8 changes: 6 additions & 2 deletions config/src/config/state_sync_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl Default for StateSyncDriverConfig {
max_connection_deadline_secs: 10,
max_consecutive_stream_notifications: 10,
max_pending_data_chunks: 100,
max_stream_wait_time_ms: 10_000,
max_stream_wait_time_ms: 5000,
}
}
}
Expand Down Expand Up @@ -173,14 +173,18 @@ impl Default for DataStreamingServiceConfig {
#[derive(Copy, Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct AptosDataClientConfig {
pub max_num_in_flight_priority_polls: u64, // Max num of in-flight polls for priority peers
pub max_num_in_flight_regular_polls: u64, // Max num of in-flight polls for regular peers
pub response_timeout_ms: u64, // Timeout (in milliseconds) when waiting for a response
pub summary_poll_interval_ms: u64, // Interval (in milliseconds) between data summary polls
}

impl Default for AptosDataClientConfig {
fn default() -> Self {
Self {
response_timeout_ms: 10000,
max_num_in_flight_priority_polls: 10,
max_num_in_flight_regular_polls: 10,
response_timeout_ms: 5000,
summary_poll_interval_ms: 100,
}
}
Expand Down
1 change: 1 addition & 0 deletions state-sync/aptos-data-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ itertools = "0.10.0"
rand = "0.8.3"
serde = { version = "1.0.137", default-features = false }
thiserror = "1.0.31"
tokio = { version = "1.8.1", features = ["full"] }

aptos-config = { path = "../../config" }
aptos-crypto = { path = "../../crates/aptos-crypto" }
Expand Down
12 changes: 12 additions & 0 deletions state-sync/aptos-data-client/src/aptosnet/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use aptos_metrics::{

/// The special label TOTAL_COUNT stores the sum of all values in the counter.
pub const TOTAL_COUNT_LABEL: &str = "TOTAL_COUNT";
pub const PRIORITIZED_PEER: &str = "prioritized_peer";
pub const REGULAR_PEER: &str = "regular_peer";

/// Counter for tracking sent requests
pub static SENT_REQUESTS: Lazy<IntCounterVec> = Lazy::new(|| {
Expand Down Expand Up @@ -50,6 +52,16 @@ pub static REQUEST_LATENCIES: Lazy<HistogramVec> = Lazy::new(|| {
.unwrap()
});

/// Gauge for tracking the number of in-flight polls
pub static IN_FLIGHT_POLLS: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"aptos_data_client_in_flight_polls",
"Gauge related to the number of in-flight polls",
&["peer_type"]
)
.unwrap()
});

/// Gauge for the highest advertised data
pub static HIGHEST_ADVERTISED_DATA: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
Expand Down
Loading

0 comments on commit 1571612

Please sign in to comment.