diff --git a/crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs b/crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs index dad873e858b40f..578d019ffb4a35 100644 --- a/crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs +++ b/crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs @@ -10,6 +10,7 @@ use anyhow::{anyhow, bail, ensure}; use diesel::{upsert::excluded, ExpressionMethods}; use diesel_async::RunQueryDsl; use futures::future::{try_join_all, Either}; +use sui_field_count::FieldCount; use sui_types::{ base_types::ObjectID, effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData, object::Owner, @@ -22,6 +23,8 @@ use crate::{ schema::sum_coin_balances, }; +const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / StoredSumCoinBalance::FIELD_COUNT; +const MAX_DELETE_CHUNK_ROWS: usize = i16::MAX as usize; pub struct SumCoinBalances; impl Processor for SumCoinBalances { @@ -151,8 +154,8 @@ impl Handler for SumCoinBalances { deletes.push(update.object_id.to_vec()); } } - let update_chunks = updates.chunks(Self::INSERT_CHUNK_ROWS).map(Either::Left); - let delete_chunks = deletes.chunks(Self::DELETE_CHUNK_ROWS).map(Either::Right); + let update_chunks = updates.chunks(MAX_INSERT_CHUNK_ROWS).map(Either::Left); + let delete_chunks = deletes.chunks(MAX_DELETE_CHUNK_ROWS).map(Either::Right); let futures = update_chunks.chain(delete_chunks).map(|chunk| match chunk { Either::Left(update) => Either::Left( diff --git a/crates/sui-indexer-alt/src/handlers/sum_displays.rs b/crates/sui-indexer-alt/src/handlers/sum_displays.rs index 545df5bf614396..8d8803493281ab 100644 --- a/crates/sui-indexer-alt/src/handlers/sum_displays.rs +++ b/crates/sui-indexer-alt/src/handlers/sum_displays.rs @@ -7,6 +7,7 @@ use anyhow::{anyhow, Result}; use diesel::{upsert::excluded, ExpressionMethods}; use diesel_async::RunQueryDsl; use futures::future::try_join_all; +use sui_field_count::FieldCount; use sui_types::{display::DisplayVersionUpdatedEvent, full_checkpoint_content::CheckpointData}; use crate::{ @@ -16,6 +17,7 @@ use crate::{ schema::sum_displays, }; +const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / StoredDisplay::FIELD_COUNT; pub struct SumDisplays; impl Processor for SumDisplays { @@ -69,22 +71,20 @@ impl Handler for SumDisplays { async fn commit(batch: &Self::Batch, conn: &mut db::Connection<'_>) -> Result { let values: Vec<_> = batch.values().cloned().collect(); - let updates = - values - .chunks(Self::MAX_INSERT_CHUNK_ROWS) - .map(|chunk: &[StoredDisplay]| { - diesel::insert_into(sum_displays::table) - .values(chunk) - .on_conflict(sum_displays::object_type) - .do_update() - .set(( - sum_displays::display_id.eq(excluded(sum_displays::display_id)), - sum_displays::display_version - .eq(excluded(sum_displays::display_version)), - sum_displays::display.eq(excluded(sum_displays::display)), - )) - .execute(conn) - }); + let updates = values + .chunks(MAX_INSERT_CHUNK_ROWS) + .map(|chunk: &[StoredDisplay]| { + diesel::insert_into(sum_displays::table) + .values(chunk) + .on_conflict(sum_displays::object_type) + .do_update() + .set(( + sum_displays::display_id.eq(excluded(sum_displays::display_id)), + sum_displays::display_version.eq(excluded(sum_displays::display_version)), + sum_displays::display.eq(excluded(sum_displays::display)), + )) + .execute(conn) + }); Ok(try_join_all(updates).await?.into_iter().sum()) } diff --git a/crates/sui-indexer-alt/src/handlers/sum_obj_types.rs b/crates/sui-indexer-alt/src/handlers/sum_obj_types.rs index 507c60075a0a68..7743820557267b 100644 --- a/crates/sui-indexer-alt/src/handlers/sum_obj_types.rs +++ b/crates/sui-indexer-alt/src/handlers/sum_obj_types.rs @@ -10,6 +10,7 @@ use anyhow::{anyhow, ensure}; use diesel::{upsert::excluded, ExpressionMethods}; use diesel_async::RunQueryDsl; use futures::future::{try_join_all, Either}; +use sui_field_count::FieldCount; use sui_types::{ base_types::ObjectID, effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData, object::Owner, @@ -22,6 +23,9 @@ use crate::{ schema::sum_obj_types, }; +const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / StoredSumObjType::FIELD_COUNT; +const MAX_DELETE_CHUNK_ROWS: usize = i16::MAX as usize; + pub struct SumObjTypes; impl Processor for SumObjTypes { @@ -149,8 +153,8 @@ impl Handler for SumObjTypes { } } - let update_chunks = updates.chunks(Self::INSERT_CHUNK_ROWS).map(Either::Left); - let delete_chunks = deletes.chunks(Self::DELETE_CHUNK_ROWS).map(Either::Right); + let update_chunks = updates.chunks(MAX_INSERT_CHUNK_ROWS).map(Either::Left); + let delete_chunks = deletes.chunks(MAX_DELETE_CHUNK_ROWS).map(Either::Right); let futures = update_chunks.chain(delete_chunks).map(|chunk| match chunk { Either::Left(update) => Either::Left( diff --git a/crates/sui-indexer-alt/src/handlers/sum_packages.rs b/crates/sui-indexer-alt/src/handlers/sum_packages.rs index 424641936b5269..2cb1f815867353 100644 --- a/crates/sui-indexer-alt/src/handlers/sum_packages.rs +++ b/crates/sui-indexer-alt/src/handlers/sum_packages.rs @@ -7,6 +7,7 @@ use anyhow::{anyhow, Result}; use diesel::{upsert::excluded, ExpressionMethods}; use diesel_async::RunQueryDsl; use futures::future::try_join_all; +use sui_field_count::FieldCount; use sui_types::full_checkpoint_content::CheckpointData; use crate::{ @@ -16,6 +17,8 @@ use crate::{ schema::sum_packages, }; +const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / StoredPackage::FIELD_COUNT; + pub struct SumPackages; impl Processor for SumPackages { @@ -65,7 +68,7 @@ impl Handler for SumPackages { async fn commit(batch: &Self::Batch, conn: &mut db::Connection<'_>) -> Result { let values: Vec<_> = batch.values().cloned().collect(); - let updates = values.chunks(Self::MAX_INSERT_CHUNK_ROWS).map(|chunk| { + let updates = values.chunks(MAX_INSERT_CHUNK_ROWS).map(|chunk| { diesel::insert_into(sum_packages::table) .values(chunk) .on_conflict(sum_packages::package_id) diff --git a/crates/sui-indexer-alt/src/pipeline/concurrent/collector.rs b/crates/sui-indexer-alt/src/pipeline/concurrent/collector.rs index a54766ad9a6792..0d57f6e9faf633 100644 --- a/crates/sui-indexer-alt/src/pipeline/concurrent/collector.rs +++ b/crates/sui-indexer-alt/src/pipeline/concurrent/collector.rs @@ -4,6 +4,7 @@ use std::{collections::BTreeMap, sync::Arc}; use mysten_metrics::spawn_monitored_task; +use sui_field_count::FieldCount; use tokio::{ sync::mpsc, task::JoinHandle, @@ -38,10 +39,9 @@ impl Pending { /// Adds data from this indexed checkpoint to the `batch`, honoring the handler's bounds on /// chunk size. fn batch_into(&mut self, batch: &mut Batched) { - if batch.values.len() + self.values.len() > H::MAX_CHUNK_ROWS { - let mut for_batch = self - .values - .split_off(H::MAX_CHUNK_ROWS - batch.values.len()); + let max_chunk_rows = i16::MAX as usize / H::Value::FIELD_COUNT; + if batch.values.len() + self.values.len() > max_chunk_rows { + let mut for_batch = self.values.split_off(max_chunk_rows - batch.values.len()); std::mem::swap(&mut self.values, &mut for_batch); batch.watermark.push(self.watermark.take(for_batch.len())); diff --git a/crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs b/crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs index a39ce86665530b..57980c3fe50e11 100644 --- a/crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs +++ b/crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs @@ -52,14 +52,10 @@ const MAX_WATERMARK_UPDATES: usize = 10_000; /// build up, the collector will stop accepting new checkpoints, which will eventually propagate /// back to the ingestion service. #[async_trait::async_trait] -pub trait Handler: Processor { +pub trait Handler: Processor { /// If at least this many rows are pending, the committer will commit them eagerly. const MIN_EAGER_ROWS: usize = 50; - /// If there are more than this many rows pending, the committer will only commit this many in - /// one operation. The size is chosen to maximize the rows without hitting the limit on bind parameters. - const MAX_CHUNK_ROWS: usize = i16::MAX as usize / Self::Value::FIELD_COUNT; - /// If there are more than this many rows pending, the committer applies backpressure. const MAX_PENDING_ROWS: usize = 5000; @@ -121,7 +117,8 @@ impl Batched { /// The batch is full if it has more than enough values to write to the database, or more than /// enough watermarks to update. fn is_full(&self) -> bool { - self.values.len() >= H::MAX_CHUNK_ROWS || self.watermark.len() >= MAX_WATERMARK_UPDATES + let max_chunk_rows = i16::MAX as usize / H::Value::FIELD_COUNT; + self.values.len() >= max_chunk_rows || self.watermark.len() >= MAX_WATERMARK_UPDATES } } diff --git a/crates/sui-indexer-alt/src/pipeline/processor.rs b/crates/sui-indexer-alt/src/pipeline/processor.rs index ef8036e9e17508..576be66422a2e7 100644 --- a/crates/sui-indexer-alt/src/pipeline/processor.rs +++ b/crates/sui-indexer-alt/src/pipeline/processor.rs @@ -5,7 +5,6 @@ use std::sync::atomic::AtomicU64; use std::sync::Arc; use mysten_metrics::spawn_monitored_task; -use sui_field_count::FieldCount; use sui_types::full_checkpoint_content::CheckpointData; use tokio::{sync::mpsc, task::JoinHandle}; use tokio_stream::wrappers::ReceiverStream; @@ -27,7 +26,7 @@ pub trait Processor { const FANOUT: usize = 10; /// The type of value being inserted by the handler. - type Value: Send + Sync + 'static + FieldCount; + type Value: Send + Sync + 'static; /// The processing logic for turning a checkpoint into rows of the table. fn process(&self, checkpoint: &Arc) -> anyhow::Result>; diff --git a/crates/sui-indexer-alt/src/pipeline/sequential/mod.rs b/crates/sui-indexer-alt/src/pipeline/sequential/mod.rs index 49fadef89ff506..adcf1ff2928cdb 100644 --- a/crates/sui-indexer-alt/src/pipeline/sequential/mod.rs +++ b/crates/sui-indexer-alt/src/pipeline/sequential/mod.rs @@ -3,7 +3,6 @@ use std::sync::Arc; -use sui_field_count::FieldCount; use sui_types::full_checkpoint_content::CheckpointData; use tokio::{sync::mpsc, task::JoinHandle}; use tokio_util::sync::CancellationToken; @@ -42,13 +41,6 @@ pub trait Handler: Processor { /// If at least this many rows are pending, the committer will commit them eagerly. const MIN_EAGER_ROWS: usize = 50; - /// If there are more than this many rows pending, the committer will only commit this many in - /// one operation. The size is chosen to maximize the rows without hitting the limit on bind parameters. - const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / Self::Value::FIELD_COUNT; - - /// Each deletion will include at most this many rows without hitting the limit on bind parameters. - const MAX_DELETE_CHUNK_ROWS: usize = i16::MAX as usize; - /// Maximum number of checkpoints to try and write in a single batch. The larger this number /// is, the more chances the pipeline has to merge redundant writes, but the longer each write /// transaction is likely to be.