Skip to content

Commit

Permalink
[Indexer] added nft_points custom contract parsing (aptos-labs#7018)
Browse files Browse the repository at this point in the history
* added nft_points custom contract parsing

* Add more fields
  • Loading branch information
bowenyang007 authored Mar 10, 2023
1 parent 354ed8b commit 3fc72cc
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 1 deletion.
5 changes: 5 additions & 0 deletions config/src/config/indexer_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ pub struct IndexerConfig {
/// Which address does the ans contract live at. Only available for token_processor. If null, disable ANS indexing
#[serde(default, skip_serializing_if = "Option::is_none")]
pub ans_contract_address: Option<String>,

/// Custom NFT points contract
#[serde(default, skip_serializing_if = "Option::is_none")]
pub nft_points_contract: Option<String>,
}

impl Debug for IndexerConfig {
Expand All @@ -94,6 +98,7 @@ impl Debug for IndexerConfig {
.field("emit_every", &self.emit_every)
.field("gap_lookback_versions", &self.gap_lookback_versions)
.field("ans_contract_address", &self.ans_contract_address)
.field("nft_points_contract", &self.nft_points_contract)
.finish()
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- This file should undo anything in `up.sql`\
DROP TABLE IF EXISTS nft_points;
DROP INDEX IF EXISTS np_oa_idx;
DROP INDEX IF EXISTS np_tt_oa_idx;
DROP INDEX IF EXISTS np_insat_idx;
13 changes: 13 additions & 0 deletions crates/indexer/migrations/2023-03-08-205402_nft_points/up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- Your SQL goes here
CREATE TABLE nft_points (
transaction_version BIGINT UNIQUE PRIMARY KEY NOT NULL,
owner_address VARCHAR(66) NOT NULL,
token_name TEXT NOT NULL,
point_type TEXT NOT NULL,
amount NUMERIC NOT NULL,
transaction_timestamp TIMESTAMP NOT NULL,
inserted_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE INDEX np_oa_idx ON nft_points (owner_address);
CREATE INDEX np_tt_oa_idx ON nft_points (transaction_timestamp, owner_address);
CREATE INDEX np_insat_idx ON nft_points (inserted_at);
1 change: 1 addition & 0 deletions crates/indexer/src/models/token_models/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

pub mod ans_lookup;
pub mod collection_datas;
pub mod nft_points;
pub mod token_activities;
pub mod token_claims;
pub mod token_datas;
Expand Down
75 changes: 75 additions & 0 deletions crates/indexer/src/models/token_models/nft_points.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

// This is required because a diesel macro makes clippy sad
#![allow(clippy::extra_unused_lifetimes)]
#![allow(clippy::unused_unit)]

use crate::{
schema::nft_points,
util::{parse_timestamp, standardize_address},
};
use aptos_api_types::{Transaction as APITransaction, TransactionPayload};
use bigdecimal::BigDecimal;
use diesel::prelude::*;
use field_count::FieldCount;
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize, FieldCount, Identifiable, Insertable, Serialize)]
#[diesel(primary_key(transaction_version))]
#[diesel(table_name = nft_points)]
pub struct NftPoints {
pub transaction_version: i64,
pub owner_address: String,
pub token_name: String,
pub point_type: String,
pub amount: BigDecimal,
pub transaction_timestamp: chrono::NaiveDateTime,
}

impl NftPoints {
pub fn from_transaction(
transaction: &APITransaction,
nft_points_contract: Option<String>,
) -> Option<Self> {
if let Some(contract) = nft_points_contract {
if let APITransaction::UserTransaction(user_txn) = transaction {
let payload = &user_txn.request.payload;
// If failed transaction, end
if !user_txn.info.success {
return None;
}
if let TransactionPayload::EntryFunctionPayload(entry_function_payload) = payload {
if entry_function_payload.function.to_string() == contract {
let transaction_version = user_txn.info.version.0 as i64;
let owner_address = standardize_address(
entry_function_payload.arguments[0].as_str().unwrap(),
);
let amount = entry_function_payload.arguments[2]
.as_str()
.unwrap()
.parse()
.unwrap();
let transaction_timestamp =
parse_timestamp(user_txn.timestamp.0, transaction_version);
return Some(Self {
transaction_version,
owner_address,
token_name: entry_function_payload.arguments[1]
.as_str()
.unwrap()
.to_string(),
point_type: entry_function_payload.arguments[3]
.as_str()
.unwrap()
.to_string(),
amount,
transaction_timestamp,
});
}
}
}
}
None
}
}
47 changes: 46 additions & 1 deletion crates/indexer/src/processors/token_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
models::token_models::{
ans_lookup::{CurrentAnsLookup, CurrentAnsLookupPK},
collection_datas::{CollectionData, CurrentCollectionData},
nft_points::NftPoints,
token_activities::TokenActivity,
token_claims::CurrentTokenPendingClaim,
token_datas::{CurrentTokenData, TokenData},
Expand All @@ -33,17 +34,23 @@ pub const NAME: &str = "token_processor";
pub struct TokenTransactionProcessor {
connection_pool: PgDbPool,
ans_contract_address: Option<String>,
nft_points_contract: Option<String>,
}

impl TokenTransactionProcessor {
pub fn new(connection_pool: PgDbPool, ans_contract_address: Option<String>) -> Self {
pub fn new(
connection_pool: PgDbPool,
ans_contract_address: Option<String>,
nft_points_contract: Option<String>,
) -> Self {
aptos_logger::info!(
ans_contract_address = ans_contract_address,
"init TokenTransactionProcessor"
);
Self {
connection_pool,
ans_contract_address,
nft_points_contract,
}
}
}
Expand All @@ -70,6 +77,7 @@ fn insert_to_db_impl(
token_activities: &[TokenActivity],
current_token_claims: &[CurrentTokenPendingClaim],
current_ans_lookups: &[CurrentAnsLookup],
nft_points: &[NftPoints],
) -> Result<(), diesel::result::Error> {
let (tokens, token_ownerships, token_datas, collection_datas) = basic_token_transaction_lists;
let (current_token_ownerships, current_token_datas, current_collection_datas) =
Expand All @@ -84,6 +92,7 @@ fn insert_to_db_impl(
insert_token_activities(conn, token_activities)?;
insert_current_token_claims(conn, current_token_claims)?;
insert_current_ans_lookups(conn, current_ans_lookups)?;
insert_nft_points(conn, nft_points)?;
Ok(())
}

Expand All @@ -106,6 +115,7 @@ fn insert_to_db(
token_activities: Vec<TokenActivity>,
current_token_claims: Vec<CurrentTokenPendingClaim>,
current_ans_lookups: Vec<CurrentAnsLookup>,
nft_points: Vec<NftPoints>,
) -> Result<(), diesel::result::Error> {
aptos_logger::trace!(
name = name,
Expand All @@ -131,6 +141,7 @@ fn insert_to_db(
&token_activities,
&current_token_claims,
&current_ans_lookups,
&nft_points,
)
}) {
Ok(_) => Ok(()),
Expand All @@ -148,6 +159,7 @@ fn insert_to_db(
let token_activities = clean_data_for_db(token_activities, true);
let current_token_claims = clean_data_for_db(current_token_claims, true);
let current_ans_lookups = clean_data_for_db(current_ans_lookups, true);
let nft_points = clean_data_for_db(nft_points, true);

insert_to_db_impl(
pg_conn,
Expand All @@ -160,6 +172,7 @@ fn insert_to_db(
&token_activities,
&current_token_claims,
&current_ans_lookups,
&nft_points,
)
}),
}
Expand Down Expand Up @@ -467,6 +480,27 @@ fn insert_current_ans_lookups(
Ok(())
}

fn insert_nft_points(
conn: &mut PgConnection,
items_to_insert: &[NftPoints],
) -> Result<(), diesel::result::Error> {
use schema::nft_points::dsl::*;

let chunks = get_chunks(items_to_insert.len(), NftPoints::field_count());

for (start_ind, end_ind) in chunks {
execute_with_better_error(
conn,
diesel::insert_into(schema::nft_points::table)
.values(&items_to_insert[start_ind..end_ind])
.on_conflict(transaction_version)
.do_nothing(),
None,
)?;
}
Ok(())
}

#[async_trait]
impl TransactionProcessor for TokenTransactionProcessor {
fn name(&self) -> &'static str {
Expand All @@ -492,6 +526,9 @@ impl TransactionProcessor for TokenTransactionProcessor {
let mut all_collection_datas = vec![];
let mut all_token_activities = vec![];

// This is likely temporary
let mut all_nft_points = vec![];

// Hashmap key will be the PK of the table, we do not want to send duplicates writes to the db within a batch
let mut all_current_token_ownerships: HashMap<
CurrentTokenOwnershipPK,
Expand Down Expand Up @@ -539,6 +576,13 @@ impl TransactionProcessor for TokenTransactionProcessor {
let current_ans_lookups =
CurrentAnsLookup::from_transaction(&txn, self.ans_contract_address.clone());
all_current_ans_lookups.extend(current_ans_lookups);

// NFT points
let nft_points_txn =
NftPoints::from_transaction(&txn, self.nft_points_contract.clone());
if let Some(nft_points) = nft_points_txn {
all_nft_points.push(nft_points);
}
}

// Getting list of values and sorting by pk in order to avoid postgres deadlock since we're doing multi threaded db writes
Expand Down Expand Up @@ -606,6 +650,7 @@ impl TransactionProcessor for TokenTransactionProcessor {
all_token_activities,
all_current_token_claims,
all_current_ans_lookups,
all_nft_points,
);
match tx_result {
Ok(_) => Ok(ProcessingResult::new(
Expand Down
1 change: 1 addition & 0 deletions crates/indexer/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ pub async fn run_forever(config: IndexerConfig, context: Arc<Context>) {
Processor::TokenProcessor => Arc::new(TokenTransactionProcessor::new(
conn_pool.clone(),
config.ans_contract_address,
config.nft_points_contract,
)),
Processor::CoinProcessor => Arc::new(CoinTransactionProcessor::new(conn_pool.clone())),
Processor::StakeProcessor => Arc::new(StakeTransactionProcessor::new(conn_pool.clone())),
Expand Down
13 changes: 13 additions & 0 deletions crates/indexer/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,18 @@ diesel::table! {
}
}

diesel::table! {
nft_points (transaction_version) {
transaction_version -> Int8,
owner_address -> Varchar,
token_name -> Text,
point_type -> Text,
amount -> Numeric,
transaction_timestamp -> Timestamp,
inserted_at -> Timestamp,
}
}

diesel::table! {
processor_status (processor) {
processor -> Varchar,
Expand Down Expand Up @@ -540,6 +552,7 @@ diesel::allow_tables_to_appear_in_same_query!(
ledger_infos,
move_modules,
move_resources,
nft_points,
processor_status,
processor_statuses,
proposal_votes,
Expand Down

0 comments on commit 3fc72cc

Please sign in to comment.