Skip to content

Commit

Permalink
[authority] Small code fixes on the back of profiling (MystenLabs#839)
Browse files Browse the repository at this point in the history
* Integrate OneCell digest
* Removed &mut and copy less
* Changed variable name, and simplified logging

Co-authored-by: George Danezis <[email protected]>
  • Loading branch information
gdanezis and George Danezis authored Mar 18, 2022
1 parent 289c0de commit ce93bd3
Show file tree
Hide file tree
Showing 14 changed files with 108 additions and 104 deletions.
5 changes: 1 addition & 4 deletions sui/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,7 @@ impl ClientServerBenchmark {

if self.benchmark_type != BenchmarkType::TransactionsOnly {
// Make certificate
let mut certificate = CertifiedTransaction {
transaction,
signatures: Vec::with_capacity(committee.quorum_threshold()),
};
let mut certificate = CertifiedTransaction::new(transaction);
for i in 0..committee.quorum_threshold() {
let (pubx, secx) = keys.get(i).unwrap();
let sig = AuthoritySignature::new(&certificate.transaction.data, secx);
Expand Down
41 changes: 18 additions & 23 deletions sui_core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl AuthorityState {
self.batch_channels
.as_ref()
.map(|(_, tx)| tx.subscribe())
.ok_or(SuiError::GenericAuthorityError {
.ok_or_else(|| SuiError::GenericAuthorityError {
error: "No broadcast subscriptions allowed for this authority.".to_string(),
})
}
Expand Down Expand Up @@ -341,9 +341,8 @@ impl AuthorityState {
&self,
confirmation_transaction: ConfirmationTransaction,
) -> SuiResult<TransactionInfoResponse> {
let transaction_digest = *confirmation_transaction.certificate.digest();
let certificate = &confirmation_transaction.certificate;
let transaction = &certificate.transaction;
let transaction_digest = transaction.digest();

// Ensure an idempotent answer.
if self._database.signed_effects_exists(&transaction_digest)? {
Expand All @@ -359,7 +358,7 @@ impl AuthorityState {

async fn check_shared_locks(
&self,
transaction_digest: TransactionDigest,
transaction_digest: &TransactionDigest,
transaction: &Transaction,
inputs: &[(InputObjectKind, Object)],
) -> Result<(), SuiError> {
Expand Down Expand Up @@ -428,15 +427,15 @@ impl AuthorityState {
confirmation_transaction: ConfirmationTransaction,
) -> Result<TransactionInfoResponse, SuiError> {
let certificate = confirmation_transaction.certificate;
let transaction = certificate.transaction.clone();
let transaction_digest = transaction.digest();
let transaction_digest = *certificate.digest();
let transaction = &certificate.transaction;

let objects_by_kind: Vec<_> = self.check_locks(&transaction).await?;
let objects_by_kind: Vec<_> = self.check_locks(transaction).await?;

// At this point we need to check if any shared objects need locks,
// and whether they have them.
let _shared_objects = self
.check_shared_locks(transaction_digest, &transaction, &objects_by_kind)
.check_shared_locks(&transaction_digest, transaction, &objects_by_kind)
.await?;
// inputs.extend(shared_objects);

Expand All @@ -456,14 +455,14 @@ impl AuthorityState {
.collect();

// Insert into the certificates map
let mut tx_ctx = TxContext::new(&transaction.sender_address(), transaction_digest);
let mut tx_ctx = TxContext::new(&transaction.sender_address(), &transaction_digest);

let gas_object_id = transaction.gas_payment_object_ref().0;
let mut temporary_store =
AuthorityTemporaryStore::new(self._database.clone(), &inputs, tx_ctx.digest());
let status = execution_engine::execute_transaction(
&mut temporary_store,
transaction,
transaction.clone(),
inputs,
&mut tx_ctx,
&self.move_vm,
Expand Down Expand Up @@ -503,22 +502,21 @@ impl AuthorityState {
/// called by a single task (ie. the task handling consensus outputs).
pub async fn handle_consensus_certificate(
&self,
certificate: &CertifiedTransaction,
certificate: CertifiedTransaction,
) -> SuiResult<()> {
let transaction = &certificate.transaction;

// Ensure it is a shared object certificate
if !transaction.contains_shared_object() {
if !certificate.transaction.contains_shared_object() {
// TODO: Maybe add a warning here, no respectable authority should
// have sequenced this.
return Ok(());
}

// Ensure it is the first time we see this certificate.
let transaction_digest = transaction.digest();
if self
._database
.sequenced(transaction_digest, transaction.shared_input_objects())?[0]
let transaction_digest = *certificate.digest();
if self._database.sequenced(
&transaction_digest,
certificate.transaction.shared_input_objects(),
)?[0]
.is_some()
{
return Ok(());
Expand All @@ -530,11 +528,8 @@ impl AuthorityState {
// Persist the certificate. We are about to lock one or more shared object.
// We thus need to make sure someone (if not the client) can continue the protocol.
// Also atomically lock the shared objects for this particular transaction.
self._database.persist_certificate_and_lock_shared_objects(
transaction_digest,
transaction,
certificate.clone(),
)
self._database
.persist_certificate_and_lock_shared_objects(&transaction_digest, certificate)
}

pub async fn handle_transaction_info_request(
Expand Down
29 changes: 14 additions & 15 deletions sui_core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,12 +381,12 @@ impl<const ALL_OBJ_VER: bool> SuiDataStore<ALL_OBJ_VER> {
/// Read a lock for a specific (transaction, shared object) pair.
pub fn sequenced(
&self,
transaction_digest: TransactionDigest,
transaction_digest: &TransactionDigest,
object_ids: &[ObjectID],
) -> Result<Vec<Option<SequenceNumber>>, SuiError> {
let keys: Vec<_> = object_ids
.iter()
.map(|objid| (transaction_digest, *objid))
.map(|objid| (*transaction_digest, *objid))
.collect();

self.sequenced.multi_get(&keys[..]).map_err(SuiError::from)
Expand All @@ -395,13 +395,13 @@ impl<const ALL_OBJ_VER: bool> SuiDataStore<ALL_OBJ_VER> {
/// Read a lock for a specific (transaction, shared object) pair.
pub fn all_shared_locks(
&self,
transaction_digest: TransactionDigest,
transaction_digest: &TransactionDigest,
) -> Result<Vec<(ObjectID, SequenceNumber)>, SuiError> {
Ok(self
.sequenced
.iter()
.skip_to(&(transaction_digest, ObjectID::ZERO))?
.take_while(|((tx, _objid), _ver)| *tx == transaction_digest)
.skip_to(&(*transaction_digest, ObjectID::ZERO))?
.take_while(|((tx, _objid), _ver)| tx == transaction_digest)
.map(|((_tx, objid), ver)| (objid, ver))
.collect())
}
Expand Down Expand Up @@ -551,7 +551,7 @@ impl<const ALL_OBJ_VER: bool> SuiDataStore<ALL_OBJ_VER> {
let mut write_batch = self.transaction_lock.batch();

// Store the certificate indexed by transaction digest
let transaction_digest: TransactionDigest = certificate.transaction.digest();
let transaction_digest: &TransactionDigest = certificate.digest();
write_batch = write_batch.insert_batch(
&self.certificates,
std::iter::once((transaction_digest, &certificate)),
Expand All @@ -572,13 +572,13 @@ impl<const ALL_OBJ_VER: bool> SuiDataStore<ALL_OBJ_VER> {

// Safe to unwrap since the "true" flag ensures we get a sequence value back.
let seq: TxSequenceNumber = self
.batch_update_objects(write_batch, temporary_store, transaction_digest, true)?
.batch_update_objects(write_batch, temporary_store, *transaction_digest, true)?
.unwrap();

Ok((
seq,
TransactionInfoResponse {
signed_transaction: self.signed_transactions.get(&transaction_digest)?,
signed_transaction: self.signed_transactions.get(transaction_digest)?,
certified_transaction: Some(certificate),
signed_effects: Some(signed_effects),
},
Expand Down Expand Up @@ -785,13 +785,13 @@ impl<const ALL_OBJ_VER: bool> SuiDataStore<ALL_OBJ_VER> {
pub fn remove_shared_objects_locks(
&self,
mut write_batch: DBBatch,
transaction_digest: TransactionDigest,
transaction_digest: &TransactionDigest,
transaction: &Transaction,
) -> SuiResult<DBBatch> {
let mut sequenced_to_delete = Vec::new();
let mut schedule_to_delete = Vec::new();
for object_id in transaction.shared_input_objects() {
sequenced_to_delete.push((transaction_digest, *object_id));
sequenced_to_delete.push((*transaction_digest, *object_id));
if self.get_object(object_id)?.is_none() {
schedule_to_delete.push(object_id);
}
Expand All @@ -804,17 +804,16 @@ impl<const ALL_OBJ_VER: bool> SuiDataStore<ALL_OBJ_VER> {
/// Lock a sequence number for the shared objects of the input transaction.
pub fn persist_certificate_and_lock_shared_objects(
&self,
transaction_digest: TransactionDigest,
transaction: &Transaction,
transaction_digest: &TransactionDigest,
certificate: CertifiedTransaction,
) -> Result<(), SuiError> {
let certificate_to_write = std::iter::once((transaction_digest, certificate));
let certificate_to_write = std::iter::once((transaction_digest, &certificate));

let mut sequenced_to_write = Vec::new();
let mut schedule_to_write = Vec::new();
for id in transaction.shared_input_objects() {
for id in certificate.transaction.shared_input_objects() {
let version = self.schedule.get(id)?.unwrap_or_default();
sequenced_to_write.push(((transaction_digest, *id), version));
sequenced_to_write.push(((*transaction_digest, *id), version));
let next_version = version.increment();
schedule_to_write.push((id, next_version));
}
Expand Down
9 changes: 5 additions & 4 deletions sui_core/src/authority_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -805,10 +805,11 @@ where
.push((name, inner_signed_transaction.signature));
state.good_stake += weight;
if state.good_stake >= threshold {
state.certificate = Some(CertifiedTransaction {
transaction: transaction_ref.clone(),
signatures: state.signatures.clone(),
});
state.certificate =
Some(CertifiedTransaction::new_with_signatures(
transaction_ref.clone(),
state.signatures.clone(),
));
}
}
// If we get back an error, then we aggregate and check
Expand Down
12 changes: 7 additions & 5 deletions sui_core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,16 +181,18 @@ impl AuthorityServer {
.map(|info| Some(serialize_transaction_info(&info)))
}
SerializedMessage::Cert(message) => {
// Cache the transaction digest
let tx_digest = *message.digest();
// Get the kind
let tx_kind = message.transaction.data.kind_as_str();

let confirmation_transaction = ConfirmationTransaction {
certificate: message.as_ref().clone(),
certificate: *message,
};
let tx_kind = message.transaction.data.kind_as_str();
match self
.state
.handle_confirmation_transaction(confirmation_transaction)
.instrument(tracing::debug_span!("process_cert",
tx_digest =? message.transaction.digest(),
tx_kind))
.instrument(tracing::debug_span!("process_cert", ?tx_digest, tx_kind))
.await
{
Ok(info) => {
Expand Down
3 changes: 1 addition & 2 deletions sui_core/src/consensus_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,10 @@ impl ConsensusHandler {
};

// Process the certificate to set the locks on the shared objects.
let certificate = &confirmation.certificate;
let result = self
.server
.state
.handle_consensus_certificate(certificate)
.handle_consensus_certificate(confirmation.certificate)
.await;
match &result {
// Log the errors that are our faults (not the client's).
Expand Down
10 changes: 5 additions & 5 deletions sui_core/src/unit_tests/authority_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ async fn test_publish_dependent_module_ok() {
let signature = Signature::new(&data, &sender_key);
let transaction = Transaction::new(data, signature);

let dependent_module_id = TxContext::new(&sender, transaction.digest()).fresh_id();
let dependent_module_id = TxContext::new(&sender, &transaction.digest()).fresh_id();

// Object does not exist
assert!(authority
Expand Down Expand Up @@ -456,7 +456,7 @@ async fn test_publish_module_no_dependencies_ok() {
let data = TransactionData::new_module(sender, gas_payment_object_ref, module_bytes, MAX_GAS);
let signature = Signature::new(&data, &sender_key);
let transaction = Transaction::new(data, signature);
let _module_object_id = TxContext::new(&sender, transaction.digest()).fresh_id();
let _module_object_id = TxContext::new(&sender, &transaction.digest()).fresh_id();
let response = send_and_confirm_transaction(&authority, transaction)
.await
.unwrap();
Expand Down Expand Up @@ -1653,13 +1653,13 @@ async fn shared_object() {

// Sequence the certificate to assign a sequence number to the shared object.
authority
.handle_consensus_certificate(&certificate)
.handle_consensus_certificate(certificate)
.await
.unwrap();

let shared_object_version = authority
.db()
.sequenced(transaction_digest, &[shared_object_id])
.sequenced(&transaction_digest, &[shared_object_id])
.unwrap()[0]
.unwrap();
assert_eq!(shared_object_version, SequenceNumber::new());
Expand All @@ -1673,7 +1673,7 @@ async fn shared_object() {

let shared_object_lock = authority
.db()
.sequenced(transaction_digest, &[shared_object_id])
.sequenced(&transaction_digest, &[shared_object_id])
.unwrap()[0];
assert!(shared_object_lock.is_none());

Expand Down
5 changes: 1 addition & 4 deletions sui_core/src/unit_tests/client_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,7 @@ async fn extract_cert(
let stake: usize = votes.iter().map(|(name, _)| committee.weight(name)).sum();
assert!(stake >= committee.quorum_threshold());

CertifiedTransaction {
transaction: transaction.unwrap(),
signatures: votes,
}
CertifiedTransaction::new_with_signatures(transaction.unwrap(), votes)
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion sui_programmability/adapter/src/genesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub fn get_genesis_context() -> TxContext {
}

pub fn get_genesis_context_with_custom_address(address: &SuiAddress) -> TxContext {
TxContext::new(address, TransactionDigest::genesis())
TxContext::new(address, &TransactionDigest::genesis())
}

/// Create and return objects wrapping the genesis modules for sui
Expand Down
17 changes: 6 additions & 11 deletions sui_types/src/base_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pub struct TxContext {
}

impl TxContext {
pub fn new(sender: &SuiAddress, digest: TransactionDigest) -> Self {
pub fn new(sender: &SuiAddress, digest: &TransactionDigest) -> Self {
Self {
sender: AccountAddress::new(sender.0),
digest: digest.0.to_vec(),
Expand Down Expand Up @@ -197,7 +197,7 @@ impl TxContext {
pub fn random_for_testing_only() -> Self {
Self::new(
&SuiAddress::random_for_testing_only(),
TransactionDigest::random(),
&TransactionDigest::random(),
)
}

Expand Down Expand Up @@ -617,21 +617,16 @@ impl TryFrom<String> for ObjectID {
type Error = ObjectIDParseError;

fn try_from(s: String) -> Result<ObjectID, ObjectIDParseError> {
match Self::from_hex(s.clone()) {
Ok(q) => Ok(q),
Err(_) => Self::from_hex_literal(&s),
}
Self::from_hex(s.clone()).or_else(|_| Self::from_hex_literal(&s))
}
}

impl std::str::FromStr for ObjectID {
type Err = ObjectIDParseError;
// Try to match both the literal (0xABC..) and the normal (ABC)

fn from_str(s: &str) -> Result<Self, ObjectIDParseError> {
match Self::from_hex(s) {
Ok(q) => Ok(q),
Err(_) => Self::from_hex_literal(s),
}
// Try to match both the literal (0xABC..) and the normal (ABC)
Self::from_hex(s).or_else(|_| Self::from_hex_literal(s))
}
}

Expand Down
Loading

0 comments on commit ce93bd3

Please sign in to comment.