Skip to content

Commit

Permalink
[data-client] basic DiemNetDataClient skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
phlip9 authored and bors-libra committed Oct 29, 2021
1 parent ff6de48 commit 8612ceb
Show file tree
Hide file tree
Showing 6 changed files with 254 additions and 1 deletion.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions network/src/application/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ impl PeerMetadataStorage {
Arc::new(peer_metadata_storage)
}

pub fn networks(&self) -> impl Iterator<Item = NetworkId> + '_ {
self.storage.keys().copied()
}

/// Handle common logic of getting a network
fn get_network(&self, network_id: NetworkId) -> &LockingHashMap<AccountAddress, PeerInfo> {
self.storage
Expand Down
4 changes: 4 additions & 0 deletions network/src/application/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ impl PeerInfo {
active_connection: connection_metadata,
}
}

pub fn is_connected(&self) -> bool {
self.status == PeerState::Connected
}
}

/// The current state of a `Peer` at any one time
Expand Down
4 changes: 4 additions & 0 deletions state-sync/diem-data-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@ edition = "2018"
[dependencies]
async-trait = "0.1.42"
futures = "0.3.12"
rand = "0.8.3"
serde = { version = "1.0.124", default-features = false }
thiserror = "1.0.24"

diem-config = { path = "../../config" }
diem-crypto = { path = "../../crypto/crypto" }
diem-types = { path = "../../types" }
diem-workspace-hack = { path = "../../common/workspace-hack" }
network = { path = "../../network" }
storage-service-client = { path = "../storage-service/client" }
storage-service-types = { path = "../storage-service/types" }
206 changes: 206 additions & 0 deletions state-sync/diem-data-client/src/diemnet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crate::{DataClientResponse, DiemDataClient, Error, ResponseError};
use async_trait::async_trait;
use diem_config::network_id::PeerNetworkId;
use network::{application::interface::NetworkInterface, protocols::rpc::error::RpcError};
use rand::seq::SliceRandom;
use std::{
convert::TryInto,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::Duration,
};
use storage_service_client::StorageServiceClient;
use storage_service_types::{
AccountStatesChunkWithProofRequest, EpochEndingLedgerInfoRequest, StorageServiceRequest,
TransactionOutputsWithProofRequest, TransactionsWithProofRequest,
};

// TODO(philiphayes): does this belong in a different crate? I feel like we're
// accumulating a lot of tiny crates though...

// TODO(philiphayes): configuration / pass as argument?
const DEFAULT_TIMEOUT: Duration = Duration::from_millis(10_000);

/// A [`DiemDataClient`] that fulfills requests from remote peers' Storage Service
/// over DiemNet.
///
/// The `DiemNetDataClient`:
///
/// 1. Sends requests to connected DiemNet peers.
/// 2. Does basic type conversions and error handling on the responses.
/// 3. Routes requests to peers that advertise availability for that data.
/// 4. Maintains peer scores based on each peer's observed quality of service
/// and upper client reports of invalid or malicious data.
/// 5. Selects high quality peers to send each request to.
/// 6. Exposes a condensed data summary of our peers' data advertisements.
///
/// The client currently assumes 1-request => 1-response. Streaming responses
/// are handled at an upper layer.
///
/// The client is expected to be cloneable and usable from many concurrent tasks
/// and/or threads.
#[derive(Clone, Debug)]
pub struct DiemNetDataClient {
network: StorageServiceClient,
next_response_id: Arc<AtomicU64>,
}

impl DiemNetDataClient {
pub fn new(network: StorageServiceClient) -> Self {
Self {
network,
next_response_id: Arc::new(AtomicU64::new(0)),
}
}

fn sample_peer(&self) -> Option<PeerNetworkId> {
// very dumb. just get this working e2e
let peer_infos = self.network.peer_metadata_storage();
let all_connected = peer_infos
.networks()
.flat_map(|network_id| {
peer_infos
.read_filtered(network_id, |(_, peer_info)| peer_info.is_connected())
.into_keys()
})
.collect::<Vec<_>>();
all_connected.choose(&mut rand::thread_rng()).copied()
}

// TODO(philiphayes): this should be generic in DiemDataClient
pub async fn send_request(
&self,
// TODO(philiphayes): should be a separate DataClient type
request: StorageServiceRequest,
) -> Result<DataClientResponse, Error> {
let peer = self
.sample_peer()
.ok_or_else(|| Error::DataIsUnavailable("no connected diemnet peers".to_owned()))?;

let result = self
.network
.send_request(peer, request, DEFAULT_TIMEOUT)
.await;

match result {
Ok(response) => Ok(DataClientResponse {
response_id: self.next_response_id.fetch_add(1, Ordering::Relaxed),
response_payload: response.try_into()?,
}),
Err(storage_service_client::Error::RpcError(err)) => match err {
RpcError::NotConnected(_) => Err(Error::DataIsUnavailable(err.to_string())),
RpcError::TimedOut => Err(Error::TimeoutWaitingForResponse(err.to_string())),
_ => Err(Error::UnexpectedErrorEncountered(err.to_string())),
},
Err(storage_service_client::Error::StorageServiceError(err)) => {
Err(Error::UnexpectedErrorEncountered(err.to_string()))
}
}

// TODO(philiphayes): update peer scores on error
}
}

