Skip to content

Commit

Permalink
[sui native bridge] - store sui ingression progress in db (#18289)
Browse files Browse the repository at this point in the history
## Description 

Store sui checkpoint ingression progress in db instead of files

## Test plan 

deployed and tested on testnet

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
patrickkuo authored Jun 20, 2024
1 parent 4b934f8 commit 4c16fc1
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 16 deletions.
2 changes: 2 additions & 0 deletions crates/sui-bridge-indexer/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# checkpoints_path: <path>
# Number of concurrent operations
# concurrency: 1
# Bridge genesis checkpoint in SUI, for testnet = 43917829
# bridge_genesis_checkpoint: <sui bridge genesis checkpoint>
# Ethereum to Sui bridge contract address
# eth_sui_bridge_contract_address: <contract_address>
# Starting block number
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-bridge-indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ pub struct Config {
pub remote_store_url: String,
pub eth_rpc_url: String,
pub db_url: String,
pub progress_store_file: String,
pub checkpoints_path: String,
pub concurrency: u64,
pub bridge_genesis_checkpoint: u64,
pub eth_sui_bridge_contract_address: String,
pub start_block: u64,
pub metric_url: String,
Expand Down
16 changes: 8 additions & 8 deletions crates/sui-bridge-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,17 @@ use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use sui_bridge::{eth_client::EthClient, eth_syncer::EthSyncer};
use sui_bridge_indexer::latest_eth_syncer::LatestEthSyncer;
use sui_bridge_indexer::postgres_manager::get_connection_pool;
use sui_bridge_indexer::postgres_manager::get_latest_eth_token_transfer;
use sui_bridge_indexer::sui_worker::SuiBridgeWorker;
use sui_bridge_indexer::{config::load_config, metrics::BridgeIndexerMetrics};
use sui_data_ingestion_core::{
DataIngestionMetrics, FileProgressStore, IndexerExecutor, ReaderOptions, WorkerPool,
};
use sui_data_ingestion_core::{DataIngestionMetrics, IndexerExecutor, ReaderOptions, WorkerPool};
use tokio::sync::oneshot;
use tracing::info;

use sui_bridge_indexer::eth_worker::process_eth_events;
use sui_bridge_indexer::latest_eth_syncer::LatestEthSyncer;
use sui_bridge_indexer::postgres_manager::{
get_connection_pool, get_latest_eth_token_transfer, PgProgressStore,
};
use sui_bridge_indexer::sui_worker::SuiBridgeWorker;

#[derive(Parser, Clone, Debug)]
struct Args {
Expand Down Expand Up @@ -84,7 +83,8 @@ async fn start_processing_sui_checkpoints(
let (_exit_sender, exit_receiver) = oneshot::channel();
let metrics = DataIngestionMetrics::new(&Registry::new());

let progress_store = FileProgressStore::new(config.progress_store_file.clone().into());
let pg_pool = get_connection_pool(config.db_url.clone());
let progress_store = PgProgressStore::new(pg_pool, config.bridge_genesis_checkpoint);
let mut executor = IndexerExecutor::new(progress_store, 1 /* workflow types */, metrics);

let indexer_metrics_cloned = indexer_meterics.clone();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
-- This file should undo anything in `up.sql`
DROP TABLE IF EXISTS token_transfer;
DROP TABLE IF EXISTS token_transfer_data;
DROP TABLE IF EXISTS progress_store;
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,9 @@ CREATE TABLE token_transfer
);
CREATE INDEX token_transfer_block_height ON token_transfer (block_height);
CREATE INDEX token_transfer_timestamp_ms ON token_transfer (timestamp_ms);

CREATE TABLE progress_store
(
task_name TEXT PRIMARY KEY,
checkpoint BIGINT NOT NULL
);
9 changes: 8 additions & 1 deletion crates/sui-bridge-indexer/src/models.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::schema::{token_transfer, token_transfer_data};
use crate::schema::{progress_store, token_transfer, token_transfer_data};
use diesel::{Identifiable, Insertable, Queryable, Selectable};

#[derive(Queryable, Selectable, Insertable, Identifiable, Debug)]
#[diesel(table_name = progress_store, primary_key(task_name))]
pub struct ProgressStore {
pub task_name: String,
pub checkpoint: i64,
}

#[derive(Queryable, Selectable, Insertable, Identifiable, Debug)]
#[diesel(table_name = token_transfer, primary_key(chain_id, nonce))]
pub struct TokenTransfer {
Expand Down
59 changes: 54 additions & 5 deletions crates/sui-bridge-indexer/src/postgres_manager.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::models::ProgressStore as DBProgressStore;
use crate::models::TokenTransfer as DBTokenTransfer;
use crate::models::TokenTransferData as DBTokenTransferData;
use crate::schema::progress_store::checkpoint;
use crate::schema::progress_store::dsl::progress_store;
use crate::schema::token_transfer_data;
use crate::{schema::token_transfer, TokenTransfer};
use crate::{schema, schema::token_transfer, TokenTransfer};
use async_trait::async_trait;
use diesel::result::Error;
use diesel::BoolExpressionMethods;
use diesel::ExpressionMethods;
use diesel::OptionalExtension;
use diesel::QueryDsl;
use diesel::{
pg::PgConnection,
r2d2::{ConnectionManager, Pool},
Connection, RunQueryDsl,
Connection, ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl, SelectableHelper,
};
use sui_data_ingestion_core::ProgressStore;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;

pub(crate) type PgPool = Pool<ConnectionManager<PgConnection>>;

Expand Down Expand Up @@ -71,3 +74,49 @@ pub fn get_latest_eth_token_transfer(
.optional()
}
}

pub struct PgProgressStore {
pool: PgPool,
bridge_genesis_checkpoint: u64,
}

impl PgProgressStore {
pub fn new(pool: PgPool, bridge_genesis_checkpoint: u64) -> Self {
PgProgressStore {
pool,
bridge_genesis_checkpoint,
}
}
}
#[async_trait]
impl ProgressStore for PgProgressStore {
async fn load(&mut self, task_name: String) -> anyhow::Result<CheckpointSequenceNumber> {
let mut conn = self.pool.get()?;
let cp: Option<DBProgressStore> = progress_store
.find(task_name)
.select(DBProgressStore::as_select())
.first(&mut conn)
.optional()?;
Ok(cp
.map(|d| d.checkpoint as u64)
.unwrap_or(self.bridge_genesis_checkpoint))
}

async fn save(
&mut self,
task_name: String,
checkpoint_number: CheckpointSequenceNumber,
) -> anyhow::Result<()> {
let mut conn = self.pool.get()?;
diesel::insert_into(schema::progress_store::table)
.values(&DBProgressStore {
task_name,
checkpoint: checkpoint_number as i64,
})
.on_conflict(schema::progress_store::dsl::task_name)
.do_update()
.set(checkpoint.eq(checkpoint_number as i64))
.execute(&mut conn)?;
Ok(())
}
}
9 changes: 8 additions & 1 deletion crates/sui-bridge-indexer/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@

// @generated automatically by Diesel CLI.

diesel::table! {
progress_store (task_name) {
task_name -> Text,
checkpoint -> Int8,
}
}

diesel::table! {
token_transfer (chain_id, nonce, status) {
chain_id -> Int4,
Expand Down Expand Up @@ -32,4 +39,4 @@ diesel::table! {
}
}

diesel::allow_tables_to_appear_in_same_query!(token_transfer, token_transfer_data);
diesel::allow_tables_to_appear_in_same_query!(progress_store, token_transfer, token_transfer_data,);

0 comments on commit 4c16fc1

Please sign in to comment.