Skip to content

Commit

Permalink
[State Sync] Update the storage service to cache the storage summary.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind authored and aptos-bot committed Mar 30, 2022
1 parent 834b7ff commit 3ec1dcc
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 25 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aptos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ fn setup_state_sync_storage_service(
config,
storage_service_runtime.handle().clone(),
storage_reader.clone(),
TimeService::real(),
events,
);
storage_service_runtime.spawn(service.start());
Expand Down
2 changes: 2 additions & 0 deletions config/src/config/state_sync_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ pub struct StorageServiceConfig {
pub max_network_channel_size: u64, // Max num of pending network messages
pub max_transaction_chunk_size: u64, // Max num of transactions per chunk
pub max_transaction_output_chunk_size: u64, // Max num of transaction outputs per chunk
pub storage_summary_refresh_interval_ms: u64, // The interval (ms) to refresh the storage summary
}

impl Default for StorageServiceConfig {
Expand All @@ -121,6 +122,7 @@ impl Default for StorageServiceConfig {
max_network_channel_size: 1000,
max_transaction_chunk_size: 3000,
max_transaction_output_chunk_size: 3000,
storage_summary_refresh_interval_ms: 1000,
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions state-sync/storage-service/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ bounded-executor = { path = "../../../crates/bounded-executor" }
channel = { path = "../../../crates/channel" }
aptos-config = { path = "../../../config" }
aptos-logger = { path = "../../../crates/aptos-logger" }
aptos-infallible = { path = "../../../crates/aptos-infallible" }
aptos-metrics = { path = "../../../crates/aptos-metrics" }
aptos-time-service = { path = "../../../crates/aptos-time-service", features = ["async"] }
aptos-types = { path = "../../../types" }
aptos-workspace-hack = { version = "0.1", path = "../../../crates/aptos-workspace-hack" }
network = { path = "../../../network" }
Expand All @@ -34,6 +36,7 @@ anyhow = "1.0.52"
claim = "0.5.0"

aptos-crypto = { path = "../../../crates/aptos-crypto" }
aptos-time-service = { path = "../../../crates/aptos-time-service", features = ["async", "testing"] }
aptos-types = { path = "../../../types" }
move-core-types = { git = "https://github.com/diem/move", rev = "3fe033b112eae7df2d15ab3467624165ae510caa", features=["address32"] }
storage-interface = { path = "../../../storage/storage-interface" }
111 changes: 97 additions & 14 deletions state-sync/storage-service/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use crate::{
};
use ::network::ProtocolId;
use aptos_config::config::StorageServiceConfig;
use aptos_infallible::RwLock;
use aptos_logger::prelude::*;
use aptos_time_service::{TimeService, TimeServiceTrait};
use aptos_types::{
epoch_change::EpochChangeProof,
state_store::state_value::StateValueChunkWithProof,
Expand Down Expand Up @@ -71,26 +73,76 @@ pub struct StorageServiceServer<T> {
// TODO(philiphayes): would like a "multi-network" stream here, so we only
// need one service for all networks.
network_requests: StorageServiceNetworkEvents,
time_service: TimeService,

// We maintain a cached storage server summary to avoid hitting the DB for
// every request. This is refreshed periodically.
cached_storage_server_summary: Arc<RwLock<StorageServerSummary>>,
}

impl<T: StorageReaderInterface> StorageServiceServer<T> {
pub fn new(
config: StorageServiceConfig,
executor: Handle,
storage: T,
time_service: TimeService,
network_requests: StorageServiceNetworkEvents,
) -> Self {
let bounded_executor =
BoundedExecutor::new(config.max_concurrent_requests as usize, executor);
let cached_storage_server_summary = Arc::new(RwLock::new(StorageServerSummary::default()));

Self {
config,
bounded_executor,
storage,
network_requests,
time_service,
cached_storage_server_summary,
}
}

/// Spawns a non-terminating task that refreshes the cached storage server summary
async fn spawn_storage_summary_refresher(&mut self) {
let config = self.config;
let storage = self.storage.clone();
let time_service = self.time_service.clone();
let cached_storage_server_summary = self.cached_storage_server_summary.clone();

// Spawn the task
self.bounded_executor
.spawn(async move {
// Create a ticker for the refresh interval
let duration = Duration::from_millis(config.storage_summary_refresh_interval_ms);
let ticker = time_service.interval(duration);
futures::pin_mut!(ticker);

// Periodically refresh the cache
loop {
ticker.next().await;

if let Err(error) = refresh_cached_storage_summary(
config,
storage.clone(),
cached_storage_server_summary.clone(),
) {
let error = format!(
"Failed to refresh the cached storage summary! Error: {:?}",
error
);
error!(LogSchema::new(LogEntry::StorageServiceError).message(&error));
}
}
})
.await;
}

/// Starts the storage service server thread
pub async fn start(mut self) {
// Spawn the refresher for the cache
self.spawn_storage_summary_refresher().await;

// Handle the storage requests
while let Some(request) = self.network_requests.next().await {
// Log the request
let (peer, protocol, request, response_sender) = request;
Expand All @@ -104,11 +156,13 @@ impl<T: StorageReaderInterface> StorageServiceServer<T> {
// All handler methods are currently CPU-bound and synchronous
// I/O-bound, so we want to spawn on the blocking thread pool to
// avoid starving other async tasks on the same runtime.
let storage = self.storage.clone();
let config = self.config;
let storage = self.storage.clone();
let cached_storage_server_summary = self.cached_storage_server_summary.clone();
self.bounded_executor
.spawn_blocking(move || {
let response = Handler::new(config, storage).call(protocol, request);
let response = Handler::new(config, storage, cached_storage_server_summary)
.call(protocol, request);
log_storage_response(&response);
response_sender.send(response);
})
Expand All @@ -117,18 +171,56 @@ impl<T: StorageReaderInterface> StorageServiceServer<T> {
}
}

/// Refreshes the cached storage server summary
fn refresh_cached_storage_summary<T: StorageReaderInterface>(
storage_config: StorageServiceConfig,
storage: T,
cached_storage_summary: Arc<RwLock<StorageServerSummary>>,
) -> Result<()> {
// Fetch the data summary from storage
let data_summary = storage
.get_data_summary()
.map_err(|error| StorageServiceError::InternalError(error.to_string()))?;

// Initialize the protocol metadata
let protocol_metadata = ProtocolMetadata {
max_epoch_chunk_size: storage_config.max_epoch_chunk_size,
max_transaction_chunk_size: storage_config.max_transaction_chunk_size,
max_transaction_output_chunk_size: storage_config.max_transaction_output_chunk_size,
max_account_states_chunk_size: storage_config.max_account_states_chunk_sizes,
};

// Save the storage server summary
let storage_server_summary = StorageServerSummary {
protocol_metadata,
data_summary,
};
*cached_storage_summary.write() = storage_server_summary;

Ok(())
}

/// The `Handler` is the "pure" inbound request handler. It contains all the
/// necessary context and state needed to construct a response to an inbound
/// request. We usually clone/create a new handler for every request.
#[derive(Clone)]
pub struct Handler<T> {
config: StorageServiceConfig,
storage: T,
cached_storage_server_summary: Arc<RwLock<StorageServerSummary>>,
}

impl<T: StorageReaderInterface> Handler<T> {
pub fn new(config: StorageServiceConfig, storage: T) -> Self {
Self { config, storage }
pub fn new(
config: StorageServiceConfig,
storage: T,
cached_storage_server_summary: Arc<RwLock<StorageServerSummary>>,
) -> Self {
Self {
config,
storage,
cached_storage_server_summary,
}
}

pub fn call(
Expand Down Expand Up @@ -244,16 +336,7 @@ impl<T: StorageReaderInterface> Handler<T> {
}

fn get_storage_server_summary(&self) -> Result<StorageServiceResponse, Error> {
let storage_server_summary = StorageServerSummary {
protocol_metadata: ProtocolMetadata {
max_epoch_chunk_size: self.config.max_epoch_chunk_size,
max_transaction_chunk_size: self.config.max_transaction_chunk_size,
max_transaction_output_chunk_size: self.config.max_transaction_output_chunk_size,
max_account_states_chunk_size: self.config.max_account_states_chunk_sizes,
},
data_summary: self.storage.get_data_summary()?,
};

let storage_server_summary = self.cached_storage_server_summary.read().clone();
Ok(StorageServiceResponse::StorageServerSummary(
storage_server_summary,
))
Expand Down
37 changes: 27 additions & 10 deletions state-sync/storage-service/server/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use anyhow::Result;
use aptos_config::config::StorageServiceConfig;
use aptos_crypto::{ed25519::Ed25519PrivateKey, HashValue, PrivateKey, SigningKey, Uniform};
use aptos_logger::Level;
use aptos_time_service::{MockTimeService, TimeService};
use aptos_types::{
account_address::AccountAddress,
block_info::BlockInfo,
Expand Down Expand Up @@ -59,7 +60,7 @@ const STATE_PRUNE_WINDOW: u64 = 50;

#[tokio::test]
async fn test_get_server_protocol_version() {
let (mut mock_client, service) = MockClient::new();
let (mut mock_client, service, _) = MockClient::new();
tokio::spawn(service.start());

// Process a request to fetch the protocol version
Expand All @@ -75,7 +76,7 @@ async fn test_get_server_protocol_version() {

#[tokio::test]
async fn test_get_account_states_chunk_with_proof() {
let (mut mock_client, service) = MockClient::new();
let (mut mock_client, service, _) = MockClient::new();
tokio::spawn(service.start());

// Create a request to fetch an account states chunk with a proof
Expand Down Expand Up @@ -112,7 +113,7 @@ async fn test_get_account_states_chunk_with_proof() {

#[tokio::test]
async fn test_get_number_of_accounts_at_version() {
let (mut mock_client, service) = MockClient::new();
let (mut mock_client, service, _) = MockClient::new();
tokio::spawn(service.start());

// Create a request to fetch the number of accounts at the specified version
Expand All @@ -129,13 +130,26 @@ async fn test_get_number_of_accounts_at_version() {

#[tokio::test]
async fn test_get_storage_server_summary() {
let (mut mock_client, service) = MockClient::new();
let (mut mock_client, service, mock_time) = MockClient::new();
tokio::spawn(service.start());

// Process a request to fetch the storage summary
let request = StorageServiceRequest::GetStorageServerSummary;
let response = mock_client.send_request(request).await.unwrap();

// Verify the response is the default response (not enough time has passed for the cache to be updated)
assert_eq!(
response,
StorageServiceResponse::StorageServerSummary(StorageServerSummary::default())
);

// Elapse enough time to force the cache to be updated
mock_time.advance_secs(5);

// Process another request to fetch the storage summary
let request = StorageServiceRequest::GetStorageServerSummary;
let response = mock_client.send_request(request).await.unwrap();

// Verify the response is correct
let highest_version = LAST_TXN_VERSION;
let highest_epoch = LAST_EPOCH;
Expand Down Expand Up @@ -170,7 +184,7 @@ async fn test_get_storage_server_summary() {

#[tokio::test]
async fn test_get_transactions_with_proof_events() {
let (mut mock_client, service) = MockClient::new();
let (mut mock_client, service, _) = MockClient::new();
tokio::spawn(service.start());

// Create a request to fetch transactions with a proof
Expand Down Expand Up @@ -205,7 +219,7 @@ async fn test_get_transactions_with_proof_events() {

#[tokio::test]
async fn test_get_transactions_with_proof_no_events() {
let (mut mock_client, service) = MockClient::new();
let (mut mock_client, service, _) = MockClient::new();
tokio::spawn(service.start());

// Create a request to fetch transactions with a proof (excluding events)
Expand Down Expand Up @@ -240,7 +254,7 @@ async fn test_get_transactions_with_proof_no_events() {

#[tokio::test]
async fn test_get_transaction_outputs_with_proof() {
let (mut mock_client, service) = MockClient::new();
let (mut mock_client, service, _) = MockClient::new();
tokio::spawn(service.start());

// Create a request to fetch transaction outputs with a proof
Expand Down Expand Up @@ -274,7 +288,7 @@ async fn test_get_transaction_outputs_with_proof() {

#[tokio::test]
async fn test_get_epoch_ending_ledger_infos() {
let (mut mock_client, service) = MockClient::new();
let (mut mock_client, service, _) = MockClient::new();
tokio::spawn(service.start());

let start_epoch = 11;
Expand Down Expand Up @@ -315,7 +329,7 @@ struct MockClient {
}

impl MockClient {
fn new() -> (Self, StorageServiceServer<StorageReader>) {
fn new() -> (Self, StorageServiceServer<StorageReader>, MockTimeService) {
initialize_logger();
let storage = StorageReader::new(Arc::new(MockDbReader));

Expand All @@ -328,14 +342,17 @@ impl MockClient {
StorageServiceNetworkEvents::new(peer_mgr_notifs_rx, connection_notifs_rx);

let executor = tokio::runtime::Handle::current();
let mock_time_service = TimeService::mock();
let storage_server = StorageServiceServer::new(
StorageServiceConfig::default(),
executor,
storage,
mock_time_service.clone(),
network_requests,
);

(Self { peer_mgr_notifs_tx }, storage_server)
let mock_client = Self { peer_mgr_notifs_tx };
(mock_client, storage_server, mock_time_service.into_mock())
}

async fn send_request(
Expand Down
2 changes: 1 addition & 1 deletion state-sync/storage-service/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ pub struct ServerProtocolVersion {
/// A storage server summary, containing a summary of the information held
/// by the corresponding server instance. This is useful for identifying the
/// data that a server instance can provide, as well as relevant metadata.
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub struct StorageServerSummary {
pub protocol_metadata: ProtocolMetadata,
pub data_summary: DataSummary,
Expand Down

0 comments on commit 3ec1dcc

Please sign in to comment.