Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[kv store] add watermark table to bigtable #20705

Merged
merged 1 commit into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
[kv store] add watermark table to bigtable
  • Loading branch information
phoenix-o committed Dec 23, 2024
commit 01df96369f95bb28049bf701cb6c0217295dbb72
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 19 additions & 2 deletions crates/sui-data-ingestion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use sui_data_ingestion::{
};
use sui_data_ingestion_core::{DataIngestionMetrics, ReaderOptions};
use sui_data_ingestion_core::{IndexerExecutor, WorkerPool};
use sui_kvstore::{BigTableClient, KvWorker};
use sui_kvstore::{BigTableClient, BigTableProgressStore, KvWorker};
use tokio::signal;
use tokio::sync::oneshot;

Expand Down Expand Up @@ -119,12 +119,30 @@ async fn main() -> Result<()> {
mysten_metrics::init_metrics(&registry);
let metrics = DataIngestionMetrics::new(&registry);

let mut bigtable_store = None;
for task in &config.tasks {
if let Task::BigTableKV(kv_config) = &task.task {
std::env::set_var(
"GOOGLE_APPLICATION_CREDENTIALS",
kv_config.credentials.clone(),
);
let bigtable_client = BigTableClient::new_remote(
kv_config.instance_id.clone(),
false,
Some(Duration::from_secs(kv_config.timeout_secs as u64)),
)
.await?;
bigtable_store = Some(BigTableProgressStore::new(bigtable_client));
}
}

let progress_store = DynamoDBProgressStore::new(
&config.progress_store.aws_access_key_id,
&config.progress_store.aws_secret_access_key,
config.progress_store.aws_region,
config.progress_store.table_name,
config.is_backfill,
bigtable_store,
)
.await;
let mut executor = IndexerExecutor::new(progress_store, config.tasks.len(), metrics);
Expand Down Expand Up @@ -160,7 +178,6 @@ async fn main() -> Result<()> {
executor.register(worker_pool).await?;
}
Task::BigTableKV(kv_config) => {
std::env::set_var("GOOGLE_APPLICATION_CREDENTIALS", kv_config.credentials);
let client = BigTableClient::new_remote(
kv_config.instance_id,
false,
Expand Down
11 changes: 11 additions & 0 deletions crates/sui-data-ingestion/src/progress_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ use aws_sdk_s3::config::{Credentials, Region};
use std::str::FromStr;
use std::time::Duration;
use sui_data_ingestion_core::ProgressStore;
use sui_kvstore::BigTableProgressStore;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;

pub struct DynamoDBProgressStore {
client: Client,
table_name: String,
is_backfill: bool,
bigtable_store: Option<BigTableProgressStore>,
}

impl DynamoDBProgressStore {
Expand All @@ -26,6 +28,7 @@ impl DynamoDBProgressStore {
aws_region: String,
table_name: String,
is_backfill: bool,
bigtable_store: Option<BigTableProgressStore>,
) -> Self {
let credentials = Credentials::new(
aws_access_key_id,
Expand All @@ -50,6 +53,7 @@ impl DynamoDBProgressStore {
client,
table_name,
is_backfill,
bigtable_store,
}
}
}
Expand Down Expand Up @@ -79,6 +83,13 @@ impl ProgressStore for DynamoDBProgressStore {
if self.is_backfill && checkpoint_number % 1000 != 0 {
return Ok(());
}
if task_name == "bigtable" {
if let Some(ref mut bigtable_store) = self.bigtable_store {
bigtable_store
.save(task_name.clone(), checkpoint_number)
.await?;
}
}
let backoff = backoff::ExponentialBackoff::default();
backoff::future::retry(backoff, || async {
let result = self
Expand Down
1 change: 1 addition & 0 deletions crates/sui-kvstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ serde.workspace = true
sui-data-ingestion-core.workspace = true
sui-types.workspace = true
telemetry-subscribers.workspace = true
tempfile.workspace = true
tokio = { workspace = true, features = ["full"] }
tonic = {version = "0.12.2",features = ["tls", "transport"] }
tracing.workspace = true
12 changes: 11 additions & 1 deletion crates/sui-kvstore/src/bigtable/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const OBJECTS_TABLE: &str = "objects";
const TRANSACTIONS_TABLE: &str = "transactions";
const CHECKPOINTS_TABLE: &str = "checkpoints";
const CHECKPOINTS_BY_DIGEST_TABLE: &str = "checkpoints_by_digest";
const WATERMARK_TABLE: &str = "watermark";

const COLUMN_FAMILY_NAME: &str = "sui";
const DEFAULT_COLUMN_QUALIFIER: &str = "";
Expand Down Expand Up @@ -131,6 +132,15 @@ impl KeyValueStoreWriter for BigTableClient {
)
.await
}

async fn save_watermark(&mut self, watermark: CheckpointSequenceNumber) -> Result<()> {
let key = watermark.to_be_bytes().to_vec();
self.multi_set(
WATERMARK_TABLE,
[(key, vec![(DEFAULT_COLUMN_QUALIFIER, vec![])])],
)
.await
}
}

#[async_trait]
Expand Down Expand Up @@ -239,7 +249,7 @@ impl KeyValueStoreReader for BigTableClient {
async fn get_latest_checkpoint(&mut self) -> Result<CheckpointSequenceNumber> {
let upper_limit = u64::MAX.to_be_bytes().to_vec();
match self
.reversed_scan(CHECKPOINTS_TABLE, upper_limit)
.reversed_scan(WATERMARK_TABLE, upper_limit)
.await?
.pop()
{
Expand Down
3 changes: 2 additions & 1 deletion crates/sui-kvstore/src/bigtable/init.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ if [[ -n $BIGTABLE_EMULATOR_HOST ]]; then
command+=(-project emulator)
fi

for table in objects transactions checkpoints checkpoints_by_digest; do
for table in objects transactions checkpoints checkpoints_by_digest watermark; do
(
set -x
"${command[@]}" createtable $table
"${command[@]}" createfamily $table sui
"${command[@]}" setgcpolicy $table sui maxversions=1
)
done
"${command[@]}" setgcpolicy watermark sui maxage=2d
1 change: 1 addition & 0 deletions crates/sui-kvstore/src/bigtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
// SPDX-License-Identifier: Apache-2.0

pub(crate) mod client;
pub(crate) mod progress_store;
mod proto;
pub(crate) mod worker;
29 changes: 29 additions & 0 deletions crates/sui-kvstore/src/bigtable/progress_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::{BigTableClient, KeyValueStoreReader, KeyValueStoreWriter};
use anyhow::Result;
use async_trait::async_trait;
use sui_data_ingestion_core::ProgressStore;
use sui_types::messages_checkpoint::CheckpointSequenceNumber;

pub struct BigTableProgressStore {
client: BigTableClient,
}

impl BigTableProgressStore {
pub fn new(client: BigTableClient) -> Self {
Self { client }
}
}

#[async_trait]
impl ProgressStore for BigTableProgressStore {
async fn load(&mut self, _: String) -> Result<CheckpointSequenceNumber> {
self.client.get_latest_checkpoint().await
}

async fn save(&mut self, _: String, checkpoint_number: CheckpointSequenceNumber) -> Result<()> {
self.client.save_watermark(checkpoint_number).await
}
}
2 changes: 2 additions & 0 deletions crates/sui-kvstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod bigtable;
use anyhow::Result;
use async_trait::async_trait;
pub use bigtable::client::BigTableClient;
pub use bigtable::progress_store::BigTableProgressStore;
pub use bigtable::worker::KvWorker;
use sui_types::base_types::ObjectID;
use sui_types::crypto::AuthorityStrongQuorumSignInfo;
Expand Down Expand Up @@ -41,6 +42,7 @@ pub trait KeyValueStoreWriter {
async fn save_objects(&mut self, objects: &[&Object]) -> Result<()>;
async fn save_transactions(&mut self, transactions: &[TransactionData]) -> Result<()>;
async fn save_checkpoint(&mut self, checkpoint: &CheckpointData) -> Result<()>;
async fn save_watermark(&mut self, watermark: CheckpointSequenceNumber) -> Result<()>;
}

#[derive(Clone, Debug)]
Expand Down
33 changes: 21 additions & 12 deletions crates/sui-kvstore/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use anyhow::Result;
use sui_data_ingestion_core::setup_single_workflow;
use sui_kvstore::BigTableClient;
use sui_kvstore::KvWorker;
use prometheus::Registry;
use sui_data_ingestion_core::{DataIngestionMetrics, IndexerExecutor, ReaderOptions, WorkerPool};
use sui_kvstore::{BigTableClient, BigTableProgressStore, KvWorker};
use telemetry_subscribers::TelemetryConfig;
use tokio::sync::oneshot;

#[tokio::main]
async fn main() -> Result<()> {
Expand All @@ -20,16 +21,24 @@ async fn main() -> Result<()> {
network == "mainnet" || network == "testnet",
"Invalid network name"
);
let client = BigTableClient::new_local(instance_id).await?;

let client = BigTableClient::new_remote(instance_id, false, None).await?;
let (executor, _term_sender) = setup_single_workflow(
KvWorker { client },
format!("https://checkpoints.{}.sui.io", network),
0,
let (_exit_sender, exit_receiver) = oneshot::channel();
let mut executor = IndexerExecutor::new(
BigTableProgressStore::new(client.clone()),
1,
None,
)
.await?;
executor.await?;
DataIngestionMetrics::new(&Registry::new()),
);
let worker_pool = WorkerPool::new(KvWorker { client }, "bigtable".to_string(), 50);
executor.register(worker_pool).await?;
executor
.run(
tempfile::tempdir()?.into_path(),
Some(format!("https://checkpoints.{}.sui.io", network)),
vec![],
ReaderOptions::default(),
exit_receiver,
)
.await?;
Ok(())
}
Loading