Skip to content

Commit

Permalink
indexer: query tx returns digests only (MystenLabs#12657)
Browse files Browse the repository at this point in the history
## Description 

IIRC there was a time we returned full contents for query tx, but now
only digests are returned.

## Test Plan 

Test with local reader + remote testing mainnet DB

---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] protocol change
- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
gegaowp authored Jun 23, 2023
1 parent ea4e198 commit 4782ab5
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 121 deletions.
122 changes: 76 additions & 46 deletions crates/sui-indexer/src/apis/indexer_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::sync::Arc;

use anyhow::anyhow;
use async_trait::async_trait;
use futures::future::join_all;
use jsonrpsee::core::RpcResult;
use jsonrpsee::http_client::HttpClient;
use jsonrpsee::types::SubscriptionResult;
Expand All @@ -20,8 +19,8 @@ use sui_json_rpc::indexer_api::spawn_subscription;
use sui_json_rpc::SuiRpcModule;
use sui_json_rpc_types::{
DynamicFieldPage, EventFilter, EventPage, ObjectsPage, Page, SuiObjectDataFilter,
SuiObjectResponse, SuiObjectResponseQuery, SuiTransactionBlockResponseQuery,
TransactionBlocksPage, TransactionFilter,
SuiObjectResponse, SuiObjectResponseQuery, SuiTransactionBlockResponse,
SuiTransactionBlockResponseQuery, TransactionBlocksPage, TransactionFilter,
};
use sui_open_rpc::Module;
use sui_types::base_types::{ObjectID, SuiAddress};
Expand Down Expand Up @@ -76,29 +75,53 @@ impl<S: IndexerStore> IndexerApi<S> {
let limit = validate_limit(limit, *QUERY_MAX_RESULT_LIMIT)?;
let is_descending = descending_order.unwrap_or_default();
let cursor_str = cursor.map(|digest| digest.to_string());
let mut tx_vec_from_db = match query.filter {
let mut tx_digests_from_db = match query.filter {
None => {
let indexer_seq_number = self
.state
.get_transaction_sequence_by_digest(cursor_str, is_descending)
.await?;
self.state
let tx_vec = self
.state
.get_all_transaction_page(indexer_seq_number, limit + 1, is_descending)
.await
.await?;
tx_vec
.into_iter()
.map(|tx| {
tx.transaction_digest.parse().map_err(|e| {
IndexerError::InsertableParsingError(format!(
"Failed to parse transaction digest {} : {:?}",
tx.transaction_digest, e
))
})
})
.collect::<Result<Vec<TransactionDigest>, _>>()?
}
Some(TransactionFilter::Checkpoint(checkpoint_id)) => {
let indexer_seq_number = self
.state
.get_transaction_sequence_by_digest(cursor_str, is_descending)
.await?;
self.state
let tx_vec = self
.state
.get_transaction_page_by_checkpoint(
checkpoint_id as i64,
indexer_seq_number,
limit + 1,
is_descending,
)
.await
.await?;
tx_vec
.into_iter()
.map(|tx| {
tx.transaction_digest.parse().map_err(|e| {
IndexerError::InsertableParsingError(format!(
"Failed to parse transaction digest {} : {:?}",
tx.transaction_digest, e
))
})
})
.collect::<Result<Vec<TransactionDigest>, _>>()?
}
Some(TransactionFilter::MoveFunction {
package,
Expand Down Expand Up @@ -134,7 +157,7 @@ impl<S: IndexerStore> IndexerApi<S> {
limit + 1,
is_descending,
)
.await
.await?
}
Some(TransactionFilter::InputObject(input_obj_id)) => {
let input_obj_seq = self
Expand All @@ -149,36 +172,49 @@ impl<S: IndexerStore> IndexerApi<S> {
limit + 1,
is_descending,
)
.await
.await?
}
Some(TransactionFilter::ChangedObject(mutated_obj_id)) => {
let indexer_seq_number = self
.state
.get_transaction_sequence_by_digest(cursor_str, is_descending)
.await?;
self.state
.get_transaction_page_by_mutated_object(
mutated_obj_id.to_string(),
.get_transaction_page_by_changed_object(
mutated_obj_id,
None,
indexer_seq_number,
limit + 1,
is_descending,
)
.await
.await?
}
// NOTE: more efficient to run this query over transactions table
Some(TransactionFilter::FromAddress(sender_address)) => {
let indexer_seq_number = self
.state
.get_transaction_sequence_by_digest(cursor_str, is_descending)
.await?;
self.state
let tx_vec = self
.state
.get_transaction_page_by_sender_address(
sender_address.to_string(),
indexer_seq_number,
limit + 1,
is_descending,
)
.await
.await?;
tx_vec
.into_iter()
.map(|tx| {
tx.transaction_digest.parse().map_err(|e| {
IndexerError::InsertableParsingError(format!(
"Failed to parse transaction digest {} : {:?}",
tx.transaction_digest, e
))
})
})
.collect::<Result<Vec<TransactionDigest>, _>>()?
}
Some(TransactionFilter::ToAddress(recipient_address)) => {
let recipient_seq_number = self
Expand All @@ -193,7 +229,7 @@ impl<S: IndexerStore> IndexerApi<S> {
limit + 1,
is_descending,
)
.await
.await?
}
Some(TransactionFilter::FromAndToAddress { from, to }) => {
let recipient_seq_number = self
Expand All @@ -208,7 +244,7 @@ impl<S: IndexerStore> IndexerApi<S> {
limit + 1,
is_descending,
)
.await
.await?
}
Some(TransactionFilter::FromOrToAddress { addr }) => {
let start_sequence = self
Expand All @@ -217,53 +253,47 @@ impl<S: IndexerStore> IndexerApi<S> {
.await?;
self.state
.get_transaction_page_by_address(addr, start_sequence, limit, is_descending)
.await
.await?
}
Some(TransactionFilter::TransactionKind(tx_kind_name)) => {
let indexer_seq_number = self
.state
.get_transaction_sequence_by_digest(cursor_str, is_descending)
.await?;
self.state
let tx_vec = self
.state
.get_transaction_page_by_transaction_kind(
tx_kind_name,
indexer_seq_number,
limit + 1,
is_descending,
)
.await
.await?;
tx_vec
.into_iter()
.map(|tx| {
tx.transaction_digest.parse().map_err(|e| {
IndexerError::InsertableParsingError(format!(
"Failed to parse transaction digest {} : {:?}",
tx.transaction_digest, e
))
})
})
.collect::<Result<Vec<TransactionDigest>, _>>()?
}
}?;
};

let has_next_page = tx_vec_from_db.len() > limit;
tx_vec_from_db.truncate(limit);
let next_cursor = tx_vec_from_db
.last()
.cloned()
.map(|tx| {
let digest = tx.transaction_digest;
let tx_digest: Result<TransactionDigest, _> = digest.parse();
tx_digest.map_err(|e| {
IndexerError::SerdeError(format!(
"Failed to deserialize transaction digest: {:?} with error {:?}",
digest, e
))
})
})
.transpose()?
.map_or(cursor, Some);
let has_next_page = tx_digests_from_db.len() > limit;
tx_digests_from_db.truncate(limit);
let next_cursor = tx_digests_from_db.last().cloned();

let tx_resp_futures = tx_vec_from_db.into_iter().map(|tx| {
self.state
.compose_sui_transaction_block_response(tx, query.options.as_ref())
});
let sui_tx_resp_vec = join_all(tx_resp_futures)
.await
let sui_tx_resps = tx_digests_from_db
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
.map(SuiTransactionBlockResponse::new)
.collect();

Ok(Page {
data: sui_tx_resp_vec,
data: sui_tx_resps,
next_cursor,
has_next_page,
})
Expand Down
20 changes: 6 additions & 14 deletions crates/sui-indexer/src/store/indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use sui_json_rpc_types::{
SuiTransactionBlockResponseOptions,
};
use sui_types::base_types::{EpochId, ObjectID, SequenceNumber, SuiAddress, VersionNumber};
use sui_types::digests::CheckpointDigest;
use sui_types::digests::{CheckpointDigest, TransactionDigest};
use sui_types::error::SuiError;
use sui_types::event::EventID;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;
Expand Down Expand Up @@ -124,22 +124,14 @@ pub trait IndexerStore {
is_descending: bool,
) -> Result<Vec<Transaction>, IndexerError>;

async fn get_transaction_page_by_mutated_object(
&self,
object_id: String,
start_sequence: Option<i64>,
limit: usize,
is_descending: bool,
) -> Result<Vec<Transaction>, IndexerError>;

async fn get_transaction_page_by_recipient_address(
&self,
sender_address: Option<SuiAddress>,
recipient_address: SuiAddress,
start_sequence: Option<i64>,
limit: usize,
is_descending: bool,
) -> Result<Vec<Transaction>, IndexerError>;
) -> Result<Vec<TransactionDigest>, IndexerError>;

// `address` can be either sender or recipient address of the transaction
async fn get_transaction_page_by_address(
Expand All @@ -148,7 +140,7 @@ pub trait IndexerStore {
start_sequence: Option<i64>,
limit: usize,
is_descending: bool,
) -> Result<Vec<Transaction>, IndexerError>;
) -> Result<Vec<TransactionDigest>, IndexerError>;

async fn get_transaction_page_by_input_object(
&self,
Expand All @@ -157,7 +149,7 @@ pub trait IndexerStore {
start_sequence: Option<i64>,
limit: usize,
is_descending: bool,
) -> Result<Vec<Transaction>, IndexerError>;
) -> Result<Vec<TransactionDigest>, IndexerError>;

async fn get_transaction_page_by_changed_object(
&self,
Expand All @@ -166,7 +158,7 @@ pub trait IndexerStore {
start_sequence: Option<i64>,
limit: usize,
is_descending: bool,
) -> Result<Vec<Transaction>, IndexerError>;
) -> Result<Vec<TransactionDigest>, IndexerError>;

async fn get_transaction_page_by_move_call(
&self,
Expand All @@ -176,7 +168,7 @@ pub trait IndexerStore {
start_sequence: Option<i64>,
limit: usize,
is_descending: bool,
) -> Result<Vec<Transaction>, IndexerError>;
) -> Result<Vec<TransactionDigest>, IndexerError>;

async fn get_transaction_sequence_by_digest(
&self,
Expand Down
Loading

0 comments on commit 4782ab5

Please sign in to comment.