Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

indexer-alt: file-based configs #20461

Merged
merged 12 commits into from
Nov 29, 2024
10 changes: 1 addition & 9 deletions crates/sui-indexer-alt/src/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,14 @@ pub struct BenchmarkArgs {
/// Path to the local ingestion directory to read checkpoints data from.
#[arg(long)]
ingestion_path: PathBuf,

/// Only run the following pipelines. If not provided, all pipelines will be run.
#[arg(long, action = clap::ArgAction::Append)]
pipeline: Vec<String>,
amnn marked this conversation as resolved.
Show resolved Hide resolved
}

pub async fn run_benchmark(
db_args: DbArgs,
benchmark_args: BenchmarkArgs,
mut indexer_config: IndexerConfig,
) -> anyhow::Result<()> {
let BenchmarkArgs {
ingestion_path,
pipeline,
} = benchmark_args;
let BenchmarkArgs { ingestion_path } = benchmark_args;

let ingestion_data = read_ingestion_data(&ingestion_path).await?;
let first_checkpoint = *ingestion_data.keys().next().unwrap();
Expand All @@ -40,7 +33,6 @@ pub async fn run_benchmark(
let indexer_args = IndexerArgs {
first_checkpoint: Some(first_checkpoint),
last_checkpoint: Some(last_checkpoint),
pipeline,
..Default::default()
};

Expand Down
44 changes: 22 additions & 22 deletions crates/sui-indexer-alt/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,32 +67,32 @@ pub struct ConcurrentLayer {
#[serde(rename_all = "snake_case")]
pub struct PipelineConfig {
// Consistent pipelines (a sequential pipeline with a write-ahead log)
pub sum_coin_balances: CommitterLayer,
pub wal_coin_balances: CommitterLayer,
pub sum_obj_types: CommitterLayer,
pub wal_obj_types: CommitterLayer,
pub sum_coin_balances: Option<CommitterLayer>,
amnn marked this conversation as resolved.
Show resolved Hide resolved
pub wal_coin_balances: Option<CommitterLayer>,
pub sum_obj_types: Option<CommitterLayer>,
pub wal_obj_types: Option<CommitterLayer>,

// Sequential pipelines without a write-ahead log
pub sum_displays: SequentialLayer,
pub sum_packages: SequentialLayer,
pub sum_displays: Option<SequentialLayer>,
pub sum_packages: Option<SequentialLayer>,

// All concurrent pipelines
pub ev_emit_mod: ConcurrentLayer,
pub ev_struct_inst: ConcurrentLayer,
pub kv_checkpoints: ConcurrentLayer,
pub kv_epoch_ends: ConcurrentLayer,
pub kv_epoch_starts: ConcurrentLayer,
pub kv_feature_flags: ConcurrentLayer,
pub kv_objects: ConcurrentLayer,
pub kv_protocol_configs: ConcurrentLayer,
pub kv_transactions: ConcurrentLayer,
pub obj_versions: ConcurrentLayer,
pub tx_affected_addresses: ConcurrentLayer,
pub tx_affected_objects: ConcurrentLayer,
pub tx_balance_changes: ConcurrentLayer,
pub tx_calls: ConcurrentLayer,
pub tx_digests: ConcurrentLayer,
pub tx_kinds: ConcurrentLayer,
pub ev_emit_mod: Option<ConcurrentLayer>,
pub ev_struct_inst: Option<ConcurrentLayer>,
pub kv_checkpoints: Option<ConcurrentLayer>,
pub kv_epoch_ends: Option<ConcurrentLayer>,
pub kv_epoch_starts: Option<ConcurrentLayer>,
pub kv_feature_flags: Option<ConcurrentLayer>,
pub kv_objects: Option<ConcurrentLayer>,
pub kv_protocol_configs: Option<ConcurrentLayer>,
pub kv_transactions: Option<ConcurrentLayer>,
pub obj_versions: Option<ConcurrentLayer>,
pub tx_affected_addresses: Option<ConcurrentLayer>,
pub tx_affected_objects: Option<ConcurrentLayer>,
pub tx_balance_changes: Option<ConcurrentLayer>,
pub tx_calls: Option<ConcurrentLayer>,
pub tx_digests: Option<ConcurrentLayer>,
pub tx_kinds: Option<ConcurrentLayer>,
}

impl SequentialLayer {
Expand Down
89 changes: 33 additions & 56 deletions crates/sui-indexer-alt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@ pub struct IndexerArgs {
#[arg(long)]
pub skip_watermark: bool,

/// Only run the following pipelines -- useful for backfills. If not provided, all pipelines
/// will be run.
#[arg(long, action = clap::ArgAction::Append)]
pub pipeline: Vec<String>,

/// Address to serve Prometheus Metrics from.
#[arg(long, default_value_t = Self::default().metrics_address)]
pub metrics_address: SocketAddr,
Expand Down Expand Up @@ -95,9 +90,6 @@ pub struct Indexer {
/// Don't write to the watermark tables for concurrent pipelines.
skip_watermark: bool,

/// Optional override of enabled pipelines.
enabled_pipelines: Option<BTreeSet<String>>,

/// Pipelines that have already been registered with the indexer. Used to make sure a pipeline
/// with the same name isn't added twice.
added_pipelines: BTreeSet<&'static str>,
Expand Down Expand Up @@ -125,7 +117,6 @@ impl Indexer {
first_checkpoint,
last_checkpoint,
skip_watermark,
pipeline,
metrics_address,
} = indexer_args;

Expand All @@ -143,8 +134,6 @@ impl Indexer {
let ingestion_service =
IngestionService::new(ingestion_config, metrics.clone(), cancel.clone())?;

let enabled_pipelines: BTreeSet<_> = pipeline.into_iter().collect();

Ok(Self {
db,
metrics,
Expand All @@ -153,11 +142,6 @@ impl Indexer {
first_checkpoint,
last_checkpoint,
skip_watermark,
enabled_pipelines: if enabled_pipelines.is_empty() {
None
} else {
Some(enabled_pipelines)
},
added_pipelines: BTreeSet::new(),
cancel,
first_checkpoint_from_watermark: u64::MAX,
Expand Down Expand Up @@ -284,13 +268,6 @@ impl Indexer {
/// Ingestion will stop after consuming the configured `last_checkpoint`, if one is provided,
/// or will continue until it tracks the tip of the network.
pub async fn run(mut self) -> Result<JoinHandle<()>> {
if let Some(enabled_pipelines) = &self.enabled_pipelines {
ensure!(
enabled_pipelines.is_empty(),
"Tried to enable pipelines that this indexer does not know about: {enabled_pipelines:#?}",
);
}

let metrics_handle = self
.metrics_service
.run()
Expand Down Expand Up @@ -345,13 +322,6 @@ impl Indexer {
P::NAME,
);

if let Some(enabled_pipelines) = &mut self.enabled_pipelines {
if !enabled_pipelines.remove(P::NAME) {
info!("Skipping pipeline {}", P::NAME);
return Ok(None);
}
}

let mut conn = self.db.connect().await.context("Failed DB connection")?;

let watermark = CommitterWatermark::get(&mut conn, P::NAME)
Expand All @@ -374,7 +344,6 @@ impl Default for IndexerArgs {
first_checkpoint: None,
last_checkpoint: None,
skip_watermark: false,
pipeline: vec![],
metrics_address: "0.0.0.0:9184".parse().unwrap(),
}
}
Expand Down Expand Up @@ -445,41 +414,49 @@ pub async fn start_indexer(

macro_rules! add_concurrent {
($handler:expr, $config:expr) => {
indexer
.concurrent_pipeline($handler, $config.finish(&committer))
.await?
if let Some(layer) = $config {
indexer
.concurrent_pipeline($handler, layer.finish(&committer))
.await?
}
};
}

macro_rules! add_sequential {
($handler:expr, $config:expr) => {
indexer
.sequential_pipeline($handler, $config.finish(&committer))
.await?
if let Some(layer) = $config {
indexer
.sequential_pipeline($handler, layer.finish(&committer))
.await?
}
};
}

macro_rules! add_consistent {
($sum_handler:expr, $sum_config:expr; $wal_handler:expr, $wal_config:expr) => {
indexer
.sequential_pipeline(
$sum_handler,
SequentialConfig {
committer: $sum_config.finish(&committer),
checkpoint_lag,
},
)
.await?;

indexer
.concurrent_pipeline(
$wal_handler,
ConcurrentConfig {
committer: $wal_config.finish(&committer),
pruner: pruner_config.clone(),
},
)
.await?;
if let Some(sum_layer) = $sum_config {
indexer
.sequential_pipeline(
$sum_handler,
SequentialConfig {
committer: sum_layer.finish(&committer),
checkpoint_lag,
},
)
.await?;

if let Some(pruner_config) = pruner_config.clone() {
indexer
.concurrent_pipeline(
$wal_handler,
ConcurrentConfig {
committer: $wal_config.unwrap_or_default().finish(&committer),
pruner: Some(pruner_config),
},
)
.await?;
}
}
};
}

Expand Down