From 19a56693dcc4d5059fdf76999d08ac6f27e7fa4f Mon Sep 17 00:00:00 2001 From: bowenyang007 Date: Tue, 30 May 2023 17:34:05 -0700 Subject: [PATCH] [indexer] Add inactive share tracking (#8363) * Add inactive share tracking * add view for easy access * add a new query for wallet * remove backfilling code --- .../down.sql | 24 ++ .../up.sql | 46 +++ .../models/stake_models/delegator_balances.rs | 301 ++++++++++++++++-- .../models/stake_models/delegator_pools.rs | 72 ++++- .../src/models/stake_models/stake_utils.rs | 31 +- .../src/models/token_models/token_utils.rs | 8 +- .../indexer/src/models/token_models/tokens.rs | 11 +- .../indexer/src/processors/stake_processor.rs | 19 +- crates/indexer/src/schema.rs | 9 +- 9 files changed, 466 insertions(+), 55 deletions(-) create mode 100644 crates/indexer/migrations/2023-05-22-234344_delegated_staking_improvements/down.sql create mode 100644 crates/indexer/migrations/2023-05-22-234344_delegated_staking_improvements/up.sql diff --git a/crates/indexer/migrations/2023-05-22-234344_delegated_staking_improvements/down.sql b/crates/indexer/migrations/2023-05-22-234344_delegated_staking_improvements/down.sql new file mode 100644 index 0000000000000..4337e764c79f1 --- /dev/null +++ b/crates/indexer/migrations/2023-05-22-234344_delegated_staking_improvements/down.sql @@ -0,0 +1,24 @@ +-- This file should undo anything in `up.sql` +ALTER TABLE current_delegated_staking_pool_balances DROP COLUMN IF EXISTS operator_commission_percentage, + DROP COLUMN IF EXISTS inactive_table_handle, + DROP COLUMN IF EXISTS active_table_handle; +DROP INDEX IF EXISTS cdspb_inactive_index; +ALTER TABLE delegated_staking_pool_balances DROP COLUMN IF EXISTS operator_commission_percentage, + DROP COLUMN IF EXISTS inactive_table_handle, + DROP COLUMN IF EXISTS active_table_handle; +ALTER TABLE current_delegator_balances DROP COLUMN IF EXISTS parent_table_handle; +ALTER TABLE current_delegator_balances DROP CONSTRAINT current_delegator_balances_pkey; +ALTER TABLE current_delegator_balances +ADD CONSTRAINT current_delegator_balances_pkey PRIMARY KEY ( + delegator_address, + pool_address, + pool_type + ); +CREATE OR REPLACE VIEW num_active_delegator_per_pool AS +SELECT pool_address, + COUNT(DISTINCT delegator_address) AS num_active_delegator +FROM current_delegator_balances +WHERE shares > 0 +GROUP BY 1; +DROP VIEW IF EXISTS delegator_distinct_pool; +DROP VIEW IF EXISTS address_events_summary; \ No newline at end of file diff --git a/crates/indexer/migrations/2023-05-22-234344_delegated_staking_improvements/up.sql b/crates/indexer/migrations/2023-05-22-234344_delegated_staking_improvements/up.sql new file mode 100644 index 0000000000000..4b5c32ad9e749 --- /dev/null +++ b/crates/indexer/migrations/2023-05-22-234344_delegated_staking_improvements/up.sql @@ -0,0 +1,46 @@ +-- Your SQL goes here +-- adding new fields to staking pool balances for display and handling inactive pools +ALTER TABLE current_delegated_staking_pool_balances +ADD COLUMN IF NOT EXISTS operator_commission_percentage NUMERIC NOT NULL, + ADD COLUMN IF NOT EXISTS inactive_table_handle VARCHAR(66) NOT NULL, + ADD COLUMN IF NOT EXISTS active_table_handle VARCHAR(66) NOT NULL; +CREATE INDEX IF NOT EXISTS cdspb_inactive_index ON current_delegated_staking_pool_balances (inactive_table_handle); +-- adding new fields to staking pool balances for display and handling inactive pools +ALTER TABLE delegated_staking_pool_balances +ADD COLUMN IF NOT EXISTS operator_commission_percentage NUMERIC NOT NULL, + ADD COLUMN IF NOT EXISTS inactive_table_handle VARCHAR(66) NOT NULL, + ADD COLUMN IF NOT EXISTS active_table_handle VARCHAR(66) NOT NULL; +-- add new field to composite primary key because technically a user could have inactive pools +ALTER TABLE current_delegator_balances +ADD COLUMN IF NOT EXISTS parent_table_handle VARCHAR(66) NOT NULL; +ALTER TABLE current_delegator_balances DROP CONSTRAINT current_delegator_balances_pkey; +ALTER TABLE current_delegator_balances +ADD CONSTRAINT current_delegator_balances_pkey PRIMARY KEY ( + delegator_address, + pool_address, + pool_type, + table_handle + ); +-- need this for delegation staking +CREATE OR REPLACE VIEW num_active_delegator_per_pool AS +SELECT pool_address, + COUNT(DISTINCT delegator_address) AS num_active_delegator +FROM current_delegator_balances +WHERE shares > 0 + AND pool_type = 'active_shares' +GROUP BY 1; +-- need this for delegation staking +CREATE OR REPLACE VIEW delegator_distinct_pool AS +SELECT delegator_address, + pool_address +FROM current_delegator_balances +WHERE shares > 0 +GROUP BY 1, + 2; +-- new query for wallet +CREATE OR REPLACE VIEW address_events_summary AS +SELECT account_address, + min(transaction_block_height) AS min_block_height, + count(DISTINCT transaction_version) AS num_distinct_versions +FROM events +GROUP BY 1 \ No newline at end of file diff --git a/crates/indexer/src/models/stake_models/delegator_balances.rs b/crates/indexer/src/models/stake_models/delegator_balances.rs index d20b50e4b8185..5b07c46aa09e7 100644 --- a/crates/indexer/src/models/stake_models/delegator_balances.rs +++ b/crates/indexer/src/models/stake_models/delegator_balances.rs @@ -3,8 +3,13 @@ // This is required because a diesel macro makes clippy sad #![allow(clippy::extra_unused_lifetimes)] -use super::delegator_pools::{DelegatorPool, DelegatorPoolBalanceMetadata}; -use crate::{schema::current_delegator_balances, util::standardize_address}; +use super::delegator_pools::{DelegatorPool, DelegatorPoolBalanceMetadata, PoolBalanceMetadata}; +use crate::{ + database::PgPoolConnection, + models::token_models::collection_datas::{QUERY_RETRIES, QUERY_RETRY_DELAY_MS}, + schema::current_delegator_balances, + util::standardize_address, +}; use anyhow::Context; use aptos_api_types::{ DeleteTableItem as APIDeleteTableItem, Transaction as APITransaction, @@ -12,13 +17,15 @@ use aptos_api_types::{ WriteTableItem as APIWriteTableItem, }; use bigdecimal::{BigDecimal, Zero}; +use diesel::{prelude::*, ExpressionMethods}; use field_count::FieldCount; use serde::{Deserialize, Serialize}; use std::collections::HashMap; pub type TableHandle = String; pub type Address = String; -pub type ActiveShareMapping = HashMap; +pub type ShareToStakingPoolMapping = HashMap; +pub type ShareToPoolMapping = HashMap; pub type CurrentDelegatorBalancePK = (Address, Address, String); pub type CurrentDelegatorBalanceMap = HashMap; @@ -32,27 +39,42 @@ pub struct CurrentDelegatorBalance { pub table_handle: String, pub last_transaction_version: i64, pub shares: BigDecimal, + pub parent_table_handle: String, +} + +#[derive(Debug, Identifiable, Queryable)] +#[diesel(primary_key(delegator_address, pool_address, pool_type))] +#[diesel(table_name = current_delegator_balances)] +pub struct CurrentDelegatorBalanceQuery { + pub delegator_address: String, + pub pool_address: String, + pub pool_type: String, + pub table_handle: String, + pub last_transaction_version: i64, + pub inserted_at: chrono::NaiveDateTime, + pub shares: BigDecimal, + pub parent_table_handle: String, } impl CurrentDelegatorBalance { - /// We're only indexing active_shares for now because that's all the UI needs and indexing - /// the inactive_shares or pending_withdrawal_shares would be more complicated. - pub fn from_write_table_item( + /// Getting active share balances. Only 1 active pool per staking pool tracked in a single table + pub fn get_active_share_from_write_table_item( write_table_item: &APIWriteTableItem, txn_version: i64, - active_share_mapping: &ActiveShareMapping, + active_pool_to_staking_pool: &ShareToStakingPoolMapping, ) -> anyhow::Result> { let table_handle = standardize_address(&write_table_item.handle.to_string()); // The mapping will tell us if the table item is an active share table - if let Some(pool_balance) = active_share_mapping.get(&table_handle) { + if let Some(pool_balance) = active_pool_to_staking_pool.get(&table_handle) { let pool_address = pool_balance.staking_pool_address.clone(); let delegator_address = standardize_address(&write_table_item.key.to_string()); - let data = write_table_item.data.as_ref().unwrap_or_else(|| { - panic!( + let data: &aptos_api_types::transaction::DecodedTableData = + write_table_item.data.as_ref().unwrap_or_else(|| { + panic!( "This table item should be an active share item, table_item {:?}, version {}", write_table_item, txn_version ) - }); + }); let shares = data .value .as_str() @@ -70,9 +92,72 @@ impl CurrentDelegatorBalance { delegator_address, pool_address, pool_type: "active_shares".to_string(), - table_handle, + table_handle: table_handle.clone(), + last_transaction_version: txn_version, + shares, + parent_table_handle: table_handle, + })) + } else { + Ok(None) + } + } + + /// Getting inactive share balances. There could be multiple inactive pool per staking pool so we have + /// 2 layers of mapping (table w/ all inactive pools -> staking pool, table w/ delegator inactive shares -> each inactive pool) + pub fn get_inactive_share_from_write_table_item( + write_table_item: &APIWriteTableItem, + txn_version: i64, + inactive_pool_to_staking_pool: &ShareToStakingPoolMapping, + inactive_share_to_pool: &ShareToPoolMapping, + conn: &mut PgPoolConnection, + ) -> anyhow::Result> { + let table_handle = standardize_address(&write_table_item.handle.to_string()); + // The mapping will tell us if the table item belongs to an inactive pool + if let Some(pool_balance) = inactive_share_to_pool.get(&table_handle) { + // If it is, we need to get the inactive staking pool handle and use it to look up the staking pool + let inactive_pool_handle = pool_balance.parent_table_handle.clone(); + + let pool_address = match inactive_pool_to_staking_pool + .get(&inactive_pool_handle) + .map(|metadata| metadata.staking_pool_address.clone()) + { + Some(pool_address) => pool_address, + None => { + Self::get_staking_pool_from_inactive_share_handle(conn, &inactive_pool_handle) + .context(format!("Failed to get staking pool address from inactive share handle {}, txn version {}", + inactive_pool_handle, txn_version + ))? + }, + }; + let delegator_address = standardize_address(&write_table_item.key.to_string()); + let data: &aptos_api_types::transaction::DecodedTableData = + write_table_item.data.as_ref().unwrap_or_else(|| { + panic!( + "This table item should be an active share item, table_item {:?}, version {}", + write_table_item, txn_version + ) + }); + let shares = data + .value + .as_str() + .map(|s| s.parse::()) + .context(format!( + "value is not a string: {:?}, table_item {:?}, version {}", + data.value, write_table_item, txn_version + ))? + .context(format!( + "cannot parse string as u64: {:?}, version {}", + data.value, txn_version + ))?; + let shares = shares / &pool_balance.scaling_factor; + Ok(Some(Self { + delegator_address, + pool_address, + pool_type: "inactive_shares".to_string(), + table_handle: table_handle.clone(), last_transaction_version: txn_version, shares, + parent_table_handle: inactive_pool_handle, })) } else { Ok(None) @@ -80,33 +165,80 @@ impl CurrentDelegatorBalance { } // Setting amount to 0 if table item is deleted - pub fn from_delete_table_item( + pub fn get_active_share_from_delete_table_item( delete_table_item: &APIDeleteTableItem, txn_version: i64, - active_share_mapping: &ActiveShareMapping, + active_pool_to_staking_pool: &ShareToStakingPoolMapping, ) -> anyhow::Result> { let table_handle = standardize_address(&delete_table_item.handle.to_string()); // The mapping will tell us if the table item is an active share table - if let Some(pool_balance) = active_share_mapping.get(&table_handle) { + if let Some(pool_balance) = active_pool_to_staking_pool.get(&table_handle) { let delegator_address = standardize_address(&delete_table_item.key.to_string()); return Ok(Some(Self { delegator_address, pool_address: pool_balance.staking_pool_address.clone(), pool_type: "active_shares".to_string(), - table_handle, + table_handle: table_handle.clone(), last_transaction_version: txn_version, shares: BigDecimal::zero(), + parent_table_handle: table_handle, })); } Ok(None) } - pub fn get_active_share_map( + // Setting amount to 0 if table item is deleted + pub fn get_inactive_share_from_delete_table_item( + delete_table_item: &APIDeleteTableItem, + txn_version: i64, + inactive_pool_to_staking_pool: &ShareToStakingPoolMapping, + inactive_share_to_pool: &ShareToPoolMapping, + conn: &mut PgPoolConnection, + ) -> anyhow::Result> { + let table_handle = standardize_address(&delete_table_item.handle.to_string()); + // The mapping will tell us if the table item belongs to an inactive pool + if let Some(pool_balance) = inactive_share_to_pool.get(&table_handle) { + // If it is, we need to get the inactive staking pool handle and use it to look up the staking pool + let inactive_pool_handle = pool_balance.parent_table_handle.clone(); + + let pool_address = match inactive_pool_to_staking_pool + .get(&inactive_pool_handle) + .map(|metadata| metadata.staking_pool_address.clone()) + { + Some(pool_address) => pool_address, + None => { + Self::get_staking_pool_from_inactive_share_handle(conn, &inactive_pool_handle) + .context(format!("Failed to get staking pool address from inactive share handle {}, txn version {}", + inactive_pool_handle, txn_version + ))? + }, + }; + let delegator_address = standardize_address(&delete_table_item.key.to_string()); + + return Ok(Some(Self { + delegator_address, + pool_address, + pool_type: "inactive_shares".to_string(), + table_handle: table_handle.clone(), + last_transaction_version: txn_version, + shares: BigDecimal::zero(), + parent_table_handle: table_handle, + })); + } + Ok(None) + } + + /// Key is the inactive share table handle obtained from 0x1::delegation_pool::DelegationPool + /// Value is the same metadata although it's not really used + pub fn get_active_pool_to_staking_pool_mapping( write_resource: &APIWriteResource, txn_version: i64, - ) -> anyhow::Result> { - if let Some(balance) = DelegatorPool::get_balance_metadata(write_resource, txn_version)? { + ) -> anyhow::Result> { + if let Some(balance) = DelegatorPool::get_delegated_pool_metadata_from_write_resource( + write_resource, + txn_version, + )? { Ok(Some(HashMap::from([( balance.active_share_table_handle.clone(), balance, @@ -116,20 +248,94 @@ impl CurrentDelegatorBalance { } } + /// Key is the inactive share table handle obtained from 0x1::delegation_pool::DelegationPool + /// Value is the same metadata although it's not really used + pub fn get_inactive_pool_to_staking_pool_mapping( + write_resource: &APIWriteResource, + txn_version: i64, + ) -> anyhow::Result> { + if let Some(balance) = DelegatorPool::get_delegated_pool_metadata_from_write_resource( + write_resource, + txn_version, + )? { + Ok(Some(HashMap::from([( + balance.inactive_share_table_handle.clone(), + balance, + )]))) + } else { + Ok(None) + } + } + + /// Key is the inactive share table handle obtained from 0x1::pool_u64_unbound::Pool + /// Value is the 0x1::pool_u64_unbound::Pool metadata that will be used to populate a user's inactive balance + pub fn get_inactive_share_to_pool_mapping( + write_table_item: &APIWriteTableItem, + txn_version: i64, + ) -> anyhow::Result> { + if let Some(balance) = DelegatorPool::get_inactive_pool_metadata_from_write_table_item( + write_table_item, + txn_version, + )? { + Ok(Some(HashMap::from([( + balance.shares_table_handle.clone(), + balance, + )]))) + } else { + Ok(None) + } + } + + pub fn get_staking_pool_from_inactive_share_handle( + conn: &mut PgPoolConnection, + table_handle: &str, + ) -> anyhow::Result { + let mut retried = 0; + while retried < QUERY_RETRIES { + retried += 1; + match CurrentDelegatorBalanceQuery::get_by_inactive_share_handle(conn, table_handle) { + Ok(current_delegator_balance) => return Ok(current_delegator_balance.pool_address), + Err(_) => { + std::thread::sleep(std::time::Duration::from_millis(QUERY_RETRY_DELAY_MS)); + }, + } + } + Err(anyhow::anyhow!( + "Failed to get staking pool address from inactive share handle" + )) + } + pub fn from_transaction( transaction: &APITransaction, + conn: &mut PgPoolConnection, ) -> anyhow::Result { - let mut active_share_mapping: ActiveShareMapping = HashMap::new(); + let mut active_pool_to_staking_pool: ShareToStakingPoolMapping = HashMap::new(); + let mut inactive_pool_to_staking_pool: ShareToStakingPoolMapping = HashMap::new(); + let mut inactive_share_to_pool: ShareToPoolMapping = HashMap::new(); let mut current_delegator_balances: CurrentDelegatorBalanceMap = HashMap::new(); // Do a first pass to get the mapping of active_share table handles to staking pool resource if let APITransaction::UserTransaction(user_txn) = transaction { let txn_version = user_txn.info.version.0 as i64; for wsc in &user_txn.info.changes { if let APIWriteSetChange::WriteResource(write_resource) = wsc { - let maybe_map = - Self::get_active_share_map(write_resource, txn_version).unwrap(); - if let Some(map) = maybe_map { - active_share_mapping.extend(map); + if let Some(map) = + Self::get_active_pool_to_staking_pool_mapping(write_resource, txn_version) + .unwrap() + { + active_pool_to_staking_pool.extend(map); + } + if let Some(map) = + Self::get_inactive_pool_to_staking_pool_mapping(write_resource, txn_version) + .unwrap() + { + inactive_pool_to_staking_pool.extend(map); + } + } + if let APIWriteSetChange::WriteTableItem(table_item) = wsc { + if let Some(map) = + Self::get_inactive_share_to_pool_mapping(table_item, txn_version).unwrap() + { + inactive_share_to_pool.extend(map); } } } @@ -138,12 +344,44 @@ impl CurrentDelegatorBalance { let txn_version = user_txn.info.version.0 as i64; let maybe_delegator_balance = match wsc { APIWriteSetChange::DeleteTableItem(table_item) => { - Self::from_delete_table_item(table_item, txn_version, &active_share_mapping) + if let Some(balance) = Self::get_active_share_from_delete_table_item( + table_item, + txn_version, + &active_pool_to_staking_pool, + ) + .unwrap() + { + Some(balance) + } else { + Self::get_inactive_share_from_delete_table_item( + table_item, + txn_version, + &inactive_pool_to_staking_pool, + &inactive_share_to_pool, + conn, + ) .unwrap() + } }, APIWriteSetChange::WriteTableItem(table_item) => { - Self::from_write_table_item(table_item, txn_version, &active_share_mapping) + if let Some(balance) = Self::get_active_share_from_write_table_item( + table_item, + txn_version, + &active_pool_to_staking_pool, + ) + .unwrap() + { + Some(balance) + } else { + Self::get_inactive_share_from_write_table_item( + table_item, + txn_version, + &inactive_pool_to_staking_pool, + &inactive_share_to_pool, + conn, + ) .unwrap() + } }, _ => None, }; @@ -162,3 +400,14 @@ impl CurrentDelegatorBalance { Ok(current_delegator_balances) } } + +impl CurrentDelegatorBalanceQuery { + pub fn get_by_inactive_share_handle( + conn: &mut PgPoolConnection, + table_handle: &str, + ) -> diesel::QueryResult { + current_delegator_balances::table + .filter(current_delegator_balances::parent_table_handle.eq(table_handle)) + .first::(conn) + } +} diff --git a/crates/indexer/src/models/stake_models/delegator_pools.rs b/crates/indexer/src/models/stake_models/delegator_pools.rs index 8fc943893b881..62299962065bc 100644 --- a/crates/indexer/src/models/stake_models/delegator_pools.rs +++ b/crates/indexer/src/models/stake_models/delegator_pools.rs @@ -4,7 +4,7 @@ // This is required because a diesel macro makes clippy sad #![allow(clippy::extra_unused_lifetimes)] -use super::stake_utils::StakeResource; +use super::stake_utils::{StakeResource, StakeTableItem}; use crate::{ schema::{ current_delegated_staking_pool_balances, delegated_staking_pool_balances, @@ -12,7 +12,7 @@ use crate::{ }, util::standardize_address, }; -use aptos_api_types::{Transaction, WriteResource, WriteSetChange}; +use aptos_api_types::{Transaction, WriteResource, WriteSetChange, WriteTableItem}; use bigdecimal::BigDecimal; use field_count::FieldCount; use serde::{Deserialize, Serialize}; @@ -39,7 +39,20 @@ pub struct DelegatorPoolBalanceMetadata { pub total_coins: BigDecimal, pub total_shares: BigDecimal, pub scaling_factor: BigDecimal, + pub operator_commission_percentage: BigDecimal, pub active_share_table_handle: String, + pub inactive_share_table_handle: String, +} + +// Similar metadata but specifically for 0x1::pool_u64_unbound::Pool +#[derive(Debug, Deserialize, Serialize)] +pub struct PoolBalanceMetadata { + pub transaction_version: i64, + pub total_coins: BigDecimal, + pub total_shares: BigDecimal, + pub scaling_factor: BigDecimal, + pub shares_table_handle: String, + pub parent_table_handle: String, } // Pools balances @@ -51,6 +64,9 @@ pub struct DelegatorPoolBalance { pub staking_pool_address: String, pub total_coins: BigDecimal, pub total_shares: BigDecimal, + pub operator_commission_percentage: BigDecimal, + pub inactive_table_handle: String, + pub active_table_handle: String, } // All pools w latest balances (really a more comprehensive version than DelegatorPool) @@ -61,7 +77,10 @@ pub struct CurrentDelegatorPoolBalance { pub staking_pool_address: String, pub total_coins: BigDecimal, pub total_shares: BigDecimal, - last_transaction_version: i64, + pub last_transaction_version: i64, + pub operator_commission_percentage: BigDecimal, + pub inactive_table_handle: String, + pub active_table_handle: String, } impl DelegatorPool { @@ -100,14 +119,15 @@ impl DelegatorPool { )) } - pub fn get_balance_metadata( + pub fn get_delegated_pool_metadata_from_write_resource( write_resource: &WriteResource, txn_version: i64, ) -> anyhow::Result> { if let Some(StakeResource::DelegationPool(inner)) = StakeResource::from_write_resource(write_resource, txn_version)? { - let staking_pool_address = standardize_address(&write_resource.address.to_string()); + let staking_pool_address: String = + standardize_address(&write_resource.address.to_string()); let total_coins = inner.active_shares.total_coins; let total_shares = &inner.active_shares.total_shares / &inner.active_shares.scaling_factor; @@ -117,9 +137,35 @@ impl DelegatorPool { total_coins, total_shares, scaling_factor: inner.active_shares.scaling_factor, - active_share_table_handle: standardize_address( - &inner.active_shares.shares.inner.handle, - ), + operator_commission_percentage: inner.operator_commission_percentage.clone(), + active_share_table_handle: inner.active_shares.shares.inner.get_handle(), + inactive_share_table_handle: inner.inactive_shares.get_handle(), + })) + } else { + Ok(None) + } + } + + pub fn get_inactive_pool_metadata_from_write_table_item( + write_table_item: &WriteTableItem, + txn_version: i64, + ) -> anyhow::Result> { + let table_item_data = write_table_item.data.as_ref().unwrap(); + + if let Some(StakeTableItem::Pool(inner)) = StakeTableItem::from_table_item_type( + table_item_data.value_type.as_str(), + &table_item_data.value, + txn_version, + )? { + let total_coins = inner.total_coins; + let total_shares = &inner.total_shares / &inner.scaling_factor; + Ok(Some(PoolBalanceMetadata { + transaction_version: txn_version, + total_coins, + total_shares, + scaling_factor: inner.scaling_factor, + shares_table_handle: inner.shares.inner.get_handle(), + parent_table_handle: standardize_address(&write_table_item.handle.to_string()), })) } else { Ok(None) @@ -130,7 +176,9 @@ impl DelegatorPool { write_resource: &WriteResource, txn_version: i64, ) -> anyhow::Result> { - if let Some(balance) = Self::get_balance_metadata(write_resource, txn_version)? { + if let Some(balance) = + Self::get_delegated_pool_metadata_from_write_resource(write_resource, txn_version)? + { let staking_pool_address = balance.staking_pool_address.clone(); let total_coins = balance.total_coins.clone(); let total_shares = balance.total_shares.clone(); @@ -145,12 +193,18 @@ impl DelegatorPool { staking_pool_address: staking_pool_address.clone(), total_coins: total_coins.clone(), total_shares: total_shares.clone(), + operator_commission_percentage: balance.operator_commission_percentage.clone(), + inactive_table_handle: balance.inactive_share_table_handle.clone(), + active_table_handle: balance.active_share_table_handle.clone(), }, CurrentDelegatorPoolBalance { staking_pool_address, total_coins, total_shares, last_transaction_version: transaction_version, + operator_commission_percentage: balance.operator_commission_percentage.clone(), + inactive_table_handle: balance.inactive_share_table_handle.clone(), + active_table_handle: balance.active_share_table_handle, }, ))) } else { diff --git a/crates/indexer/src/models/stake_models/stake_utils.rs b/crates/indexer/src/models/stake_models/stake_utils.rs index 5afa8aa981cae..4ded27c470c40 100644 --- a/crates/indexer/src/models/stake_models/stake_utils.rs +++ b/crates/indexer/src/models/stake_models/stake_utils.rs @@ -15,11 +15,14 @@ pub struct StakePoolResource { #[derive(Serialize, Deserialize, Debug, Clone)] pub struct DelegationPoolResource { - pub active_shares: SharesResource, + pub active_shares: PoolResource, + pub inactive_shares: Table, + #[serde(deserialize_with = "deserialize_from_string")] + pub operator_commission_percentage: BigDecimal, } #[derive(Serialize, Deserialize, Debug, Clone)] -pub struct SharesResource { +pub struct PoolResource { pub shares: SharesInnerResource, #[serde(deserialize_with = "deserialize_from_string")] pub total_coins: BigDecimal, @@ -84,6 +87,30 @@ pub struct ReactivateStakeEvent { pub pool_address: String, } +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum StakeTableItem { + Pool(PoolResource), +} + +impl StakeTableItem { + pub fn from_table_item_type( + data_type: &str, + data: &serde_json::Value, + txn_version: i64, + ) -> Result> { + match data_type { + "0x1::pool_u64_unbound::Pool" => { + serde_json::from_value(data.clone()).map(|inner| Some(StakeTableItem::Pool(inner))) + }, + _ => Ok(None), + } + .context(format!( + "version {} failed! failed to parse type {}, data {:?}", + txn_version, data_type, data + )) + } +} + #[derive(Serialize, Deserialize, Debug, Clone)] pub enum StakeResource { StakePool(StakePoolResource), diff --git a/crates/indexer/src/models/token_models/token_utils.rs b/crates/indexer/src/models/token_models/token_utils.rs index 267474131dcf5..54dbcd72a8dd0 100644 --- a/crates/indexer/src/models/token_models/token_utils.rs +++ b/crates/indexer/src/models/token_models/token_utils.rs @@ -22,7 +22,13 @@ pub const URI_LENGTH: usize = 512; #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Table { - pub handle: String, + handle: String, +} + +impl Table { + pub fn get_handle(&self) -> String { + standardize_address(self.handle.as_str()) + } } #[derive(Serialize, Deserialize, Debug, Clone)] diff --git a/crates/indexer/src/models/token_models/tokens.rs b/crates/indexer/src/models/token_models/tokens.rs index 4a9a6812f0f81..f847f68adffa8 100644 --- a/crates/indexer/src/models/token_models/tokens.rs +++ b/crates/indexer/src/models/token_models/tokens.rs @@ -403,14 +403,11 @@ impl TableMetadataForToken { txn_version, )? { TokenResource::CollectionResource(collection_resource) => { - collection_resource.collection_data.handle + collection_resource.collection_data.get_handle() }, - TokenResource::TokenStoreResource(inner) => inner.tokens.handle, - TokenResource::PendingClaimsResource(inner) => inner.pending_claims.handle, + TokenResource::TokenStoreResource(inner) => inner.tokens.get_handle(), + TokenResource::PendingClaimsResource(inner) => inner.pending_claims.get_handle(), }; - Ok(Some(HashMap::from([( - standardize_address(&table_handle), - value, - )]))) + Ok(Some(HashMap::from([(table_handle, value)]))) } } diff --git a/crates/indexer/src/processors/stake_processor.rs b/crates/indexer/src/processors/stake_processor.rs index eb7f5f00a3535..e86cf78cb06d0 100644 --- a/crates/indexer/src/processors/stake_processor.rs +++ b/crates/indexer/src/processors/stake_processor.rs @@ -214,13 +214,13 @@ fn insert_delegator_balances( conn, diesel::insert_into(schema::current_delegator_balances::table) .values(&item_to_insert[start_ind..end_ind]) - .on_conflict((delegator_address, pool_address, pool_type)) + .on_conflict((delegator_address, pool_address, pool_type, table_handle)) .do_update() .set(( - table_handle.eq(excluded(table_handle)), last_transaction_version.eq(excluded(last_transaction_version)), inserted_at.eq(excluded(inserted_at)), shares.eq(excluded(shares)), + parent_table_handle.eq(excluded(parent_table_handle)), )), Some( " WHERE current_delegator_balances.last_transaction_version <= EXCLUDED.last_transaction_version ", @@ -269,11 +269,7 @@ fn insert_delegator_pool_balances( diesel::insert_into(schema::delegated_staking_pool_balances::table) .values(&item_to_insert[start_ind..end_ind]) .on_conflict((transaction_version, staking_pool_address)) - .do_update() - .set(( - total_shares.eq(excluded(total_shares)), - inserted_at.eq(excluded(inserted_at)), - )), + .do_nothing(), None, )?; } @@ -302,6 +298,9 @@ fn insert_current_delegator_pool_balances( total_shares.eq(excluded(total_shares)), last_transaction_version.eq(excluded(last_transaction_version)), inserted_at.eq(excluded(inserted_at)), + operator_commission_percentage.eq(excluded(operator_commission_percentage)), + inactive_table_handle.eq(excluded(inactive_table_handle)), + active_table_handle.eq(excluded(active_table_handle)), )), Some( " WHERE current_delegated_staking_pool_balances.last_transaction_version <= EXCLUDED.last_transaction_version ", @@ -323,6 +322,8 @@ impl TransactionProcessor for StakeTransactionProcessor { start_version: u64, end_version: u64, ) -> Result { + let mut conn = self.get_conn(); + let mut all_current_stake_pool_voters: StakingPoolVoterMap = HashMap::new(); let mut all_proposal_votes = vec![]; let mut all_delegator_activities = vec![]; @@ -343,7 +344,8 @@ impl TransactionProcessor for StakeTransactionProcessor { all_delegator_activities.append(&mut delegator_activities); // Add delegator balances - let delegator_balances = CurrentDelegatorBalance::from_transaction(txn).unwrap(); + let delegator_balances = + CurrentDelegatorBalance::from_transaction(txn, &mut conn).unwrap(); all_delegator_balances.extend(delegator_balances); // Add delegator pools @@ -382,7 +384,6 @@ impl TransactionProcessor for StakeTransactionProcessor { all_current_delegator_pool_balances .sort_by(|a, b| a.staking_pool_address.cmp(&b.staking_pool_address)); - let mut conn = self.get_conn(); let tx_result = insert_to_db( &mut conn, self.name(), diff --git a/crates/indexer/src/schema.rs b/crates/indexer/src/schema.rs index ab8de1081e5cc..acdff3ee82739 100644 --- a/crates/indexer/src/schema.rs +++ b/crates/indexer/src/schema.rs @@ -187,11 +187,14 @@ diesel::table! { total_shares -> Numeric, last_transaction_version -> Int8, inserted_at -> Timestamp, + operator_commission_percentage -> Numeric, + inactive_table_handle -> Varchar, + active_table_handle -> Varchar, } } diesel::table! { - current_delegator_balances (delegator_address, pool_address, pool_type) { + current_delegator_balances (delegator_address, pool_address, pool_type, table_handle) { delegator_address -> Varchar, pool_address -> Varchar, pool_type -> Varchar, @@ -199,6 +202,7 @@ diesel::table! { last_transaction_version -> Int8, inserted_at -> Timestamp, shares -> Numeric, + parent_table_handle -> Varchar, } } @@ -359,6 +363,9 @@ diesel::table! { total_coins -> Numeric, total_shares -> Numeric, inserted_at -> Timestamp, + operator_commission_percentage -> Numeric, + inactive_table_handle -> Varchar, + active_table_handle -> Varchar, } }