Skip to content

Commit

Permalink
[State Sync] Add a config for the data streaming service.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind authored and bors-libra committed Nov 13, 2021
1 parent 3c8a594 commit 4e22424
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 63 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 43 additions & 7 deletions config/src/config/state_sync_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,10 @@ pub struct StateSyncConfig {
pub sync_request_timeout_ms: u64,
// interval used for checking state synchronization progress
pub tick_interval_ms: u64,

// TODO(joshlind): plug these in when required.
// Everything above belongs to state sync v1 and will be removed in the future.

// The config for the storage service running on each node. Required by
// state sync v2.
pub storage_service: StorageServiceConfig,
// pub data_streaming_service: DataStreamingServiceConfig,
// pub storage_service: StorageServiceConfig,
}

impl Default for StateSyncConfig {
Expand All @@ -47,12 +45,11 @@ impl Default for StateSyncConfig {
multicast_timeout_ms: 30_000,
sync_request_timeout_ms: 60_000,
tick_interval_ms: 100,
storage_service: StorageServiceConfig::default(),
}
}
}

#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[derive(Copy, Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct StorageServiceConfig {
pub max_account_states_chunk_sizes: u64, // Max num of accounts per chunk
Expand All @@ -73,3 +70,42 @@ impl Default for StorageServiceConfig {
}
}
}

#[derive(Copy, Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct DataStreamingServiceConfig {
// The interval (milliseconds) at which to refresh the global data summary.
pub global_summary_refresh_interval_ms: u64,

// Maximum number of concurrent data client requests (per stream).
pub max_concurrent_requests: u64,

// Maximum channel sizes for each data stream listener. If messages are not
// consumed, they will be dropped (oldest messages first). The remaining
// messages will be retrieved using FIFO ordering.
pub max_data_stream_channel_sizes: u64,

// Maximum number of retries for a single client request before a data
// stream will terminate.
pub max_request_retry: u64,

// Maximum number of notification ID to response context mappings held in
// memory. Once the number grows beyond this value, garbage collection occurs.
pub max_notification_id_mappings: u64,

// The interval (milliseconds) at which to check the progress of each stream.
pub progress_check_interval_ms: u64,
}

