Skip to content

Commit

Permalink
[fastx client / auth] Add OrderInfoRequest to authority and use it …
Browse files Browse the repository at this point in the history
…for more robust sync. (MystenLabs#311)

* Added dependencies to order effects
* Added a order info request command to authority and client
* Fix bug in previous_transaction + use order info request command for sync
* Fix test_move_calls_chain_many_delete_authority_synchronization test
* Test sync_certificate_to_authority_with_timeout
* Fix corner case in sync.

Co-authored-by: George Danezis <[email protected]>
  • Loading branch information
gdanezis and George Danezis authored Jan 31, 2022
1 parent e544299 commit f06ff78
Showing 8 changed files with 297 additions and 69 deletions.
6 changes: 6 additions & 0 deletions fastpay/src/server_lib.rs
Original file line number Diff line number Diff line change
@@ -108,6 +108,12 @@ impl MessageHandler for RunningServerState {
.handle_object_info_request(*message)
.await
.map(|info| Some(serialize_object_info_response(&info))),
SerializedMessage::OrderInfoReq(message) => self
.server
.state
.handle_order_info_request(*message)
.await
.map(|info| Some(serialize_order_info(&info))),
_ => Err(FastPayError::UnexpectedMessage),
}
}
18 changes: 17 additions & 1 deletion fastpay_core/src/authority.rs
Original file line number Diff line number Diff line change
@@ -235,17 +235,26 @@ impl AuthorityState {
.map(|(_, object)| object)
.collect();

let mut transaction_dependencies: BTreeSet<_> = inputs
.iter()
.map(|object| object.previous_transaction)
.collect();

// Insert into the certificates map
let mut tx_ctx = TxContext::new(order.sender(), transaction_digest);

let gas_object_id = *order.gas_payment_object_id();
let (temporary_store, status) = self.execute_order(order, inputs, &mut tx_ctx)?;

// Remove from dependencies the generic hash
transaction_dependencies.remove(&TransactionDigest::genesis());

// Update the database in an atomic manner
let to_signed_effects = temporary_store.to_signed_effects(
&self.name,
&self.secret,
&transaction_digest,
transaction_dependencies.into_iter().collect(),
status,
&gas_object_id,
);
@@ -259,7 +268,7 @@ impl AuthorityState {
mut inputs: Vec<Object>,
tx_ctx: &mut TxContext,
) -> FastPayResult<(AuthorityTemporaryStore, ExecutionStatus)> {
let mut temporary_store = AuthorityTemporaryStore::new(self, &inputs);
let mut temporary_store = AuthorityTemporaryStore::new(self, &inputs, tx_ctx.digest());
// unwraps here are safe because we built `inputs`
let mut gas_object = inputs.pop().unwrap();

@@ -345,6 +354,13 @@ impl AuthorityState {
Ok(ExecutionStatus::Success)
}

pub async fn handle_order_info_request(
&self,
request: OrderInfoRequest,
) -> Result<OrderInfoResponse, FastPayError> {
self.make_order_info(&request.transaction_digest).await
}

pub async fn handle_account_info_request(
&self,
request: AccountInfoRequest,
10 changes: 9 additions & 1 deletion fastpay_core/src/authority/temporary_store.rs
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ pub type InnerTemporaryStore = (

pub struct AuthorityTemporaryStore {
object_store: Arc<AuthorityStore>,
tx_digest: TransactionDigest,
objects: BTreeMap<ObjectID, Object>,
active_inputs: Vec<ObjectRef>, // Inputs that are not read only
// TODO: We need to study whether it's worth to optimize the lookup of
@@ -29,9 +30,11 @@ impl AuthorityTemporaryStore {
pub fn new(
authority_state: &AuthorityState,
_input_objects: &'_ [Object],
tx_digest: TransactionDigest,
) -> AuthorityTemporaryStore {
AuthorityTemporaryStore {
object_store: authority_state._database.clone(),
tx_digest,
objects: _input_objects.iter().map(|v| (v.id(), v.clone())).collect(),
active_inputs: _input_objects
.iter()
@@ -92,6 +95,7 @@ impl AuthorityTemporaryStore {
authority_name: &AuthorityName,
secret: &KeyPair,
transaction_digest: &TransactionDigest,
transaction_dependencies: Vec<TransactionDigest>,
status: ExecutionStatus,
gas_object_id: &ObjectID,
) -> SignedOrderEffects {
@@ -119,6 +123,7 @@ impl AuthorityTemporaryStore {
.collect(),
gas_object: (gas_object.to_object_reference(), gas_object.owner),
events: self.events.clone(),
dependencies: transaction_dependencies,
};
let signature = Signature::new(&effects, secret);

@@ -196,7 +201,7 @@ impl Storage for AuthorityTemporaryStore {
caller.
*/

fn write_object(&mut self, object: Object) {
fn write_object(&mut self, mut object: Object) {
// Check it is not read-only
#[cfg(test)] // Movevm should ensure this
if let Some(existing_object) = self.read_object(&object.id()) {
@@ -207,6 +212,9 @@ impl Storage for AuthorityTemporaryStore {
}
}

// The adapter is not very disciplined at filling in the correct
// previous transaction digest, so we ensure it is correct here.
object.previous_transaction = self.tx_digest;
self.written.insert(object.id(), object);
}

90 changes: 63 additions & 27 deletions fastpay_core/src/client.rs
Original file line number Diff line number Diff line change
@@ -59,6 +59,12 @@ pub trait AuthorityClient {
&self,
request: ObjectInfoRequest,
) -> Result<ObjectInfoResponse, FastPayError>;

/// Handle Object information requests for this account.
async fn handle_order_info_request(
&self,
request: OrderInfoRequest,
) -> Result<OrderInfoResponse, FastPayError>;
}

#[async_trait]
@@ -99,6 +105,18 @@ impl AuthorityClient for network::Client {
)
.await
}

/// Handle Object information requests for this account.
async fn handle_order_info_request(
&self,
request: OrderInfoRequest,
) -> Result<OrderInfoResponse, FastPayError> {
self.send_recv_bytes(
serialize_order_info_request(&request),
order_info_deserializer,
)
.await
}
}

pub struct ClientState<AuthorityClient> {
@@ -402,7 +420,7 @@ where
source_authority: AuthorityName,
destination_authority: AuthorityName,
) -> Result<(), FastPayError> {
let source_client = self.authority_clients[&source_authority].clone();
let mut source_client = self.authority_clients[&source_authority].clone();
let mut destination_client = self.authority_clients[&destination_authority].clone();

// This represents a stack of certificates that we need to register with the
@@ -444,40 +462,58 @@ where
// TODO: Eventually the client will store more information, and we could
// first try to read certificates and parents from a local cache before
// asking an authority.
let input_objects = target_cert.certificate.order.input_objects();
// let input_objects = target_cert.certificate.order.input_objects();

let order_info = if missing_certificates.is_empty() {
// Here we cover a corner case due to the nature of using consistent
// broadcast: it is possible for the client to have a certificate
// signed by some authority, before the authority has processed the
// certificate. This can only happen to a certificate for objects
// not used in another certificicate, hence it can only be the case
// for the very first certificate we try to sync. For this reason for
// this one instead of asking for the effects of a previous execution
// we send the cert for execution. Since execution is idempotent this
// is ok.

source_client
.handle_confirmation_order(target_cert.clone())
.await?
} else {
// Unlike the previous case if a certificate created an object that
// was involved in the processing of another certificate the previous
// cert must have been processed, so here we just ask for the effects
// of such an execution.

source_client
.handle_order_info_request(OrderInfoRequest {
transaction_digest: cert_digest,
})
.await?
};

// Put back the target cert
missing_certificates.push(target_cert);
let signed_effects = &order_info
.signed_effects
.ok_or(FastPayError::AuthorityInformationUnavailable)?;

for object_kind in input_objects {
// Request the parent certificate from the authority.
let object_info_response = source_client
.handle_object_info_request(ObjectInfoRequest {
object_id: object_kind.object_id(),
request_sequence_number: Some(object_kind.version()),
})
.await;

let object_info = match object_info_response {
Ok(object_info) => object_info,
// Here we cover the case the object genuinely has no parent.
Err(FastPayError::ParentNotfound { .. }) => {
continue;
}
Err(e) => return Err(e),
};

let returned_certificate = object_info
.parent_certificate
.ok_or(FastPayError::AuthorityInformationUnavailable)?;
let returned_digest = returned_certificate.order.digest();

for returned_digest in &signed_effects.effects.dependencies {
// We check that we are not processing twice the same certificate, as
// it would be common if two objects used by one order, were also both
// mutated by the same preceeding order.
if !candidate_certificates.contains(&returned_digest) {
if !candidate_certificates.contains(returned_digest) {
// Add this cert to the set we have processed
candidate_certificates.insert(returned_digest);
candidate_certificates.insert(*returned_digest);

let inner_order_info = source_client
.handle_order_info_request(OrderInfoRequest {
transaction_digest: *returned_digest,
})
.await?;

let returned_certificate = inner_order_info
.certified_order
.ok_or(FastPayError::AuthorityInformationUnavailable)?;

// Check & Add it to the list of certificates to sync
returned_certificate.check(&self.committee).map_err(|_| {
10 changes: 10 additions & 0 deletions fastpay_core/src/unit_tests/authority_tests.rs
Original file line number Diff line number Diff line change
@@ -886,6 +886,16 @@ async fn test_handle_confirmation_order_idempotent() {

// this is valid because we're checking the authority state does not change the certificate
compare_order_info_responses(&info, &info2);

// Now check the order info request is also the same
let info3 = authority_state
.handle_order_info_request(OrderInfoRequest {
transaction_digest: certified_transfer_order.order.digest(),
})
.await
.unwrap();

compare_order_info_responses(&info, &info3);
}

#[tokio::test]
Loading

0 comments on commit f06ff78

Please sign in to comment.