Skip to content

Commit

Permalink
indexer: replace txn_log with latest processed checkpoint in txn table (
Browse files Browse the repository at this point in the history
MystenLabs#8611)

## Description 

Per discussion with the team, we want to get rid of _log table as much
as we can. As today txns are processed following its related checkpoint
instead of txn pagination, it is doable to use latest processed
checkpoint sequence number to do so.

## Test Plan 

Test with local validator, FN, indexer and PG
![Screenshot 2023-02-23 at 4 55 41
PM](https://user-images.githubusercontent.com/106119108/221086627-41cf8449-c878-400c-a2bc-29ac2113621b.png)
  • Loading branch information
gegaowp authored Feb 24, 2023
1 parent 448f16a commit 79f5767
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ CREATE TABLE transactions (
id BIGSERIAL PRIMARY KEY,
transaction_digest VARCHAR(255) NOT NULL,
sender VARCHAR(255) NOT NULL,
checkpoint_sequence_number BIGINT,
checkpoint_sequence_number BIGINT NOT NULL,
transaction_time TIMESTAMP,
transaction_kinds TEXT[] NOT NULL,
-- object related
Expand Down

This file was deleted.

7 changes: 0 additions & 7 deletions crates/sui-indexer/migrations/2022-11-22-235415_logs/up.sql

This file was deleted.

16 changes: 7 additions & 9 deletions crates/sui-indexer/src/handlers/transaction_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ use tracing::info;
use sui_indexer::errors::IndexerError;
use sui_indexer::metrics::IndexerTransactionHandlerMetrics;
use sui_indexer::models::checkpoints::get_checkpoint;
use sui_indexer::models::transaction_logs::{commit_transaction_log, read_transaction_log};
use sui_indexer::models::transactions::commit_transactions;
use sui_indexer::models::transactions::{commit_transactions, read_latest_processed_checkpoint};
use sui_indexer::utils::log_errors_to_pg;
use sui_indexer::{get_pg_pool_connection, PgConnectionPool};

Expand Down Expand Up @@ -42,20 +41,20 @@ impl TransactionHandler {
pub async fn start(&self) -> Result<(), IndexerError> {
info!("Indexer transaction handler started...");
let mut pg_pool_conn = get_pg_pool_connection(self.pg_connection_pool.clone())?;
let txn_log = read_transaction_log(&mut pg_pool_conn)?;
let mut checkpoint_seq_number = txn_log.next_checkpoint_sequence_number;
let mut next_checkpoint_to_process =
read_latest_processed_checkpoint(&mut pg_pool_conn)? + 1;

loop {
let checkpoint_db_read_guard = self
.transaction_handler_metrics
.checkpoint_db_read_request_latency
.start_timer();
let mut checkpoint_opt = get_checkpoint(&mut pg_pool_conn, checkpoint_seq_number);
let mut checkpoint_opt = get_checkpoint(&mut pg_pool_conn, next_checkpoint_to_process);
// this often happens when the checkpoint is not yet committed to the database
while checkpoint_opt.is_err() {
// this sleep is necessary to avoid blocking the checkpoint commit.
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
checkpoint_opt = get_checkpoint(&mut pg_pool_conn, checkpoint_seq_number);
checkpoint_opt = get_checkpoint(&mut pg_pool_conn, next_checkpoint_to_process);
}
// unwrap is safe here b/c of the check above.
let checkpoint = checkpoint_opt.unwrap();
Expand Down Expand Up @@ -111,7 +110,7 @@ impl TransactionHandler {
info!(
"Received transaction responses for {} transaction(s) in checkpoint {}",
resp_vec.len(),
checkpoint_seq_number,
next_checkpoint_to_process,
);
log_errors_to_pg(&mut pg_pool_conn, errors);

Expand All @@ -120,8 +119,7 @@ impl TransactionHandler {
.db_write_request_latency
.start_timer();
commit_transactions(&mut pg_pool_conn, resp_vec)?;
checkpoint_seq_number += 1;
commit_transaction_log(&mut pg_pool_conn, checkpoint_seq_number)?;
next_checkpoint_to_process += 1;
self.transaction_handler_metrics
.total_transactions_processed
.inc_by(txn_count as u64);
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ pub mod object_logs;
pub mod objects;
pub mod package_logs;
pub mod packages;
pub mod transaction_logs;
pub mod transactions;
// TODO: remove below after wave 2
pub mod move_event_logs;
Expand Down
55 changes: 0 additions & 55 deletions crates/sui-indexer/src/models/transaction_logs.rs

This file was deleted.

82 changes: 55 additions & 27 deletions crates/sui-indexer/src/models/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
// SPDX-License-Identifier: Apache-2.0

use crate::schema::transactions;
use crate::schema::transactions::dsl::{id, transactions as transactions_table};
use crate::schema::transactions::dsl::{
checkpoint_sequence_number, id, transactions as transactions_table,
};
use crate::utils::log_errors_to_pg;

use chrono::NaiveDateTime;
use diesel::dsl::max;
use diesel::prelude::*;
use diesel::result::Error;
use sui_json_rpc_types::{OwnedObjectRef, SuiObjectRef, SuiTransactionResponse};
Expand All @@ -19,7 +22,7 @@ pub struct Transaction {
pub id: i64,
pub transaction_digest: String,
pub sender: String,
pub checkpoint_sequence_number: Option<i64>,
pub checkpoint_sequence_number: i64,
pub transaction_time: Option<NaiveDateTime>,
pub transaction_kinds: Vec<Option<String>>,
pub created: Vec<Option<String>>,
Expand All @@ -44,7 +47,7 @@ pub struct Transaction {
pub struct NewTransaction {
pub transaction_digest: String,
pub sender: String,
pub checkpoint_sequence_number: Option<i64>,
pub checkpoint_sequence_number: i64,
pub transaction_time: Option<NaiveDateTime>,
pub transaction_kinds: Vec<Option<String>>,
pub created: Vec<Option<String>>,
Expand All @@ -64,29 +67,6 @@ pub struct NewTransaction {
pub transaction_content: String,
}

pub fn read_transactions(
pg_pool_conn: &mut PgPoolConnection,
last_processed_id: i64,
limit: usize,
) -> Result<Vec<Transaction>, IndexerError> {
let txn_read_result: Result<Vec<Transaction>, Error> = pg_pool_conn
.build_transaction()
.read_only()
.run::<_, Error, _>(|conn| {
transactions_table
.filter(id.gt(last_processed_id))
.limit(limit as i64)
.load::<Transaction>(conn)
});

txn_read_result.map_err(|e| {
IndexerError::PostgresReadError(format!(
"Failed reading transactions with last_processed_id {} and err: {:?}",
last_processed_id, e
))
})
}

pub fn commit_transactions(
pg_pool_conn: &mut PgPoolConnection,
tx_resps: Vec<SuiTransactionResponse>,
Expand Down Expand Up @@ -134,7 +114,10 @@ pub fn transaction_response_to_new_transaction(
let gas_budget = tx_resp.transaction.data.gas_data.budget;
let gas_price = tx_resp.transaction.data.gas_data.price;
let sender = tx_resp.transaction.data.sender.to_string();
let checkpoint_seq_number = tx_resp.checkpoint.map(|c| c as i64);
// NOTE: unwrap is safe here because indexer fetches checkpoint first and then transactions
// based on the transaction digests in the checkpoint, thus the checkpoint sequence number
// is always Some. This is also confirmed by the sui-core team.
let checkpoint_seq_number = tx_resp.checkpoint.unwrap() as i64;
let txn_kind_iter = tx_resp
.transaction
.data
Expand Down Expand Up @@ -232,3 +215,48 @@ fn obj_ref_to_obj_id_string(obj_ref: SuiObjectRef) -> String {
fn vec_string_to_vec_opt_string(v: Vec<String>) -> Vec<Option<String>> {
v.into_iter().map(Some).collect::<Vec<Option<String>>>()
}

pub fn read_transactions(
pg_pool_conn: &mut PgPoolConnection,
last_processed_id: i64,
limit: usize,
) -> Result<Vec<Transaction>, IndexerError> {
let txn_read_result: Result<Vec<Transaction>, Error> = pg_pool_conn
.build_transaction()
.read_only()
.run::<_, Error, _>(|conn| {
transactions_table
.filter(id.gt(last_processed_id))
.limit(limit as i64)
.load::<Transaction>(conn)
});

txn_read_result.map_err(|e| {
IndexerError::PostgresReadError(format!(
"Failed reading transactions with last_processed_id {} and err: {:?}",
last_processed_id, e
))
})
}

pub fn read_latest_processed_checkpoint(
pg_pool_conn: &mut PgPoolConnection,
) -> Result<i64, IndexerError> {
let latest_processed_checkpoint: Result<i64, Error> = pg_pool_conn
.build_transaction()
.read_only()
.run::<_, Error, _>(|conn| {
transactions_table
.select(max(checkpoint_sequence_number))
.first::<Option<i64>>(conn)
// -1 means no checkpoints in the DB
.map(|o| o.unwrap_or(-1))
});

latest_processed_checkpoint.map_err(|e| {
IndexerError::PostgresReadError(format!(
"Failed reading latest processed checkpoint from transaction table with err: {:?}",
e
))
})
}
10 changes: 1 addition & 9 deletions crates/sui-indexer/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,19 +154,12 @@ diesel::table! {
}
}

diesel::table! {
transaction_logs (id) {
id -> Int4,
next_checkpoint_sequence_number -> Int8,
}
}

diesel::table! {
transactions (id) {
id -> Int8,
transaction_digest -> Varchar,
sender -> Varchar,
checkpoint_sequence_number -> Nullable<Int8>,
checkpoint_sequence_number -> Int8,
transaction_time -> Nullable<Timestamp>,
transaction_kinds -> Array<Nullable<Text>>,
created -> Array<Nullable<Text>>,
Expand Down Expand Up @@ -203,6 +196,5 @@ diesel::allow_tables_to_appear_in_same_query!(
packages,
publish_event_logs,
publish_events,
transaction_logs,
transactions,
);

0 comments on commit 79f5767

Please sign in to comment.