Skip to content

Commit

Permalink
Review Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
CapCap authored and aptos-bot committed Apr 15, 2022
1 parent 63c1de1 commit b0a5387
Show file tree
Hide file tree
Showing 12 changed files with 33 additions and 20 deletions.
4 changes: 1 addition & 3 deletions ecosystem/indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ repository = "https://github.com/aptos-labs/aptos-core"
homepage = "https://aptoslabs.com"
license = "Apache-2.0"
publish = false
default-run = "indexer"

[dependencies]
aptos-workspace-hack = { version = "0.1", path = "../../crates/aptos-workspace-hack" }
Expand All @@ -32,5 +31,4 @@ url = "2.2.2"


[[bin]]
name = "indexer"
path = "src/main.rs"
name = "aptos-indexer"
7 changes: 7 additions & 0 deletions ecosystem/indexer/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
# Aptos Indexer
> Tails the blockchain's transactions and pushes them into a postgres DB
Tails the node utilizing the rest interface/client, and maintains state for each registered `TransactionProcessor`.
On startup, by default, will retry any previously errored versions for each registered processor.

When developing your own, ensure each `TransactionProcessor` is idempotent, and being called with the same input won't result in an error if
some or all of the processing had previously been completed.


Example invocation:
```bash
cargo run -- --pg-uri "postgresql://localhost/postgres" --node-url "https://fullnode.devnet.aptoslabs.com" --emit-every 25 --batch-size 100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ CREATE TABLE user_transactions
sender VARCHAR(255) NOT NULL,
sequence_number BIGINT NOT NULL,
max_gas_amount BIGINT NOT NULL,
-- ignore 'gas_currency_code' and 'gas_unit_price', we'll remove it
-- ignore 'gas_currency_code', as we'll remove it
expiration_timestamp_secs TIMESTAMP NOT NULL,
gas_unit_price BIGINT NOT NULL,

-- from UserTransaction
"timestamp" TIMESTAMP NOT NULL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ CREATE TABLE processor_statuses
(
name VARCHAR(50) NOT NULL,
version BIGINT NOT NULL,
ok BOOLEAN NOT NULL,
success BOOLEAN NOT NULL,
details TEXT,
last_updated TIMESTAMP NOT NULL DEFAULT NOW(),

Expand Down
2 changes: 1 addition & 1 deletion ecosystem/indexer/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub static PROCESSOR_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
/// Number of times any given processor has completed successfully
pub static PROCESSOR_OKS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"indexer_processor_ok_count",
"indexer_processor_success_count",
"Number of times a given processor has completed successfully",
&["processor_name"]
)
Expand Down
6 changes: 3 additions & 3 deletions ecosystem/indexer/src/default_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl TransactionProcessor for DefaultTransactionProcessor {
})?;

let tx_result = conn.transaction::<(), diesel::result::Error, _>(||{
aptos_logger::debug!(
aptos_logger::trace!(
"[default_processor] inserting 'transaction' version {} with hash {}",
version,
&transaction_model.hash
Expand All @@ -80,7 +80,7 @@ impl TransactionProcessor for DefaultTransactionProcessor {
if let Some(tx_details_model) = maybe_details_model {
match tx_details_model {
Either::Left(ut) => {
aptos_logger::debug!(
aptos_logger::trace!(
"[default_processor] inserting 'user_transaction' version {} with hash {}",
version,
&transaction_model.hash
Expand All @@ -96,7 +96,7 @@ impl TransactionProcessor for DefaultTransactionProcessor {
.expect("Error inserting row into database");
}
Either::Right(bmt) => {
aptos_logger::debug!(
aptos_logger::trace!(
"[default_processor] inserting 'block_metadata_transaction' version {} with hash {}",
version,
&transaction_model.hash
Expand Down
2 changes: 1 addition & 1 deletion ecosystem/indexer/src/indexer/tailer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl Tailer {
self.processors.push(processor);
}

/// For all versions which have an `ok=false` in the `processor_status` table, re-run them
/// For all versions which have an `success=false` in the `processor_status` table, re-run them
pub async fn handle_previous_errors(&self) {
info!("Checking for previously errored versions...");
let mut tasks = vec![];
Expand Down
6 changes: 5 additions & 1 deletion ecosystem/indexer/src/indexer/transaction_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,11 @@ pub trait TransactionProcessor: Send + Sync + Debug {

dsl::processor_statuses
.select(dsl::version)
.filter(dsl::ok.eq(false).and(dsl::name.eq(self.name().to_string())))
.filter(
dsl::success
.eq(false)
.and(dsl::name.eq(self.name().to_string())),
)
.load::<i64>(&conn)
.expect("Error loading the error versions only query")
.iter()
Expand Down
2 changes: 1 addition & 1 deletion ecosystem/indexer/src/models/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{models::transactions::Transaction, schema::events};
use aptos_rest_client::aptos_api_types::Event as APIEvent;
use serde::Serialize;

#[derive(Debug, Serialize, Queryable, Insertable, Associations, Identifiable)]
#[derive(Associations, Debug, Identifiable, Insertable, Queryable, Serialize)]
#[diesel(table_name = "events")]
#[belongs_to(Transaction, foreign_key = "transaction_hash")]
#[primary_key(key, sequence_number)]
Expand Down
8 changes: 4 additions & 4 deletions ecosystem/indexer/src/models/processor_statuses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,22 @@ use crate::{
schema::processor_statuses as processor_statuss,
};

#[derive(Debug, Queryable, Insertable, AsChangeset)]
#[derive(AsChangeset, Debug, Insertable, Queryable)]
#[diesel(table_name = processor_statuses)]
pub struct ProcessorStatus {
pub name: &'static str,
pub version: i64,
pub ok: bool,
pub success: bool,
pub details: Option<String>,
pub last_updated: chrono::NaiveDateTime,
}

impl ProcessorStatus {
pub fn new(name: &'static str, version: i64, ok: bool, details: Option<String>) -> Self {
pub fn new(name: &'static str, version: i64, success: bool, details: Option<String>) -> Self {
Self {
name,
version,
ok,
success,
details,
last_updated: chrono::Utc::now().naive_utc(),
}
Expand Down
8 changes: 5 additions & 3 deletions ecosystem/indexer/src/models/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use diesel::{
use futures::future::Either;
use serde::Serialize;

#[derive(Debug, Queryable, Serialize, Insertable, AsChangeset, Identifiable)]
#[derive(AsChangeset, Debug, Identifiable, Insertable, Queryable, Serialize)]
#[primary_key(hash)]
#[diesel(table_name = "transactions")]
pub struct Transaction {
Expand Down Expand Up @@ -228,7 +228,7 @@ impl Transaction {
}
}

#[derive(Debug, Queryable, Serialize, Identifiable, Insertable, AsChangeset, Associations)]
#[derive(AsChangeset, Associations, Debug, Identifiable, Insertable, Queryable, Serialize)]
#[belongs_to(Transaction, foreign_key = "hash")]
#[primary_key(hash)]
#[diesel(table_name = "user_transactions")]
Expand All @@ -241,6 +241,7 @@ pub struct UserTransaction {

// from UserTransactionRequest
pub expiration_timestamp_secs: chrono::NaiveDateTime,
pub gas_unit_price: i64,

// from UserTransaction
pub timestamp: chrono::NaiveDateTime,
Expand All @@ -261,13 +262,14 @@ impl UserTransaction {
*tx.request.expiration_timestamp_secs.inner() as i64,
0,
),
gas_unit_price: *tx.request.gas_unit_price.inner() as i64,
timestamp: parse_timestamp(tx.timestamp, tx.info.version),
inserted_at: chrono::Utc::now().naive_utc(),
}
}
}

#[derive(Debug, Queryable, Serialize, Identifiable, Insertable, AsChangeset, Associations)]
#[derive(AsChangeset, Associations, Debug, Identifiable, Insertable, Queryable, Serialize)]
#[belongs_to(Transaction, foreign_key = "hash")]
#[primary_key("hash")]
#[diesel(table_name = "block_metadata_transactions")]
Expand Down
3 changes: 2 additions & 1 deletion ecosystem/indexer/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ table! {
processor_statuses (name, version) {
name -> Varchar,
version -> Int8,
ok -> Bool,
success -> Bool,
details -> Nullable<Text>,
last_updated -> Timestamp,
}
Expand Down Expand Up @@ -60,6 +60,7 @@ table! {
sequence_number -> Int8,
max_gas_amount -> Int8,
expiration_timestamp_secs -> Timestamp,
gas_unit_price -> Int8,
timestamp -> Timestamp,
inserted_at -> Timestamp,
}
Expand Down

0 comments on commit b0a5387

Please sign in to comment.