Skip to content

Commit

Permalink
Changed the handler returns to OrderInfoResponse & Added Signed Eff…
Browse files Browse the repository at this point in the history
…ects (MystenLabs#130)

* Changed the handler returns to OrderInfo & Added Signed Effects
* Added idempotency test for cert handling
* Added the status to the signed effects

Co-authored-by: George Danezis <[email protected]>
  • Loading branch information
gdanezis and George Danezis authored Jan 9, 2022
1 parent 6286592 commit 9e6a9bc
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 91 deletions.
12 changes: 6 additions & 6 deletions fastpay/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl MessageHandler for RunningServerState {
.state
.handle_order(*message)
.await
.map(|info| Some(serialize_object_info_response(&info))),
.map(|info| Some(serialize_order_info(&info))),
SerializedMessage::Cert(message) => {
let confirmation_order = ConfirmationOrder {
certificate: message.as_ref().clone(),
Expand All @@ -98,7 +98,7 @@ impl MessageHandler for RunningServerState {
{
Ok(info) => {
// Response
Ok(Some(serialize_object_info_response(&info)))
Ok(Some(serialize_order_info(&info)))
}
Err(error) => Err(error),
}
Expand Down Expand Up @@ -210,9 +210,9 @@ impl Client {

impl AuthorityClient for Client {
/// Initiate a new transfer to a FastPay or Primary account.
fn handle_order(&mut self, order: Order) -> AsyncResult<'_, ObjectInfoResponse, FastPayError> {
fn handle_order(&mut self, order: Order) -> AsyncResult<'_, OrderInfoResponse, FastPayError> {
Box::pin(async move {
self.send_recv_bytes(serialize_order(&order), object_info_deserializer)
self.send_recv_bytes(serialize_order(&order), order_info_deserializer)
.await
})
}
Expand All @@ -221,9 +221,9 @@ impl AuthorityClient for Client {
fn handle_confirmation_order(
&mut self,
order: ConfirmationOrder,
) -> AsyncResult<'_, ObjectInfoResponse, FastPayError> {
) -> AsyncResult<'_, OrderInfoResponse, FastPayError> {
Box::pin(async move {
self.send_recv_bytes(serialize_cert(&order.certificate), object_info_deserializer)
self.send_recv_bytes(serialize_cert(&order.certificate), order_info_deserializer)
.await
})
}
Expand Down
74 changes: 35 additions & 39 deletions fastpay_core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,10 @@ pub struct AuthorityState {
/// Repeating commands should produce no changes and return no error.
impl AuthorityState {
/// Initiate a new order.
pub async fn handle_order(&self, order: Order) -> Result<ObjectInfoResponse, FastPayError> {
pub async fn handle_order(&self, order: Order) -> Result<OrderInfoResponse, FastPayError> {
// Check the sender's signature.
order.check_signature()?;
let transaction_digest = order.digest();

let input_objects = order.input_objects();
let mut mutable_objects = Vec::with_capacity(input_objects.len());
Expand Down Expand Up @@ -149,25 +150,24 @@ impl AuthorityState {
// We have checked that there is a mutable gas object.
debug_assert!(!mutable_objects.is_empty());

let object_id = *order.object_id();
let signed_order = SignedOrder::new(order, self.name, &self.secret);

// Check and write locks, to signed order, into the database
self.set_order_lock(&mutable_objects, signed_order).await?;

// TODO: what should we return here?
let info = self.make_object_info(object_id, None).await?;
Ok(info)
// Return the signed Order or maybe a cert.
self.make_order_info(&transaction_digest).await
}

/// Confirm a transfer.
pub async fn handle_confirmation_order(
&self,
confirmation_order: ConfirmationOrder,
) -> Result<ObjectInfoResponse, FastPayError> {
) -> Result<OrderInfoResponse, FastPayError> {
let certificate = confirmation_order.certificate;
let order = certificate.order.clone();
let mut object_id = *order.object_id();
let transaction_digest = order.digest();

// Check the certificate and retrieve the transfer data.
certificate.check(&self.committee)?;

Expand All @@ -190,7 +190,7 @@ impl AuthorityState {
}
if input_sequence_number > input_seq {
// Transfer was already confirmed.
return self.make_object_info(object_id, None).await;
return self.make_order_info(&transaction_digest).await;
}

inputs.push(input_object.clone());
Expand All @@ -201,10 +201,8 @@ impl AuthorityState {
let mut tx_ctx = TxContext::new(transaction_digest);

// Order-specific logic
//
// TODO: think very carefully what to do in case we throw an Err here.
let mut temporary_store = AuthorityTemporaryStore::new(self, &inputs);
match order.kind {
let _status = match order.kind {
OrderKind::Transfer(t) => {
// unwraps here are safe because we built `inputs`
let mut gas_object = inputs.pop().unwrap();
Expand All @@ -223,12 +221,13 @@ impl AuthorityState {
Address::FastPay(addr) => addr,
});
temporary_store.write_object(output_object);
Ok(())
}
OrderKind::Call(c) => {
// unwraps here are safe because we built `inputs`
let gas_object = inputs.pop().unwrap();
let module = inputs.pop().unwrap();
match adapter::execute(
adapter::execute(
&mut temporary_store,
self.native_functions.clone(),
module,
Expand All @@ -239,44 +238,32 @@ impl AuthorityState {
c.gas_budget,
gas_object,
tx_ctx,
) {
Ok(()) => {
// TODO(https://github.com/MystenLabs/fastnft/issues/63): AccountInfoResponse should return all object ID outputs.
// but for now it only returns one, so use this hack
object_id = temporary_store.written[0].0
}
Err(e) => {
return Err(e);
}
}
)
}

OrderKind::Publish(m) => {
let gas_object = inputs.pop().unwrap();
match adapter::publish(
adapter::publish(
&mut temporary_store,
m.modules,
m.sender,
&mut tx_ctx,
gas_object,
) {
Ok(outputs) => {
// TODO(https://github.com/MystenLabs/fastnft/issues/63): AccountInfoResponse should return all object ID outputs.
// but for now it only returns one, so use this hack
object_id = outputs[0].0;
}
Err(_e) => {
// TODO(https://github.com/MystenLabs/fastnft/issues/63): return this error to the client
object_id = m.gas_payment.0;
}
}
)
}
};

// Update the database in an atomic manner
self.update_state(temporary_store, certificate).await?;
let to_signed_effects = temporary_store.to_signed_effects(
&self.name,
&self.secret,
&transaction_digest,
_status,
);
self.update_state(temporary_store, certificate, to_signed_effects)
.await?;

let info = self.make_object_info(object_id, None).await?;
Ok(info)
self.make_order_info(&transaction_digest).await
}

pub async fn handle_account_info_request(
Expand Down Expand Up @@ -344,8 +331,15 @@ impl AuthorityState {
.expect("TODO: propagate the error")
}

/// Make an information summary of an object to help clients
/// Make an information response for an order
async fn make_order_info(
&self,
transaction_digest: &TransactionDigest,
) -> Result<OrderInfoResponse, FastPayError> {
self._database.get_order_info(transaction_digest)
}

/// Make an information summary of an object to help clients
async fn make_object_info(
&self,
object_id: ObjectID,
Expand Down Expand Up @@ -402,8 +396,10 @@ impl AuthorityState {
&self,
temporary_store: AuthorityTemporaryStore,
certificate: CertifiedOrder,
signed_effects: SignedOrderEffects,
) -> Result<(), FastPayError> {
self._database.update_state(temporary_store, certificate)
self._database
.update_state(temporary_store, certificate, signed_effects)
}

/// Get a read reference to an object/seq lock
Expand Down
32 changes: 32 additions & 0 deletions fastpay_core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub struct AuthorityStore {
signed_orders: DBMap<TransactionDigest, SignedOrder>,
certificates: DBMap<TransactionDigest, CertifiedOrder>,
parent_sync: DBMap<ObjectRef, TransactionDigest>,
signed_effects: DBMap<TransactionDigest, SignedOrderEffects>,
check_and_write_lock: Mutex<()>,
}

Expand All @@ -27,6 +28,7 @@ impl AuthorityStore {
"signed_orders",
"certificates",
"parent_sync",
"signed_effects",
],
)
.expect("Cannot open DB.");
Expand All @@ -36,6 +38,7 @@ impl AuthorityStore {
signed_orders: DBMap::reopen(&db, Some("signed_orders")).expect("Cannot open CF."),
certificates: DBMap::reopen(&db, Some("certificates")).expect("Cannot open CF."),
parent_sync: DBMap::reopen(&db, Some("parent_sync")).expect("Cannot open CF."),
signed_effects: DBMap::reopen(&db, Some("signed_effects")).expect("Cannot open CF."),
check_and_write_lock: Mutex::new(()),
}
}
Expand All @@ -55,6 +58,26 @@ impl AuthorityStore {
.collect())
}

pub fn get_order_info(
&self,
transaction_digest: &TransactionDigest,
) -> Result<OrderInfoResponse, FastPayError> {
Ok(OrderInfoResponse {
signed_order: self
.signed_orders
.get(transaction_digest)
.map_err(|_| FastPayError::StorageError)?,
certified_order: self
.certificates
.get(transaction_digest)
.map_err(|_| FastPayError::StorageError)?,
signed_effects: self
.signed_effects
.get(transaction_digest)
.map_err(|_| FastPayError::StorageError)?,
})
}

/// Read an object and return it, or Err(ObjectNotFound) if the object was not found.
pub fn object_state(&self, object_id: &ObjectID) -> Result<Object, FastPayError> {
self.objects
Expand Down Expand Up @@ -199,6 +222,7 @@ impl AuthorityStore {
&self,
temporary_store: AuthorityTemporaryStore,
certificate: CertifiedOrder,
signed_effects: SignedOrderEffects,
) -> Result<(), FastPayError> {
// TODO: There is a lot of cloning used -- eliminate it.

Expand All @@ -220,6 +244,14 @@ impl AuthorityStore {
)
.map_err(|_| FastPayError::StorageError)?;

// Store the signed effects of the order
write_batch = write_batch
.insert_batch(
&self.signed_effects,
std::iter::once((transaction_digest, signed_effects)),
)
.map_err(|_| FastPayError::StorageError)?;

// Delete objects
write_batch = write_batch
.delete_batch(
Expand Down
22 changes: 22 additions & 0 deletions fastpay_core/src/authority/temporary_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,28 @@ impl AuthorityTemporaryStore {
(self.objects, self.active_inputs, self.written, self.deleted)
}

pub fn to_signed_effects(
&self,
authority_name: &AuthorityName,
secret: &KeyPair,
transaction_digest: &TransactionDigest,
status: Result<(), FastPayError>,
) -> SignedOrderEffects {
let effects = OrderEffects {
status,
transaction_digest: *transaction_digest,
mutated: self.written.clone(),
deleted: self.deleted.clone(),
};
let signature = Signature::new(&effects, secret);

SignedOrderEffects {
effects,
authority: *authority_name,
signature,
}
}

/// An internal check of the invariants (will only fire in debug)
#[cfg(debug_assertions)]
fn check_invariants(&self) {
Expand Down
23 changes: 12 additions & 11 deletions fastpay_core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ pub type AsyncResult<'a, T, E> = future::BoxFuture<'a, Result<T, E>>;

pub trait AuthorityClient {
/// Initiate a new order to a FastPay or Primary account.
fn handle_order(&mut self, order: Order) -> AsyncResult<'_, ObjectInfoResponse, FastPayError>;
fn handle_order(&mut self, order: Order) -> AsyncResult<'_, OrderInfoResponse, FastPayError>;

/// Confirm an order to a FastPay or Primary account.
fn handle_confirmation_order(
&mut self,
order: ConfirmationOrder,
) -> AsyncResult<'_, ObjectInfoResponse, FastPayError>;
) -> AsyncResult<'_, OrderInfoResponse, FastPayError>;

/// Handle Account information requests for this account.
fn handle_account_info_request(
Expand Down Expand Up @@ -427,21 +427,22 @@ where
}
// Send the transfer order (if any) and return a vote.
if let CommunicateAction::SendOrder(order) = action {
let result = client.handle_order(order).await;
return match result {
Ok(ObjectInfoResponse {
pending_confirmation: Some(signed_order),
let result: Result<OrderInfoResponse, FastPayError> =
client.handle_order(order).await;
match result {
Ok(OrderInfoResponse {
signed_order: Some(inner_signed_order),
..
}) => {
fp_ensure!(
signed_order.authority == name,
inner_signed_order.authority == name,
FastPayError::ErrorWhileProcessingTransferOrder
);
signed_order.check(committee)?;
Ok(Some(signed_order))
inner_signed_order.check(committee)?;
return Ok(Some(inner_signed_order));
}
Err(err) => Err(err),
_ => Err(FastPayError::ErrorWhileProcessingTransferOrder),
Err(err) => return Err(err),
_ => return Err(FastPayError::ErrorWhileProcessingTransferOrder),
};
}
Ok(None)
Expand Down
Loading

0 comments on commit 9e6a9bc

Please sign in to comment.