Skip to content

Commit

Permalink
Review Feedback Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
CapCap authored and aptos-bot committed Apr 15, 2022
1 parent 26d8044 commit e590538
Show file tree
Hide file tree
Showing 12 changed files with 348 additions and 99 deletions.
11 changes: 5 additions & 6 deletions ecosystem/indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,9 @@ license = "Apache-2.0"
publish = false

[dependencies]
aptos-workspace-hack = { version = "0.1", path = "../../crates/aptos-workspace-hack" }
aptos-logger = { path = "../../crates/aptos-logger" }
aptos-metrics = { path = "../../crates/aptos-metrics" }
aptos-rest-client = { path = "../../crates/aptos-rest-client" }

anyhow = "1.0.52"
async-trait = "0.1.42"
chrono = { version = "0.4.19", default-features = false, features = ["clock", "serde"] }

clap = "3.1.6"
diesel = { version = "1.4.8", features = ["chrono", "postgres", "r2d2", "numeric", "serde_json"] }
diesel_migrations = { version = "1.4.0", features = ["postgres"] }
Expand All @@ -29,6 +23,11 @@ serde_json = "1.0.64"
tokio = { version = "1.8.1", features = ["full", "time"] }
url = "2.2.2"

aptos-workspace-hack = { version = "0.1", path = "../../crates/aptos-workspace-hack" }
aptos-logger = { path = "../../crates/aptos-logger" }
aptos-metrics = { path = "../../crates/aptos-metrics" }
aptos-rest-client = { path = "../../crates/aptos-rest-client" }


[[bin]]
name = "aptos-indexer"
55 changes: 47 additions & 8 deletions ecosystem/indexer/README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
# Aptos Indexer
> Tails the blockchain's transactions and pushes them into a postgres DB

Tails the node utilizing the rest interface/client, and maintains state for each registered `TransactionProcessor`.
On startup, by default, will retry any previously errored versions for each registered processor.
> Tails the blockchain's transactions and pushes them into a postgres DB
When developing your own, ensure each `TransactionProcessor` is idempotent, and being called with the same input won't result in an error if
some or all of the processing had previously been completed.
Tails the node utilizing the rest interface/client, and maintains state for each registered `TransactionProcessor`. On
startup, by default, will retry any previously errored versions for each registered processor.

When developing your own, ensure each `TransactionProcessor` is idempotent, and being called with the same input won't
result in an error if some or all of the processing had previously been completed.

Example invocation:

```bash
cargo run -- --pg-uri "postgresql://localhost/postgres" --node-url "https://fullnode.devnet.aptoslabs.com" --emit-every 25 --batch-size 100
```

Try running the indexer with `--help` to get more details

