Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sui-field-count: const instead of fn and use it in sui-indexer-alt #20143

Merged
merged 4 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
address comments
  • Loading branch information
gegaowp committed Dec 2, 2024
commit 3398f946d1c3daa2279992dd29f2217db126ddc3
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/ev_emit_mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ impl Processor for EvEmitMod {
#[async_trait::async_trait]
impl Handler for EvEmitMod {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/ev_struct_inst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ impl Processor for EvStructInst {
#[async_trait::async_trait]
impl Handler for EvStructInst {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/kv_feature_flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ impl Processor for KvFeatureFlags {
#[async_trait::async_trait]
impl Handler for KvFeatureFlags {
const MIN_EAGER_ROWS: usize = 1;
const MAX_CHUNK_ROWS: usize = i16::MAX as usize / 3;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/kv_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ impl Processor for KvObjects {
#[async_trait::async_trait]
impl Handler for KvObjects {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/kv_protocol_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ impl Processor for KvProtocolConfigs {
#[async_trait::async_trait]
impl Handler for KvProtocolConfigs {
const MIN_EAGER_ROWS: usize = 1;
const MAX_CHUNK_ROWS: usize = i16::MAX as usize / 3;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/kv_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ impl Processor for KvTransactions {
#[async_trait::async_trait]
impl Handler for KvTransactions {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/obj_versions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ impl Processor for ObjVersions {
#[async_trait::async_trait]
impl Handler for ObjVersions {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/sum_coin_balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ impl Handler for SumCoinBalances {
deletes.push(update.object_id.to_vec());
}
}

let update_chunks = updates.chunks(Self::INSERT_CHUNK_ROWS).map(Either::Left);
let delete_chunks = deletes.chunks(Self::DELETE_CHUNK_ROWS).map(Either::Right);

Expand Down
30 changes: 16 additions & 14 deletions crates/sui-indexer-alt/src/handlers/sum_displays.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ use crate::{
schema::sum_displays,
};

const CHUNK_ROWS: usize = i16::MAX as usize / 4;

pub struct SumDisplays;

impl Processor for SumDisplays {
Expand Down Expand Up @@ -71,18 +69,22 @@ impl Handler for SumDisplays {

async fn commit(batch: &Self::Batch, conn: &mut db::Connection<'_>) -> Result<usize> {
let values: Vec<_> = batch.values().cloned().collect();
let updates = values.chunks(CHUNK_ROWS).map(|chunk| {
diesel::insert_into(sum_displays::table)
.values(chunk)
.on_conflict(sum_displays::object_type)
.do_update()
.set((
sum_displays::display_id.eq(excluded(sum_displays::display_id)),
sum_displays::display_version.eq(excluded(sum_displays::display_version)),
sum_displays::display.eq(excluded(sum_displays::display)),
))
.execute(conn)
});
let updates =
values
.chunks(Self::MAX_INSERT_CHUNK_ROWS)
.map(|chunk: &[StoredDisplay]| {
diesel::insert_into(sum_displays::table)
.values(chunk)
.on_conflict(sum_displays::object_type)
.do_update()
.set((
sum_displays::display_id.eq(excluded(sum_displays::display_id)),
sum_displays::display_version
.eq(excluded(sum_displays::display_version)),
sum_displays::display.eq(excluded(sum_displays::display)),
))
.execute(conn)
});

Ok(try_join_all(updates).await?.into_iter().sum())
}
Expand Down
4 changes: 1 addition & 3 deletions crates/sui-indexer-alt/src/handlers/sum_packages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ use crate::{
schema::sum_packages,
};

const CHUNK_ROWS: usize = i16::MAX as usize / 5;
gegaowp marked this conversation as resolved.
Show resolved Hide resolved

pub struct SumPackages;

impl Processor for SumPackages {
Expand Down Expand Up @@ -67,7 +65,7 @@ impl Handler for SumPackages {

async fn commit(batch: &Self::Batch, conn: &mut db::Connection<'_>) -> Result<usize> {
let values: Vec<_> = batch.values().cloned().collect();
let updates = values.chunks(CHUNK_ROWS).map(|chunk| {
let updates = values.chunks(Self::MAX_INSERT_CHUNK_ROWS).map(|chunk| {
diesel::insert_into(sum_packages::table)
.values(chunk)
.on_conflict(sum_packages::package_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ impl Processor for TxAffectedAddresses {
#[async_trait::async_trait]
impl Handler for TxAffectedAddresses {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_affected_objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ impl Processor for TxAffectedObjects {
#[async_trait::async_trait]
impl Handler for TxAffectedObjects {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_balance_changes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ impl Processor for TxBalanceChanges {
#[async_trait::async_trait]
impl Handler for TxBalanceChanges {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_calls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ impl Processor for TxCalls {
#[async_trait::async_trait]
impl Handler for TxCalls {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_digests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ impl Processor for TxDigests {
#[async_trait::async_trait]
impl Handler for TxDigests {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/tx_kinds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ impl Processor for TxKinds {
#[async_trait::async_trait]
impl Handler for TxKinds {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/wal_coin_balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ impl Processor for WalCoinBalances {
#[async_trait::async_trait]
impl Handler for WalCoinBalances {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
1 change: 0 additions & 1 deletion crates/sui-indexer-alt/src/handlers/wal_obj_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ impl Processor for WalObjTypes {
#[async_trait::async_trait]
impl Handler for WalObjTypes {
const MIN_EAGER_ROWS: usize = 100;
const MAX_CHUNK_ROWS: usize = 1000;
const MAX_PENDING_ROWS: usize = 10000;

async fn commit(values: &[Self::Value], conn: &mut db::Connection<'_>) -> Result<usize> {
Expand Down
3 changes: 2 additions & 1 deletion crates/sui-indexer-alt/src/models/displays.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
// SPDX-License-Identifier: Apache-2.0

use diesel::prelude::*;
use sui_field_count::FieldCount;

use crate::schema::sum_displays;

#[derive(Insertable, Debug, Clone)]
#[derive(Insertable, Debug, Clone, FieldCount)]
#[diesel(table_name = sum_displays, primary_key(object_type))]
pub struct StoredDisplay {
pub object_type: Vec<u8>,
Expand Down
3 changes: 2 additions & 1 deletion crates/sui-indexer-alt/src/models/packages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
// SPDX-License-Identifier: Apache-2.0

use diesel::prelude::*;
use sui_field_count::FieldCount;

use crate::schema::sum_packages;

#[derive(Insertable, Debug, Clone)]
#[derive(Insertable, Debug, Clone, FieldCount)]
#[diesel(table_name = sum_packages, primary_key(package_id))]
pub struct StoredPackage {
pub package_id: Vec<u8>,
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-indexer-alt/src/models/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub struct StoredTxBalanceChange {
pub balance_changes: Vec<u8>,
}

#[derive(Insertable, Debug, Clone)]
#[derive(Insertable, Debug, Clone, FieldCount)]
#[diesel(table_name = tx_calls)]
pub struct StoredTxCalls {
pub tx_sequence_number: i64,
Expand All @@ -80,7 +80,7 @@ pub struct StoredTxCalls {
pub sender: Vec<u8>,
}

#[derive(Insertable, Debug, Clone)]
#[derive(Insertable, Debug, Clone, FieldCount)]
#[diesel(table_name = tx_digests)]
pub struct StoredTxDigest {
pub tx_sequence_number: i64,
Expand All @@ -95,7 +95,7 @@ pub enum StoredKind {
ProgrammableTransaction = 1,
}

#[derive(Insertable, Debug, Clone)]
#[derive(Insertable, Debug, Clone, FieldCount)]
#[diesel(table_name = tx_kinds)]
pub struct StoredTxKind {
pub tx_sequence_number: i64,
Expand Down
5 changes: 3 additions & 2 deletions crates/sui-indexer-alt/src/pipeline/concurrent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::{sync::Arc, time::Duration};

use serde::{Deserialize, Serialize};
use sui_field_count::FieldCount;
use sui_types::full_checkpoint_content::CheckpointData;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -57,8 +58,8 @@ pub trait Handler: Processor {
const MIN_EAGER_ROWS: usize = 50;

/// If there are more than this many rows pending, the committer will only commit this many in
/// one operation.
const MAX_CHUNK_ROWS: usize = 1000;
/// one operation. The size is chosen to maximize the rows without hitting the limit on bind parameters.
const MAX_CHUNK_ROWS: usize = i16::MAX as usize / Self::Value::FIELD_COUNT;
gegaowp marked this conversation as resolved.
Show resolved Hide resolved

/// If there are more than this many rows pending, the committer applies backpressure.
const MAX_PENDING_ROWS: usize = 5000;
Expand Down
7 changes: 0 additions & 7 deletions crates/sui-indexer-alt/src/pipeline/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,6 @@ pub trait Processor {
/// How much concurrency to use when processing checkpoint data.
const FANOUT: usize = 10;

/// Each insert or update will include at most this many rows -- the size is chosen to maximize the
/// rows without hitting the limit on bind parameters.
const INSERT_CHUNK_ROWS: usize = i16::MAX as usize / Self::Value::FIELD_COUNT;

/// Each deletion will include at most this many rows without hitting the limit on bind parameters.
const DELETE_CHUNK_ROWS: usize = i16::MAX as usize;

/// The type of value being inserted by the handler.
type Value: Send + Sync + 'static + FieldCount;
gegaowp marked this conversation as resolved.
Show resolved Hide resolved

Expand Down
8 changes: 8 additions & 0 deletions crates/sui-indexer-alt/src/pipeline/sequential/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::sync::Arc;

use serde::{Deserialize, Serialize};
use sui_field_count::FieldCount;
use sui_types::full_checkpoint_content::CheckpointData;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -42,6 +43,13 @@ pub trait Handler: Processor {
/// If at least this many rows are pending, the committer will commit them eagerly.
const MIN_EAGER_ROWS: usize = 50;

/// If there are more than this many rows pending, the committer will only commit this many in
/// one operation. The size is chosen to maximize the rows without hitting the limit on bind parameters.
const MAX_INSERT_CHUNK_ROWS: usize = i16::MAX as usize / Self::Value::FIELD_COUNT;

/// Each deletion will include at most this many rows without hitting the limit on bind parameters.
const MAX_DELETE_CHUNK_ROWS: usize = i16::MAX as usize;

gegaowp marked this conversation as resolved.
Show resolved Hide resolved
/// Maximum number of checkpoints to try and write in a single batch. The larger this number
/// is, the more chances the pipeline has to merge redundant writes, but the longer each write
/// transaction is likely to be.
Expand Down