Skip to content

Commit

Permalink
[storage] Remove type tempalte on AuthorityStores (MystenLabs#6688)
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind authored Dec 9, 2022
1 parent ce810b8 commit 45efc6a
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 125 deletions.
6 changes: 3 additions & 3 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use tracing::{debug, error, instrument, warn};
use typed_store::Map;

pub use authority_notify_read::EffectsNotifyRead;
pub use authority_store::{AuthorityStore, ResolverWrapper, SuiDataStore, UpdateType};
pub use authority_store::{AuthorityStore, ResolverWrapper, UpdateType};
use narwhal_config::{
Committee as ConsensusCommittee, WorkerCache as ConsensusWorkerCache,
WorkerId as ConsensusWorkerId,
Expand All @@ -55,7 +55,7 @@ use sui_storage::{
IndexStore,
};
use sui_types::committee::EpochId;
use sui_types::crypto::{AuthorityKeyPair, AuthoritySignInfo, NetworkKeyPair};
use sui_types::crypto::{AuthorityKeyPair, NetworkKeyPair};
use sui_types::event::{Event, EventID};
use sui_types::messages_checkpoint::{CheckpointRequest, CheckpointResponse};
use sui_types::object::{Owner, PastObjectRead};
Expand Down Expand Up @@ -1616,7 +1616,7 @@ impl AuthorityState {
self.database.clone()
}

pub fn epoch_store(&self) -> Guard<Arc<AuthorityPerEpochStore<AuthoritySignInfo>>> {
pub fn epoch_store(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
self.database.epoch_store()
}

Expand Down
38 changes: 13 additions & 25 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ use sui_storage::mutex_table::LockGuard;
use sui_storage::write_ahead_log::{DBWriteAheadLog, TxGuard, WriteAheadLog};
use sui_types::base_types::{AuthorityName, EpochId, ObjectID, SequenceNumber, TransactionDigest};
use sui_types::committee::Committee;
use sui_types::crypto::AuthoritySignInfo;
use sui_types::error::{SuiError, SuiResult};
use sui_types::messages::{
ConsensusTransaction, ConsensusTransactionKey, SenderSignedData, SignedTransactionEffects,
TrustedCertificate, VerifiedCertificate,
TrustedCertificate, VerifiedCertificate, VerifiedSignedTransaction,
};
use tracing::debug;
use typed_store::rocks::{DBBatch, DBMap, DBOptions, TypedStoreError};
Expand All @@ -28,7 +29,7 @@ use crate::authority::{CertTxGuard, MAX_TX_RECOVERY_RETRY};
use crate::epoch::reconfiguration::ReconfigState;
use crate::notify_once::NotifyOnce;
use crate::stake_aggregator::StakeAggregator;
use sui_types::message_envelope::{TrustedEnvelope, VerifiedEnvelope};
use sui_types::message_envelope::TrustedEnvelope;
use sui_types::temporary_store::InnerTemporaryStore;
use typed_store::Map;
use typed_store_derive::DBMapUtils;
Expand All @@ -47,9 +48,9 @@ pub struct ExecutionIndicesWithHash {
pub hash: u64,
}

pub struct AuthorityPerEpochStore<S> {
pub struct AuthorityPerEpochStore {
committee: Committee,
tables: AuthorityEpochTables<S>,
tables: AuthorityEpochTables,
/// In-memory cache of the content from the reconfig_state db table.
reconfig_state_mem: RwLock<ReconfigState>,
consensus_notify_read: NotifyRead<ConsensusTransactionKey, ()>,
Expand All @@ -62,10 +63,10 @@ pub struct AuthorityPerEpochStore<S> {

/// AuthorityEpochTables contains tables that contain data that is only valid within an epoch.
#[derive(DBMapUtils)]
pub struct AuthorityEpochTables<S> {
pub struct AuthorityEpochTables {
/// This is map between the transaction digest and transactions found in the `transaction_lock`.
#[default_options_override_fn = "transactions_table_default_config"]
transactions: DBMap<TransactionDigest, TrustedEnvelope<SenderSignedData, S>>,
transactions: DBMap<TransactionDigest, TrustedEnvelope<SenderSignedData, AuthoritySignInfo>>,

/// The two tables below manage shared object locks / versions. There are two ways they can be
/// updated:
Expand Down Expand Up @@ -135,19 +136,12 @@ pub struct AuthorityEpochTables<S> {
final_epoch_checkpoint: DBMap<u64, u64>,
}

impl<S> AuthorityEpochTables<S>
where
S: std::fmt::Debug + Serialize + for<'de> Deserialize<'de>,
{
impl AuthorityEpochTables {
pub fn open(epoch: EpochId, parent_path: &Path, db_options: Option<Options>) -> Self {
Self::open_tables_read_write(
AuthorityEpochTables::<S>::path(epoch, parent_path),
db_options,
None,
)
Self::open_tables_read_write(Self::path(epoch, parent_path), db_options, None)
}

pub fn open_readonly(epoch: EpochId, parent_path: &Path) -> AuthorityEpochTablesReadOnly<S> {
pub fn open_readonly(epoch: EpochId, parent_path: &Path) -> AuthorityEpochTablesReadOnly {
Self::get_read_only_handle(Self::path(epoch, parent_path), None, None)
}

Expand All @@ -164,10 +158,7 @@ where
}
}

impl<S> AuthorityPerEpochStore<S>
where
S: std::fmt::Debug + Serialize + for<'de> Deserialize<'de>,
{
impl AuthorityPerEpochStore {
pub fn new(committee: Committee, parent_path: &Path, db_options: Option<Options>) -> Self {
let epoch_id = committee.epoch;
let tables = AuthorityEpochTables::open(epoch_id, parent_path, db_options);
Expand Down Expand Up @@ -243,10 +234,7 @@ where
)?)
}

pub fn insert_transaction(
&self,
transaction: VerifiedEnvelope<SenderSignedData, S>,
) -> SuiResult {
pub fn insert_transaction(&self, transaction: VerifiedSignedTransaction) -> SuiResult {
Ok(self
.tables
.transactions
Expand All @@ -256,7 +244,7 @@ where
pub fn get_transaction(
&self,
tx_digest: &TransactionDigest,
) -> SuiResult<Option<VerifiedEnvelope<SenderSignedData, S>>> {
) -> SuiResult<Option<VerifiedSignedTransaction>> {
Ok(self.tables.transactions.get(tx_digest)?.map(|t| t.into()))
}

Expand Down
45 changes: 16 additions & 29 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ use sui_storage::{
mutex_table::{LockGuard, MutexTable},
LockService,
};
use sui_types::crypto::AuthoritySignInfo;
use sui_types::message_envelope::VerifiedEnvelope;
use sui_types::object::Owner;
use sui_types::storage::ChildObjectResolver;
use sui_types::{base_types::SequenceNumber, storage::ParentSync};
Expand All @@ -31,8 +29,6 @@ use tracing::{debug, info, trace};
use typed_store::rocks::DBBatch;
use typed_store::traits::Map;

pub type AuthorityStore = SuiDataStore<AuthoritySignInfo>;

const NUM_SHARDS: usize = 4096;
const SHARD_SIZE: usize = 128;

Expand All @@ -42,15 +38,15 @@ const SHARD_SIZE: usize = 128;
/// S is a template on Authority signature state. This allows SuiDataStore to be used on either
/// authorities or non-authorities. Specifically, when storing transactions and effects,
/// S allows SuiDataStore to either store the authority signed version or unsigned version.
pub struct SuiDataStore<S> {
pub struct AuthorityStore {
/// The LockService this store depends on for locking functionality
lock_service: LockService,

/// Internal vector of locks to manage concurrent writes to the database
mutex_table: MutexTable<ObjectDigest>,

pub(crate) perpetual_tables: AuthorityPerpetualTables<S>,
epoch_store: ArcSwap<AuthorityPerEpochStore<S>>,
pub(crate) perpetual_tables: AuthorityPerpetualTables,
epoch_store: ArcSwap<AuthorityPerEpochStore>,

// needed for re-opening epoch db.
path: PathBuf,
Expand All @@ -59,7 +55,7 @@ pub struct SuiDataStore<S> {
pub(crate) effects_notify_read: NotifyRead<TransactionDigest, TransactionEffects>,
}

impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
impl AuthorityStore {
/// Open an authority store by directory path.
/// If the store is empty, initialize it using genesis objects.
pub async fn open(
Expand Down Expand Up @@ -97,7 +93,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
path: &Path,
db_options: Option<Options>,
genesis: &Genesis,
perpetual_tables: AuthorityPerpetualTables<S>,
perpetual_tables: AuthorityPerpetualTables,
committee: Committee,
) -> SuiResult<Self> {
let epoch_tables = Arc::new(AuthorityPerEpochStore::new(
Expand Down Expand Up @@ -146,7 +142,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
previous_store.epoch_terminated();
}

pub fn epoch_store(&self) -> Guard<Arc<AuthorityPerEpochStore<S>>> {
pub fn epoch_store(&self) -> Guard<Arc<AuthorityPerEpochStore>> {
self.epoch_store.load()
}

Expand Down Expand Up @@ -429,7 +425,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
&self,
object_ref: &ObjectRef,
epoch_id: EpochId,
) -> SuiResult<Option<VerifiedEnvelope<SenderSignedData, S>>> {
) -> SuiResult<Option<VerifiedSignedTransaction>> {
let lock_info = self.lock_service.get_lock(*object_ref, epoch_id).await?;
let lock_info = match lock_info {
ObjectLockStatus::LockedAtDifferentVersion { locked_ref } => {
Expand Down Expand Up @@ -621,7 +617,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
&self,
epoch: EpochId,
owned_input_objects: &[ObjectRef],
transaction: VerifiedEnvelope<SenderSignedData, S>,
transaction: VerifiedSignedTransaction,
) -> Result<(), SuiError> {
let tx_digest = *transaction.digest();

Expand All @@ -648,7 +644,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
inner_temporary_store: InnerTemporaryStore,
certificate: &VerifiedCertificate,
proposed_seq: TxSequenceNumber,
effects: &TransactionEffectsEnvelope<S>,
effects: &SignedTransactionEffects,
effects_digest: &TransactionEffectsDigest,
) -> SuiResult<TxSequenceNumber> {
// Extract the new state from the execution
Expand Down Expand Up @@ -721,7 +717,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
inner_temporary_store: InnerTemporaryStore,
transaction_digest: &TransactionDigest,
proposed_seq: TxSequenceNumber,
effects: &TransactionEffectsEnvelope<S>,
effects: &SignedTransactionEffects,
effects_digest: &TransactionEffectsDigest,
) -> SuiResult<TxSequenceNumber> {
// Safe to unwrap since UpdateType::Transaction ensures we get a sequence number back.
Expand Down Expand Up @@ -1281,7 +1277,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
pub fn get_transaction(
&self,
transaction_digest: &TransactionDigest,
) -> SuiResult<Option<VerifiedEnvelope<SenderSignedData, S>>> {
) -> SuiResult<Option<VerifiedSignedTransaction>> {
self.epoch_store().get_transaction(transaction_digest)
}

Expand All @@ -1306,15 +1302,10 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
.collect())
}

pub fn get_sui_system_state_object(&self) -> SuiResult<SuiSystemState>
where
S: Eq + Serialize + for<'de> Deserialize<'de>,
{
pub fn get_sui_system_state_object(&self) -> SuiResult<SuiSystemState> {
self.perpetual_tables.get_sui_system_state_object()
}
}

impl SuiDataStore<AuthoritySignInfo> {
/// Returns true if we have a transaction structure for this transaction digest
pub fn transaction_exists(
&self,
Expand Down Expand Up @@ -1349,9 +1340,7 @@ impl SuiDataStore<AuthoritySignInfo> {
}
}

impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> BackingPackageStore
for SuiDataStore<S>
{
impl BackingPackageStore for AuthorityStore {
fn get_package(&self, package_id: &ObjectID) -> SuiResult<Option<Object>> {
let package = self.get_object(package_id)?;
if let Some(obj) = &package {
Expand All @@ -1366,9 +1355,7 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> BackingPackageStore
}
}

impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> ChildObjectResolver
for SuiDataStore<S>
{
impl ChildObjectResolver for AuthorityStore {
fn read_child_object(&self, parent: &ObjectID, child: &ObjectID) -> SuiResult<Option<Object>> {
let child_object = match self.get_object(child)? {
None => return Ok(None),
Expand All @@ -1386,15 +1373,15 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> ChildObjectResolver
}
}

impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> ParentSync for SuiDataStore<S> {
impl ParentSync for AuthorityStore {
fn get_latest_parent_entry_ref(&self, object_id: ObjectID) -> SuiResult<Option<ObjectRef>> {
Ok(self
.get_latest_parent_entry(object_id)?
.map(|(obj_ref, _)| obj_ref))
}
}

impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> ModuleResolver for SuiDataStore<S> {
impl ModuleResolver for AuthorityStore {
type Error = SuiError;

// TODO: duplicated code with ModuleResolver for InMemoryStorage in memory_storage.rs.
Expand Down
12 changes: 4 additions & 8 deletions crates/sui-core/src/authority/authority_store_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use super::{authority_store::ObjectKey, *};
use rocksdb::Options;
use serde::{Deserialize, Serialize};
use std::path::Path;
use sui_storage::default_db_options;
use sui_types::base_types::{ExecutionDigests, SequenceNumber};
Expand All @@ -16,7 +15,7 @@ use typed_store_derive::DBMapUtils;

/// AuthorityPerpetualTables contains data that must be preserved from one epoch to the next.
#[derive(DBMapUtils)]
pub struct AuthorityPerpetualTables<S> {
pub struct AuthorityPerpetualTables {
/// This is a map between the object (ID, version) and the latest state of the object, namely the
/// state that is needed to process new transactions.
///
Expand Down Expand Up @@ -56,7 +55,7 @@ pub struct AuthorityPerpetualTables<S> {
/// structure is used to ensure we do not double process a certificate, and that we can return
/// the same response for any call after the first (ie. make certificate processing idempotent).
#[default_options_override_fn = "effects_table_default_config"]
pub(crate) executed_effects: DBMap<TransactionDigest, TransactionEffectsEnvelope<S>>,
pub(crate) executed_effects: DBMap<TransactionDigest, SignedTransactionEffects>,

pub(crate) effects: DBMap<TransactionEffectsDigest, TransactionEffects>,

Expand All @@ -81,10 +80,7 @@ pub struct AuthorityPerpetualTables<S> {
pub batches: DBMap<TxSequenceNumber, SignedBatch>,
}

impl<S> AuthorityPerpetualTables<S>
where
S: std::fmt::Debug + Serialize + for<'de> Deserialize<'de>,
{
impl AuthorityPerpetualTables {
pub fn path(parent_path: &Path) -> PathBuf {
parent_path.join("perpetual")
}
Expand All @@ -93,7 +89,7 @@ where
Self::open_tables_read_write(Self::path(parent_path), db_options, None)
}

pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly<S> {
pub fn open_readonly(parent_path: &Path) -> AuthorityPerpetualTablesReadOnly {
Self::get_read_only_handle(Self::path(parent_path), None, None)
}

Expand Down
7 changes: 3 additions & 4 deletions crates/sui-core/src/consensus_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use sui_types::base_types::TransactionDigest;
use sui_types::committee::Committee;
use sui_types::crypto::AuthoritySignInfo;
use sui_types::{
error::{SuiError, SuiResult},
messages::ConsensusTransaction,
Expand Down Expand Up @@ -319,7 +318,7 @@ impl ConsensusAdapter {
fn submit_unchecked(
self: &Arc<Self>,
transaction: ConsensusTransaction,
epoch_store: Arc<AuthorityPerEpochStore<AuthoritySignInfo>>,
epoch_store: Arc<AuthorityPerEpochStore>,
) -> JoinHandle<()> {
// Reconfiguration lock is dropped when pending_consensus_transactions is persisted, before it is handled by consensus
let async_stage = self.clone().submit_and_wait(transaction, epoch_store);
Expand All @@ -331,7 +330,7 @@ impl ConsensusAdapter {
async fn submit_and_wait(
self: Arc<Self>,
transaction: ConsensusTransaction,
epoch_store: Arc<AuthorityPerEpochStore<AuthoritySignInfo>>,
epoch_store: Arc<AuthorityPerEpochStore>,
) {
let epoch_terminated = epoch_store.wait_epoch_terminated().boxed();
let submit_and_wait = self
Expand All @@ -348,7 +347,7 @@ impl ConsensusAdapter {
async fn submit_and_wait_inner(
self: Arc<Self>,
transaction: ConsensusTransaction,
epoch_store: &Arc<AuthorityPerEpochStore<AuthoritySignInfo>>,
epoch_store: &Arc<AuthorityPerEpochStore>,
) {
let _guard = InflightDropGuard::acquire(&self);
let processed_waiter = epoch_store
Expand Down
Loading

0 comments on commit 45efc6a

Please sign in to comment.