diff --git a/crates/sui-bridge-indexer/config.yaml b/crates/sui-bridge-indexer/config.yaml index 591b55a501bfc..ec2cce7977d38 100644 --- a/crates/sui-bridge-indexer/config.yaml +++ b/crates/sui-bridge-indexer/config.yaml @@ -12,6 +12,8 @@ # checkpoints_path: # Number of concurrent operations # concurrency: 1 +# Bridge genesis checkpoint in SUI, for testnet = 43917829 +# bridge_genesis_checkpoint: # Ethereum to Sui bridge contract address # eth_sui_bridge_contract_address: # Starting block number diff --git a/crates/sui-bridge-indexer/src/config.rs b/crates/sui-bridge-indexer/src/config.rs index 5810ac5c5fdb1..ed4c52013b512 100644 --- a/crates/sui-bridge-indexer/src/config.rs +++ b/crates/sui-bridge-indexer/src/config.rs @@ -11,9 +11,9 @@ pub struct Config { pub remote_store_url: String, pub eth_rpc_url: String, pub db_url: String, - pub progress_store_file: String, pub checkpoints_path: String, pub concurrency: u64, + pub bridge_genesis_checkpoint: u64, pub eth_sui_bridge_contract_address: String, pub start_block: u64, pub metric_url: String, diff --git a/crates/sui-bridge-indexer/src/main.rs b/crates/sui-bridge-indexer/src/main.rs index 39ee0d4bdba13..19e58d95269f9 100644 --- a/crates/sui-bridge-indexer/src/main.rs +++ b/crates/sui-bridge-indexer/src/main.rs @@ -14,18 +14,17 @@ use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; use sui_bridge::{eth_client::EthClient, eth_syncer::EthSyncer}; -use sui_bridge_indexer::latest_eth_syncer::LatestEthSyncer; -use sui_bridge_indexer::postgres_manager::get_connection_pool; -use sui_bridge_indexer::postgres_manager::get_latest_eth_token_transfer; -use sui_bridge_indexer::sui_worker::SuiBridgeWorker; use sui_bridge_indexer::{config::load_config, metrics::BridgeIndexerMetrics}; -use sui_data_ingestion_core::{ - DataIngestionMetrics, FileProgressStore, IndexerExecutor, ReaderOptions, WorkerPool, -}; +use sui_data_ingestion_core::{DataIngestionMetrics, IndexerExecutor, ReaderOptions, WorkerPool}; use tokio::sync::oneshot; use tracing::info; use sui_bridge_indexer::eth_worker::process_eth_events; +use sui_bridge_indexer::latest_eth_syncer::LatestEthSyncer; +use sui_bridge_indexer::postgres_manager::{ + get_connection_pool, get_latest_eth_token_transfer, PgProgressStore, +}; +use sui_bridge_indexer::sui_worker::SuiBridgeWorker; #[derive(Parser, Clone, Debug)] struct Args { @@ -84,7 +83,8 @@ async fn start_processing_sui_checkpoints( let (_exit_sender, exit_receiver) = oneshot::channel(); let metrics = DataIngestionMetrics::new(&Registry::new()); - let progress_store = FileProgressStore::new(config.progress_store_file.clone().into()); + let pg_pool = get_connection_pool(config.db_url.clone()); + let progress_store = PgProgressStore::new(pg_pool, config.bridge_genesis_checkpoint); let mut executor = IndexerExecutor::new(progress_store, 1 /* workflow types */, metrics); let indexer_metrics_cloned = indexer_meterics.clone(); diff --git a/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/down.sql b/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/down.sql index 2550cdf407015..eb62a1bf35cdf 100644 --- a/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/down.sql +++ b/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/down.sql @@ -1,3 +1,4 @@ -- This file should undo anything in `up.sql` DROP TABLE IF EXISTS token_transfer; DROP TABLE IF EXISTS token_transfer_data; +DROP TABLE IF EXISTS progress_store; diff --git a/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql b/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql index 5f688a55f20b0..93e5762bfd07e 100644 --- a/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql +++ b/crates/sui-bridge-indexer/src/migrations/00000000000000_diesel_initial_setup/up.sql @@ -33,3 +33,9 @@ CREATE TABLE token_transfer ); CREATE INDEX token_transfer_block_height ON token_transfer (block_height); CREATE INDEX token_transfer_timestamp_ms ON token_transfer (timestamp_ms); + +CREATE TABLE progress_store +( + task_name TEXT PRIMARY KEY, + checkpoint BIGINT NOT NULL +); diff --git a/crates/sui-bridge-indexer/src/models.rs b/crates/sui-bridge-indexer/src/models.rs index 04c5c9405b278..e3bd537be945f 100644 --- a/crates/sui-bridge-indexer/src/models.rs +++ b/crates/sui-bridge-indexer/src/models.rs @@ -1,9 +1,16 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 -use crate::schema::{token_transfer, token_transfer_data}; +use crate::schema::{progress_store, token_transfer, token_transfer_data}; use diesel::{Identifiable, Insertable, Queryable, Selectable}; +#[derive(Queryable, Selectable, Insertable, Identifiable, Debug)] +#[diesel(table_name = progress_store, primary_key(task_name))] +pub struct ProgressStore { + pub task_name: String, + pub checkpoint: i64, +} + #[derive(Queryable, Selectable, Insertable, Identifiable, Debug)] #[diesel(table_name = token_transfer, primary_key(chain_id, nonce))] pub struct TokenTransfer { diff --git a/crates/sui-bridge-indexer/src/postgres_manager.rs b/crates/sui-bridge-indexer/src/postgres_manager.rs index 7ef3f3184ad0b..73035a048ff06 100644 --- a/crates/sui-bridge-indexer/src/postgres_manager.rs +++ b/crates/sui-bridge-indexer/src/postgres_manager.rs @@ -1,20 +1,23 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use crate::models::ProgressStore as DBProgressStore; use crate::models::TokenTransfer as DBTokenTransfer; use crate::models::TokenTransferData as DBTokenTransferData; +use crate::schema::progress_store::checkpoint; +use crate::schema::progress_store::dsl::progress_store; use crate::schema::token_transfer_data; -use crate::{schema::token_transfer, TokenTransfer}; +use crate::{schema, schema::token_transfer, TokenTransfer}; +use async_trait::async_trait; use diesel::result::Error; use diesel::BoolExpressionMethods; -use diesel::ExpressionMethods; -use diesel::OptionalExtension; -use diesel::QueryDsl; use diesel::{ pg::PgConnection, r2d2::{ConnectionManager, Pool}, - Connection, RunQueryDsl, + Connection, ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl, SelectableHelper, }; +use sui_data_ingestion_core::ProgressStore; +use sui_types::messages_checkpoint::CheckpointSequenceNumber; pub(crate) type PgPool = Pool>; @@ -71,3 +74,49 @@ pub fn get_latest_eth_token_transfer( .optional() } } + +pub struct PgProgressStore { + pool: PgPool, + bridge_genesis_checkpoint: u64, +} + +impl PgProgressStore { + pub fn new(pool: PgPool, bridge_genesis_checkpoint: u64) -> Self { + PgProgressStore { + pool, + bridge_genesis_checkpoint, + } + } +} +#[async_trait] +impl ProgressStore for PgProgressStore { + async fn load(&mut self, task_name: String) -> anyhow::Result { + let mut conn = self.pool.get()?; + let cp: Option = progress_store + .find(task_name) + .select(DBProgressStore::as_select()) + .first(&mut conn) + .optional()?; + Ok(cp + .map(|d| d.checkpoint as u64) + .unwrap_or(self.bridge_genesis_checkpoint)) + } + + async fn save( + &mut self, + task_name: String, + checkpoint_number: CheckpointSequenceNumber, + ) -> anyhow::Result<()> { + let mut conn = self.pool.get()?; + diesel::insert_into(schema::progress_store::table) + .values(&DBProgressStore { + task_name, + checkpoint: checkpoint_number as i64, + }) + .on_conflict(schema::progress_store::dsl::task_name) + .do_update() + .set(checkpoint.eq(checkpoint_number as i64)) + .execute(&mut conn)?; + Ok(()) + } +} diff --git a/crates/sui-bridge-indexer/src/schema.rs b/crates/sui-bridge-indexer/src/schema.rs index 791ef52b96868..7ba9c6213cda5 100644 --- a/crates/sui-bridge-indexer/src/schema.rs +++ b/crates/sui-bridge-indexer/src/schema.rs @@ -3,6 +3,13 @@ // @generated automatically by Diesel CLI. +diesel::table! { + progress_store (task_name) { + task_name -> Text, + checkpoint -> Int8, + } +} + diesel::table! { token_transfer (chain_id, nonce, status) { chain_id -> Int4, @@ -32,4 +39,4 @@ diesel::table! { } } -diesel::allow_tables_to_appear_in_same_query!(token_transfer, token_transfer_data); +diesel::allow_tables_to_appear_in_same_query!(progress_store, token_transfer, token_transfer_data,);