## Requirements
Expand All @@ -21,13 +23,50 @@ Try running the indexer with `--help` to get more details
- [Postgres](https://www.postgresql.org/)

# Local Development

> *Notes*:
> - Diesel uses the `DATABASE_URL` env var to connect to the database
> - `diesel migration run` sets up the database and run all available migrations.
> - `diesel migration run` sets up the database and runs all available migrations.
## Adding new tables / Updating tables with Diesel
* `diesel migration generate <your_migration_name>` generates a new folder containing `up.sql + down.sql` for your migration

* `diesel migration generate <your_migration_name>` generates a new folder containing `up.sql + down.sql` for your
migration
* `diesel migration run` to apply the missing migrations,
* `diesel migration redo` to rollback and apply the last migration
* `diesel database reset` drops the existing database and reruns all the migration
* `diesel database reset` drops the existing database and reruns all the migrations
* You can find more information in the [Diesel](https://diesel.rs/) documentation

# General Flow

The `Tailer` is the central glue that holds all the other components together. It's responsible for the following:

1. Maintaining processor state. The `Tailer` keeps a record of the `Result` of each `TransactionProcessor`'s output for
each transaction version (eg: transaction). If a `TransactionProcessor` returns a `Result::Err()` for a transaction,
the `Tailer` will mark that version as failed in the database (along with the stringified error text) and continue
on.
2. Retry failed versions for each `TransactionProcessor`. By default, when a `Tailer` is started, it will re-fetch the
versions for all `TransactionProcessor` which have failed, and attempt to re-process them. The `Result::Ok`
/`Result::Err` returned from the `TransactionProcessor::process_version` replace the state in the DB for the
given `TransactionProcessor`/version combination.
3. Piping new transactions from the `Fetcher` into each `TransactionProcessor` that was registered to it.
Each `TransactionProcessor` gets its own copy, in its own `tokio::Task`, for each version. These are done in batches,
the size of which is specifiable via `--batch-size`. For other tunable parameters, try `cargo run -- --help`.

The `Fetcher` is responsible for fetching transactions from a node in one of two ways:

1. One at a time (used by the `Tailer` when retrying previously errored transactions).
2. In bulk, with an internal buffer. Although the `Tailer` only fetches one transaction at a time from the `Fetcher`,
internally the `Fetcher` will fetch from the `/transactions` endpoint, which returns potentially hundreds of
transactions at a time. This is much more efficient than making hundreds of individual HTTP calls. In the future,
when there is a streaming Node API, that would be the optimal source of transactions.

All the above comes free 'out of the box'. The `TransactionProcessor` is where everything becomes useful for those
writing their own indexers. The trait only has one main method that needs to be implemented: `process_transaction`. You
can do anything you want in a `TransactionProcessor` - write data to Postgres tables like the `DefaultProcessor` does,
make restful HTTP calls to some other service, submit its own transactions to the chain: anything at all. There is just
one note: *transaction processing is guaranteed at least once*. It's possible for a given `TransactionProcessor` to
receive the same transaction more than once: and so your implementation must be idempotent.

To implement your own `TransactionProcessor`, check out the documentation and source code
here: [`./src/indexer/transaction_processor.rs`](./src/indexer/transaction_processor.rs).
Original file line number Diff line number Diff line change
@@ -1,4 +1,35 @@
-- Your SQL goes here

/* Genesis Tx (doesn't have an entry in user_transactions or block_metadata_transactions) Ex:
{
"type":"genesis_transaction",
"version":"0",
"hash":"0x12180a4bbccf48de4d1e23b498add134328669ffc7741c8d529c6b2e3629ac99",
"state_root_hash":"0xb50adef3662d77e528be9e1cb5637fe5b7afd13eea317b330799f0c559c918c1",
"event_root_hash":"0xcbdbb1b830d1016d45a828bb3171ea81826e8315f14140acfbd7886f49fbcb40",
"gas_used":"0",
"success":true,
"vm_status":"Executed successfully",
"accumulator_root_hash":"0x188ed588547d551e652f04fccd5434c2977d6cff9e7443eb8e7c3038408caad4",
"payload":{
"type":"write_set_payload",
"write_set":{
"type":"direct_write_set",
"changes":[],
"events":[]
}
},
"events":[
{
"key":"0x0400000000000000000000000000000000000000000000000000000000000000000000000a550c18",
"sequence_number":"0",
"type":"0x1::Reconfiguration::NewEpochEvent",
"data":{
"epoch":"1"
}
}
]
}
*/

CREATE TABLE transactions
(
Expand All @@ -20,6 +51,60 @@ CREATE TABLE transactions
);


/* Ex:
{
"type":"user_transaction",
"version":"691595",
"hash":"0xefd4c865e00c240da0c426a37ceeda10d9b030d0e8a4fb4fb7ff452ad63401fb",
"state_root_hash":"0xebfe1eb7aa5321e7a7d741d927487163c34c821eaab60646ae0efd02b286c97c",
"event_root_hash":"0x414343554d554c41544f525f504c414345484f4c4445525f4841534800000000",
"gas_used":"43",
"success":true,
"vm_status":"Executed successfully",
"accumulator_root_hash":"0x97bfd5949d32f6c9a9efad93411924bfda658a8829de384d531ee73c2f740971",
"sender":"0xdfd557c68c6c12b8c65908b3d3c7b95d34bb12ae6eae5a43ee30aa67a4c12494",
"sequence_number":"21386",
"max_gas_amount":"1000",
"gas_unit_price":"1",
"gas_currency_code":"XUS",
"expiration_timestamp_secs":"1649713172",
"payload":{
"type":"script_function_payload",
"function":"0x1::TestCoin::mint",
"type_arguments":[
],
"arguments":[
"0x45b44793724a5ecc6ad85fa60949d0824cfc7f61d6bd74490b13598379313142",
"20000"
]
},
"signature":{
"type":"ed25519_signature",
"public_key":"0x14ff6646855dad4a2dab30db773cdd4b22d6f9e6813f3e50142adf4f3efcf9f8",
"signature":"0x70781112e78cc8b54b86805c016cef2478bccdef21b721542af0323276ab906c989172adffed5bf2f475f2ec3a5b284a0ac46a6aef0d79f0dbb6b85bfca0080a"
},
"events":[
{
"key":"0x040000000000000000000000000000000000000000000000000000000000000000000000fefefefe",
"sequence_number":"0",
"type":"0x1::Whatever::FakeEvent1",
"data":{
"amazing":"1"
}
},
{
"key":"0x040000000000000000000000000000000000000000000000000000000000000000000000fefefefe",
"sequence_number":"1",
"type":"0x1::Whatever::FakeEvent2",
"data":{
"amazing":"2"
}
}
],
"timestamp":"1649713141723410"
}
*/
CREATE TABLE user_transactions
(
-- join from "transactions"
Expand Down Expand Up @@ -51,6 +136,40 @@ CREATE TABLE user_transactions

CREATE INDEX ut_sender_index ON user_transactions (sender);

/* Ex:
{
"type":"block_metadata_transaction",
"version":"69158",
"hash":"0x2b7c58ed8524d228f9d0543a82e2793d04e8871df322f976b0e7bb8c5ced4ff5",
"state_root_hash":"0x3ead9eb40582fbc7df5e02f72280931dc3e6f1aae45dc832966b4cd972dac4b8",
"event_root_hash":"0x2e481956dea9c59b6fc9f823fe5f4c45efce173e42c551c1fe073b5d76a65504",
"gas_used":"0",
"success":true,
"vm_status":"Executed successfully",
"accumulator_root_hash":"0xb0ad602f805eb20c398f0f29a3504a9ef38bcc52c9c451deb9ec4a2d18807b49",
"id":"0xeef99391a3fc681f16963a6c03415bc0b1b12b56c00429308fa8bf46ac9eddf0",
"round":"57600",
"previous_block_votes":[
"0x992da26d46e6d515a070c7f6e52376a1e674e850cb4d116babc6f870da9c258",
"0xfb4d785594a018bd980b4a20556d120c53a3f50b1cff9d5aa2e26eee582a587",
"0x2b7bce01a6f55e4a863c4822b154021a25588250c762ee01169b6208d6169208",
"0x43a2c4cefc4725e710dadf423dd9142057208e640c623b27c6bba704380825ab",
"0x4c91f3949924e988144550ece1da1bd9335cbecdd1c3ce1893f80e55376d018f",
"0x61616c1208b6b3491496370e7783d48426c674bdd7d04ed1a96afe2e4d8a3930",
"0x66ccccae2058641f136b79792d4d884419437826342ba84dfbbf3e52d8b3fc7d",
"0x68f04222bd9f8846cda028ea5ba3846a806b04a47e1f1a4f0939f350d713b2eb",
"0x6bbf2564ea4a6968df450da786b40b3f56b533a7b700c681c31b3714fc30256b",
"0x735c0a1cb33689ecba65907ba05a485f98831ff610955a44abf0a986f2904612",
"0x784a9514644c8ab6235aaff425381f2ea2719315a51388bc1f1e1c5afa2daaa9",
"0x7a8cee78757dfe0cee3631208cc81f171d27ca6004c63ebae5814e1754a03c79",
"0x803160c3a2f8e025df5a6e1110163493293dc974cc8abd43d4c1896000f4a1ec",
"0xcece26ebddbadfcfbc541baddc989fa73b919b82915164bbf77ebd86c7edbc90",
"0xe7be8996cbdf7db0f64abd17aa0968074b32e4b0df6560328921470e09fd608b"
],
"proposer":"0x68f04222bd9f8846cda028ea5ba3846a806b04a47e1f1a4f0939f350d713b2eb",
"timestamp":"1649395495746947"
}
*/
CREATE TABLE block_metadata_transactions
(
-- join from "transactions"
Expand Down
2 changes: 1 addition & 1 deletion ecosystem/indexer/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub static PROCESSOR_ERRORS: Lazy<IntCounterVec> = Lazy::new(|| {
});

/// Number of times any given processor has completed successfully
pub static PROCESSOR_OKS: Lazy<IntCounterVec> = Lazy::new(|| {
pub static PROCESSOR_SUCCESSES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"indexer_processor_success_count",
"Number of times a given processor has completed successfully",
Expand Down
Loading

0 comments on commit e590538

Please sign in to comment.