Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

indexer-alt: file-based configs #20461

Merged
merged 12 commits into from
Nov 29, 2024
Prev Previous commit
Next Next commit
indexer-alt: file-based configs -- add back --pipeline flag
## Description

Add back the `--pipeline` command-line argument, layered on top of the
file-based configuration.

## Test plan

```
sui$ cargo run -p sui-indexer-alt -- generate-config > /tmp/indexer.toml
# Remove some pipelines

sui$ cargo run -p sui-indexer-alt -- indexer            \
  --last-checkpoint 10000                               \
  --remote-store-url https://checkpoints.mainnet.sui.io \
  --config /tmp/indexer.toml                            \
  --pipeline i_dont_exist

sui$ cargo run -p sui-indexer-alt -- indexer            \
  --last-checkpoint 10000                               \
  --remote-store-url https://checkpoints.mainnet.sui.io \
  --config /tmp/indexer.toml                            \
  --pipeline kv_objects --pipeline kv_transactions

sui$ cargo run -p sui-indexer-alt -- indexer            \
  --last-checkpoint 10000                               \
  --remote-store-url https://checkpoints.mainnet.sui.io \
  --config /tmp/indexer.toml
```
  • Loading branch information
amnn committed Nov 29, 2024
commit 4e5ea38cf8267a51d749953f6ba32c688c3577a8
11 changes: 10 additions & 1 deletion crates/sui-indexer-alt/src/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,22 @@ pub struct BenchmarkArgs {
/// Path to the local ingestion directory to read checkpoints data from.
#[arg(long)]
ingestion_path: PathBuf,

/// Only run the following pipelines. If not provided, all pipelines found in the
/// configuration file will be run.
#[arg(long, action = clap::ArgAction::Append)]
pipeline: Vec<String>,
}

pub async fn run_benchmark(
db_args: DbArgs,
benchmark_args: BenchmarkArgs,
indexer_config: IndexerConfig,
) -> anyhow::Result<()> {
let BenchmarkArgs { ingestion_path } = benchmark_args;
let BenchmarkArgs {
ingestion_path,
pipeline,
} = benchmark_args;

let ingestion_data = read_ingestion_data(&ingestion_path).await?;
let first_checkpoint = *ingestion_data.keys().next().unwrap();
Expand All @@ -34,6 +42,7 @@ pub async fn run_benchmark(
let indexer_args = IndexerArgs {
first_checkpoint: Some(first_checkpoint),
last_checkpoint: Some(last_checkpoint),
pipeline,
..Default::default()
};

Expand Down
32 changes: 32 additions & 0 deletions crates/sui-indexer-alt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ pub struct IndexerArgs {
#[arg(long)]
pub last_checkpoint: Option<u64>,

/// Only run the following pipelines. If not provided, all pipelines found in the
/// configuration file will be run.
#[arg(long, action = clap::ArgAction::Append)]
pipeline: Vec<String>,

/// Don't write to the watermark tables for concurrent pipelines.
#[arg(long)]
pub skip_watermark: bool,
Expand Down Expand Up @@ -90,6 +95,11 @@ pub struct Indexer {
/// Don't write to the watermark tables for concurrent pipelines.
skip_watermark: bool,

/// Optional filter for pipelines to run. If `None`, all pipelines added to the indexer will
/// run. Any pipelines that are present in this filter but not added to the indexer will yield
/// a warning when the indexer is run.
enabled_pipelines: Option<BTreeSet<String>>,

/// Pipelines that have already been registered with the indexer. Used to make sure a pipeline
/// with the same name isn't added twice.
added_pipelines: BTreeSet<&'static str>,
Expand Down Expand Up @@ -117,6 +127,7 @@ impl Indexer {
let IndexerArgs {
first_checkpoint,
last_checkpoint,
pipeline,
skip_watermark,
metrics_address,
} = indexer_args;
Expand Down Expand Up @@ -148,6 +159,11 @@ impl Indexer {
first_checkpoint,
last_checkpoint,
skip_watermark,
enabled_pipelines: if pipeline.is_empty() {
None
} else {
Some(pipeline.into_iter().collect())
},
added_pipelines: BTreeSet::new(),
cancel,
first_checkpoint_from_watermark: u64::MAX,
Expand Down Expand Up @@ -274,6 +290,14 @@ impl Indexer {
/// Ingestion will stop after consuming the configured `last_checkpoint`, if one is provided,
/// or will continue until it tracks the tip of the network.
pub async fn run(mut self) -> Result<JoinHandle<()>> {
if let Some(enabled_pipelines) = self.enabled_pipelines {
ensure!(
enabled_pipelines.is_empty(),
"Tried to enable pipelines that this indexer does not know about: \
{enabled_pipelines:#?}",
);
}

let metrics_handle = self
.metrics_service
.run()
Expand Down Expand Up @@ -328,6 +352,13 @@ impl Indexer {
P::NAME,
);

if let Some(enabled_pipelines) = &mut self.enabled_pipelines {
if !enabled_pipelines.remove(P::NAME) {
info!(pipeline = P::NAME, "Skipping");
return Ok(None);
}
}

let mut conn = self.db.connect().await.context("Failed DB connection")?;

let watermark = CommitterWatermark::get(&mut conn, P::NAME)
Expand All @@ -349,6 +380,7 @@ impl Default for IndexerArgs {
Self {
first_checkpoint: None,
last_checkpoint: None,
pipeline: vec![],
skip_watermark: false,
metrics_address: "0.0.0.0:9184".parse().unwrap(),
}
Expand Down