Skip to content

Commit

Permalink
[fastx client] Adding robust client <-> authorities primitives (Myste…
Browse files Browse the repository at this point in the history
…nLabs#336)

* We include a generic map/reduce pattern for interacting with authorities, in which we provide an async function to query each authority, and then a reduce function to aggregate responses, and adaptively end, continue or timeout the queries.
* We use the abstraction to retrieve objects at all versions that are know by the authorities to be owned by an address.
* We use the abstraction to retrieve all versions of an object known by the authorities and associated certificates.
* We use the abstraction to update authorities to the latest state of objected owned by an address using the latest certs.
* Augmented the authority with an object version in `parent_sync` when the object is deleted, and a link back to the cert that deleted it.
* Changed the semantics of ObjectInfoRequest to return the latest `ObjectRef` for an object id, and the certificate that leads to it, including for deleted objects.

Co-authored-by: George Danezis <[email protected]>
  • Loading branch information
gdanezis and George Danezis authored Feb 6, 2022
1 parent 6971520 commit db9ee35
Show file tree
Hide file tree
Showing 9 changed files with 1,138 additions and 56 deletions.
47 changes: 32 additions & 15 deletions fastpay_core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,26 +394,35 @@ impl AuthorityState {
request: ObjectInfoRequest,
) -> Result<ObjectInfoResponse, FastPayError> {
// Only add a certificate if it is requested
let parent_certificate = if let Some(seq) = request.request_sequence_number {
let ref_and_digest = if let Some(seq) = request.request_sequence_number {
// Get the Transaction Digest that created the object
let parent_iterator = self
.get_parent_iterator(request.object_id, Some(seq))
.await?;
let (_, transaction_digest) =
parent_iterator
.first()
.ok_or(FastPayError::ParentNotfound {
object_id: request.object_id,
sequence: seq,
})?;
// Get the cert from the transaction digest
Some(
self.read_certificate(transaction_digest)
.await?
.ok_or(FastPayError::CertificateNotfound)?,
)

parent_iterator
.first()
.map(|(object_ref, tx_digest)| (*object_ref, *tx_digest))
} else {
None
// Or get the latest object_reference and transaction entry.
self.get_latest_parent_entry(request.object_id).await?
};

let (requested_object_reference, parent_certificate) = match ref_and_digest {
Some((object_ref, transaction_digest)) => (
Some(object_ref),
if transaction_digest == TransactionDigest::genesis() {
None
} else {
// Get the cert from the transaction digest
Some(self.read_certificate(&transaction_digest).await?.ok_or(
FastPayError::CertificateNotfound {
certificate_digest: transaction_digest,
},
)?)
},
),
None => (None, None),
};

// Always attempt to return the latest version of the object and the
Expand All @@ -436,6 +445,7 @@ impl AuthorityState {

Ok(ObjectInfoResponse {
parent_certificate,
requested_object_reference,
object_and_lock,
})
}
Expand Down Expand Up @@ -588,4 +598,11 @@ impl AuthorityState {
self._database.get_parent_iterator(object_id, seq)
}
}

pub async fn get_latest_parent_entry(
&self,
object_id: ObjectID,
) -> Result<Option<(ObjectRef, TransactionDigest)>, FastPayError> {
self._database.get_latest_parent_entry(object_id)
}
}
55 changes: 55 additions & 0 deletions fastpay_core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ pub struct AuthorityStore {

/// The map between the object ref of objects processed at all versions and the transaction
/// digest of the certificate that lead to the creation of this version of the object.
///
/// When an object is deleted we include an entry into this table for its next version and
/// a digest of ObjectDigest::deleted(), along with a link to the transaction that deleted it.
parent_sync: DBMap<ObjectRef, TransactionDigest>,

/// A map between the transaction digest of a certificate that was successfully processed
Expand Down Expand Up @@ -213,6 +216,13 @@ impl AuthorityStore {
self.owner_index
.insert(&(object.owner, object.id()), &object.to_object_reference())?;

// Update the parent
self.parent_sync
.insert(&object.to_object_reference(), &object.previous_transaction)?;

// We only side load objects with a genesis parent transaction.
debug_assert!(object.previous_transaction == TransactionDigest::genesis());

Ok(())
}

Expand Down Expand Up @@ -344,6 +354,21 @@ impl AuthorityStore {
.map(|(_, object)| (object.to_object_reference(), transaction_digest)),
)?;

// Index the certificate by the objects deleted
write_batch = write_batch.insert_batch(
&self.parent_sync,
deleted.iter().map(|object_id| {
(
(
*object_id,
objects[object_id].version().increment(),
ObjectDigest::deleted(),
),
transaction_digest,
)
}),
)?;

// Create locks for new objects, if they are not immutable
write_batch = write_batch.insert_batch(
&self.order_lock,
Expand Down Expand Up @@ -394,4 +419,34 @@ impl AuthorityStore {
signed_effects: Some(signed_effects),
})
}

/// Returns the last entry we have for this object in the parents_sync index used
/// to facilitate client and authority sync. In turn the latest entry provides the
/// latest object_reference, and also the latest tranaction that has interacted with
/// this object.
///
/// This parent_sync index also contains entries for deleted objects (with a digest of
/// ObjectDigest::deleted()), and provides the transaction digest of the certificate
/// that deleted the object. Note that a deleted object may re-appear if the deletion
/// was the result of the object being wrapped in another object.
///
/// If no entry for the object_id is found, return None.
pub fn get_latest_parent_entry(
&self,
object_id: ObjectID,
) -> Result<Option<(ObjectRef, TransactionDigest)>, FastPayError> {
let mut iterator = self
.parent_sync
.iter()
// Make the max possible entry for this object ID.
.skip_prior_to(&(object_id, SEQUENCE_NUMBER_MAX, OBJECT_DIGEST_MAX))?;

Ok(iterator.next().and_then(|(obj_ref, tx_digest)| {
if obj_ref.0 == object_id {
Some((obj_ref, tx_digest))
} else {
None
}
}))
}
}
Loading

0 comments on commit db9ee35

Please sign in to comment.