forked from ethui/indexer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.rs
85 lines (68 loc) · 2.02 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
mod api;
mod config;
mod db;
mod rearrange;
mod sync;
use std::sync::Arc;
use color_eyre::eyre::Result;
use tokio::{signal, sync::mpsc};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use tracing::info;
use tracing_subscriber::{fmt::format::FmtSpan, EnvFilter};
use config::Config;
use crate::sync::{RethProvider, StopStrategy};
use self::db::Db;
use self::sync::{BackfillManager, Forward, SyncJob};
#[tokio::main]
async fn main() -> Result<()> {
setup()?;
let config = Config::read()?;
// set up a few random things
let (account_tx, account_rx) = mpsc::unbounded_channel();
let (job_tx, job_rx) = mpsc::unbounded_channel();
let db = Db::connect(&config, account_tx, job_tx).await?;
let chain = db.setup_chain(&config.chain).await?;
let provider_factory = Arc::new(RethProvider::new(&config, &chain)?);
let token = CancellationToken::new();
// setup each task
let sync = Forward::new(
db.clone(),
&config,
chain,
provider_factory.clone(),
account_rx,
token.clone(),
)
.await?;
let backfill = BackfillManager::new(
db.clone(),
&config,
provider_factory.clone(),
job_rx,
StopStrategy::Token(token.clone()),
);
let api = config.http.map(|c| api::start(db.clone(), c));
// spawn and tasks and track them
let tracker = TaskTracker::new();
tracker.spawn(sync.run());
tracker.spawn(backfill.run());
api.map(|t| tracker.spawn(t));
// termination handling
signal::ctrl_c().await?;
token.cancel();
tracker.close();
tracker.wait().await;
info!("graceful shutdown achieved. Closing");
Ok(())
}
fn setup() -> Result<()> {
color_eyre::install()?;
let filter = EnvFilter::from_default_env();
let subscriber = tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(filter)
.with_span_events(FmtSpan::NEW)
.compact()
.finish();
tracing::subscriber::set_global_default(subscriber)?;
Ok(())
}