Skip to content

Commit

Permalink
[verifying-client] Push retries up to Verifying Client
Browse files Browse the repository at this point in the history
+ The main benefit here is: a client that's a bit far behind can retry
the `NeedSync` errors without any extra effort.

Closes: aptos-labs#8885
  • Loading branch information
phlip9 authored and bors-libra committed Sep 1, 2021
1 parent 685f65d commit adaee2f
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 25 deletions.
7 changes: 6 additions & 1 deletion sdk/client/src/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use diem_types::{
};
use move_core_types::move_resource::{MoveResource, MoveStructType};
use serde::{de::DeserializeOwned, Serialize};
use std::time::Duration;
use std::{mem, time::Duration};

// In order to avoid needing to publish the proxy crate to crates.io we simply include the small
// library in inline by making it a module instead of a dependency. 'src/proxy.rs' is a symlink to
Expand All @@ -52,6 +52,11 @@ impl BlockingClient {
}
}

#[allow(dead_code)]
pub(crate) fn take_retry(&mut self) -> Retry {
mem::replace(&mut self.retry, Retry::none())
}

pub fn last_known_state(&self) -> Option<State> {
self.state.last_known_state()
}
Expand Down
6 changes: 5 additions & 1 deletion sdk/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use diem_types::{
use move_core_types::move_resource::{MoveResource, MoveStructType};
use reqwest::Client as ReqwestClient;
use serde::{de::DeserializeOwned, Serialize};
use std::time::Duration;
use std::{mem, time::Duration};

#[derive(Clone, Debug)]
pub struct Client {
Expand Down Expand Up @@ -56,6 +56,10 @@ impl Client {
}
}

pub(crate) fn take_retry(&mut self) -> Retry {
mem::replace(&mut self.retry, Retry::none())
}

pub fn last_known_state(&self) -> Option<State> {
self.state.last_known_state()
}
Expand Down
4 changes: 4 additions & 0 deletions sdk/client/src/retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ impl Retry {
Self { max_retries, delay }
}

pub fn none() -> Self {
Self::new(0, Duration::ZERO)
}

pub fn max_retries(&self) -> u32 {
self.max_retries
}
Expand Down
81 changes: 58 additions & 23 deletions sdk/client/src/verifying_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
error::{Error, Result, WaitForTransactionError},
request::MethodRequest,
response::{MethodResponse, Response},
retry::Retry,
verifying_client::{methods::VerifyingBatch, state_store::StateStore},
};
use diem_crypto::hash::{CryptoHash, HashValue};
Expand All @@ -21,7 +22,6 @@ use diem_types::{
};
use std::{fmt::Debug, time::Duration};

// TODO(philiphayes): figure out retry strategy
// TODO(philiphayes): fill out rest of the methods
// TODO(philiphayes): all clients should validate chain id (allow users to trust-on-first-use or pre-configure)
// TODO(philiphayes): we could abstract the async client so VerifyingClient takes a dyn Trait?
Expand Down Expand Up @@ -56,26 +56,34 @@ use std::{fmt::Debug, time::Duration};
pub struct VerifyingClient<S> {
inner: Client,
state_store: S,
retry: Retry,
}

impl<S: StateStore> VerifyingClient<S> {
// TODO(philiphayes): construct the client ourselves? we probably want to
// control the retries out here. For example, during sync, if we get a stale
// state proof the retry logic should include that and not just fail immediately.
pub fn new(inner: Client, state_store: S) -> Result<Self> {
let this = Self { inner, state_store };
pub fn new(mut inner: Client, state_store: S) -> Result<Self> {
let retry = inner.take_retry();
let this = Self {
inner,
state_store,
retry,
};
this.version()?; // state store must be initialized
Ok(this)
}

pub fn new_with_state(
inner: Client,
mut inner: Client,
state_store: S,
initial_state: &TrustedState,
) -> Result<Self> {
let client = Self { inner, state_store };
client.ratchet_state(Some(initial_state))?;
Ok(client)
let retry = inner.take_retry();
let this = Self {
inner,
state_store,
retry,
};
this.ratchet_state(Some(initial_state))?;
Ok(this)
}

/// Get a snapshot of our current trusted ledger [`Version`].
Expand Down Expand Up @@ -168,17 +176,22 @@ impl<S: StateStore> VerifyingClient<S> {
/// node's current version (unless we experience a verification error or other
/// I/O error).
pub async fn sync(&self) -> Result<()> {
// TODO(philiphayes): retries
while self.sync_one_step().await? {}
Ok(())
self.retry
.retry_async(|| async {
while self.sync_one_step().await? {
// TODO(philiphayes): logging / callback?
}
Ok(())
})
.await
}

/// Issue a single `get_state_proof` request and try to verify it. Returns
/// `Ok(true)` if, after verification, we still need to sync more. Returns
/// `Ok(false)` if we have finished syncing.
pub async fn sync_one_step(&self) -> Result<bool> {
// batch([]) is effectively just a get_state_proof request
match self.batch(vec![]).await {
match self.batch_without_retry(vec![]).await {
Ok(_) => Ok(false),
Err(err) => {
if err.is_need_sync() {
Expand Down Expand Up @@ -218,9 +231,9 @@ impl<S: StateStore> VerifyingClient<S> {
/// connection to a single server, so the broadcasting needs to happen at a
/// higher layer.
pub async fn submit(&self, txn: &SignedTransaction) -> Result<Response<()>> {
self.request(MethodRequest::submit(txn).map_err(Error::request)?)
.await?
.and_then(MethodResponse::try_into_submit)
// TODO(philiphayes): fix retries for txn submit. need to
// avoid the whole verifying client machinery for submits
self.inner.submit(txn).await
}

pub async fn get_metadata_by_version(
Expand Down Expand Up @@ -327,19 +340,41 @@ impl<S: StateStore> VerifyingClient<S> {
.and_then(MethodResponse::try_into_get_network_status)
}

pub fn actual_batch_size(&self, requests: &[MethodRequest]) -> Result<usize> {
Ok(VerifyingBatch::from_batch(requests.to_vec()).num_requests(&self.trusted_state()?))
}

/// Send a single request via `VerifyingClient::batch`.
pub async fn request(&self, request: MethodRequest) -> Result<Response<MethodResponse>> {
let mut responses = self.batch(vec![request]).await?.into_iter();
self.retry
.retry_async(|| self.request_without_retry(request.clone()))
.await
}

pub async fn batch(
&self,
requests: Vec<MethodRequest>,
) -> Result<Vec<Result<Response<MethodResponse>>>> {
self.retry
.retry_async(|| self.batch_without_retry(requests.clone()))
.await
}

//
// Private Helpers
//

async fn request_without_retry(
&self,
request: MethodRequest,
) -> Result<Response<MethodResponse>> {
let mut responses = self.batch_without_retry(vec![request]).await?.into_iter();
responses
.next()
.expect("batch guarantees the correct number of responses")
}

pub fn actual_batch_size(&self, requests: &[MethodRequest]) -> Result<usize> {
Ok(VerifyingBatch::from_batch(requests.to_vec()).num_requests(&self.trusted_state()?))
}

pub async fn batch(
async fn batch_without_retry(
&self,
requests: Vec<MethodRequest>,
) -> Result<Vec<Result<Response<MethodResponse>>>> {
Expand Down

0 comments on commit adaee2f

Please sign in to comment.