From 01df96369f95bb28049bf701cb6c0217295dbb72 Mon Sep 17 00:00:00 2001 From: phoenix <51927076+phoenix-o@users.noreply.github.com> Date: Thu, 19 Dec 2024 10:26:19 -0500 Subject: [PATCH] [kv store] add watermark table to bigtable --- Cargo.lock | 1 + crates/sui-data-ingestion/src/main.rs | 21 ++++++++++-- .../sui-data-ingestion/src/progress_store.rs | 11 +++++++ crates/sui-kvstore/Cargo.toml | 1 + crates/sui-kvstore/src/bigtable/client.rs | 12 ++++++- crates/sui-kvstore/src/bigtable/init.sh | 3 +- crates/sui-kvstore/src/bigtable/mod.rs | 1 + .../src/bigtable/progress_store.rs | 29 ++++++++++++++++ crates/sui-kvstore/src/lib.rs | 2 ++ crates/sui-kvstore/src/main.rs | 33 ++++++++++++------- 10 files changed, 98 insertions(+), 16 deletions(-) create mode 100644 crates/sui-kvstore/src/bigtable/progress_store.rs diff --git a/Cargo.lock b/Cargo.lock index a4984024d8bf6..98dca2e4b7814 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14287,6 +14287,7 @@ dependencies = [ "sui-data-ingestion-core", "sui-types", "telemetry-subscribers", + "tempfile", "tokio", "tonic 0.12.3", "tracing", diff --git a/crates/sui-data-ingestion/src/main.rs b/crates/sui-data-ingestion/src/main.rs index cd2c00694a07d..e82a503588864 100644 --- a/crates/sui-data-ingestion/src/main.rs +++ b/crates/sui-data-ingestion/src/main.rs @@ -13,7 +13,7 @@ use sui_data_ingestion::{ }; use sui_data_ingestion_core::{DataIngestionMetrics, ReaderOptions}; use sui_data_ingestion_core::{IndexerExecutor, WorkerPool}; -use sui_kvstore::{BigTableClient, KvWorker}; +use sui_kvstore::{BigTableClient, BigTableProgressStore, KvWorker}; use tokio::signal; use tokio::sync::oneshot; @@ -119,12 +119,30 @@ async fn main() -> Result<()> { mysten_metrics::init_metrics(®istry); let metrics = DataIngestionMetrics::new(®istry); + let mut bigtable_store = None; + for task in &config.tasks { + if let Task::BigTableKV(kv_config) = &task.task { + std::env::set_var( + "GOOGLE_APPLICATION_CREDENTIALS", + kv_config.credentials.clone(), + ); + let bigtable_client = BigTableClient::new_remote( + kv_config.instance_id.clone(), + false, + Some(Duration::from_secs(kv_config.timeout_secs as u64)), + ) + .await?; + bigtable_store = Some(BigTableProgressStore::new(bigtable_client)); + } + } + let progress_store = DynamoDBProgressStore::new( &config.progress_store.aws_access_key_id, &config.progress_store.aws_secret_access_key, config.progress_store.aws_region, config.progress_store.table_name, config.is_backfill, + bigtable_store, ) .await; let mut executor = IndexerExecutor::new(progress_store, config.tasks.len(), metrics); @@ -160,7 +178,6 @@ async fn main() -> Result<()> { executor.register(worker_pool).await?; } Task::BigTableKV(kv_config) => { - std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS", kv_config.credentials); let client = BigTableClient::new_remote( kv_config.instance_id, false, diff --git a/crates/sui-data-ingestion/src/progress_store.rs b/crates/sui-data-ingestion/src/progress_store.rs index 02857becfc626..2bc699d93bc18 100644 --- a/crates/sui-data-ingestion/src/progress_store.rs +++ b/crates/sui-data-ingestion/src/progress_store.rs @@ -11,12 +11,14 @@ use aws_sdk_s3::config::{Credentials, Region}; use std::str::FromStr; use std::time::Duration; use sui_data_ingestion_core::ProgressStore; +use sui_kvstore::BigTableProgressStore; use sui_types::messages_checkpoint::CheckpointSequenceNumber; pub struct DynamoDBProgressStore { client: Client, table_name: String, is_backfill: bool, + bigtable_store: Option, } impl DynamoDBProgressStore { @@ -26,6 +28,7 @@ impl DynamoDBProgressStore { aws_region: String, table_name: String, is_backfill: bool, + bigtable_store: Option, ) -> Self { let credentials = Credentials::new( aws_access_key_id, @@ -50,6 +53,7 @@ impl DynamoDBProgressStore { client, table_name, is_backfill, + bigtable_store, } } } @@ -79,6 +83,13 @@ impl ProgressStore for DynamoDBProgressStore { if self.is_backfill && checkpoint_number % 1000 != 0 { return Ok(()); } + if task_name == "bigtable" { + if let Some(ref mut bigtable_store) = self.bigtable_store { + bigtable_store + .save(task_name.clone(), checkpoint_number) + .await?; + } + } let backoff = backoff::ExponentialBackoff::default(); backoff::future::retry(backoff, || async { let result = self diff --git a/crates/sui-kvstore/Cargo.toml b/crates/sui-kvstore/Cargo.toml index a06bacd9ace66..dd78b873ce808 100644 --- a/crates/sui-kvstore/Cargo.toml +++ b/crates/sui-kvstore/Cargo.toml @@ -20,6 +20,7 @@ serde.workspace = true sui-data-ingestion-core.workspace = true sui-types.workspace = true telemetry-subscribers.workspace = true +tempfile.workspace = true tokio = { workspace = true, features = ["full"] } tonic = {version = "0.12.2",features = ["tls", "transport"] } tracing.workspace = true diff --git a/crates/sui-kvstore/src/bigtable/client.rs b/crates/sui-kvstore/src/bigtable/client.rs index 5c85447622827..44e5b9d01a698 100644 --- a/crates/sui-kvstore/src/bigtable/client.rs +++ b/crates/sui-kvstore/src/bigtable/client.rs @@ -35,6 +35,7 @@ const OBJECTS_TABLE: &str = "objects"; const TRANSACTIONS_TABLE: &str = "transactions"; const CHECKPOINTS_TABLE: &str = "checkpoints"; const CHECKPOINTS_BY_DIGEST_TABLE: &str = "checkpoints_by_digest"; +const WATERMARK_TABLE: &str = "watermark"; const COLUMN_FAMILY_NAME: &str = "sui"; const DEFAULT_COLUMN_QUALIFIER: &str = ""; @@ -131,6 +132,15 @@ impl KeyValueStoreWriter for BigTableClient { ) .await } + + async fn save_watermark(&mut self, watermark: CheckpointSequenceNumber) -> Result<()> { + let key = watermark.to_be_bytes().to_vec(); + self.multi_set( + WATERMARK_TABLE, + [(key, vec![(DEFAULT_COLUMN_QUALIFIER, vec![])])], + ) + .await + } } #[async_trait] @@ -239,7 +249,7 @@ impl KeyValueStoreReader for BigTableClient { async fn get_latest_checkpoint(&mut self) -> Result { let upper_limit = u64::MAX.to_be_bytes().to_vec(); match self - .reversed_scan(CHECKPOINTS_TABLE, upper_limit) + .reversed_scan(WATERMARK_TABLE, upper_limit) .await? .pop() { diff --git a/crates/sui-kvstore/src/bigtable/init.sh b/crates/sui-kvstore/src/bigtable/init.sh index f96ac5c1e9827..5d314b275fe37 100755 --- a/crates/sui-kvstore/src/bigtable/init.sh +++ b/crates/sui-kvstore/src/bigtable/init.sh @@ -10,7 +10,7 @@ if [[ -n $BIGTABLE_EMULATOR_HOST ]]; then command+=(-project emulator) fi -for table in objects transactions checkpoints checkpoints_by_digest; do +for table in objects transactions checkpoints checkpoints_by_digest watermark; do ( set -x "${command[@]}" createtable $table @@ -18,3 +18,4 @@ for table in objects transactions checkpoints checkpoints_by_digest; do "${command[@]}" setgcpolicy $table sui maxversions=1 ) done +"${command[@]}" setgcpolicy watermark sui maxage=2d diff --git a/crates/sui-kvstore/src/bigtable/mod.rs b/crates/sui-kvstore/src/bigtable/mod.rs index 9be9541c15ec4..58985d241ca94 100644 --- a/crates/sui-kvstore/src/bigtable/mod.rs +++ b/crates/sui-kvstore/src/bigtable/mod.rs @@ -2,5 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 pub(crate) mod client; +pub(crate) mod progress_store; mod proto; pub(crate) mod worker; diff --git a/crates/sui-kvstore/src/bigtable/progress_store.rs b/crates/sui-kvstore/src/bigtable/progress_store.rs new file mode 100644 index 0000000000000..e03ee86f667da --- /dev/null +++ b/crates/sui-kvstore/src/bigtable/progress_store.rs @@ -0,0 +1,29 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use crate::{BigTableClient, KeyValueStoreReader, KeyValueStoreWriter}; +use anyhow::Result; +use async_trait::async_trait; +use sui_data_ingestion_core::ProgressStore; +use sui_types::messages_checkpoint::CheckpointSequenceNumber; + +pub struct BigTableProgressStore { + client: BigTableClient, +} + +impl BigTableProgressStore { + pub fn new(client: BigTableClient) -> Self { + Self { client } + } +} + +#[async_trait] +impl ProgressStore for BigTableProgressStore { + async fn load(&mut self, _: String) -> Result { + self.client.get_latest_checkpoint().await + } + + async fn save(&mut self, _: String, checkpoint_number: CheckpointSequenceNumber) -> Result<()> { + self.client.save_watermark(checkpoint_number).await + } +} diff --git a/crates/sui-kvstore/src/lib.rs b/crates/sui-kvstore/src/lib.rs index 5d3ab55a3f64a..dd0724a48c733 100644 --- a/crates/sui-kvstore/src/lib.rs +++ b/crates/sui-kvstore/src/lib.rs @@ -4,6 +4,7 @@ mod bigtable; use anyhow::Result; use async_trait::async_trait; pub use bigtable::client::BigTableClient; +pub use bigtable::progress_store::BigTableProgressStore; pub use bigtable::worker::KvWorker; use sui_types::base_types::ObjectID; use sui_types::crypto::AuthorityStrongQuorumSignInfo; @@ -41,6 +42,7 @@ pub trait KeyValueStoreWriter { async fn save_objects(&mut self, objects: &[&Object]) -> Result<()>; async fn save_transactions(&mut self, transactions: &[TransactionData]) -> Result<()>; async fn save_checkpoint(&mut self, checkpoint: &CheckpointData) -> Result<()>; + async fn save_watermark(&mut self, watermark: CheckpointSequenceNumber) -> Result<()>; } #[derive(Clone, Debug)] diff --git a/crates/sui-kvstore/src/main.rs b/crates/sui-kvstore/src/main.rs index 82f9ceac61315..b25768216d51f 100644 --- a/crates/sui-kvstore/src/main.rs +++ b/crates/sui-kvstore/src/main.rs @@ -1,10 +1,11 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 use anyhow::Result; -use sui_data_ingestion_core::setup_single_workflow; -use sui_kvstore::BigTableClient; -use sui_kvstore::KvWorker; +use prometheus::Registry; +use sui_data_ingestion_core::{DataIngestionMetrics, IndexerExecutor, ReaderOptions, WorkerPool}; +use sui_kvstore::{BigTableClient, BigTableProgressStore, KvWorker}; use telemetry_subscribers::TelemetryConfig; +use tokio::sync::oneshot; #[tokio::main] async fn main() -> Result<()> { @@ -20,16 +21,24 @@ async fn main() -> Result<()> { network == "mainnet" || network == "testnet", "Invalid network name" ); + let client = BigTableClient::new_local(instance_id).await?; - let client = BigTableClient::new_remote(instance_id, false, None).await?; - let (executor, _term_sender) = setup_single_workflow( - KvWorker { client }, - format!("https://checkpoints.{}.sui.io", network), - 0, + let (_exit_sender, exit_receiver) = oneshot::channel(); + let mut executor = IndexerExecutor::new( + BigTableProgressStore::new(client.clone()), 1, - None, - ) - .await?; - executor.await?; + DataIngestionMetrics::new(&Registry::new()), + ); + let worker_pool = WorkerPool::new(KvWorker { client }, "bigtable".to_string(), 50); + executor.register(worker_pool).await?; + executor + .run( + tempfile::tempdir()?.into_path(), + Some(format!("https://checkpoints.{}.sui.io", network)), + vec![], + ReaderOptions::default(), + exit_receiver, + ) + .await?; Ok(()) }