Skip to content

Commit

Permalink
[Storage Service] Implement an LRU cache in front of storage.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind authored and aptos-bot committed May 4, 2022
1 parent b28a4f1 commit b71475c
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 41 deletions.
17 changes: 16 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions config/src/config/state_sync_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub struct StorageServiceConfig {
pub max_account_states_chunk_sizes: u64, // Max num of accounts per chunk
pub max_concurrent_requests: u64, // Max num of concurrent storage server tasks
pub max_epoch_chunk_size: u64, // Max num of epoch ending ledger infos per chunk
pub max_lru_cache_size: u64, // Max num of items in the lru cache before eviction
pub max_network_channel_size: u64, // Max num of pending network messages
pub max_transaction_chunk_size: u64, // Max num of transactions per chunk
pub max_transaction_output_chunk_size: u64, // Max num of transaction outputs per chunk
Expand All @@ -121,6 +122,7 @@ impl Default for StorageServiceConfig {
max_account_states_chunk_sizes: 1000,
max_concurrent_requests: 4000,
max_epoch_chunk_size: 100,
max_lru_cache_size: 100,
max_network_channel_size: 4000,
max_transaction_chunk_size: 1000,
max_transaction_output_chunk_size: 1000,
Expand Down
16 changes: 4 additions & 12 deletions crates/aptos-workspace-hack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ edition = "2018"
### BEGIN HAKARI SECTION
[dependencies]
Inflector = { version = "0.11.4", features = ["heavyweight", "lazy_static", "regex"] }
ahash = { version = "0.7.6", features = ["std"] }
anyhow = { version = "1.0.57", features = ["backtrace", "std"] }
arrayvec = { version = "0.5.2", features = ["array-sizes-33-128", "std"] }
backtrace = { version = "0.3.58", features = ["addr2line", "gimli-symbolize", "miniz_oxide", "object", "serde", "std"] }
Expand All @@ -34,6 +35,7 @@ futures-sink = { version = "0.3.21", features = ["alloc", "std"] }
futures-util = { version = "0.3.17", features = ["alloc", "async-await", "async-await-macro", "channel", "futures-channel", "futures-io", "futures-macro", "futures-sink", "io", "memchr", "proc-macro-hack", "proc-macro-nested", "sink", "slab", "std"] }
generic-array = { version = "0.14.5", default-features = false, features = ["more_lengths"] }
getrandom = { version = "0.2.6", default-features = false, features = ["std"] }
hashbrown = { version = "0.11.2", features = ["ahash", "inline-more", "raw"] }
hyper = { version = "0.14.18", features = ["client", "full", "h2", "http1", "http2", "runtime", "server", "socket2", "stream", "tcp"] }
include_dir = { version = "0.7.2", features = ["glob"] }
indexmap = { version = "1.8.1", default-features = false, features = ["std"] }
Expand Down Expand Up @@ -66,6 +68,7 @@ zeroize = { version = "1.5.4", features = ["alloc", "zeroize_derive"] }

[build-dependencies]
Inflector = { version = "0.11.4", features = ["heavyweight", "lazy_static", "regex"] }
ahash = { version = "0.7.6", features = ["std"] }
anyhow = { version = "1.0.57", features = ["backtrace", "std"] }
arrayvec = { version = "0.5.2", features = ["array-sizes-33-128", "std"] }
backtrace = { version = "0.3.58", features = ["addr2line", "gimli-symbolize", "miniz_oxide", "object", "serde", "std"] }
Expand All @@ -89,6 +92,7 @@ futures-sink = { version = "0.3.21", features = ["alloc", "std"] }
futures-util = { version = "0.3.17", features = ["alloc", "async-await", "async-await-macro", "channel", "futures-channel", "futures-io", "futures-macro", "futures-sink", "io", "memchr", "proc-macro-hack", "proc-macro-nested", "sink", "slab", "std"] }
generic-array = { version = "0.14.5", default-features = false, features = ["more_lengths"] }
getrandom = { version = "0.2.6", default-features = false, features = ["std"] }
hashbrown = { version = "0.11.2", features = ["ahash", "inline-more", "raw"] }
hyper = { version = "0.14.18", features = ["client", "full", "h2", "http1", "http2", "runtime", "server", "socket2", "stream", "tcp"] }
include_dir = { version = "0.7.2", features = ["glob"] }
indexmap = { version = "1.8.1", default-features = false, features = ["std"] }
Expand Down Expand Up @@ -120,16 +124,4 @@ tracing-core = { version = "0.1.26", features = ["lazy_static", "std"] }
warp = { version = "0.3.2", features = ["multipart", "tls", "tokio-rustls", "tokio-tungstenite", "websocket"] }
zeroize = { version = "1.5.4", features = ["alloc", "zeroize_derive"] }

[target.x86_64-unknown-linux-gnu.dependencies]
hashbrown = { version = "0.11.2", default-features = false, features = ["inline-more", "raw"] }

[target.x86_64-unknown-linux-gnu.build-dependencies]
hashbrown = { version = "0.11.2", default-features = false, features = ["inline-more", "raw"] }

[target.x86_64-apple-darwin.dependencies]
hashbrown = { version = "0.11.2", default-features = false, features = ["inline-more", "raw"] }

[target.x86_64-apple-darwin.build-dependencies]
hashbrown = { version = "0.11.2", default-features = false, features = ["inline-more", "raw"] }

### END HAKARI SECTION
2 changes: 2 additions & 0 deletions state-sync/storage-service/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ edition = "2018"
bcs = "0.1.2"
bytes = "1.0.1"
futures = "0.3.12"
lru = "0.7.5"
once_cell = "1.7.2"
serde = { version = "1.0.124", default-features = false }
thiserror = "1.0.24"
Expand All @@ -34,6 +35,7 @@ storage-service-types = { path = "../types" }
[dev-dependencies]
anyhow = "1.0.52"
claim = "0.5.0"
mockall = "0.11.0"

aptos-crypto = { path = "../../../crates/aptos-crypto" }
aptos-time-service = { path = "../../../crates/aptos-time-service", features = ["async", "testing"] }
Expand Down
87 changes: 64 additions & 23 deletions state-sync/storage-service/server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@

use crate::{
logging::{LogEntry, LogSchema},
metrics::{increment_counter, start_timer},
metrics::{increment_counter, start_timer, LRU_CACHE_HIT, LRU_CACHE_PROBE},
network::StorageServiceNetworkEvents,
};
use ::network::ProtocolId;
use aptos_config::config::StorageServiceConfig;
use aptos_infallible::RwLock;
use aptos_infallible::{Mutex, RwLock};
use aptos_logger::prelude::*;
use aptos_time_service::{TimeService, TimeServiceTrait};
use aptos_types::{
Expand All @@ -20,6 +20,7 @@ use aptos_types::{
};
use bounded_executor::BoundedExecutor;
use futures::stream::StreamExt;
use lru::LruCache;
use serde::{Deserialize, Serialize};
use std::{sync::Arc, time::Duration};
use storage_interface::DbReader;
Expand Down Expand Up @@ -67,17 +68,20 @@ impl Error {
/// The server-side actor for the storage service. Handles inbound storage
/// service requests from clients.
pub struct StorageServiceServer<T> {
config: StorageServiceConfig,
bounded_executor: BoundedExecutor,
storage: T,
// TODO(philiphayes): would like a "multi-network" stream here, so we only
// need one service for all networks.
config: StorageServiceConfig,
network_requests: StorageServiceNetworkEvents,
storage: T,
time_service: TimeService,

// We maintain a cached storage server summary to avoid hitting the DB for
// every request. This is refreshed periodically.
cached_storage_server_summary: Arc<RwLock<StorageServerSummary>>,

// We maintain a LRU cache for commonly requested data items. This is
// separate from the cached storage summary because these responses
// should never change (the storage summary changes over time).
lru_storage_cache: Arc<Mutex<LruCache<StorageServiceRequest, StorageServiceResponse>>>,
}

impl<T: StorageReaderInterface> StorageServiceServer<T> {
Expand All @@ -91,6 +95,9 @@ impl<T: StorageReaderInterface> StorageServiceServer<T> {
let bounded_executor =
BoundedExecutor::new(config.max_concurrent_requests as usize, executor);
let cached_storage_server_summary = Arc::new(RwLock::new(StorageServerSummary::default()));
let lru_storage_cache = Arc::new(Mutex::new(LruCache::new(
config.max_lru_cache_size as usize,
)));

Self {
config,
Expand All @@ -99,6 +106,7 @@ impl<T: StorageReaderInterface> StorageServiceServer<T> {
network_requests,
time_service,
cached_storage_server_summary,
lru_storage_cache,
}
}

Expand Down Expand Up @@ -158,10 +166,12 @@ impl<T: StorageReaderInterface> StorageServiceServer<T> {
// avoid starving other async tasks on the same runtime.
let storage = self.storage.clone();
let cached_storage_server_summary = self.cached_storage_server_summary.clone();
let lru_storage_cache = self.lru_storage_cache.clone();
self.bounded_executor
.spawn_blocking(move || {
let response = Handler::new(storage, cached_storage_server_summary)
.call(protocol, request);
let response =
Handler::new(storage, cached_storage_server_summary, lru_storage_cache)
.call(protocol, request);
log_storage_response(&response);
response_sender.send(response);
})
Expand Down Expand Up @@ -206,16 +216,19 @@ fn refresh_cached_storage_summary<T: StorageReaderInterface>(
pub struct Handler<T> {
storage: T,
cached_storage_server_summary: Arc<RwLock<StorageServerSummary>>,
lru_storage_cache: Arc<Mutex<LruCache<StorageServiceRequest, StorageServiceResponse>>>,
}

impl<T: StorageReaderInterface> Handler<T> {
pub fn new(
storage: T,
cached_storage_server_summary: Arc<RwLock<StorageServerSummary>>,
lru_storage_cache: Arc<Mutex<LruCache<StorageServiceRequest, StorageServiceResponse>>>,
) -> Self {
Self {
storage,
cached_storage_server_summary,
lru_storage_cache,
}
}

Expand All @@ -240,23 +253,9 @@ impl<T: StorageReaderInterface> Handler<T> {

// Process the request
let response = match &request {
StorageServiceRequest::GetAccountStatesChunkWithProof(request) => {
self.get_account_states_chunk_with_proof(request)
}
StorageServiceRequest::GetEpochEndingLedgerInfos(request) => {
self.get_epoch_ending_ledger_infos(request)
}
StorageServiceRequest::GetNumberOfAccountsAtVersion(version) => {
self.get_number_of_accounts_at_version(*version)
}
StorageServiceRequest::GetServerProtocolVersion => self.get_server_protocol_version(),
StorageServiceRequest::GetStorageServerSummary => self.get_storage_server_summary(),
StorageServiceRequest::GetTransactionOutputsWithProof(request) => {
self.get_transaction_outputs_with_proof(request)
}
StorageServiceRequest::GetTransactionsWithProof(request) => {
self.get_transactions_with_proof(request)
}
_ => self.process_cachable_request(protocol, &request),
};

// Process the response and handle any errors
Expand Down Expand Up @@ -290,6 +289,48 @@ impl<T: StorageReaderInterface> Handler<T> {
}
}

fn process_cachable_request(
&self,
protocol: ProtocolId,
request: &StorageServiceRequest,
) -> Result<StorageServiceResponse, Error> {
increment_counter(&metrics::LRU_CACHE_EVENT, protocol, LRU_CACHE_PROBE.into());

// Check if the response is already in the cache
if let Some(response) = self.lru_storage_cache.lock().get(request) {
increment_counter(&metrics::LRU_CACHE_EVENT, protocol, LRU_CACHE_HIT.into());
return Ok(response.clone());
}

// Fetch the response from storage
let response = match request {
StorageServiceRequest::GetAccountStatesChunkWithProof(request) => {
self.get_account_states_chunk_with_proof(request)
}
StorageServiceRequest::GetEpochEndingLedgerInfos(request) => {
self.get_epoch_ending_ledger_infos(request)
}
StorageServiceRequest::GetNumberOfAccountsAtVersion(version) => {
self.get_number_of_accounts_at_version(*version)
}
StorageServiceRequest::GetTransactionOutputsWithProof(request) => {
self.get_transaction_outputs_with_proof(request)
}
StorageServiceRequest::GetTransactionsWithProof(request) => {
self.get_transactions_with_proof(request)
}
_ => unreachable!("Received an unexpected request: {:?}", request),
}?;

// Cache the response before returning
let _ = self
.lru_storage_cache
.lock()
.put(request.clone(), response.clone());

Ok(response)
}

fn get_account_states_chunk_with_proof(
&self,
request: &AccountStatesChunkWithProofRequest,
Expand Down
14 changes: 14 additions & 0 deletions state-sync/storage-service/server/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,20 @@ use aptos_metrics::{
use network::ProtocolId;
use once_cell::sync::Lazy;

/// Useful metric constants for the storage service
pub const LRU_CACHE_HIT: &str = "lru_cache_hit";
pub const LRU_CACHE_PROBE: &str = "lru_cache_probe";

/// Counter for lru cache events in the storage service (server-side)
pub static LRU_CACHE_EVENT: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_storage_service_server_lru_cache",
"Counters for lru cache events in the storage server",
&["protocol", "event"]
)
.unwrap()
});

/// Counter for pending network events to the storage service (server-side)
pub static PENDING_STORAGE_SERVER_NETWORK_EVENTS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
Expand Down
10 changes: 5 additions & 5 deletions state-sync/storage-service/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub enum StorageServiceMessage {
}

/// A storage service request.
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
pub enum StorageServiceRequest {
GetAccountStatesChunkWithProof(AccountStatesChunkWithProofRequest), // Fetches a list of account states with a proof
GetEpochEndingLedgerInfos(EpochEndingLedgerInfoRequest), // Fetches a list of epoch ending ledger infos
Expand Down Expand Up @@ -229,7 +229,7 @@ impl TryFrom<StorageServiceResponse> for TransactionListWithProof {

/// A storage service request for fetching a list of account states at a
/// specified version.
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
pub struct AccountStatesChunkWithProofRequest {
pub version: u64, // The version to fetch the account states at
pub start_account_index: u64, // The account index to start fetching account states
Expand All @@ -238,7 +238,7 @@ pub struct AccountStatesChunkWithProofRequest {

/// A storage service request for fetching a transaction output list with a
/// corresponding proof.
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
pub struct TransactionOutputsWithProofRequest {
pub proof_version: u64, // The version the proof should be relative to
pub start_version: u64, // The starting version of the transaction output list
Expand All @@ -247,7 +247,7 @@ pub struct TransactionOutputsWithProofRequest {

/// A storage service request for fetching a transaction list with a
/// corresponding proof.
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
pub struct TransactionsWithProofRequest {
pub proof_version: u64, // The version the proof should be relative to
pub start_version: u64, // The starting version of the transaction list
Expand All @@ -256,7 +256,7 @@ pub struct TransactionsWithProofRequest {
}

/// A storage service request for fetching a list of epoch ending ledger infos.
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[derive(Clone, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
pub struct EpochEndingLedgerInfoRequest {
pub start_epoch: u64,
pub expected_end_epoch: u64,
Expand Down

0 comments on commit b71475c

Please sign in to comment.