Skip to content

Commit

Permalink
[Client Refactoring 2] Add a stateless version of execution_transacti…
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind authored Feb 4, 2022
1 parent 47d2a6a commit 936666d
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 30 deletions.
8 changes: 4 additions & 4 deletions fastpay_core/src/authority_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use fastx_types::{error::FastPayError, messages::*, serialize::*};
#[async_trait]
pub trait AuthorityAPI {
/// Initiate a new order to a FastPay or Primary account.
async fn handle_order(&mut self, order: Order) -> Result<OrderInfoResponse, FastPayError>;
async fn handle_order(&self, order: Order) -> Result<OrderInfoResponse, FastPayError>;

/// Confirm an order to a FastPay or Primary account.
async fn handle_confirmation_order(
&mut self,
&self,
order: ConfirmationOrder,
) -> Result<OrderInfoResponse, FastPayError>;

Expand Down Expand Up @@ -47,14 +47,14 @@ impl AuthorityClient {
#[async_trait]
impl AuthorityAPI for AuthorityClient {
/// Initiate a new transfer to a FastPay or Primary account.
async fn handle_order(&mut self, order: Order) -> Result<OrderInfoResponse, FastPayError> {
async fn handle_order(&self, order: Order) -> Result<OrderInfoResponse, FastPayError> {
let response = self.0.send_recv_bytes(serialize_order(&order)).await?;
deserialize_order_info(response)
}

/// Confirm a transfer to a FastPay or Primary account.
async fn handle_confirmation_order(
&mut self,
&self,
order: ConfirmationOrder,
) -> Result<OrderInfoResponse, FastPayError> {
let response = self
Expand Down
52 changes: 28 additions & 24 deletions fastpay_core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,8 @@ where
source_authority: AuthorityName,
destination_authority: AuthorityName,
) -> Result<(), FastPayError> {
let mut source_client = self.authority_clients[&source_authority].clone();
let mut destination_client = self.authority_clients[&destination_authority].clone();
let source_client = self.authority_clients[&source_authority].clone();
let destination_client = self.authority_clients[&destination_authority].clone();

// This represents a stack of certificates that we need to register with the
// destination authority. The stack is a LIFO queue, and therefore later insertions
Expand Down Expand Up @@ -617,17 +617,14 @@ where
}

/// Execute a sequence of actions in parallel for a quorum of authorities.
async fn communicate_with_quorum<'a, V, F>(
&'a mut self,
execute: F,
) -> Result<Vec<V>, FastPayError>
async fn communicate_with_quorum<'a, V, F>(&'a self, execute: F) -> Result<Vec<V>, FastPayError>
where
F: Fn(AuthorityName, &'a mut A) -> AsyncResult<'a, V, FastPayError> + Clone,
F: Fn(AuthorityName, &'a A) -> AsyncResult<'a, V, FastPayError> + Clone,
{
let committee = &self.committee;
let authority_clients = &mut self.authority_clients;
let authority_clients = &self.authority_clients;
let mut responses: futures::stream::FuturesUnordered<_> = authority_clients
.iter_mut()
.iter()
.map(|(name, client)| {
let execute = execute.clone();
async move { (*name, execute(*name, client).await) }
Expand Down Expand Up @@ -667,7 +664,7 @@ where

/// Broadcast missing confirmation orders and invoke handle_order on each authority client.
async fn broadcast_and_handle_order(
&mut self,
&self,
order: Order,
) -> Result<(Vec<(CertifiedOrder, OrderInfoResponse)>, CertifiedOrder), anyhow::Error> {
for object_kind in &order.input_objects() {
Expand Down Expand Up @@ -723,12 +720,12 @@ where
// potentially many dependencies that need to be satisfied, not just a
// list.
async fn broadcast_and_execute<'a, V, F: 'a>(
&'a mut self,
&'a self,
certificates_to_broadcast: Vec<CertifiedOrder>,
action: F,
) -> Result<(Vec<(CertifiedOrder, OrderInfoResponse)>, Vec<V>), anyhow::Error>
where
F: Fn(AuthorityName, &'a mut A) -> AsyncResult<'a, V, FastPayError> + Send + Sync + Copy,
F: Fn(AuthorityName, &'a A) -> AsyncResult<'a, V, FastPayError> + Send + Sync + Copy,
V: Copy,
{
let result = self
Expand Down Expand Up @@ -852,7 +849,7 @@ where
/// Broadcast confirmation orders.
/// The corresponding sequence numbers should be consecutive and increasing.
async fn broadcast_confirmation_orders(
&mut self,
&self,
certificates_to_broadcast: Vec<CertifiedOrder>,
) -> Result<Vec<(CertifiedOrder, OrderInfoResponse)>, anyhow::Error> {
self.broadcast_and_execute(certificates_to_broadcast, |_, _| Box::pin(async { Ok(()) }))
Expand Down Expand Up @@ -979,6 +976,19 @@ where
self.update_authority_certificates(*order.sender(), &inputs, known_certificates)
.await?;

let (new_certificate, response) = self.execute_transaction_stateless(order).await?;

// Update local data using new order response.
self.update_objects_from_order_info(response.clone())
.await?;

Ok((new_certificate, response.signed_effects.unwrap().effects))
}

async fn execute_transaction_stateless(
&self,
order: Order,
) -> Result<(CertifiedOrder, OrderInfoResponse), anyhow::Error> {
let new_certificate = self.execute_transaction_without_confirmation(order).await?;

// Confirm last transfer certificate if needed.
Expand All @@ -992,11 +1002,7 @@ where
.find(|(cert, _)| cert.order == new_certificate.order)
.ok_or(FastPayError::ErrorWhileRequestingInformation)?;

// Update local data using new order response.
self.update_objects_from_order_info(response.clone())
.await?;

Ok((new_certificate, response.signed_effects.unwrap().effects))
Ok((new_certificate, response))
}

/// Returns true if this pending order's input objects are locked by another unconfirmed order
Expand Down Expand Up @@ -1045,7 +1051,7 @@ where

/// Execute (or retry) an order without confirmation. Update local object states using newly created certificate.
async fn execute_transaction_without_confirmation(
&mut self,
&self,
order: Order,
) -> Result<CertifiedOrder, anyhow::Error> {
// Return error if conflict
Expand All @@ -1063,13 +1069,11 @@ where
self.unlock_pending_order_objects(&order)?;

// order_info_response contains response from broadcasting old unconfirmed order, if any.
let (order_info_responses, new_sent_certificate) = result?;
let (_order_info_responses, new_sent_certificate) = result?;
assert_eq!(&new_sent_certificate.order, &order);
// TODO: Verify that we don't need to update client objects here based on _order_info_responses,
// but can do it at the caller site.

// Update local data using all order response.
for (_, response) in order_info_responses {
self.update_objects_from_order_info(response).await?;
}
Ok(new_sent_certificate)
}

Expand Down
4 changes: 2 additions & 2 deletions fastpay_core/src/unit_tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ struct LocalAuthorityClient(Arc<Mutex<AuthorityState>>);

#[async_trait]
impl AuthorityAPI for LocalAuthorityClient {
async fn handle_order(&mut self, order: Order) -> Result<OrderInfoResponse, FastPayError> {
async fn handle_order(&self, order: Order) -> Result<OrderInfoResponse, FastPayError> {
let state = self.0.clone();
let result = state.lock().await.handle_order(order).await;
result
}

async fn handle_confirmation_order(
&mut self,
&self,
order: ConfirmationOrder,
) -> Result<OrderInfoResponse, FastPayError> {
let state = self.0.clone();
Expand Down

0 comments on commit 936666d

Please sign in to comment.