Skip to content

Commit

Permalink
[fastx client] Create a clean object reading interface (Part 4) (Myst…
Browse files Browse the repository at this point in the history
…enLabs#392)

* Adapt tests to use get_oject less
* Added ObjectRead structure
* get_object_info_execute now modernized
* Remove older read functions
* Comments and use Self
* Address review & add checks

Co-authored-by: George Danezis <[email protected]>
  • Loading branch information
gdanezis and George Danezis authored Feb 9, 2022
1 parent e669843 commit 338fb8b
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 296 deletions.
43 changes: 14 additions & 29 deletions fastpay/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use fastx_types::{base_types::*, committee::Committee, messages::*, serialize::*
use move_core_types::transaction_argument::convert_txn_args;

use bytes::Bytes;
use fastx_types::object::Object;
use fastx_types::object::{Object, ObjectRead};
use futures::stream::StreamExt;
use std::{
collections::{BTreeMap, HashSet},
Expand Down Expand Up @@ -544,15 +544,8 @@ fn main() {
.await;

// Fetch the object info for the object
let obj_info_req = ObjectInfoRequest {
object_id: obj_id,
request_sequence_number: None,
};
if let Some(object) = client_state
.get_object_info(obj_info_req)
.await
.unwrap()
.object()
if let Ok(ObjectRead::Exists(_, object)) =
client_state.get_object_info(obj_id).await
{
println!("Owner: {:#?}", object.owner);
println!("Version: {:#?}", object.version().value());
Expand Down Expand Up @@ -598,15 +591,12 @@ fn main() {
.await;

// Fetch the object info for the package
let package_obj_info_req = ObjectInfoRequest {
object_id: config.package_obj_id,
request_sequence_number: None,
};
let package_obj_info = client_state
.get_object_info(package_obj_info_req)
let package_obj_ref = client_state
.get_object_info(config.package_obj_id)
.await
.unwrap()
.reference()
.unwrap();
let package_obj_ref = package_obj_info.object().unwrap().to_object_reference();

// Fetch the object info for the gas obj
let gas_obj_ref = *client_state
Expand All @@ -618,18 +608,13 @@ fn main() {
let mut object_args_refs = Vec::new();
for obj_id in config.object_args_ids {
// Fetch the obj ref
let obj_info_req = ObjectInfoRequest {
object_id: obj_id,
request_sequence_number: None,
};

let obj_info = client_state.get_object_info(obj_info_req).await.unwrap();
object_args_refs.push(
obj_info
.object()
.unwrap_or_else(|| panic!("Could not find object {:?}", obj_id))
.to_object_reference(),
);
let obj_info_ref = client_state
.get_object_info(obj_id)
.await
.unwrap()
.reference()
.unwrap_or_else(|_| panic!("Could not find object {:?}", obj_id));
object_args_refs.push(obj_info_ref);
}

let pure_args = convert_txn_args(&config.pure_args);
Expand Down
133 changes: 75 additions & 58 deletions fastpay_core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::authority_client::AuthorityAPI;

use fastx_types::object::Object;
use fastx_types::object::{Object, ObjectRead};
use fastx_types::{
base_types::*,
committee::Committee,
Expand Down Expand Up @@ -951,52 +951,6 @@ where
(obj.0.as_ref().unwrap().owner, top_ref.0 .1)
}

/// Execute a sequence of actions in parallel for a quorum of authorities.
async fn communicate_with_quorum<'a, V, F>(&'a self, execute: F) -> Result<Vec<V>, FastPayError>
where
F: Fn(AuthorityName, &'a A) -> AsyncResult<'a, V, FastPayError> + Clone,
{
let committee = &self.committee;
let authority_clients = &self.authority_clients;
let mut responses: futures::stream::FuturesUnordered<_> = authority_clients
.iter()
.map(|(name, client)| {
let execute = execute.clone();
async move { (*name, execute(*name, client).await) }
})
.collect();

let mut values = Vec::new();
let mut value_score = 0;
let mut error_scores = HashMap::new();
while let Some((name, result)) = responses.next().await {
match result {
Ok(value) => {
values.push(value);
value_score += committee.weight(&name);
if value_score >= committee.quorum_threshold() {
// Success!
return Ok(values);
}
}
Err(err) => {
let entry = error_scores.entry(err.clone()).or_insert(0);
*entry += committee.weight(&name);
if *entry >= committee.validity_threshold() {
// At least one honest node returned this error.
// No quorum can be reached, so return early.
return Err(FastPayError::QuorumNotReached {
errors: error_scores.into_keys().collect(),
});
}
}
}
}
Err(FastPayError::QuorumNotReached {
errors: error_scores.into_keys().collect(),
})
}

pub async fn execute_transaction(
&self,
order: &Order,
Expand All @@ -1013,19 +967,82 @@ where

pub async fn get_object_info_execute(
&mut self,
object_info_req: ObjectInfoRequest,
) -> Result<ObjectInfoResponse, anyhow::Error> {
let votes = self
.communicate_with_quorum(|_, client| {
let req = object_info_req.clone();
Box::pin(async move { client.handle_object_info_request(req).await })
})
object_id: ObjectID,
) -> Result<ObjectRead, anyhow::Error> {
let (object_map, cert_map) = self
.get_object_by_id(object_id, AUTHORITY_REQUEST_TIMEOUT)
.await?;
let mut object_ref_stack: Vec<_> = object_map.into_iter().collect();

while let Some(((obj_ref, tx_digest), (obj_option, authorities))) = object_ref_stack.pop() {
let stake: usize = authorities
.iter()
.map(|(name, _)| self.committee.weight(name))
.sum();

// If we have f+1 stake telling us of the latest version of the object, we just accept it.
if stake >= self.committee.validity_threshold() {
if obj_option.is_none() {
return Ok(ObjectRead::Deleted(obj_ref));
} else {
// safe due to check
return Ok(ObjectRead::Exists(obj_ref, obj_option.unwrap()));
}
} else if cert_map.contains_key(&tx_digest) {
// If we have less stake telling us about the latest state of an object
// we re-run the certificate on all authorities to ensure it is correct.
if let Ok(_effects) = self
.process_certificate(cert_map[&tx_digest].clone(), AUTHORITY_REQUEST_TIMEOUT)
.await
{
let mut is_ok = false;

// The mutated or created case
if _effects
.mutated
.iter()
.filter(|(oref, _)| *oref == obj_ref)
.count()
!= 0
|| _effects
.created
.iter()
.filter(|(oref, _)| *oref == obj_ref)
.count()
!= 0
{
is_ok = true;
}

// The deleted case
if obj_ref.2 == OBJECT_DIGEST_DELETED
&& _effects
.deleted
.iter()
.filter(|(id, seq, _)| *id == obj_ref.0 && seq.increment() == obj_ref.1)
.count()
!= 0
{
is_ok = true;
}

if !is_ok {
// Report a byzantine fault here
continue;
}

// NOTE: here we should validate the object is correct from the effects
if obj_option.is_none() {
return Ok(ObjectRead::Deleted(obj_ref));
} else {
// safe due to check
return Ok(ObjectRead::Exists(obj_ref, obj_option.unwrap()));
}
}
}
}

votes
.get(0)
.cloned()
.ok_or_else(|| anyhow::anyhow!("No valid confirmation order votes"))
Ok(ObjectRead::NotExists(object_id))
}

/// Given a list of object refs, download the objects.
Expand Down
31 changes: 6 additions & 25 deletions fastpay_core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use async_trait::async_trait;
use fastx_framework::build_move_package_to_bytes;
use fastx_types::{
base_types::*, committee::Committee, error::FastPayError, fp_ensure, messages::*,
object::ObjectRead,
};
use futures::future;
use itertools::Itertools;
Expand Down Expand Up @@ -87,10 +88,7 @@ pub trait Client {
) -> Result<(CertifiedOrder, OrderEffects), anyhow::Error>;

/// Get the object information
async fn get_object_info(
&mut self,
object_info_req: ObjectInfoRequest,
) -> Result<ObjectInfoResponse, anyhow::Error>;
async fn get_object_info(&mut self, object_id: ObjectID) -> Result<ObjectRead, anyhow::Error>;

/// Get all object we own.
async fn get_owned_objects(&self) -> Vec<ObjectID>;
Expand Down Expand Up @@ -247,20 +245,8 @@ where

#[cfg(test)]
pub async fn get_framework_object_ref(&mut self) -> Result<ObjectRef, anyhow::Error> {
let info = self
.get_object_info(ObjectInfoRequest {
object_id: FASTX_FRAMEWORK_ADDRESS,
request_sequence_number: None,
})
.await?;
let reference = info
.object_and_lock
.ok_or(FastPayError::ObjectNotFound {
object_id: FASTX_FRAMEWORK_ADDRESS,
})?
.object
.to_object_reference();
Ok(reference)
let info = self.get_object_info(FASTX_FRAMEWORK_ADDRESS).await?;
Ok(info.reference()?)
}

async fn execute_transaction_inner(
Expand Down Expand Up @@ -597,13 +583,8 @@ where
self.execute_transaction(move_publish_order).await
}

async fn get_object_info(
&mut self,
object_info_req: ObjectInfoRequest,
) -> Result<ObjectInfoResponse, anyhow::Error> {
self.authorities
.get_object_info_execute(object_info_req)
.await
async fn get_object_info(&mut self, object_id: ObjectID) -> Result<ObjectRead, anyhow::Error> {
self.authorities.get_object_info_execute(object_id).await
}

async fn get_owned_objects(&self) -> Vec<ObjectID> {
Expand Down
Loading

0 comments on commit 338fb8b

Please sign in to comment.