/// (start..=end).len()
fn range_len(start: u64, end: u64) -> Result<u64, Error> {
// len = end - start + 1
let len = end.checked_sub(start).ok_or_else(|| {
Error::InvalidRequest(format!("end ({}) must be >= start ({})", end, start))
})?;
let len = len
.checked_add(1)
.ok_or_else(|| Error::InvalidRequest(format!("end ({}) must not be u64::MAX", end)))?;
Ok(len)
}

#[async_trait]
impl DiemDataClient for DiemNetDataClient {
async fn get_account_states_with_proof(
&self,
version: u64,
start_index: u64,
end_index: u64,
) -> Result<DataClientResponse, Error> {
self.send_request(StorageServiceRequest::GetAccountStatesChunkWithProof(
AccountStatesChunkWithProofRequest {
version,
start_account_index: start_index,
expected_num_account_states: range_len(start_index, end_index)?,
},
))
.await
}

async fn get_epoch_ending_ledger_infos(
&self,
start_epoch: u64,
expected_end_epoch: u64,
) -> Result<DataClientResponse, Error> {
self.send_request(StorageServiceRequest::GetEpochEndingLedgerInfos(
EpochEndingLedgerInfoRequest {
start_epoch,
expected_end_epoch,
},
))
.await
}

fn get_global_data_summary(&self) -> Result<DataClientResponse, Error> {
todo!()
}

async fn get_number_of_account_states(
&self,
version: u64,
) -> Result<DataClientResponse, Error> {
self.send_request(StorageServiceRequest::GetNumberOfAccountsAtVersion(version))
.await
}

async fn get_transaction_outputs_with_proof(
&self,
proof_version: u64,
start_version: u64,
end_version: u64,
) -> Result<DataClientResponse, Error> {
self.send_request(StorageServiceRequest::GetTransactionOutputsWithProof(
TransactionOutputsWithProofRequest {
proof_version,
start_version,
expected_num_outputs: range_len(start_version, end_version)?,
},
))
.await
}

async fn get_transactions_with_proof(
&self,
proof_version: u64,
start_version: u64,
end_version: u64,
include_events: bool,
) -> Result<DataClientResponse, Error> {
self.send_request(StorageServiceRequest::GetTransactionsWithProof(
TransactionsWithProofRequest {
proof_version,
start_version,
expected_num_transactions: range_len(start_version, end_version)?,
include_events,
},
))
.await
}

async fn notify_bad_response(
&self,
_response_id: u64,
_response_error: ResponseError,
) -> Result<(), Error> {
todo!()
}
}
33 changes: 32 additions & 1 deletion state-sync/diem-data-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
// SPDX-License-Identifier: Apache-2.0

#![forbid(unsafe_code)]

use std::convert::TryFrom;

use async_trait::async_trait;
use diem_types::{
account_state_blob::AccountStatesChunkWithProof,
Expand All @@ -12,18 +15,30 @@ use diem_types::{
},
};
use serde::{Deserialize, Serialize};
use storage_service_types::{CompleteDataRange, Epoch};
use storage_service_types::{CompleteDataRange, Epoch, StorageServiceResponse};
use thiserror::Error;

pub mod diemnet;

// TODO(philiphayes): a Error { kind: ErrorKind, inner: BoxError } would be more convenient
/// An error returned by the Diem Data Client for failed API calls.
#[derive(Clone, Debug, Deserialize, Error, PartialEq, Serialize)]
pub enum Error {
#[error("The requested data is unavailable and cannot be found! Error: {0}")]
DataIsUnavailable(String),

#[error("The requested data is too large: {0}")]
DataIsTooLarge(String),

#[error("Invalid request: {0}")]
InvalidRequest(String),

#[error("Invalid response: {0}")]
InvalidResponse(String),

#[error("Timed out waiting for a response: {0}")]
TimeoutWaitingForResponse(String),

#[error("Unexpected error encountered: {0}")]
UnexpectedErrorEncountered(String),
}
Expand Down Expand Up @@ -131,6 +146,22 @@ pub enum DataClientPayload {
TransactionsWithProof(TransactionListWithProof),
}

impl TryFrom<StorageServiceResponse> for DataClientPayload {
type Error = Error;

fn try_from(response: StorageServiceResponse) -> Result<Self, Self::Error> {
match response {
StorageServiceResponse::EpochEndingLedgerInfos(epochs) => Ok(
DataClientPayload::EpochEndingLedgerInfos(epochs.ledger_info_with_sigs),
),
StorageServiceResponse::TransactionsWithProof(txns) => {
Ok(DataClientPayload::TransactionsWithProof(txns))
}
_ => Err(Error::UnexpectedErrorEncountered(format!(""))),
}
}
}

/// A snapshot of the global state of data available in the Diem network.
#[derive(Clone, Debug)]
pub struct GlobalDataSummary {
Expand Down

0 comments on commit 8612ceb

Please sign in to comment.