Skip to content

Commit

Permalink
indexer: add checkpoint handler (MystenLabs#7439)
Browse files Browse the repository at this point in the history
Add checkpoint handler to indexer.
This include metrics needed for wave 2:
- total txns since genesis
- total gas cost including computation, storage and rebate since genesis
of the current epoch
  • Loading branch information
gegaowp authored Jan 18, 2023
1 parent 7d52656 commit 214a4e4
Show file tree
Hide file tree
Showing 12 changed files with 358 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
DROP TABLE checkpoints;
DROP TABLE checkpoint_logs;
20 changes: 20 additions & 0 deletions crates/sui-indexer/migrations/2023-01-16-233119_checkpoint/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
CREATE TABLE checkpoints (
sequence_number BIGINT PRIMARY KEY,
content_digest VARCHAR(255) NOT NULL,
epoch BIGINT NOT NULL,
-- derived from gas cost summary
total_gas_cost BIGINT NOT NULL,
total_computation_cost BIGINT NOT NULL,
total_storage_cost BIGINT NOT NULL,
total_storage_rebate BIGINT NOT NULL,
total_transactions BIGINT NOT NULL,
previous_digest VARCHAR(255),
next_epoch_committee TEXT,
UNIQUE(sequence_number)
);

CREATE TABLE checkpoint_logs (
next_cursor_sequence_number BIGINT PRIMARY KEY
);

INSERT INTO checkpoint_logs (next_cursor_sequence_number) VALUES (0);
69 changes: 69 additions & 0 deletions crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use prometheus::Registry;
use std::sync::Arc;
use sui_sdk::SuiClient;
use tracing::info;

use sui_indexer::errors::IndexerError;
use sui_indexer::metrics::IndexerCheckpointHandlerMetrics;
use sui_indexer::models::checkpoint_logs::{commit_checkpoint_log, read_checkpoint_log};
use sui_indexer::models::checkpoints::commit_checkpoint;
use sui_indexer::{get_pg_pool_connection, PgConnectionPool};

pub struct CheckpointHandler {
rpc_client: SuiClient,
pg_connection_pool: Arc<PgConnectionPool>,
pub checkpoint_handler_metrics: IndexerCheckpointHandlerMetrics,
}

impl CheckpointHandler {
pub fn new(
rpc_client: SuiClient,
pg_connection_pool: Arc<PgConnectionPool>,
prometheus_registry: &Registry,
) -> Self {
Self {
rpc_client,
pg_connection_pool,
checkpoint_handler_metrics: IndexerCheckpointHandlerMetrics::new(prometheus_registry),
}
}

pub async fn start(&self) -> Result<(), IndexerError> {
info!("Indexer checkpoint handler started...");
let mut pg_pool_conn = get_pg_pool_connection(self.pg_connection_pool.clone())?;

let checkpoint_log = read_checkpoint_log(&mut pg_pool_conn)?;
let mut next_cursor_sequence_number = checkpoint_log.next_cursor_sequence_number;

loop {
self.checkpoint_handler_metrics
.total_checkpoint_requested
.inc();
let checkpoint = self
.rpc_client
.read_api()
.get_checkpoint_summary(next_cursor_sequence_number as u64)
.await
.map_err(|e| {
IndexerError::FullNodeReadingError(format!(
"Failed to get checkpoint with sequence {} error: {:?}",
next_cursor_sequence_number, e
))
})?;
self.checkpoint_handler_metrics
.total_checkpoint_received
.inc();
commit_checkpoint(&mut pg_pool_conn, checkpoint)?;
info!("Checkpoint {} committed", next_cursor_sequence_number);
self.checkpoint_handler_metrics
.total_checkpoint_processed
.inc();

next_cursor_sequence_number += 1;
commit_checkpoint_log(&mut pg_pool_conn, next_cursor_sequence_number)?;
}
}
}
34 changes: 33 additions & 1 deletion crates/sui-indexer/src/handlers/handler_orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use futures::future::try_join_all;
use prometheus::Registry;
use tracing::{error, info, warn};

use crate::handlers::checkpoint_handler::CheckpointHandler;
use crate::handlers::event_handler::EventHandler;
use crate::handlers::transaction_handler::TransactionHandler;

Expand Down Expand Up @@ -38,6 +39,11 @@ impl HandlerOrchestrator {

pub async fn run_forever(&self) {
info!("Handler orchestrator started...");
let checkpoint_handler = CheckpointHandler::new(
self.rpc_client.clone(),
self.pg_connection_pool.clone(),
&self.prometheus_registry,
);
let event_handler = EventHandler::new(
self.rpc_client.clone(),
self.pg_connection_pool.clone(),
Expand All @@ -49,6 +55,32 @@ impl HandlerOrchestrator {
&self.prometheus_registry,
);

let checkpoint_handle = tokio::task::spawn(async move {
let backoff_config = ExponentialBackoffBuilder::new()
.with_max_interval(std::time::Duration::from_secs(BACKOFF_MAX_INTERVAL_IN_SECS))
.build();
let checkpoint_res = retry(backoff_config, || async {
let checkpoint_handler_exec_res = checkpoint_handler.start().await;
if let Err(e) = checkpoint_handler_exec_res.clone() {
checkpoint_handler
.checkpoint_handler_metrics
.total_checkpoint_handler_error
.inc();
warn!(
"Indexer checkpoint handler failed with error: {:?}, retrying...",
e
);
}
Ok(checkpoint_handler_exec_res?)
})
.await;
if let Err(e) = checkpoint_res {
error!(
"Indexer checkpoint handler failed after retrials with error: {:?}!",
e
);
}
});
let txn_handle = tokio::task::spawn(async move {
let backoff_config = ExponentialBackoffBuilder::new()
.with_max_interval(std::time::Duration::from_secs(BACKOFF_MAX_INTERVAL_IN_SECS))
Expand Down Expand Up @@ -101,7 +133,7 @@ impl HandlerOrchestrator {
);
}
});
try_join_all(vec![txn_handle, event_handle])
try_join_all(vec![checkpoint_handle, txn_handle, event_handle])
.await
.expect("Handler orchestrator shoult not run into errors.");
}
Expand Down
1 change: 1 addition & 0 deletions crates/sui-indexer/src/handlers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

pub mod checkpoint_handler;
pub mod event_handler;
pub mod handler_orchestrator;
pub mod transaction_handler;
39 changes: 39 additions & 0 deletions crates/sui-indexer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,45 @@ impl IndexerEventHandlerMetrics {
}
}

#[derive(Clone, Debug)]
pub struct IndexerCheckpointHandlerMetrics {
pub total_checkpoint_requested: IntCounter,
pub total_checkpoint_received: IntCounter,
pub total_checkpoint_processed: IntCounter,
pub total_checkpoint_handler_error: IntCounter,
}

impl IndexerCheckpointHandlerMetrics {
pub fn new(registry: &Registry) -> Self {
Self {
total_checkpoint_received: register_int_counter_with_registry!(
"total_checkpoint_received",
"Total number of checkpoint received",
registry,
)
.unwrap(),
total_checkpoint_processed: register_int_counter_with_registry!(
"total_checkpoint_processed",
"Total number of checkpoint processed",
registry,
)
.unwrap(),
total_checkpoint_handler_error: register_int_counter_with_registry!(
"total_checkpoint_handler_error",
"Total number of checkpoint handler error",
registry,
)
.unwrap(),
total_checkpoint_requested: register_int_counter_with_registry!(
"total_checkpoint_requested",
"Total number of checkpoint requested",
registry,
)
.unwrap(),
}
}
}

/// derivative data processor related metrics
#[derive(Clone, Debug)]
pub struct IndexerAddressProcessorMetrics {
Expand Down
53 changes: 53 additions & 0 deletions crates/sui-indexer/src/models/checkpoint_logs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::errors::IndexerError;
use crate::schema::checkpoint_logs;
use crate::schema::checkpoint_logs::dsl::*;
use crate::PgPoolConnection;

use diesel::prelude::*;
use diesel::result::Error;

#[derive(Queryable, Debug, Identifiable)]
#[diesel(primary_key(next_cursor_sequence_number))]
pub struct CheckpointLog {
pub next_cursor_sequence_number: i64,
}

pub fn read_checkpoint_log(
pg_pool_conn: &mut PgPoolConnection,
) -> Result<CheckpointLog, IndexerError> {
let checkpoint_log_read_result: Result<CheckpointLog, Error> = pg_pool_conn
.build_transaction()
.read_only()
.run::<_, Error, _>(|conn| checkpoint_logs.limit(1).first::<CheckpointLog>(conn));

checkpoint_log_read_result.map_err(|e| {
IndexerError::PostgresReadError(format!(
"Failed reading checkpoint log in PostgresDB with error {:?}",
e
))
})
}

pub fn commit_checkpoint_log(
pg_pool_conn: &mut PgPoolConnection,
sequence_number: i64,
) -> Result<usize, IndexerError> {
let checkpoint_log_commit_result: Result<usize, Error> = pg_pool_conn
.build_transaction()
.read_write()
.run::<_, Error, _>(|conn| {
diesel::update(checkpoint_logs::table)
.set(next_cursor_sequence_number.eq(sequence_number))
.execute(conn)
});

checkpoint_log_commit_result.map_err(|e| {
IndexerError::PostgresWriteError(format!(
"Failed updating checkpoint log in PostgresDB with sequence number {:?} and error {:?}",
sequence_number, e
))
})
}
96 changes: 96 additions & 0 deletions crates/sui-indexer/src/models/checkpoints.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::errors::IndexerError;
use crate::schema::checkpoints;
use crate::schema::checkpoints::dsl::{checkpoints as checkpoints_table, sequence_number};
// use crate::utils::log_errors_to_pg;
use crate::PgPoolConnection;

use diesel::prelude::*;

use sui_types::messages_checkpoint::CheckpointSummary;

#[derive(Queryable, Debug, Clone)]
pub struct Checkpoint {
pub sequence_number: i64,
pub content_digest: String,
pub epoch: i64,
pub total_gas_cost: i64,
pub total_computation_cost: i64,
pub total_storage_cost: i64,
pub total_storage_rebate: i64,
pub total_transactions: i64,
pub previous_digest: Option<String>,
pub next_epoch_committee: Option<String>,
}

#[derive(Debug, Insertable, Clone)]
#[diesel(table_name = checkpoints)]
pub struct NewCheckpoint {
pub sequence_number: i64,
pub content_digest: String,
pub epoch: i64,
pub total_gas_cost: i64,
pub total_computation_cost: i64,
pub total_storage_cost: i64,
pub total_storage_rebate: i64,
pub total_transactions: i64,
pub previous_digest: Option<String>,
pub next_epoch_committee: Option<String>,
}

pub fn commit_checkpoint(
pg_pool_conn: &mut PgPoolConnection,
checkpoint_summary: CheckpointSummary,
) -> Result<usize, IndexerError> {
let total_gas_cost = checkpoint_summary
.epoch_rolling_gas_cost_summary
.computation_cost
+ checkpoint_summary
.epoch_rolling_gas_cost_summary
.storage_cost
- checkpoint_summary
.epoch_rolling_gas_cost_summary
.storage_rebate;
let next_committee_json = checkpoint_summary.next_epoch_committee.map(|c| {
serde_json::to_string(&c).expect("Failed to serialize next_epoch_committee to JSON")
});

let checkpoint = NewCheckpoint {
sequence_number: checkpoint_summary.sequence_number as i64,
content_digest: checkpoint_summary.content_digest.encode(),
epoch: checkpoint_summary.epoch as i64,
total_gas_cost: total_gas_cost as i64,
total_computation_cost: checkpoint_summary
.epoch_rolling_gas_cost_summary
.computation_cost as i64,
total_storage_cost: checkpoint_summary
.epoch_rolling_gas_cost_summary
.storage_cost as i64,
total_storage_rebate: checkpoint_summary
.epoch_rolling_gas_cost_summary
.storage_rebate as i64,
total_transactions: checkpoint_summary.network_total_transactions as i64,
previous_digest: checkpoint_summary.previous_digest.map(|d| d.encode()),
next_epoch_committee: next_committee_json,
};
commit_checkpoint_impl(pg_pool_conn, checkpoint)
}

fn commit_checkpoint_impl(
pg_pool_conn: &mut PgPoolConnection,
checkpoint: NewCheckpoint,
) -> Result<usize, IndexerError> {
let checkpoint_commit_result = diesel::insert_into(checkpoints_table)
.values(checkpoint.clone())
.on_conflict(sequence_number)
.do_nothing()
.execute(pg_pool_conn);
checkpoint_commit_result.map_err(|e| {
IndexerError::PostgresWriteError(format!(
"Failed writing checkpoint to PostgresDB with events {:?} and error: {:?}",
checkpoint, e
))
})
}
2 changes: 2 additions & 0 deletions crates/sui-indexer/src/models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

pub mod address_logs;
pub mod addresses;
pub mod checkpoint_logs;
pub mod checkpoints;
pub mod error_logs;
pub mod event_logs;
pub mod events;
Expand Down
Loading

0 comments on commit 214a4e4

Please sign in to comment.