Skip to content

Commit

Permalink
Move AuthorityClient to a separate file
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind committed Feb 1, 2022
1 parent 9fc5bc3 commit 3c49991
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 96 deletions.
12 changes: 6 additions & 6 deletions fastpay/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#![deny(warnings)]

use fastpay::config::*;
use fastpay_core::client::*;
use fastpay_core::{authority_client::AuthorityClient, client::*};
use fastx_network::{network::NetworkClient, transport};
use fastx_types::{base_types::*, committee::Committee, messages::*, serialize::*};
use move_core_types::transaction_argument::convert_txn_args;
Expand All @@ -26,17 +26,17 @@ fn make_authority_clients(
buffer_size: usize,
send_timeout: std::time::Duration,
recv_timeout: std::time::Duration,
) -> HashMap<AuthorityName, NetworkClient> {
) -> HashMap<AuthorityName, AuthorityClient> {
let mut authority_clients = HashMap::new();
for config in &committee_config.authorities {
let config = config.clone();
let client = NetworkClient::new(
let client = AuthorityClient::new(NetworkClient::new(
config.host,
config.base_port,
buffer_size,
send_timeout,
recv_timeout,
);
));
authority_clients.insert(config.address, client);
}
authority_clients
Expand Down Expand Up @@ -69,7 +69,7 @@ async fn make_client_state_and_try_sync(
buffer_size: usize,
send_timeout: std::time::Duration,
recv_timeout: std::time::Duration,
) -> ClientState<NetworkClient> {
) -> ClientState<AuthorityClient> {
let mut c = make_client_state(
accounts,
committee_config,
Expand All @@ -91,7 +91,7 @@ fn make_client_state(
buffer_size: usize,
send_timeout: std::time::Duration,
recv_timeout: std::time::Duration,
) -> ClientState<NetworkClient> {
) -> ClientState<AuthorityClient> {
let account = accounts.get(&address).expect("Unknown account");
let committee = Committee::new(committee_config.voting_rights());
let authority_clients =
Expand Down
100 changes: 100 additions & 0 deletions fastpay_core/src/authority_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright (c) Facebook, Inc. and its affiliates.
// SPDX-License-Identifier: Apache-2.0

use async_trait::async_trait;
use fastx_network::network::NetworkClient;
use fastx_types::{error::FastPayError, messages::*, serialize::*};

#[async_trait]
pub trait AuthorityAPI {
/// Initiate a new order to a FastPay or Primary account.
async fn handle_order(&mut self, order: Order) -> Result<OrderInfoResponse, FastPayError>;

/// Confirm an order to a FastPay or Primary account.
async fn handle_confirmation_order(
&mut self,
order: ConfirmationOrder,
) -> Result<OrderInfoResponse, FastPayError>;

/// Handle Account information requests for this account.
async fn handle_account_info_request(
&self,
request: AccountInfoRequest,
) -> Result<AccountInfoResponse, FastPayError>;

/// Handle Object information requests for this account.
async fn handle_object_info_request(
&self,
request: ObjectInfoRequest,
) -> Result<ObjectInfoResponse, FastPayError>;

/// Handle Object information requests for this account.
async fn handle_order_info_request(
&self,
request: OrderInfoRequest,
) -> Result<OrderInfoResponse, FastPayError>;
}

#[derive(Clone)]
pub struct AuthorityClient(NetworkClient);

impl AuthorityClient {
pub fn new(network_client: NetworkClient) -> Self {
Self(network_client)
}
}

#[async_trait]
impl AuthorityAPI for AuthorityClient {
/// Initiate a new transfer to a FastPay or Primary account.
async fn handle_order(&mut self, order: Order) -> Result<OrderInfoResponse, FastPayError> {
let response = self.0.send_recv_bytes(serialize_order(&order)).await?;
deserialize_order_info(response)
}

/// Confirm a transfer to a FastPay or Primary account.
async fn handle_confirmation_order(
&mut self,
order: ConfirmationOrder,
) -> Result<OrderInfoResponse, FastPayError> {
let response = self
.0
.send_recv_bytes(serialize_cert(&order.certificate))
.await?;
deserialize_order_info(response)
}

async fn handle_account_info_request(
&self,
request: AccountInfoRequest,
) -> Result<AccountInfoResponse, FastPayError> {
let response = self
.0
.send_recv_bytes(serialize_account_info_request(&request))
.await?;
deserialize_account_info(response)
}

async fn handle_object_info_request(
&self,
request: ObjectInfoRequest,
) -> Result<ObjectInfoResponse, FastPayError> {
let response = self
.0
.send_recv_bytes(serialize_object_info_request(&request))
.await?;
deserialize_object_info(response)
}

/// Handle Object information requests for this account.
async fn handle_order_info_request(
&self,
request: OrderInfoRequest,
) -> Result<OrderInfoResponse, FastPayError> {
let response = self
.0
.send_recv_bytes(serialize_order_info_request(&request))
.await?;
deserialize_order_info(response)
}
}
96 changes: 7 additions & 89 deletions fastpay_core/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
// Copyright (c) Facebook, Inc. and its affiliates.
// SPDX-License-Identifier: Apache-2.0

use crate::downloader::*;
use crate::{authority_client::AuthorityAPI, downloader::*};
use async_trait::async_trait;
use fastx_framework::build_move_package_to_bytes;
use fastx_network::network::NetworkClient;
use fastx_types::{
base_types::*, committee::Committee, error::FastPayError, fp_ensure, messages::*, serialize::*,
base_types::*, committee::Committee, error::FastPayError, fp_ensure, messages::*,
};
use futures::{future, StreamExt, TryFutureExt};
use move_core_types::identifier::Identifier;
Expand Down Expand Up @@ -37,96 +36,15 @@ const AUTHORITY_REQUEST_TIMEOUT: Duration = Duration::from_secs(60);

pub type AsyncResult<'a, T, E> = future::BoxFuture<'a, Result<T, E>>;

#[async_trait]
pub trait AuthorityClient {
/// Initiate a new order to a FastPay or Primary account.
async fn handle_order(&mut self, order: Order) -> Result<OrderInfoResponse, FastPayError>;

/// Confirm an order to a FastPay or Primary account.
async fn handle_confirmation_order(
&mut self,
order: ConfirmationOrder,
) -> Result<OrderInfoResponse, FastPayError>;

/// Handle Account information requests for this account.
async fn handle_account_info_request(
&self,
request: AccountInfoRequest,
) -> Result<AccountInfoResponse, FastPayError>;

/// Handle Object information requests for this account.
async fn handle_object_info_request(
&self,
request: ObjectInfoRequest,
) -> Result<ObjectInfoResponse, FastPayError>;

/// Handle Object information requests for this account.
async fn handle_order_info_request(
&self,
request: OrderInfoRequest,
) -> Result<OrderInfoResponse, FastPayError>;
}

#[async_trait]
impl AuthorityClient for NetworkClient {
/// Initiate a new transfer to a FastPay or Primary account.
async fn handle_order(&mut self, order: Order) -> Result<OrderInfoResponse, FastPayError> {
let response = self.send_recv_bytes(serialize_order(&order)).await?;
deserialize_order_info(response)
}

/// Confirm a transfer to a FastPay or Primary account.
async fn handle_confirmation_order(
&mut self,
order: ConfirmationOrder,
) -> Result<OrderInfoResponse, FastPayError> {
let response = self
.send_recv_bytes(serialize_cert(&order.certificate))
.await?;
deserialize_order_info(response)
}

async fn handle_account_info_request(
&self,
request: AccountInfoRequest,
) -> Result<AccountInfoResponse, FastPayError> {
let response = self
.send_recv_bytes(serialize_account_info_request(&request))
.await?;
deserialize_account_info(response)
}

async fn handle_object_info_request(
&self,
request: ObjectInfoRequest,
) -> Result<ObjectInfoResponse, FastPayError> {
let response = self
.send_recv_bytes(serialize_object_info_request(&request))
.await?;
deserialize_object_info(response)
}

/// Handle Object information requests for this account.
async fn handle_order_info_request(
&self,
request: OrderInfoRequest,
) -> Result<OrderInfoResponse, FastPayError> {
let response = self
.send_recv_bytes(serialize_order_info_request(&request))
.await?;
deserialize_order_info(response)
}
}

pub struct ClientState<AuthorityClient> {
pub struct ClientState<AuthorityAPI> {
/// Our FastPay address.
address: FastPayAddress,
/// Our signature key.
secret: KeyPair,
/// Our FastPay committee.
committee: Committee,
/// How to talk to this committee.
authority_clients: HashMap<AuthorityName, AuthorityClient>,
authority_clients: HashMap<AuthorityName, AuthorityAPI>,
/// Pending transfer.
pending_transfer: Option<Order>,

Expand Down Expand Up @@ -350,7 +268,7 @@ impl<A> CertificateRequester<A> {
#[async_trait]
impl<A> Requester for CertificateRequester<A>
where
A: AuthorityClient + Send + Sync + 'static + Clone,
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
type Key = (ObjectID, SequenceNumber);
type Value = Result<CertifiedOrder, FastPayError>;
Expand Down Expand Up @@ -405,7 +323,7 @@ where

impl<A> ClientState<A>
where
A: AuthorityClient + Send + Sync + 'static + Clone,
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
/// Sync a certificate and all its dependencies to a destination authority, using a
/// source authority to get information about parent certificates.
Expand Down Expand Up @@ -1168,7 +1086,7 @@ where
#[async_trait]
impl<A> Client for ClientState<A>
where
A: AuthorityClient + Send + Sync + Clone + 'static,
A: AuthorityAPI + Send + Sync + Clone + 'static,
{
async fn transfer_object(
&mut self,
Expand Down
1 change: 1 addition & 0 deletions fastpay_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
// SPDX-License-Identifier: Apache-2.0

pub mod authority;
pub mod authority_client;
pub mod client;
pub mod downloader;
2 changes: 1 addition & 1 deletion fastpay_core/src/unit_tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ fn max_files_client_tests() -> i32 {
struct LocalAuthorityClient(Arc<Mutex<AuthorityState>>);

#[async_trait]
impl AuthorityClient for LocalAuthorityClient {
impl AuthorityAPI for LocalAuthorityClient {
async fn handle_order(&mut self, order: Order) -> Result<OrderInfoResponse, FastPayError> {
let state = self.0.clone();
let result = state.lock().await.handle_order(order).await;
Expand Down

0 comments on commit 3c49991

Please sign in to comment.