Skip to content

Commit

Permalink
indexer: read txn digests in checkpoint (MystenLabs#8560)
Browse files Browse the repository at this point in the history
## Description 

Before this PR, indexer uses txn pagination to read txn digests, this PR
changes it to use checkpoint instead, which also unblocks removal of txn
pagination on FN.

## Test Plan 

tested locally with validator, FN and indexer as some new APIs are not
in devnet yet, verify that checkpoints table and txns table can still be
properly populated:

![Screenshot 2023-02-22 at 7 30 07
PM](https://user-images.githubusercontent.com/106119108/220796411-5314ecc5-2aca-4132-b575-0f8d20f0b314.png)



---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)
- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
gegaowp authored Feb 23, 2023
1 parent ac57e4d commit 5719608
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ CREATE TABLE transactions (
id BIGSERIAL PRIMARY KEY,
transaction_digest VARCHAR(255) NOT NULL,
sender VARCHAR(255) NOT NULL,
checkpoint_sequence_number BIGINT,
transaction_time TIMESTAMP,
transaction_kinds TEXT[] NOT NULL,
-- object related
Expand Down Expand Up @@ -32,3 +33,4 @@ CREATE INDEX transactions_transaction_digest ON transactions (transaction_digest
CREATE INDEX transactions_transaction_time ON transactions (transaction_time);
CREATE INDEX transactions_sender ON transactions (sender);
CREATE INDEX transactions_gas_object_id ON transactions (gas_object_id);
CREATE INDEX transactions_checkpoint_sequence_number ON transactions (checkpoint_sequence_number);
6 changes: 3 additions & 3 deletions crates/sui-indexer/migrations/2022-11-22-235415_logs/up.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
CREATE TABLE transaction_logs (
id SERIAL PRIMARY KEY,
next_cursor_tx_digest TEXT
next_checkpoint_sequence_number BIGINT NOT NULL
);

INSERT INTO transaction_logs (id, next_cursor_tx_digest) VALUES
(1, NULL);
INSERT INTO transaction_logs (id, next_checkpoint_sequence_number)
VALUES (1, 0);
14 changes: 7 additions & 7 deletions crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use sui_indexer::errors::IndexerError;
use sui_indexer::metrics::IndexerCheckpointHandlerMetrics;
use sui_indexer::models::checkpoint_logs::{commit_checkpoint_log, read_checkpoint_log};
use sui_indexer::models::checkpoints::{
commit_checkpoint, create_checkpoint, get_previous_checkpoint, Checkpoint,
commit_checkpoint, create_checkpoint, get_checkpoint, Checkpoint,
};
use sui_indexer::{get_pg_pool_connection, PgConnectionPool};
use sui_json_rpc_types::CheckpointId;
Expand Down Expand Up @@ -40,13 +40,13 @@ impl CheckpointHandler {

let checkpoint_log = read_checkpoint_log(&mut pg_pool_conn)?;
let mut next_cursor_sequence_number = checkpoint_log.next_cursor_sequence_number;
let mut previous_checkpoint_commit = Checkpoint::default();

if next_cursor_sequence_number != 0 {
let mut previous_checkpoint = Checkpoint::default();
if next_cursor_sequence_number > 0 {
let temp_checkpoint =
get_previous_checkpoint(&mut pg_pool_conn, next_cursor_sequence_number - 1);
get_checkpoint(&mut pg_pool_conn, next_cursor_sequence_number - 1);
match temp_checkpoint {
Ok(checkpoint) => previous_checkpoint_commit = checkpoint,
Ok(checkpoint) => previous_checkpoint = checkpoint,
Err(err) => {
error!("{}", err)
}
Expand Down Expand Up @@ -92,14 +92,14 @@ impl CheckpointHandler {
.db_write_request_latency
.start_timer();
// unwrap here is safe because we checked for error above
let new_checkpoint = create_checkpoint(checkpoint.unwrap(), previous_checkpoint_commit);
let new_checkpoint = create_checkpoint(checkpoint.unwrap(), previous_checkpoint);
commit_checkpoint(&mut pg_pool_conn, new_checkpoint.clone())?;
info!("Checkpoint {} committed", next_cursor_sequence_number);
self.checkpoint_handler_metrics
.total_checkpoint_processed
.inc();
db_guard.stop_and_record();
previous_checkpoint_commit = Checkpoint::from(new_checkpoint.clone());
previous_checkpoint = Checkpoint::from(new_checkpoint.clone());
next_cursor_sequence_number += 1;
commit_checkpoint_log(&mut pg_pool_conn, next_cursor_sequence_number)?;
}
Expand Down
110 changes: 58 additions & 52 deletions crates/sui-indexer/src/handlers/transaction_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@
use futures::future::join_all;
use prometheus::Registry;
use std::sync::Arc;
use std::time::Duration;
use sui_json_rpc_types::{SuiTransactionResponse, TransactionsPage};
use sui_sdk::SuiClient;
use sui_types::base_types::TransactionDigest;
use sui_types::query::TransactionQuery;
use tokio::time::sleep;
use tracing::info;

use sui_indexer::errors::IndexerError;
use sui_indexer::metrics::IndexerTransactionHandlerMetrics;
use sui_indexer::models::checkpoints::get_checkpoint;
use sui_indexer::models::transaction_logs::{commit_transaction_log, read_transaction_log};
use sui_indexer::models::transactions::commit_transactions;
use sui_indexer::utils::log_errors_to_pg;
Expand Down Expand Up @@ -43,33 +42,57 @@ impl TransactionHandler {
pub async fn start(&self) -> Result<(), IndexerError> {
info!("Indexer transaction handler started...");
let mut pg_pool_conn = get_pg_pool_connection(self.pg_connection_pool.clone())?;

let mut next_cursor = None;
let txn_log = read_transaction_log(&mut pg_pool_conn)?;
if let Some(tx_dig) = txn_log.next_cursor_tx_digest {
let tx_digest = tx_dig.parse().map_err(|e| {
IndexerError::TransactionDigestParsingError(format!(
"Failed parsing transaction digest {:?} with error: {:?}",
tx_dig, e
))
})?;
next_cursor = Some(tx_digest);
}
let mut checkpoint_seq_number = txn_log.next_checkpoint_sequence_number;

loop {
self.transaction_handler_metrics
.total_transaction_page_fetch_attempt
.inc();
let checkpoint_db_read_guard = self
.transaction_handler_metrics
.checkpoint_db_read_request_latency
.start_timer();
let mut checkpoint_opt = get_checkpoint(&mut pg_pool_conn, checkpoint_seq_number);
// this often happens when the checkpoint is not yet committed to the database
while checkpoint_opt.is_err() {
// this sleep is necessary to avoid blocking the checkpoint commit.
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
checkpoint_opt = get_checkpoint(&mut pg_pool_conn, checkpoint_seq_number);
}
// unwrap is safe here b/c of the check above.
let checkpoint = checkpoint_opt.unwrap();
checkpoint_db_read_guard.stop_and_record();

let request_guard = self
.transaction_handler_metrics
.full_node_read_request_latency
.start_timer();

let page = get_transaction_page(self.rpc_client.clone(), next_cursor).await?;
self.transaction_handler_metrics
.total_transaction_page_received
.inc();
let txn_digest_vec = page.data;
let mut errors = vec![];
let txn_str_vec: Vec<String> = checkpoint
.transactions
.iter()
.filter_map(|t| {
t.clone()
.ok_or_else(|| {
IndexerError::PostgresReadError(format!(
"Read null transaction digests from checkpoint: {:?}",
checkpoint
))
})
.map_err(|e| errors.push(e))
.ok()
})
.collect();
let txn_digest_vec: Vec<TransactionDigest> = txn_str_vec
.into_iter()
.map(|txn_str| {
txn_str.parse().map_err(|e| {
IndexerError::PostgresReadError(format!(
"Failed to decode transaction digest: {:?} with err: {:?}",
txn_str, e
))
})
})
.filter_map(|f| f.map_err(|e| errors.push(e)).ok())
.collect();
let txn_count = txn_digest_vec.len();
self.transaction_handler_metrics
.total_transactions_received
Expand All @@ -80,46 +103,29 @@ impl TransactionHandler {
.map(|tx_digest| get_transaction_response(self.rpc_client.clone(), tx_digest)),
)
.await;
let resp_vec: Vec<SuiTransactionResponse> = txn_response_res_vec
.into_iter()
.filter_map(|f| f.map_err(|e| errors.push(e)).ok())
.collect();
request_guard.stop_and_record();
info!(
"Received transaction responses for {} transactions with next cursor: {:?}",
txn_response_res_vec.len(),
page.next_cursor,
"Received transaction responses for {} transaction(s) in checkpoint {}",
resp_vec.len(),
checkpoint_seq_number,
);
request_guard.stop_and_record();
log_errors_to_pg(&mut pg_pool_conn, errors);

let db_guard = self
.transaction_handler_metrics
.db_write_request_latency
.start_timer();
let mut errors = vec![];
let resp_vec: Vec<SuiTransactionResponse> = txn_response_res_vec
.into_iter()
.filter_map(|f| f.map_err(|e| errors.push(e)).ok())
.collect();

log_errors_to_pg(&mut pg_pool_conn, errors);
commit_transactions(&mut pg_pool_conn, resp_vec)?;
// Transaction page's next cursor can be None when latest transaction page is
// reached, if we use the None cursor to read transactions, it will read from genesis,
// thus here we do not commit / use the None cursor.
// This will cause duplicate run of the current batch, but will not cause duplicate rows
// b/c of the uniqueness restriction of the table.
if let Some(next_cursor_val) = page.next_cursor {
// canonical txn digest is Base58 encoded
commit_transaction_log(&mut pg_pool_conn, Some(next_cursor_val.base58_encode()))?;
self.transaction_handler_metrics
.total_transactions_processed
.inc_by(txn_count as u64);
next_cursor = page.next_cursor;
}
checkpoint_seq_number += 1;
commit_transaction_log(&mut pg_pool_conn, checkpoint_seq_number)?;
self.transaction_handler_metrics
.total_transaction_page_committed
.inc();
.total_transactions_processed
.inc_by(txn_count as u64);
db_guard.stop_and_record();

if txn_count < TRANSACTION_PAGE_SIZE || page.next_cursor.is_none() {
sleep(Duration::from_secs_f32(0.1)).await;
}
}
}
}
Expand Down
32 changes: 12 additions & 20 deletions crates/sui-indexer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@ const LATENCY_SEC_BUCKETS: &[f64] = &[
pub struct IndexerTransactionHandlerMetrics {
pub total_transactions_received: IntCounter,
pub total_transactions_processed: IntCounter,

pub total_transaction_page_fetch_attempt: IntCounter,
pub total_transaction_page_received: IntCounter,
pub total_transaction_page_committed: IntCounter,

pub total_transaction_checkpoint_processed: IntCounter,
pub total_transaction_handler_error: IntCounter,

pub db_write_request_latency: Histogram,
pub full_node_read_request_latency: Histogram,
pub checkpoint_db_read_request_latency: Histogram,
}

impl IndexerTransactionHandlerMetrics {
Expand All @@ -44,21 +41,9 @@ impl IndexerTransactionHandlerMetrics {
registry,
)
.unwrap(),
total_transaction_page_fetch_attempt: register_int_counter_with_registry!(
"total_transaction_page_fetch_attempt",
"Total number of transaction page fetch attempt",
registry,
)
.unwrap(),
total_transaction_page_received: register_int_counter_with_registry!(
"total_transaction_page_received",
"Total number of transaction page received",
registry,
)
.unwrap(),
total_transaction_page_committed: register_int_counter_with_registry!(
"total_transaction_page_committed",
"Total number of transaction page committed",
total_transaction_checkpoint_processed: register_int_counter_with_registry!(
"total_transaction_checkpoint_processed",
"Total number of transactions processed",
registry,
)
.unwrap(),
Expand All @@ -82,6 +67,13 @@ impl IndexerTransactionHandlerMetrics {
registry,
)
.unwrap(),
checkpoint_db_read_request_latency: register_histogram_with_registry!(
"transaction_checkpoint_db_read_request_latency",
"Time spent in reading a transaction from the checkpoint db",
LATENCY_SEC_BUCKETS.to_vec(),
registry,
)
.unwrap(),
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-indexer/src/models/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,16 +201,16 @@ fn commit_checkpoint_impl(
})
}

pub fn get_previous_checkpoint(
pub fn get_checkpoint(
pg_pool_conn: &mut PgPoolConnection,
currency_checkpoint_sequence_number: i64,
checkpoint_sequence_number: i64,
) -> Result<Checkpoint, IndexerError> {
let checkpoint_read_result = pg_pool_conn
.build_transaction()
.read_only()
.run::<_, Error, _>(|conn| {
checkpoints_table
.filter(sequence_number.eq(currency_checkpoint_sequence_number - 1))
.filter(sequence_number.eq(checkpoint_sequence_number))
.limit(1)
.first::<Checkpoint>(conn)
});
Expand Down
8 changes: 4 additions & 4 deletions crates/sui-indexer/src/models/transaction_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use diesel::result::Error;
#[diesel(primary_key(id))]
pub struct TransactionLog {
pub id: i32,
pub next_cursor_tx_digest: Option<String>,
pub next_checkpoint_sequence_number: i64,
}

pub fn read_transaction_log(
Expand All @@ -35,21 +35,21 @@ pub fn read_transaction_log(

pub fn commit_transaction_log(
pg_pool_conn: &mut PgPoolConnection,
txn_digest: Option<String>,
checkpoint_sequence_number: i64,
) -> Result<usize, IndexerError> {
let txn_log_commit_result: Result<usize, Error> = pg_pool_conn
.build_transaction()
.read_write()
.run::<_, Error, _>(|conn| {
diesel::update(transaction_logs::table)
.set(next_cursor_tx_digest.eq(txn_digest.clone()))
.set(next_checkpoint_sequence_number.eq(checkpoint_sequence_number))
.execute(conn)
});

txn_log_commit_result.map_err(|e| {
IndexerError::PostgresWriteError(format!(
"Failed updating transaction log in PostgresDB with tx digest {:?} and error {:?}",
txn_digest, e
checkpoint_sequence_number, e
))
})
}
4 changes: 4 additions & 0 deletions crates/sui-indexer/src/models/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub struct Transaction {
pub id: i64,
pub transaction_digest: String,
pub sender: String,
pub checkpoint_sequence_number: Option<i64>,
pub transaction_time: Option<NaiveDateTime>,
pub transaction_kinds: Vec<Option<String>>,
pub created: Vec<Option<String>>,
Expand All @@ -43,6 +44,7 @@ pub struct Transaction {
pub struct NewTransaction {
pub transaction_digest: String,
pub sender: String,
pub checkpoint_sequence_number: Option<i64>,
pub transaction_time: Option<NaiveDateTime>,
pub transaction_kinds: Vec<Option<String>>,
pub created: Vec<Option<String>>,
Expand Down Expand Up @@ -132,6 +134,7 @@ pub fn transaction_response_to_new_transaction(
let gas_budget = tx_resp.transaction.data.gas_data.budget;
let gas_price = tx_resp.transaction.data.gas_data.price;
let sender = tx_resp.transaction.data.sender.to_string();
let checkpoint_seq_number = tx_resp.checkpoint.map(|c| c as i64);
let txn_kind_iter = tx_resp
.transaction
.data
Expand Down Expand Up @@ -195,6 +198,7 @@ pub fn transaction_response_to_new_transaction(
Ok(NewTransaction {
transaction_digest: tx_digest,
sender,
checkpoint_sequence_number: checkpoint_seq_number,
transaction_kinds: txn_kind_iter.map(Some).collect::<Vec<Option<String>>>(),
transaction_time: timestamp,
created: vec_string_to_vec_opt_string(created),
Expand Down
3 changes: 2 additions & 1 deletion crates/sui-indexer/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ diesel::table! {
diesel::table! {
transaction_logs (id) {
id -> Int4,
next_cursor_tx_digest -> Nullable<Text>,
next_checkpoint_sequence_number -> Int8,
}
}

Expand All @@ -172,6 +172,7 @@ diesel::table! {
id -> Int8,
transaction_digest -> Varchar,
sender -> Varchar,
checkpoint_sequence_number -> Nullable<Int8>,
transaction_time -> Nullable<Timestamp>,
transaction_kinds -> Array<Nullable<Text>>,
created -> Array<Nullable<Text>>,
Expand Down

0 comments on commit 5719608

Please sign in to comment.