Skip to content

Commit

Permalink
comments
Browse files Browse the repository at this point in the history
  • Loading branch information
gegaowp committed Dec 2, 2024
1 parent 8bbdec3 commit b511875
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 41 deletions.
7 changes: 5 additions & 2 deletions crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use anyhow::{anyhow, bail, ensure};
use diesel::{upsert::excluded, ExpressionMethods};
use diesel_async::RunQueryDsl;
use futures::future::{try_join_all, Either};
use sui_field_count::FieldCount;
use sui_types::{
base_types::ObjectID, effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData,
object::Owner,
Expand All @@ -22,6 +23,8 @@ use crate::{
schema::sum_coin_balances,
};

const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / StoredSumCoinBalance::FIELD_COUNT;
const MAX_DELETE_CHUNK_ROWS: usize = i16::MAX as usize;
pub struct SumCoinBalances;

impl Processor for SumCoinBalances {
Expand Down Expand Up @@ -151,8 +154,8 @@ impl Handler for SumCoinBalances {
deletes.push(update.object_id.to_vec());
}
}
let update_chunks = updates.chunks(Self::INSERT_CHUNK_ROWS).map(Either::Left);
let delete_chunks = deletes.chunks(Self::DELETE_CHUNK_ROWS).map(Either::Right);
let update_chunks = updates.chunks(MAX_INSERT_CHUNK_ROWS).map(Either::Left);
let delete_chunks = deletes.chunks(MAX_DELETE_CHUNK_ROWS).map(Either::Right);

let futures = update_chunks.chain(delete_chunks).map(|chunk| match chunk {
Either::Left(update) => Either::Left(
Expand Down
32 changes: 16 additions & 16 deletions crates/sui-indexer-alt/src/handlers/sum_displays.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use anyhow::{anyhow, Result};
use diesel::{upsert::excluded, ExpressionMethods};
use diesel_async::RunQueryDsl;
use futures::future::try_join_all;
use sui_field_count::FieldCount;
use sui_types::{display::DisplayVersionUpdatedEvent, full_checkpoint_content::CheckpointData};

use crate::{
Expand All @@ -16,6 +17,7 @@ use crate::{
schema::sum_displays,
};

const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / StoredDisplay::FIELD_COUNT;
pub struct SumDisplays;

impl Processor for SumDisplays {
Expand Down Expand Up @@ -69,22 +71,20 @@ impl Handler for SumDisplays {

async fn commit(batch: &Self::Batch, conn: &mut db::Connection<'_>) -> Result<usize> {
let values: Vec<_> = batch.values().cloned().collect();
let updates =
values
.chunks(Self::MAX_INSERT_CHUNK_ROWS)
.map(|chunk: &[StoredDisplay]| {
diesel::insert_into(sum_displays::table)
.values(chunk)
.on_conflict(sum_displays::object_type)
.do_update()
.set((
sum_displays::display_id.eq(excluded(sum_displays::display_id)),
sum_displays::display_version
.eq(excluded(sum_displays::display_version)),
sum_displays::display.eq(excluded(sum_displays::display)),
))
.execute(conn)
});
let updates = values
.chunks(MAX_INSERT_CHUNK_ROWS)
.map(|chunk: &[StoredDisplay]| {
diesel::insert_into(sum_displays::table)
.values(chunk)
.on_conflict(sum_displays::object_type)
.do_update()
.set((
sum_displays::display_id.eq(excluded(sum_displays::display_id)),
sum_displays::display_version.eq(excluded(sum_displays::display_version)),
sum_displays::display.eq(excluded(sum_displays::display)),
))
.execute(conn)
});

Ok(try_join_all(updates).await?.into_iter().sum())
}
Expand Down
8 changes: 6 additions & 2 deletions crates/sui-indexer-alt/src/handlers/sum_obj_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use anyhow::{anyhow, ensure};
use diesel::{upsert::excluded, ExpressionMethods};
use diesel_async::RunQueryDsl;
use futures::future::{try_join_all, Either};
use sui_field_count::FieldCount;
use sui_types::{
base_types::ObjectID, effects::TransactionEffectsAPI, full_checkpoint_content::CheckpointData,
object::Owner,
Expand All @@ -22,6 +23,9 @@ use crate::{
schema::sum_obj_types,
};

const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / StoredSumObjType::FIELD_COUNT;
const MAX_DELETE_CHUNK_ROWS: usize = i16::MAX as usize;

pub struct SumObjTypes;

impl Processor for SumObjTypes {
Expand Down Expand Up @@ -149,8 +153,8 @@ impl Handler for SumObjTypes {
}
}

let update_chunks = updates.chunks(Self::INSERT_CHUNK_ROWS).map(Either::Left);
let delete_chunks = deletes.chunks(Self::DELETE_CHUNK_ROWS).map(Either::Right);
let update_chunks = updates.chunks(MAX_INSERT_CHUNK_ROWS).map(Either::Left);
let delete_chunks = deletes.chunks(MAX_DELETE_CHUNK_ROWS).map(Either::Right);

let futures = update_chunks.chain(delete_chunks).map(|chunk| match chunk {
Either::Left(update) => Either::Left(
Expand Down
5 changes: 4 additions & 1 deletion crates/sui-indexer-alt/src/handlers/sum_packages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use anyhow::{anyhow, Result};
use diesel::{upsert::excluded, ExpressionMethods};
use diesel_async::RunQueryDsl;
use futures::future::try_join_all;
use sui_field_count::FieldCount;
use sui_types::full_checkpoint_content::CheckpointData;

use crate::{
Expand All @@ -16,6 +17,8 @@ use crate::{
schema::sum_packages,
};

const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / StoredPackage::FIELD_COUNT;

pub struct SumPackages;

impl Processor for SumPackages {
Expand Down Expand Up @@ -65,7 +68,7 @@ impl Handler for SumPackages {

async fn commit(batch: &Self::Batch, conn: &mut db::Connection<'_>) -> Result<usize> {
let values: Vec<_> = batch.values().cloned().collect();
let updates = values.chunks(Self::MAX_INSERT_CHUNK_ROWS).map(|chunk| {
let updates = values.chunks(MAX_INSERT_CHUNK_ROWS).map(|chunk| {
diesel::insert_into(sum_packages::table)
.values(chunk)
.on_conflict(sum_packages::package_id)
Expand Down
8 changes: 4 additions & 4 deletions crates/sui-indexer-alt/src/pipeline/concurrent/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::{collections::BTreeMap, sync::Arc};

use mysten_metrics::spawn_monitored_task;
use sui_field_count::FieldCount;
use tokio::{
sync::mpsc,
task::JoinHandle,
Expand Down Expand Up @@ -38,10 +39,9 @@ impl<H: Handler> Pending<H> {
/// Adds data from this indexed checkpoint to the `batch`, honoring the handler's bounds on
/// chunk size.
fn batch_into(&mut self, batch: &mut Batched<H>) {
if batch.values.len() + self.values.len() > H::MAX_CHUNK_ROWS {
let mut for_batch = self
.values
.split_off(H::MAX_CHUNK_ROWS - batch.values.len());
let max_chunk_rows = i16::MAX as usize / H::Value::FIELD_COUNT;
if batch.values.len() + self.values.len() > max_chunk_rows {
let mut for_batch = self.values.split_off(max_chunk_rows - batch.values.len());

std::mem::swap(&mut self.values, &mut for_batch);
batch.watermark.push(self.watermark.take(for_batch.len()));
Expand Down
9 changes: 3 additions & 6 deletions crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,10 @@ const MAX_WATERMARK_UPDATES: usize = 10_000;
/// build up, the collector will stop accepting new checkpoints, which will eventually propagate
/// back to the ingestion service.
#[async_trait::async_trait]
pub trait Handler: Processor {
pub trait Handler: Processor<Value: FieldCount> {
/// If at least this many rows are pending, the committer will commit them eagerly.
const MIN_EAGER_ROWS: usize = 50;

/// If there are more than this many rows pending, the committer will only commit this many in
/// one operation. The size is chosen to maximize the rows without hitting the limit on bind parameters.
const MAX_CHUNK_ROWS: usize = i16::MAX as usize / Self::Value::FIELD_COUNT;

/// If there are more than this many rows pending, the committer applies backpressure.
const MAX_PENDING_ROWS: usize = 5000;

Expand Down Expand Up @@ -121,7 +117,8 @@ impl<H: Handler> Batched<H> {
/// The batch is full if it has more than enough values to write to the database, or more than
/// enough watermarks to update.
fn is_full(&self) -> bool {
self.values.len() >= H::MAX_CHUNK_ROWS || self.watermark.len() >= MAX_WATERMARK_UPDATES
let max_chunk_rows = i16::MAX as usize / H::Value::FIELD_COUNT;
self.values.len() >= max_chunk_rows || self.watermark.len() >= MAX_WATERMARK_UPDATES
}
}

Expand Down
3 changes: 1 addition & 2 deletions crates/sui-indexer-alt/src/pipeline/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use mysten_metrics::spawn_monitored_task;
use sui_field_count::FieldCount;
use sui_types::full_checkpoint_content::CheckpointData;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_stream::wrappers::ReceiverStream;
Expand All @@ -27,7 +26,7 @@ pub trait Processor {
const FANOUT: usize = 10;

/// The type of value being inserted by the handler.
type Value: Send + Sync + 'static + FieldCount;
type Value: Send + Sync + 'static;

/// The processing logic for turning a checkpoint into rows of the table.
fn process(&self, checkpoint: &Arc<CheckpointData>) -> anyhow::Result<Vec<Self::Value>>;
Expand Down
8 changes: 0 additions & 8 deletions crates/sui-indexer-alt/src/pipeline/sequential/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use std::sync::Arc;

use sui_field_count::FieldCount;
use sui_types::full_checkpoint_content::CheckpointData;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -42,13 +41,6 @@ pub trait Handler: Processor {
/// If at least this many rows are pending, the committer will commit them eagerly.
const MIN_EAGER_ROWS: usize = 50;

/// If there are more than this many rows pending, the committer will only commit this many in
/// one operation. The size is chosen to maximize the rows without hitting the limit on bind parameters.
const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / Self::Value::FIELD_COUNT;

/// Each deletion will include at most this many rows without hitting the limit on bind parameters.
const MAX_DELETE_CHUNK_ROWS: usize = i16::MAX as usize;

/// Maximum number of checkpoints to try and write in a single batch. The larger this number
/// is, the more chances the pipeline has to merge redundant writes, but the longer each write
/// transaction is likely to be.
Expand Down

0 comments on commit b511875

Please sign in to comment.