Skip to content

Commit

Permalink
refactor: rename _database -> database in authority
Browse files Browse the repository at this point in the history
  • Loading branch information
huitseeker committed May 18, 2022
1 parent c2d0dbb commit 915d616
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 37 deletions.
70 changes: 35 additions & 35 deletions sui_core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ pub struct AuthorityState {
move_vm: Arc<MoveVM>,

/// The database
pub(crate) _database: Arc<AuthorityStore>, // TODO: remove pub
pub(crate) database: Arc<AuthorityStore>, // TODO: remove pub

/// The checkpoint store
pub(crate) _checkpoints: Option<Arc<Mutex<CheckpointStore>>>,
Expand Down Expand Up @@ -223,14 +223,14 @@ impl AuthorityState {
) -> Result<TransactionInfoResponse, SuiError> {
let transaction_digest = *transaction.digest();
// Ensure an idempotent answer.
if self._database.transaction_exists(&transaction_digest)? {
if self.database.transaction_exists(&transaction_digest)? {
self.metrics.tx_already_processed.inc();
let transaction_info = self.make_transaction_info(&transaction_digest).await?;
return Ok(transaction_info);
}

let (_gas_status, all_objects) = transaction_input_checker::check_transaction_input(
&self._database,
&self.database,
&transaction,
&self.metrics.shared_obj_tx,
)
Expand Down Expand Up @@ -271,7 +271,7 @@ impl AuthorityState {
// If we see an error, it is possible that a certificate has already been processed.
// In that case, we could still return Ok to avoid showing confusing errors.
Err(err) => {
if self._database.effects_exists(&transaction_digest)? {
if self.database.effects_exists(&transaction_digest)? {
self.metrics.tx_already_processed.inc();
Ok(self.make_transaction_info(&transaction_digest).await?)
} else {
Expand All @@ -290,7 +290,7 @@ impl AuthorityState {
let transaction_digest = *confirmation_transaction.certificate.digest();

// Ensure an idempotent answer.
if self._database.effects_exists(&transaction_digest)? {
if self.database.effects_exists(&transaction_digest)? {
let info = self.make_transaction_info(&transaction_digest).await?;
debug!("Transaction {transaction_digest:?} already executed");
return Ok(info);
Expand Down Expand Up @@ -323,7 +323,7 @@ impl AuthorityState {
);

let shared_locks: HashMap<_, _> = self
._database
.database
.all_shared_locks(transaction_digest)?
.into_iter()
.collect();
Expand Down Expand Up @@ -373,7 +373,7 @@ impl AuthorityState {
let transaction_digest = *certificate.digest();

let (gas_status, objects_by_kind) = transaction_input_checker::check_transaction_input(
&self._database,
&self.database,
&certificate,
&self.metrics.shared_obj_tx,
)
Expand Down Expand Up @@ -413,7 +413,7 @@ impl AuthorityState {
.map(|(_, obj)| obj.previous_transaction)
.collect();
let mut temporary_store = AuthorityTemporaryStore::new(
self._database.clone(),
self.database.clone(),
objects_by_kind,
transaction_digest,
);
Expand Down Expand Up @@ -442,7 +442,7 @@ impl AuthorityState {
.await?;

Ok(TransactionInfoResponse {
signed_transaction: self._database.get_transaction(&transaction_digest)?,
signed_transaction: self.database.get_transaction(&transaction_digest)?,
certified_transaction: Some(certificate),
signed_effects: Some(signed_effects),
})
Expand All @@ -467,7 +467,7 @@ impl AuthorityState {
// Ensure it is the first time we see this certificate.
let transaction_digest = *certificate.digest();
if self
._database
.database
.sequenced(&transaction_digest, certificate.shared_input_objects())?[0]
.is_some()
{
Expand All @@ -484,7 +484,7 @@ impl AuthorityState {
// this function and that the last consensus index is also kept in memory. It is
// thus ok to only persist now (despite this function may have returned earlier).
// In the worst case, the synchronizer of the consensus client will catch up.
self._database
self.database
.persist_certificate_and_lock_shared_objects(certificate, last_consensus_index)
}

Expand All @@ -503,7 +503,7 @@ impl AuthorityState {

// If we already executed this transaction, return the sign effects.
let digest = certificate.digest();
if self._database.effects_exists(digest)? {
if self.database.effects_exists(digest)? {
debug!("Shared-object transaction {digest:?} already executed");
return self.make_transaction_info(digest).await.map(Some);
}
Expand All @@ -512,7 +512,7 @@ impl AuthorityState {
// This can happen to transaction previously submitted to consensus that failed execution
// due to missing dependencies.
match self
._database
.database
.sequenced(digest, certificate.shared_input_objects())?[0]
{
Some(_) => {
Expand Down Expand Up @@ -653,7 +653,7 @@ impl AuthorityState {
};
let end = start + request.length;

let (batches, transactions) = self._database.batches_and_transactions(start, end)?;
let (batches, transactions) = self.database.batches_and_transactions(start, end)?;

let mut dq_batches = std::collections::VecDeque::from(batches);
let mut dq_transactions = std::collections::VecDeque::from(transactions);
Expand Down Expand Up @@ -731,14 +731,14 @@ impl AuthorityState {

// Get all unprocessed checkpoints
for (_seq, batch) in state
._database
.database
.batches
.iter()
.skip_to(&next_expected_tx)
.expect("Seeking batches should never fail at this point")
{
let transactions: Vec<(TxSequenceNumber, TransactionDigest)> = state
._database
.database
.executed_sequence
.iter()
.skip_to(&batch.batch.initial_sequence_number)
Expand Down Expand Up @@ -819,7 +819,7 @@ impl AuthorityState {
adapter::new_move_vm(native_functions)
.expect("We defined natives to not fail here"),
),
_database: store.clone(),
database: store.clone(),
_checkpoints: checkpoints,
batch_channels: tx,
batch_notifier: Arc::new(
Expand All @@ -842,22 +842,22 @@ impl AuthorityState {
}

pub(crate) fn db(&self) -> Arc<AuthorityStore> {
self._database.clone()
self.database.clone()
}

async fn get_object(&self, object_id: &ObjectID) -> Result<Option<Object>, SuiError> {
self._database.get_object(object_id)
self.database.get_object(object_id)
}

pub async fn insert_genesis_object(&self, object: Object) {
self._database
self.database
.insert_genesis_object(object)
.await
.expect("Cannot insert genesis object")
}

pub async fn insert_genesis_objects_bulk_unsafe(&self, objects: &[&Object]) {
self._database
self.database
.bulk_object_insert(objects)
.await
.expect("Cannot bulk insert genesis objects")
Expand Down Expand Up @@ -899,7 +899,7 @@ impl AuthorityState {

debug_assert!(ctx.digest() == TransactionDigest::genesis());
let mut temporary_store =
AuthorityTemporaryStore::new(self._database.clone(), filtered, ctx.digest());
AuthorityTemporaryStore::new(self.database.clone(), filtered, ctx.digest());
let package_id = ObjectID::from(*modules[0].self_id().address());
let natives = self._native_functions.clone();
let mut gas_status = SuiGasStatus::new_unmetered();
Expand Down Expand Up @@ -927,12 +927,12 @@ impl AuthorityState {
&self,
transaction_digest: &TransactionDigest,
) -> Result<TransactionInfoResponse, SuiError> {
self._database
self.database
.get_signed_transaction_info(transaction_digest)
}

fn make_account_info(&self, account: SuiAddress) -> Result<AccountInfoResponse, SuiError> {
self._database
self.database
.get_account_objects(account)
.map(|object_ids| AccountInfoResponse {
object_ids,
Expand All @@ -949,7 +949,7 @@ impl AuthorityState {
mutable_input_objects: &[ObjectRef],
signed_transaction: SignedTransaction,
) -> Result<(), SuiError> {
self._database
self.database
.lock_and_write_transaction(mutable_input_objects, signed_transaction)
.await
}
Expand All @@ -964,7 +964,7 @@ impl AuthorityState {
signed_effects: &SignedTransactionEffects,
) -> SuiResult {
let notifier_ticket = self.batch_notifier.ticket()?;
self._database
self.database
.update_state(
temporary_store,
certificate,
Expand All @@ -980,7 +980,7 @@ impl AuthorityState {
&self,
object_ref: &ObjectRef,
) -> Result<Option<SignedTransaction>, SuiError> {
self._database.get_transaction_envelope(object_ref).await
self.database.get_transaction_envelope(object_ref).await
}

// Helper functions to manage certificates
Expand All @@ -990,11 +990,11 @@ impl AuthorityState {
&self,
digest: &TransactionDigest,
) -> Result<Option<CertifiedTransaction>, SuiError> {
self._database.read_certificate(digest)
self.database.read_certificate(digest)
}

pub async fn parent(&self, object_ref: &ObjectRef) -> Option<TransactionDigest> {
self._database
self.database
.parent(object_ref)
.expect("TODO: propagate the error")
}
Expand All @@ -1003,7 +1003,7 @@ impl AuthorityState {
&self,
_objects: &[ObjectID],
) -> Result<Vec<Option<Object>>, SuiError> {
self._database.get_objects(_objects)
self.database.get_objects(_objects)
}

/// Returns all parents (object_ref and transaction digests) that match an object_id, at
Expand All @@ -1014,23 +1014,23 @@ impl AuthorityState {
seq: Option<SequenceNumber>,
) -> Result<impl Iterator<Item = (ObjectRef, TransactionDigest)> + '_, SuiError> {
{
self._database.get_parent_iterator(object_id, seq)
self.database.get_parent_iterator(object_id, seq)
}
}

pub async fn get_latest_parent_entry(
&self,
object_id: ObjectID,
) -> Result<Option<(ObjectRef, TransactionDigest)>, SuiError> {
self._database.get_latest_parent_entry(object_id)
self.database.get_latest_parent_entry(object_id)
}
}

impl ModuleResolver for AuthorityState {
type Error = SuiError;

fn get_module(&self, module_id: &ModuleId) -> Result<Option<Vec<u8>>, Self::Error> {
self._database.get_module(module_id)
self.database.get_module(module_id)
}
}

Expand All @@ -1048,7 +1048,7 @@ impl ExecutionState for AuthorityState {

// Ensure an idempotent answer.
let digest = certificate.digest();
if self._database.effects_exists(digest)? {
if self.database.effects_exists(digest)? {
let info = self.make_transaction_info(digest).await?;
debug!(tx_digest =? digest, "Shared-object transaction already executed");
return Ok(bincode::serialize(&info).unwrap());
Expand Down Expand Up @@ -1082,6 +1082,6 @@ impl ExecutionState for AuthorityState {
}

async fn load_execution_indices(&self) -> Result<ExecutionIndices, Self::Error> {
self._database.last_consensus_index()
self.database.last_consensus_index()
}
}
4 changes: 2 additions & 2 deletions sui_core/src/authority_active/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ where
},
// Upon receiving a transaction digest we store it, if it is not processed already.
Some(Ok(BatchInfoResponseItem(UpdateItem::Transaction((_seq, _digest))))) => {
if !self.state._database.effects_exists(&_digest)? {
if !self.state.database.effects_exists(&_digest)? {
queue.push(async move {
tokio::time::sleep(Duration::from_millis(EACH_ITEM_DELAY_MS)).await;
_digest
Expand Down Expand Up @@ -223,7 +223,7 @@ where

digest = &mut queue.next() , if !queue.is_empty() => {
let digest = digest.unwrap();
if !self.state._database.effects_exists(&digest)? {
if !self.state.database.effects_exists(&digest)? {
// We still do not have a transaction others have after some time

// Download the certificate
Expand Down

0 comments on commit 915d616

Please sign in to comment.