Skip to content

Commit

Permalink
Address post-merge comments from 2106 (MystenLabs#2375)
Browse files Browse the repository at this point in the history
* Call update_state before updating the indexes

* Rename get_object_versions

* Remove code duplication

* Remove unnecessary pub

* fix clippy

* Add comments
  • Loading branch information
mystenmark authored Jun 7, 2022
1 parent 6c82996 commit 7d44e53
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 105 deletions.
92 changes: 36 additions & 56 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ use crate::{
event_handler::EventHandler,
execution_engine,
gateway_types::TransactionEffectsResponse,
query_helpers::QueryHelpers,
transaction_input_checker,
};
use anyhow::anyhow;
use arc_swap::ArcSwap;
use async_trait::async_trait;
use itertools::Itertools;
Expand Down Expand Up @@ -97,7 +97,6 @@ pub mod authority_notifier;

pub const MAX_ITEMS_LIMIT: u64 = 100_000;
const BROADCAST_CAPACITY: usize = 10_000;
const MAX_TX_RANGE_SIZE: u64 = 4096;

/// Prometheus metrics which can be displayed in Grafana, queried and alerted on
pub struct AuthorityMetrics {
Expand Down Expand Up @@ -967,7 +966,7 @@ impl AuthorityState {
None => Ok(ObjectRead::NotExists(*object_id)),
Some((obj_ref, _)) => {
if obj_ref.2.is_alive() {
match self.database.get_object_version(object_id, obj_ref.1)? {
match self.database.get_object_by_key(object_id, obj_ref.1)? {
None => {
error!("Object with in parent_entry is missing from object store, datastore is inconsistent");
Err(SuiError::ObjectNotFound {
Expand All @@ -992,71 +991,29 @@ impl AuthorityState {
}

pub fn get_total_transaction_number(&self) -> Result<u64, anyhow::Error> {
Ok(self.database.next_sequence_number()?)
QueryHelpers::get_total_transaction_number(&self.database)
}

pub fn get_transactions_in_range(
&self,
start: TxSequenceNumber,
end: TxSequenceNumber,
) -> Result<Vec<(TxSequenceNumber, TransactionDigest)>, anyhow::Error> {
fp_ensure!(
start <= end,
SuiError::GatewayInvalidTxRangeQuery {
error: format!(
"start must not exceed end, (start={}, end={}) given",
start, end
),
}
.into()
);
fp_ensure!(
end - start <= MAX_TX_RANGE_SIZE,
SuiError::GatewayInvalidTxRangeQuery {
error: format!(
"Number of transactions queried must not exceed {}, {} queried",
MAX_TX_RANGE_SIZE,
end - start
),
}
.into()
);
let res = self.database.transactions_in_seq_range(start, end)?;
debug!(?start, ?end, ?res, "Fetched transactions");
Ok(res)
QueryHelpers::get_transactions_in_range(&self.database, start, end)
}

pub fn get_recent_transactions(
&self,
count: u64,
) -> Result<Vec<(TxSequenceNumber, TransactionDigest)>, anyhow::Error> {
fp_ensure!(
count <= MAX_TX_RANGE_SIZE,
SuiError::GatewayInvalidTxRangeQuery {
error: format!(
"Number of transactions queried must not exceed {}, {} queried",
MAX_TX_RANGE_SIZE, count
),
}
.into()
);
let end = self.get_total_transaction_number()?;
let start = if end >= count { end - count } else { 0 };
self.get_transactions_in_range(start, end)
QueryHelpers::get_recent_transactions(&self.database, count)
}

pub async fn get_transaction(
&self,
digest: TransactionDigest,
) -> Result<TransactionEffectsResponse, anyhow::Error> {
let opt = self.database.get_certified_transaction(&digest)?;
match opt {
Some(certificate) => Ok(TransactionEffectsResponse {
certificate: certificate.try_into()?,
effects: self.database.get_effects(&digest)?.into(),
}),
None => Err(anyhow!(SuiError::TransactionNotFound { digest })),
}
QueryHelpers::get_transaction(&self.database, digest)
}

fn get_indexes(&self) -> SuiResult<Arc<IndexStore>> {
Expand Down Expand Up @@ -1115,7 +1072,7 @@ impl AuthorityState {
}

/// Make an information response for a transaction
pub(crate) async fn make_transaction_info(
async fn make_transaction_info(
&self,
transaction_digest: &TransactionDigest,
) -> Result<TransactionInfoResponse, SuiError> {
Expand Down Expand Up @@ -1164,13 +1121,38 @@ impl AuthorityState {
let notifier_ticket = self.batch_notifier.ticket()?;
let seq = notifier_ticket.seq();

if let Some(indexes) = &self.indexes {
let inputs: Vec<_> = temporary_store.objects().iter().map(|(_, o)| o).collect();
// We want to call update_state before updating the indexes, however this is extremely
// awkward because update_state takes temporary_store by value.
// TODO: Move indexing either into update_state, or make it a batch consumer to clean this
// up.
let (inputs, outputs) = if self.indexes.is_some() {
let inputs: Vec<_> = temporary_store
.objects()
.iter()
.map(|(_, o)| o.clone())
.collect();
let outputs: Vec<_> = temporary_store
.written()
.iter()
.map(|(_, (_, o))| o)
.map(|(_, (_, o))| o.clone())
.collect();
(Some(inputs), Some(outputs))
} else {
(None, None)
};

let res = self
.database
.update_state(temporary_store, certificate, signed_effects, Some(seq))
.await;

if let Some(indexes) = &self.indexes {
// unwrap ok because of previous if stmt.
let inputs = inputs.unwrap();
let outputs = outputs.unwrap();
// turn into vectors of references...
let inputs: Vec<_> = inputs.iter().collect();
let outputs: Vec<_> = outputs.iter().collect();
if let Err(e) = indexes.index_tx(
certificate.sender_address(),
&inputs,
Expand All @@ -1182,9 +1164,7 @@ impl AuthorityState {
}
}

self.database
.update_state(temporary_store, certificate, signed_effects, Some(seq))
.await
res

// implicitly we drop the ticket here and that notifies the batch manager
}
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ impl<
.collect())
}

pub fn get_object_version(
pub fn get_object_by_key(
&self,
object_id: &ObjectID,
version: VersionNumber,
Expand Down
53 changes: 5 additions & 48 deletions crates/sui-core/src/gateway_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use sui_types::{
use crate::transaction_input_checker;
use crate::{
authority::GatewayStore, authority_aggregator::AuthorityAggregator,
authority_client::AuthorityAPI,
authority_client::AuthorityAPI, query_helpers::QueryHelpers,
};
use sui_json::{resolve_move_function_args, SuiJsonCallArg, SuiJsonValue};

Expand All @@ -53,7 +53,6 @@ pub type GatewayClient = Arc<dyn GatewayAPI + Sync + Send>;

pub type GatewayTxSeqNumber = u64;

const MAX_TX_RANGE_SIZE: u64 = 4096;
/// Number of times to retry failed TX
const MAX_NUM_TX_RETRIES: usize = 5;

Expand Down Expand Up @@ -1158,70 +1157,28 @@ where
}

fn get_total_transaction_number(&self) -> Result<u64, anyhow::Error> {
Ok(self.store.next_sequence_number()?)
QueryHelpers::get_total_transaction_number(&self.store)
}

fn get_transactions_in_range(
&self,
start: GatewayTxSeqNumber,
end: GatewayTxSeqNumber,
) -> Result<Vec<(GatewayTxSeqNumber, TransactionDigest)>, anyhow::Error> {
fp_ensure!(
start <= end,
SuiError::GatewayInvalidTxRangeQuery {
error: format!(
"start must not exceed end, (start={}, end={}) given",
start, end
),
}
.into()
);
fp_ensure!(
end - start <= MAX_TX_RANGE_SIZE,
SuiError::GatewayInvalidTxRangeQuery {
error: format!(
"Number of transactions queried must not exceed {}, {} queried",
MAX_TX_RANGE_SIZE,
end - start
),
}
.into()
);
let res = self.store.transactions_in_seq_range(start, end)?;
debug!(?start, ?end, ?res, "Fetched transactions");
Ok(res)
QueryHelpers::get_transactions_in_range(&self.store, start, end)
}

fn get_recent_transactions(
&self,
count: u64,
) -> Result<Vec<(GatewayTxSeqNumber, TransactionDigest)>, anyhow::Error> {
fp_ensure!(
count <= MAX_TX_RANGE_SIZE,
SuiError::GatewayInvalidTxRangeQuery {
error: format!(
"Number of transactions queried must not exceed {}, {} queried",
MAX_TX_RANGE_SIZE, count
),
}
.into()
);
let end = self.get_total_transaction_number()?;
let start = if end >= count { end - count } else { 0 };
self.get_transactions_in_range(start, end)
QueryHelpers::get_recent_transactions(&self.store, count)
}

async fn get_transaction(
&self,
digest: TransactionDigest,
) -> Result<TransactionEffectsResponse, anyhow::Error> {
let opt = self.store.get_certified_transaction(&digest)?;
match opt {
Some(certificate) => Ok(TransactionEffectsResponse {
certificate: certificate.try_into()?,
effects: self.store.get_effects(&digest)?.into(),
}),
None => Err(anyhow!(SuiError::TransactionNotFound { digest })),
}
QueryHelpers::get_transaction(&self.store, digest)
}
}
2 changes: 2 additions & 0 deletions crates/sui-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ pub mod gateway_types;
pub mod safe_client;
pub mod streamer;
pub mod transaction_input_checker;

mod query_helpers;
95 changes: 95 additions & 0 deletions crates/sui-core/src/query_helpers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::{authority::SuiDataStore, gateway_types::TransactionEffectsResponse};
use anyhow::anyhow;
use serde::{Deserialize, Serialize};
use sui_types::{base_types::*, batch::TxSequenceNumber, error::SuiError, fp_ensure};
use tracing::debug;

const MAX_TX_RANGE_SIZE: u64 = 4096;

pub struct QueryHelpers<const ALL_OBJ_VER: bool, const USE_LOCKS: bool, S> {
_s: std::marker::PhantomData<S>,
}

// TODO: QueryHelpers contains query implementations for the Gateway read API that would otherwise
// be duplicated between AuthorityState and GatewayState. The gateway read API will be removed
// soon, since nodes will be handling that. At that point we should delete this struct and move the
// code back to AuthorityState.
impl<
const ALL_OBJ_VER: bool,
const USE_LOCKS: bool,
S: Eq + Serialize + for<'de> Deserialize<'de>,
> QueryHelpers<ALL_OBJ_VER, USE_LOCKS, S>
{
pub fn get_total_transaction_number(
database: &SuiDataStore<ALL_OBJ_VER, USE_LOCKS, S>,
) -> Result<u64, anyhow::Error> {
Ok(database.next_sequence_number()?)
}

pub fn get_transactions_in_range(
database: &SuiDataStore<ALL_OBJ_VER, USE_LOCKS, S>,
start: TxSequenceNumber,
end: TxSequenceNumber,
) -> Result<Vec<(TxSequenceNumber, TransactionDigest)>, anyhow::Error> {
fp_ensure!(
start <= end,
SuiError::GatewayInvalidTxRangeQuery {
error: format!(
"start must not exceed end, (start={}, end={}) given",
start, end
),
}
.into()
);
fp_ensure!(
end - start <= MAX_TX_RANGE_SIZE,
SuiError::GatewayInvalidTxRangeQuery {
error: format!(
"Number of transactions queried must not exceed {}, {} queried",
MAX_TX_RANGE_SIZE,
end - start
),
}
.into()
);
let res = database.transactions_in_seq_range(start, end)?;
debug!(?start, ?end, ?res, "Fetched transactions");
Ok(res)
}

pub fn get_recent_transactions(
database: &SuiDataStore<ALL_OBJ_VER, USE_LOCKS, S>,
count: u64,
) -> Result<Vec<(TxSequenceNumber, TransactionDigest)>, anyhow::Error> {
fp_ensure!(
count <= MAX_TX_RANGE_SIZE,
SuiError::GatewayInvalidTxRangeQuery {
error: format!(
"Number of transactions queried must not exceed {}, {} queried",
MAX_TX_RANGE_SIZE, count
),
}
.into()
);
let end = Self::get_total_transaction_number(database)?;
let start = if end >= count { end - count } else { 0 };
Self::get_transactions_in_range(database, start, end)
}

pub fn get_transaction(
database: &SuiDataStore<ALL_OBJ_VER, USE_LOCKS, S>,
digest: TransactionDigest,
) -> Result<TransactionEffectsResponse, anyhow::Error> {
let opt = database.get_certified_transaction(&digest)?;
match opt {
Some(certificate) => Ok(TransactionEffectsResponse {
certificate: certificate.try_into()?,
effects: database.get_effects(&digest)?.into(),
}),
None => Err(anyhow!(SuiError::TransactionNotFound { digest })),
}
}
}

0 comments on commit 7d44e53

Please sign in to comment.