Skip to content

Commit

Permalink
[State Sync] Add a new Storage Service implemention (server-side!)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind authored and bors-libra committed Sep 17, 2021
1 parent a9975df commit 02d86fc
Show file tree
Hide file tree
Showing 7 changed files with 400 additions and 0 deletions.
26 changes: 26 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ members = [
"state-sync/inter-component/mempool-notifications",
"state-sync/state-sync-v1",
"state-sync/state-sync-v2",
"state-sync/storage-service/server",
"state-sync/storage-service/types",
"storage/accumulator",
"storage/backup/backup-cli",
"storage/backup/backup-service",
Expand Down
30 changes: 30 additions & 0 deletions state-sync/storage-service/server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[package]
name = "storage-service-server"
version = "0.1.0"
authors = ["Diem Association <[email protected]>"]
description = "The Diem storage service (server-side)"
repository = "https://github.com/diem/diem"
homepage = "https://diem.com"
license = "Apache-2.0"
publish = false
edition = "2018"

[dependencies]
serde = { version = "1.0.124", default-features = false }
thiserror = "1.0.24"

diem-infallible = { path = "../../../common/infallible" }
diem-types = { path = "../../../types" }
diem-workspace-hack = { path = "../../../common/workspace-hack" }
storage-interface = { path = "../../../storage/storage-interface" }
storage-service-types = { path = "../types" }

[dev-dependencies]
anyhow = "1.0.38"
bcs = "0.1.2"
claim = "0.5.0"

diem-crypto = { path = "../../../crypto/crypto" }
diem-types = { path = "../../../types" }
move-core-types = { path = "../../../language/move-core/types" }
storage-interface = { path = "../../../storage/storage-interface" }
242 changes: 242 additions & 0 deletions state-sync/storage-service/server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0

#![forbid(unsafe_code)]

use diem_infallible::RwLock;
use diem_types::{epoch_change::EpochChangeProof, transaction::TransactionListWithProof};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use storage_interface::DbReaderWriter;
use storage_service_types::{
DataSummary, EpochEndingLedgerInfoRequest, ProtocolMetadata, ServerProtocolVersion,
StorageServerSummary, StorageServiceError, StorageServiceRequest, StorageServiceResponse,
TransactionsWithProofRequest,
};
use thiserror::Error;

#[cfg(test)]
mod tests;

// TODO(joshlind): make these configurable.
/// Storage server constants.
pub const MAX_TRANSACTION_CHUNK_SIZE: u64 = 1000;
pub const STORAGE_SERVER_VERSION: u64 = 1;

#[derive(Clone, Debug, Deserialize, Error, PartialEq, Serialize)]
pub enum Error {
#[error("Storage error encountered: {0}")]
StorageErrorEncountered(String),
#[error("Unexpected error encountered: {0}")]
UnexpectedErrorEncountered(String),
}

/// The server-side implementation of the storage service. This provides all the
/// functionality required to handle storage service requests (i.e., from clients).
pub struct StorageServiceServer<T> {
storage: T,
}

impl<T: StorageReaderInterface> StorageServiceServer<T> {
pub fn new(storage: T) -> Self {
Self { storage }
}

pub fn handle_request(
&self,
request: StorageServiceRequest,
) -> Result<StorageServiceResponse, Error> {
let response = match request {
StorageServiceRequest::GetEpochEndingLedgerInfos(request) => {
self.get_epoch_ending_ledger_infos(request)
}
StorageServiceRequest::GetServerProtocolVersion => self.get_server_protocol_version(),
StorageServiceRequest::GetStorageServerSummary => self.get_storage_server_summary(),
StorageServiceRequest::GetTransactionsWithProof(request) => {
self.get_transactions_with_proof(request)
}
};

// If any requests resulted in an unexpected error, return an InternalStorageError to the
// client and log the actual error.
if let Err(_error) = response {
// TODO(joshlind): add logging support to this library so we can log _error
Ok(StorageServiceResponse::StorageServiceError(
StorageServiceError::InternalError,
))
} else {
response
}
}

fn get_epoch_ending_ledger_infos(
&self,
request: EpochEndingLedgerInfoRequest,
) -> Result<StorageServiceResponse, Error> {
let epoch_change_proof = self
.storage
.get_epoch_ending_ledger_infos(request.start_epoch, request.expected_end_epoch)?;

Ok(StorageServiceResponse::EpochEndingLedgerInfos(
epoch_change_proof,
))
}

fn get_server_protocol_version(&self) -> Result<StorageServiceResponse, Error> {
let server_protocol_version = ServerProtocolVersion {
protocol_version: STORAGE_SERVER_VERSION,
};
Ok(StorageServiceResponse::ServerProtocolVersion(
server_protocol_version,
))
}

fn get_storage_server_summary(&self) -> Result<StorageServiceResponse, Error> {
let storage_server_summary = StorageServerSummary {
protocol_metadata: ProtocolMetadata {
max_transaction_chunk_size: MAX_TRANSACTION_CHUNK_SIZE,
},
data_summary: self.storage.get_data_summary()?,
};

Ok(StorageServiceResponse::StorageServerSummary(
storage_server_summary,
))
}

fn get_transactions_with_proof(
&self,
request: TransactionsWithProofRequest,
) -> Result<StorageServiceResponse, Error> {
let transactions_with_proof = self.storage.get_transactions_with_proof(
request.proof_version,
request.start_version,
request.expected_num_transactions,
request.include_events,
)?;

Ok(StorageServiceResponse::TransactionsWithProof(
transactions_with_proof,
))
}
}

/// The interface into local storage (e.g., the Diem DB) used by the storage
/// server to handle client requests.
pub trait StorageReaderInterface {
/// Returns a data summary of the underlying storage state.
fn get_data_summary(&self) -> Result<DataSummary, Error>;

/// Returns a list of transactions with a proof relative to the
/// `proof_version`. The transaction list is expected to contain *at most*
/// `expected_num_transactions` and start at `start_version`.
/// If `include_events` is true, events are also returned.
fn get_transactions_with_proof(
&self,
proof_version: u64,
start_version: u64,
expected_num_transactions: u64,
include_events: bool,
) -> Result<TransactionListWithProof, Error>;

/// Returns a list of epoch ending ledger infos, starting at `start_epoch`
/// and ending *at most* at the `expected_end_epoch`.
fn get_epoch_ending_ledger_infos(
&self,
start_epoch: u64,
expected_end_epoch: u64,
) -> Result<EpochChangeProof, Error>;

// TODO(joshlind): support me!
//
// Returns a list of transaction outputs with a proof relative to the
// `proof_version`. The transaction output list is expected to contain
// *at most* `expected_num_transaction_outputs` and start at `start_version`.
//fn get_transaction_outputs_with_proof(
// &self,
// proof_version: u64,
// start_version: u64,
// expected_num_transaction_outputs: u64,
//) -> Result<TransactionOutputListWithProof, Error>;

// TODO(joshlind): support me!
//
// Returns an AccountStateChunk holding a list of account states
// starting at the specified account key with *at most*
// `expected_num_account_states`.
//fn get_account_states_chunk(
// version,
// start_account_key,
// expected_num_account_states: u64,
//) -> Result<AccountStateChunk, Error>
}

/// The underlying implementation of the StorageReaderInterface, used by the
/// storage server.
pub struct StorageReader {
storage: Arc<RwLock<DbReaderWriter>>,
}

impl StorageReader {
pub fn new(storage: Arc<RwLock<DbReaderWriter>>) -> Self {
Self { storage }
}
}

impl StorageReaderInterface for StorageReader {
fn get_data_summary(&self) -> Result<DataSummary, Error> {
// Fetch the latest ledger info
let latest_ledger_info_with_sigs = self
.storage
.read()
.reader
.get_latest_ledger_info()
.map_err(|error| Error::StorageErrorEncountered(error.to_string()))?;
let latest_ledger_info = latest_ledger_info_with_sigs.ledger_info();

// Return the relevant data summary
// TODO(joshlind): Update the DiemDB to support fetching the lowest txn version and epoch!
let data_summary = DataSummary {
highest_transaction_version: latest_ledger_info.version(),
lowest_transaction_version: 0,
highest_epoch: latest_ledger_info.epoch(),
lowest_epoch: 0,
};
Ok(data_summary)
}

fn get_transactions_with_proof(
&self,
proof_version: u64,
start_version: u64,
expected_num_transactions: u64,
include_events: bool,
) -> Result<TransactionListWithProof, Error> {
let transaction_list_with_proof = self
.storage
.read()
.reader
.get_transactions(
start_version,
expected_num_transactions,
proof_version,
include_events,
)
.map_err(|error| Error::StorageErrorEncountered(error.to_string()))?;
Ok(transaction_list_with_proof)
}

fn get_epoch_ending_ledger_infos(
&self,
start_epoch: u64,
expected_end_epoch: u64,
) -> Result<EpochChangeProof, Error> {
let epoch_change_proof = self
.storage
.read()
.reader
.get_epoch_ending_ledger_infos(start_epoch, expected_end_epoch)
.map_err(|error| Error::StorageErrorEncountered(error.to_string()))?;
Ok(epoch_change_proof)
}
}
16 changes: 16 additions & 0 deletions state-sync/storage-service/types/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "storage-service-types"
version = "0.1.0"
authors = ["Diem Association <[email protected]>"]
description = "Common types offered by the Diem storage service"
repository = "https://github.com/diem/diem"
homepage = "https://diem.com"
license = "Apache-2.0"
publish = false
edition = "2018"

[dependencies]
diem-types = { path = "../../../types" }
diem-workspace-hack = { path = "../../../common/workspace-hack" }

[dev-dependencies]
Loading

0 comments on commit 02d86fc

Please sign in to comment.