impl Default for DataStreamingServiceConfig {
fn default() -> Self {
Self {
global_summary_refresh_interval_ms: 1000,
max_concurrent_requests: 3,
max_data_stream_channel_sizes: 1000,
max_request_retry: 10,
max_notification_id_mappings: 2000,
progress_check_interval_ms: 100,
}
}
}
13 changes: 4 additions & 9 deletions diem-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,8 @@ fn setup_storage_service_servers(
.expect("Failed to start the DiemNet storage-service runtime.");
let storage_reader = StorageReader::new(Arc::clone(&db_rw.reader));
for events in network_handles {
let service = StorageServiceServer::new(
config.clone(),
rt.handle().clone(),
storage_reader.clone(),
events,
);
let service =
StorageServiceServer::new(config, rt.handle().clone(), storage_reader.clone(), events);
rt.spawn(service.start());
}
rt
Expand Down Expand Up @@ -494,17 +490,16 @@ pub fn setup_environment(node_config: &NodeConfig, logger: Option<Arc<Logger>>)
// TODO set up on-chain discovery network based on UpstreamConfig.fallback_network
// and pass network handles to mempool/state sync

let storage_service_config = node_config.state_sync.storage_service.clone();
let storage_service_rt = setup_storage_service_servers(
storage_service_config.clone(),
StorageServiceConfig::default(),
storage_service_server_network_handles,
&db_rw,
);

let _diemnet_data_client = setup_diemnet_data_client(
// TODO(philiphayes): probably use state-sync-v2 handle here?
storage_service_rt.handle(),
storage_service_config,
StorageServiceConfig::default(),
storage_service_client_network_handles,
peer_metadata_storage.clone(),
);
Expand Down
1 change: 1 addition & 0 deletions state-sync/state-sync-v2/data-streaming-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ tokio = { version = "1.8.1", features = ["full"] }
tokio-stream = "0.1.4"

channel = { path = "../../../common/channel" }
diem-config = { path = "../../../config" }
diem-crypto = { path = "../../../crypto/crypto" }
diem-data-client = { path = "../../diem-data-client" }
diem-id-generator = { path = "../../../common/id-generator" }
Expand Down
41 changes: 19 additions & 22 deletions state-sync/state-sync-v2/data-streaming-service/src/data_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
streaming_client::{NotificationFeedback, StreamRequest},
};
use channel::{diem_channel, message_queues::QueueStyle};
use diem_config::config::DataStreamingServiceConfig;
use diem_data_client::{
AdvertisedData, DiemDataClient, GlobalDataSummary, Response, ResponseContext, ResponseError,
ResponsePayload,
Expand All @@ -26,21 +27,6 @@ use std::{
};
use tokio::task::JoinHandle;

// Maximum channel sizes for each stream listener. If messages are not
// consumed, they will be dropped (oldest messages first). The remaining
// messages will be retrieved using FIFO ordering.
const DATA_STREAM_CHANNEL_SIZE: usize = 1000;

// Maximum number of concurrent data client requests (per stream)
const MAX_CONCURRENT_REQUESTS: u64 = 3;

// Maximum number of retries for a single client request before the stream terminates
pub const MAX_REQUEST_RETRY: u64 = 10;

// Maximum number of notification ID to response ID mappings held in memory.
// Once the number of mappings grow beyond this value, garbage collection occurs.
pub const MAX_NOTIFICATION_ID_MAPPINGS: u64 = 2000;

/// A unique ID used to identify each stream.
pub type DataStreamId = u64;

Expand All @@ -57,6 +43,9 @@ pub type PendingClientResponse = Arc<Mutex<Box<data_notification::PendingClientR
/// proofs must be sent with monotonically increasing versions).
#[derive(Debug)]
pub struct DataStream<T> {
// The configuration for this data stream
config: DataStreamingServiceConfig,

// The unique ID for this data stream. This is useful for logging.
data_stream_id: DataStreamId,

Expand Down Expand Up @@ -95,22 +84,27 @@ pub struct DataStream<T> {

impl<T: DiemDataClient + Send + Clone + 'static> DataStream<T> {
pub fn new(
config: DataStreamingServiceConfig,
data_stream_id: DataStreamId,
stream_request: &StreamRequest,
diem_data_client: T,
notification_id_generator: Arc<U64IdGenerator>,
advertised_data: &AdvertisedData,
) -> Result<(Self, DataStreamListener), Error> {
// Create a new data stream listener
let (notification_sender, notification_receiver) =
diem_channel::new(QueueStyle::KLAST, DATA_STREAM_CHANNEL_SIZE, None);
let (notification_sender, notification_receiver) = diem_channel::new(
QueueStyle::KLAST,
config.max_data_stream_channel_sizes as usize,
None,
);
let data_stream_listener = DataStreamListener::new(notification_receiver);

// Create a new stream engine
let stream_engine = StreamEngine::new(stream_request, advertised_data)?;

// Create a new data stream
let data_stream = Self {
config,
data_stream_id,
diem_data_client,
stream_engine,
Expand Down Expand Up @@ -195,7 +189,9 @@ impl<T: DiemDataClient + Send + Clone + 'static> DataStream<T> {
) -> Result<(), Error> {
// Determine how many requests (at most) can be sent to the network
let num_sent_requests = self.get_sent_data_requests().len() as u64;
let max_num_requests_to_send = MAX_CONCURRENT_REQUESTS
let max_num_requests_to_send = self
.config
.max_concurrent_requests
.checked_sub(num_sent_requests)
.ok_or_else(|| {
Error::IntegerOverflow("Max number of requests to send has overflown!".into())
Expand Down Expand Up @@ -332,7 +328,7 @@ impl<T: DiemDataClient + Send + Clone + 'static> DataStream<T> {
global_data_summary: GlobalDataSummary,
) -> Result<(), Error> {
if self.stream_engine.is_stream_complete()
|| self.request_failure_count >= MAX_REQUEST_RETRY
|| self.request_failure_count >= self.config.max_request_retry
{
if self.stream_end_notification_id.is_none() {
self.send_end_of_stream_notification()?;
Expand All @@ -341,7 +337,7 @@ impl<T: DiemDataClient + Send + Clone + 'static> DataStream<T> {
}

// Process any ready data responses
for _ in 0..MAX_CONCURRENT_REQUESTS {
for _ in 0..self.config.max_concurrent_requests {
if let Some(pending_response) = self.pop_pending_response_queue() {
let mut pending_response = pending_response.lock();
let client_response = pending_response
Expand Down Expand Up @@ -523,10 +519,11 @@ impl<T: DiemDataClient + Send + Clone + 'static> DataStream<T> {
}

fn garbage_collect_notification_response_map(&mut self) -> Result<(), Error> {
let max_notification_id_mappings = self.config.max_notification_id_mappings;
let map_length = self.notifications_to_responses.len() as u64;
if map_length > MAX_NOTIFICATION_ID_MAPPINGS {
if map_length > max_notification_id_mappings {
let num_entries_to_remove = map_length
.checked_sub(MAX_NOTIFICATION_ID_MAPPINGS)
.checked_sub(max_notification_id_mappings)
.ok_or_else(|| {
Error::IntegerOverflow("Number of entries to remove has overflown!".into())
})?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
StreamRequest, StreamRequestMessage, StreamingServiceListener, TerminateStreamRequest,
},
};
use diem_config::config::DataStreamingServiceConfig;
use diem_data_client::{DiemDataClient, GlobalDataSummary, OptimalChunkSizes};
use diem_id_generator::{IdGenerator, U64IdGenerator};
use diem_logger::prelude::*;
Expand All @@ -17,12 +18,11 @@ use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::time::interval;
use tokio_stream::wrappers::IntervalStream;

/// Constants for state management frequencies
const DATA_REFRESH_INTERVAL_MS: u64 = 1000;
const PROGRESS_CHECK_INTERVAL_MS: u64 = 100;

/// The data streaming service that responds to data stream requests.
pub struct DataStreamingService<T> {
// The configuration for this streaming service.
config: DataStreamingServiceConfig,

// The data client through which to fetch data from the Diem network
diem_data_client: T,

Expand All @@ -41,8 +41,13 @@ pub struct DataStreamingService<T> {
}

impl<T: DiemDataClient + Send + Clone + 'static> DataStreamingService<T> {
pub fn new(diem_data_client: T, stream_requests: StreamingServiceListener) -> Self {
pub fn new(
config: DataStreamingServiceConfig,
diem_data_client: T,
stream_requests: StreamingServiceListener,
) -> Self {
Self {
config,
diem_data_client,
global_data_summary: GlobalDataSummary::empty(),
data_streams: HashMap::new(),
Expand All @@ -54,10 +59,14 @@ impl<T: DiemDataClient + Send + Clone + 'static> DataStreamingService<T> {

/// Starts the dedicated streaming service
pub async fn start_service(mut self) {
let mut data_refresh_interval =
IntervalStream::new(interval(Duration::from_millis(DATA_REFRESH_INTERVAL_MS))).fuse();
let mut progress_check_interval =
IntervalStream::new(interval(Duration::from_millis(PROGRESS_CHECK_INTERVAL_MS))).fuse();
let mut data_refresh_interval = IntervalStream::new(interval(Duration::from_millis(
self.config.global_summary_refresh_interval_ms,
)))
.fuse();
let mut progress_check_interval = IntervalStream::new(interval(Duration::from_millis(
self.config.progress_check_interval_ms,
)))
.fuse();

loop {
::futures::select! {
Expand Down Expand Up @@ -149,6 +158,7 @@ impl<T: DiemDataClient + Send + Clone + 'static> DataStreamingService<T> {
// Create a new data stream
let stream_id = self.stream_id_generator.next();
let (data_stream, stream_listener) = DataStream::new(
self.config,
stream_id,
&request_message.stream_request,
self.diem_data_client.clone(),
Expand Down
Loading

0 comments on commit 4e22424

Please sign in to comment.