Skip to content

Commit

Permalink
Add Events to query helpers
Browse files Browse the repository at this point in the history
  • Loading branch information
CapCap authored and aptos-bot committed Apr 15, 2022
1 parent f237ceb commit d28574d
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 28 deletions.
2 changes: 1 addition & 1 deletion ecosystem/indexer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ Try running the indexer with `--help` to get more details
* `diesel migration run` to apply the missing migrations,
* `diesel migration redo` to rollback and apply the last migration
* `diesel database reset` drops the existing database and reruns all the migration
* You can find more information in the [Diesel](https://diesel.rs/) documentation
* You can find more information in the [Diesel](https://diesel.rs/) documentation
17 changes: 10 additions & 7 deletions ecosystem/indexer/src/indexer/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ impl TransactionFetcher {
}

/// Fetches the next version based on its internal version counter
/// Under the hood, it fetches 100 (when needed), and uses that buffer to feed out
/// In the event it can't, it will keep retrying every 5s
/// Under the hood, it fetches TRANSACTION_FETCH_BATCH_SIZE versions in bulk (when needed), and uses that buffer to feed out
/// In the event it can't fetch, it will keep retrying every RETRY_TIME_MILLIS ms
pub async fn fetch_next(&mut self) -> Transaction {
let mut transactions_buffer = self.transactions_buffer.lock().await;
if transactions_buffer.is_empty() {
Expand All @@ -58,19 +58,21 @@ impl TransactionFetcher {
// If it's a 404, then we're all caught up; no need to increment the `UNABLE_TO_FETCH_TRANSACTION` counter
if err_str.contains("404") {
aptos_logger::debug!(
"Could not fetch {} transactions starting at {}: all caught up. Will check again in 5s.",
"Could not fetch {} transactions starting at {}: all caught up. Will check again in {}ms.",
TRANSACTION_FETCH_BATCH_SIZE,
self.version,
RETRY_TIME_MILLIS,
);
tokio::time::sleep(Duration::from_millis(RETRY_TIME_MILLIS)).await;
continue;
}
UNABLE_TO_FETCH_TRANSACTION.inc();
aptos_logger::error!(
"Could not fetch {} transactions starting at {}, will retry in 5s. Err: {:?}",
"Could not fetch {} transactions starting at {}, will retry in {}ms. Err: {:?}",
TRANSACTION_FETCH_BATCH_SIZE,
self.version,
324
RETRY_TIME_MILLIS,
err
);
tokio::time::sleep(Duration::from_millis(RETRY_TIME_MILLIS)).await;
}
Expand All @@ -84,7 +86,7 @@ impl TransactionFetcher {
}

/// fetches one version; this used for error checking/repair/etc
/// In the event it can't, it will keep retrying every 5s
/// In the event it can't, it will keep retrying every RETRY_TIME_MILLIS ms
pub async fn fetch_version(&self, version: u64) -> Transaction {
loop {
let res = self.client.get_transaction_by_version(version).await;
Expand All @@ -96,8 +98,9 @@ impl TransactionFetcher {
Err(err) => {
UNABLE_TO_FETCH_TRANSACTION.inc();
aptos_logger::error!(
"Could not fetch version {}, will retry in 5s. Err: {:?}",
"Could not fetch version {}, will retry in {}ms. Err: {:?}",
version,
RETRY_TIME_MILLIS,
err
);
tokio::time::sleep(Duration::from_millis(RETRY_TIME_MILLIS)).await;
Expand Down
9 changes: 9 additions & 0 deletions ecosystem/indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ struct IndexerArgs {
#[clap(long)]
skip_previous_errors: bool,

/// If set, will exit after migrations/repairs instead of starting indexing loop
#[clap(long)]
dont_index: bool,

/// If set, will ignore database contents and start processing from the specified version.
/// This will not delete any database contents, just transactions as it reprocesses them.
#[clap(long)]
Expand Down Expand Up @@ -78,6 +82,11 @@ async fn main() -> std::io::Result<()> {
tailer.handle_previous_errors().await;
}

if args.dont_index {
info!("All pre-index tasks complete, exiting!");
return Ok(());
}

info!("Indexing loop started!");
let mut processed: usize = starting_version as usize;
let mut base: usize = 0;
Expand Down
8 changes: 6 additions & 2 deletions ecosystem/indexer/src/models/events.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
// Copyright (c) Aptos
// SPDX-License-Identifier: Apache-2.0

use crate::schema::events;
use crate::{models::transactions::Transaction, schema::events};
use aptos_rest_client::aptos_api_types::Event as APIEvent;
use serde::Serialize;

#[derive(Debug, Queryable, Insertable)]
#[derive(Debug, Serialize, Queryable, Insertable, Associations, Identifiable)]
#[diesel(table_name = "events")]
#[belongs_to(Transaction, foreign_key = "transaction_hash")]
#[primary_key(key, sequence_number)]
pub struct Event {
pub transaction_hash: String,
pub key: String,
Expand All @@ -29,6 +32,7 @@ impl Event {
inserted_at: chrono::Utc::now().naive_utc(),
}
}

pub fn from_events(transaction_hash: String, events: &[APIEvent]) -> Option<Vec<Self>> {
if events.is_empty() {
return None;
Expand Down
63 changes: 45 additions & 18 deletions ecosystem/indexer/src/models/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ use diesel::{
BelongingToDsl, ExpressionMethods, GroupedBy, OptionalExtension, QueryDsl, RunQueryDsl,
};
use futures::future::Either;
use serde::Serialize;

#[derive(Debug, Queryable, Insertable, AsChangeset, Identifiable)]
#[derive(Debug, Queryable, Serialize, Insertable, AsChangeset, Identifiable)]
#[primary_key(hash)]
#[diesel(table_name = "transactions")]
pub struct Transaction {
Expand Down Expand Up @@ -47,30 +48,36 @@ impl Transaction {
Transaction,
Option<UserTransaction>,
Option<BlockMetadataTransaction>,
Vec<EventModel>,
)>,
> {
let mut transactions = transactions::table
let mut txs = transactions::table
.filter(transactions::version.ge(start_version))
.order(transactions::version.asc())
.limit(number_to_get)
.load::<Transaction>(connection)?;

let mut user_transactions: Vec<Vec<UserTransaction>> =
UserTransaction::belonging_to(&transactions)
.load::<UserTransaction>(connection)?
.grouped_by(&transactions);
let mut user_transactions: Vec<Vec<UserTransaction>> = UserTransaction::belonging_to(&txs)
.load::<UserTransaction>(connection)?
.grouped_by(&txs);

let mut block_metadata_transactions: Vec<Vec<BlockMetadataTransaction>> =
BlockMetadataTransaction::belonging_to(&transactions)
BlockMetadataTransaction::belonging_to(&txs)
.load::<BlockMetadataTransaction>(connection)?
.grouped_by(&transactions);
.grouped_by(&txs);

let mut events: Vec<Vec<EventModel>> = EventModel::belonging_to(&txs)
.load::<EventModel>(connection)?
.grouped_by(&txs);

// Convert to the nice result tuple
let mut result = vec![];
while !transactions.is_empty() {
while !txs.is_empty() {
result.push((
transactions.pop().unwrap(),
txs.pop().unwrap(),
user_transactions.pop().unwrap().pop(),
block_metadata_transactions.pop().unwrap().pop(),
events.pop().unwrap(),
))
}

Expand All @@ -84,15 +91,21 @@ impl Transaction {
Transaction,
Option<UserTransaction>,
Option<BlockMetadataTransaction>,
Vec<EventModel>,
)> {
let transaction = transactions::table
.filter(transactions::version.eq(version as i64))
.first::<Transaction>(connection)?;

let (user_transaction, block_metadata_transaction) =
let (user_transaction, block_metadata_transaction, events) =
transaction.get_details_for_transaction(connection)?;

Ok((transaction, user_transaction, block_metadata_transaction))
Ok((
transaction,
user_transaction,
block_metadata_transaction,
events,
))
}

pub fn get_by_hash(
Expand All @@ -102,24 +115,38 @@ impl Transaction {
Transaction,
Option<UserTransaction>,
Option<BlockMetadataTransaction>,
Vec<EventModel>,
)> {
let transaction = transactions::table
.filter(transactions::hash.eq(&transaction_hash))
.first::<Transaction>(connection)?;

let (user_transaction, block_metadata_transaction) =
let (user_transaction, block_metadata_transaction, events) =
transaction.get_details_for_transaction(connection)?;

Ok((transaction, user_transaction, block_metadata_transaction))
Ok((
transaction,
user_transaction,
block_metadata_transaction,
events,
))
}

fn get_details_for_transaction(
&self,
connection: &PgPoolConnection,
) -> diesel::QueryResult<(Option<UserTransaction>, Option<BlockMetadataTransaction>)> {
) -> diesel::QueryResult<(
Option<UserTransaction>,
Option<BlockMetadataTransaction>,
Vec<EventModel>,
)> {
let mut user_transaction: Option<UserTransaction> = None;
let mut block_metadata_transaction: Option<BlockMetadataTransaction> = None;

let events = crate::schema::events::table
.filter(crate::schema::events::transaction_hash.eq(&self.hash))
.load::<EventModel>(connection)?;

match self.type_.as_str() {
"user_transaction" => {
user_transaction = user_transactions::table
Expand All @@ -135,7 +162,7 @@ impl Transaction {
}
_ => unreachable!("Unknown transaction type: {}", &self.type_),
};
Ok((user_transaction, block_metadata_transaction))
Ok((user_transaction, block_metadata_transaction, events))
}

pub fn from_transaction(
Expand Down Expand Up @@ -200,7 +227,7 @@ impl Transaction {
}
}

#[derive(Debug, Queryable, Identifiable, Insertable, AsChangeset, Associations)]
#[derive(Debug, Queryable, Serialize, Identifiable, Insertable, AsChangeset, Associations)]
#[belongs_to(Transaction, foreign_key = "hash")]
#[primary_key(hash)]
#[diesel(table_name = "user_transactions")]
Expand Down Expand Up @@ -239,7 +266,7 @@ impl UserTransaction {
}
}

#[derive(Debug, Queryable, Identifiable, Insertable, AsChangeset, Associations)]
#[derive(Debug, Queryable, Serialize, Identifiable, Insertable, AsChangeset, Associations)]
#[belongs_to(Transaction, foreign_key = "hash")]
#[primary_key("hash")]
#[diesel(table_name = "block_metadata_transactions")]
Expand Down

0 comments on commit d28574d

Please sign in to comment.