Skip to content

Commit

Permalink
[fastx client] Add logic for a client to sync a cert to an authority,…
Browse files Browse the repository at this point in the history
… and two authorities. (MystenLabs#285)

* Fix authority idempotency bug, and start work on sync_authority_source_to_destination
* Added safer sync function
* Refined the error returned when signatures fail.
* Add tests for sync, and report BUGs.
* Created a delete test that fails due to a known bug (BUG(MystenLabs#282))

Co-authored-by: George Danezis <[email protected]>
  • Loading branch information
gdanezis and George Danezis authored Jan 29, 2022
1 parent c26969d commit 4a61ac0
Show file tree
Hide file tree
Showing 4 changed files with 618 additions and 15 deletions.
39 changes: 27 additions & 12 deletions fastpay_core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,6 @@ impl AuthorityState {
);

let object = object.ok_or(FastPayError::ObjectNotFound { object_id })?;
fp_ensure!(
object.digest() == object_digest,
FastPayError::InvalidObjectDigest {
object_id,
expected_digest: object_digest
}
);

// Check that the seq number is the same
fp_ensure!(
Expand All @@ -111,6 +104,15 @@ impl AuthorityState {
}
);

// Check the digest matches
fp_ensure!(
object.digest() == object_digest,
FastPayError::InvalidObjectDigest {
object_id,
expected_digest: object_digest
}
);

if object.is_read_only() {
// For a tranfer order, the object to be transferred
// must not be read only.
Expand Down Expand Up @@ -171,6 +173,12 @@ impl AuthorityState {
// Check the certificate and retrieve the transfer data.
certificate.check(&self.committee)?;

// Ensure an idempotent answer
let order_info = self.make_order_info(&transaction_digest).await?;
if order_info.certified_order.is_some() {
return Ok(order_info);
}

let input_objects = order.input_objects();
let ids: Vec<_> = input_objects.iter().map(|(id, _, _)| *id).collect();
// Get a copy of the object.
Expand All @@ -197,6 +205,10 @@ impl AuthorityState {
current_sequence_number: input_sequence_number
});
}

// Note: this should never be true in prod, but some tests
// (test_handle_confirmation_order_bad_sequence_number) do
// a poor job of setting up the DB.
if input_sequence_number > input_seq {
// Transfer was already confirmed.
return self.make_order_info(&transaction_digest).await;
Expand All @@ -210,7 +222,6 @@ impl AuthorityState {
}

// Insert into the certificates map
let transaction_digest = certificate.order.digest();
let mut tx_ctx = TxContext::new(order.sender(), transaction_digest);

let gas_object_id = *order.gas_payment_object_id();
Expand Down Expand Up @@ -321,11 +332,15 @@ impl AuthorityState {
let requested_certificate = if let Some(seq) = request.request_sequence_number {
// Get the Transaction Digest that created the object
let parent_iterator = self
.get_parent_iterator(request.object_id, Some(seq.increment()))
.get_parent_iterator(request.object_id, Some(seq))
.await?;
let (_, transaction_digest) = parent_iterator
.first()
.ok_or(FastPayError::CertificateNotfound)?;
let (_, transaction_digest) =
parent_iterator
.first()
.ok_or(FastPayError::ParentNotfound {
object_id: request.object_id,
sequence: seq,
})?;
// Get the cert from the transaction digest
Some(
self.read_certificate(transaction_digest)
Expand Down
212 changes: 209 additions & 3 deletions fastpay_core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use move_core_types::language_storage::TypeTag;
use rand::seq::SliceRandom;
use typed_store::rocks::open_cf;

use std::collections::HashSet;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::env;
use std::path::Path;
Expand Down Expand Up @@ -266,6 +267,7 @@ impl<A> ClientState<A> {
}
}

#[allow(dead_code)]
#[derive(Clone)]
struct CertificateRequester<A> {
committee: Committee,
Expand Down Expand Up @@ -300,9 +302,18 @@ where
&mut self,
(object_id, sequence_number): (ObjectID, SequenceNumber),
) -> Result<CertifiedOrder, FastPayError> {
// BUG(https://github.com/MystenLabs/fastnft/issues/290): This function assumes that requesting the parent cert of object seq+1 will give the cert of
// that creates the object. This is not true, as objects may be deleted and may not have a seq+1
// to look up.
//
// The authority `handle_object_info_request` is now fixed to return the parent at seq, and not
// seq+1. But a lot of the client code makes the above wrong assumption, and the line above reverts
// query to the old (incorrect) behavious to not break tests everywhere.
let inner_sequence_number = sequence_number.increment();

let request = ObjectInfoRequest {
object_id,
request_sequence_number: Some(sequence_number),
request_sequence_number: Some(inner_sequence_number),
request_received_transfers_excluding_first_nth: None,
};
// Sequentially try each authority in random order.
Expand All @@ -315,14 +326,19 @@ where
.requested_certificate
.expect("Unable to get certificate");
if certificate.check(&self.committee).is_ok() {
// BUG (https://github.com/MystenLabs/fastnft/issues/290): Orders do not have a sequence number any more, objects do.
/*
let order = &certificate.order;
if let Some(sender) = self.sender {
if order.sender() == &sender && order.sequence_number() == sequence_number {
if order.sender() == &sender && order.sequence_number() == inner_sequence_number {
return Ok(certificate.clone());
}
} else {
return Ok(certificate.clone());
}
*/
return Ok(certificate);
}
}
}
Expand All @@ -334,6 +350,183 @@ impl<A> ClientState<A>
where
A: AuthorityClient + Send + Sync + 'static + Clone,
{
/// Sync a certificate and all its dependencies to a destination authority, using a
/// source authority to get information about parent certificates.
///
/// Note: Both source and destination may be byzantine, therefore one should always
/// time limit the call to this function to avoid byzantine authorities consuming
/// an unbounded amount of resources.
async fn sync_authority_source_to_destination(
&self,
cert: ConfirmationOrder,
source_authority: AuthorityName,
destination_authority: AuthorityName,
) -> Result<(), FastPayError> {
let source_client = self.authority_clients[&source_authority].clone();
let mut destination_client = self.authority_clients[&destination_authority].clone();

// This represents a stack of certificates that we need to register with the
// destination authority. The stack is a LIFO queue, and therefore later insertions
// represent certificates that earlier insertions depend on. Thus updating an
// authority in the order we pop() the certificates from this stack should ensure
// certificates are uploaded in causal order.
let digest = cert.certificate.order.digest();
let mut missing_certificates: Vec<_> = vec![cert.clone()];

// We keep a list of certificates already processed to avoid duplicates
let mut candidate_certificates: HashSet<TransactionDigest> =
vec![digest].into_iter().collect();
let mut attempted_certificates: HashSet<TransactionDigest> = HashSet::new();

while let Some(target_cert) = missing_certificates.pop() {
match destination_client
.handle_confirmation_order(target_cert.clone())
.await
{
Ok(_) => continue,
Err(FastPayError::ObjectNotFound { .. })
| Err(FastPayError::MissingEalierConfirmations { .. }) => {}
Err(e) => return Err(e),
}

// If we are here it means that the destination authority is missing
// the previous certificates, so we need to read them from the source
// authority.

// The first time we cannot find the cert from the destination authority
// we try to get its dependencies. But the second time we have already tried
// to update its dependencies, so we should just admit failure.
let cert_digest = target_cert.certificate.order.digest();
if attempted_certificates.contains(&cert_digest) {
return Err(FastPayError::AuthorityInformationUnavailable);
}
attempted_certificates.insert(cert_digest);

// TODO: Eventually the client will store more information, and we could
// first try to read certificates and parents from a local cache before
// asking an authority.
let input_objects = target_cert.certificate.order.input_objects();

// Put back the target cert
missing_certificates.push(target_cert);

for object_ref in input_objects {
// Request the parent certificate from the authority.
let object_info_response = source_client
.handle_object_info_request(ObjectInfoRequest {
object_id: object_ref.0,
request_sequence_number: Some(object_ref.1),
request_received_transfers_excluding_first_nth: None,
})
.await;

let object_info = match object_info_response {
Ok(object_info) => object_info,
// Here we cover the case the object genuinely has no parent.
Err(FastPayError::ParentNotfound { .. }) => {
continue;
}
Err(e) => return Err(e),
};

let returned_certificate = object_info
.requested_certificate
.ok_or(FastPayError::AuthorityInformationUnavailable)?;
let returned_digest = returned_certificate.order.digest();

// We check that we are not processing twice the same certificate, as
// it would be common if two objects used by one order, were also both
// mutated by the same preceeding order.
if !candidate_certificates.contains(&returned_digest) {
// Add this cert to the set we have processed
candidate_certificates.insert(returned_digest);

// Check & Add it to the list of certificates to sync
returned_certificate.check(&self.committee).map_err(|_| {
FastPayError::ByzantineAuthoritySuspicion {
authority: source_authority,
}
})?;
missing_certificates.push(ConfirmationOrder::new(returned_certificate));
}
}
}

Ok(())
}

/// Sync a certificate to an authority.
///
/// This function infers which authorities have the history related to
/// a certificate and attempts `retries` number of them, sampled accoding to
/// stake, in order to bring the destination authority up to date to accept
/// the certificate. The time devoted to each attempt is bounded by
/// `timeout_milliseconds`.
pub async fn sync_certificate_to_authority_with_timeout(
&self,
cert: ConfirmationOrder,
destination_authority: AuthorityName,
timeout_milliseconds: u64,
retries: usize,
) -> Result<(), FastPayError> {
// Extract the set of authorities that should have this certificate
// and its full history. We should be able to use these are source authorities.
let mut candidate_source_authorties: HashSet<AuthorityName> = cert
.certificate
.signatures
.iter()
.map(|(name, _)| *name)
.collect();

// Sample a `retries` number of distinct authorities by stake.
let mut source_authorities: Vec<AuthorityName> = Vec::new();
while source_authorities.len() < retries && !candidate_source_authorties.is_empty() {
// Here we do rejection sampling.
//
// TODO: add a filter parameter to sample, so that we can directly
// sample from a subset which is more efficient.
let sample_authority = self.committee.sample();
if candidate_source_authorties.contains(sample_authority) {
candidate_source_authorties.remove(sample_authority);
source_authorities.push(*sample_authority);
}
}

// Now try to update the destination authority sequentially using
// the source authorities we have sampled.
for source_authority in source_authorities {
// Note: here we could improve this function by passing into the
// `sync_authority_source_to_destination` call a cache of
// certificates and parents to avoid re-downloading them.
if timeout(
Duration::from_millis(timeout_milliseconds),
self.sync_authority_source_to_destination(
cert.clone(),
source_authority,
destination_authority,
),
)
.await
.is_ok()
{
// If the updates suceeds we return, since there is no need
// to try other sources.
return Ok(());
}

// If we are here it means that the update failed, either due to the
// source being faulty or the destination being faulty.
//
// TODO: We should probably be keeping a record of suspected faults
// upon failure to de-prioritize authorities that we have observed being
// less reliable.
}

// Eventually we should add more information to this error about the destination
// and maybe event the certificiate.
Err(FastPayError::AuthorityUpdateFailure)
}

#[cfg(test)]
async fn request_certificate(
&mut self,
Expand Down Expand Up @@ -530,6 +723,10 @@ where
}

/// Broadcast missing confirmation orders and execute provided authority action on each authority.
// BUG(https://github.com/MystenLabs/fastnft/issues/290): This logic for
// updating an authority that is behind is not correct, since we now have
// potentially many dependencies that need to be satisfied, not just a
// list.
async fn broadcast_and_execute<'a, V, F: 'a>(
&'a mut self,
sender: FastPayAddress,
Expand Down Expand Up @@ -591,6 +788,7 @@ where
number = seq.decrement();
}
}

// Send all missing confirmation orders.
missing_certificates.reverse();
missing_certificates.extend(certificates_to_broadcast.clone());
Expand Down Expand Up @@ -938,7 +1136,15 @@ where
let response = self
.get_object_info(ObjectInfoRequest {
object_id: *certificate.order.object_id(),
request_sequence_number: Some(transfer.object_ref.1),
// BUG(https://github.com/MystenLabs/fastnft/issues/290):
// This function assumes that requesting the parent cert of object seq+1 will give the cert of
// that creates the object. This is not true, as objects may be deleted and may not have a seq+1
// to look up.
//
// The authority `handle_object_info_request` is now fixed to return the parent at seq, and not
// seq+1. But a lot of the client code makes the above wrong assumption, and the line above reverts
// query to the old (incorrect) behavious to not break tests everywhere.
request_sequence_number: Some(transfer.object_ref.1.increment()),
request_received_transfers_excluding_first_nth: None,
})
.await?;
Expand Down
Loading

0 comments on commit 4a61ac0

Please sign in to comment.