forked from aptos-labs/aptos-core
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathruntime.rs
156 lines (137 loc) · 4.54 KB
/
runtime.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Copyright (c) Aptos
// SPDX-License-Identifier: Apache-2.0
use crate::{context::Context, index};
use aptos_config::config::{ApiConfig, NodeConfig};
use aptos_mempool::MempoolClientSender;
use aptos_types::chain_id::ChainId;
use storage_interface::DbReader;
use warp::{Filter, Reply};
use std::{convert::Infallible, net::SocketAddr, sync::Arc};
use tokio::runtime::{Builder, Runtime};
/// Creates HTTP server (warp-based) serves for both REST and JSON-RPC API.
/// When api and json-rpc are configured with same port, both API will be served for the port.
/// When api and json-rpc are configured with different port, both API will be served for
/// both ports.
/// Returns corresponding Tokio runtime
pub fn bootstrap(
config: &NodeConfig,
chain_id: ChainId,
db: Arc<dyn DbReader>,
mp_sender: MempoolClientSender,
) -> anyhow::Result<Runtime> {
let runtime = Builder::new_multi_thread()
.thread_name("api")
.enable_all()
.build()
.expect("[api] failed to create runtime");
let api_config = config.api.clone();
let api = WebServer::from(api_config.clone());
runtime.spawn(async move {
let context = Context::new(chain_id, db, mp_sender, api_config);
let routes = index::routes(context);
api.serve(routes).await;
});
Ok(runtime)
}
#[derive(Clone, Debug, PartialEq)]
struct WebServer {
pub address: SocketAddr,
pub tls_cert_path: Option<String>,
pub tls_key_path: Option<String>,
}
impl From<ApiConfig> for WebServer {
fn from(cfg: ApiConfig) -> Self {
Self::new(cfg.address, cfg.tls_cert_path, cfg.tls_key_path)
}
}
impl WebServer {
pub fn new(
address: SocketAddr,
tls_cert_path: Option<String>,
tls_key_path: Option<String>,
) -> Self {
Self {
address,
tls_cert_path,
tls_key_path,
}
}
pub async fn serve<F>(&self, routes: F)
where
F: Filter<Error = Infallible> + Clone + Sync + Send + 'static,
F::Extract: Reply,
{
match &self.tls_cert_path {
None => warp::serve(routes).bind(self.address).await,
Some(cert_path) => {
warp::serve(routes)
.tls()
.cert_path(cert_path)
.key_path(self.tls_key_path.as_ref().unwrap())
.bind(self.address)
.await
}
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use aptos_config::config::NodeConfig;
use aptos_types::chain_id::ChainId;
use crate::{
runtime::bootstrap,
tests::{new_test_context, TestContext},
};
#[test]
fn test_bootstrap_jsonprc_and_api_configured_at_different_port() {
let mut cfg = NodeConfig::default();
cfg.randomize_ports();
bootstrap_with_config(cfg);
}
pub fn bootstrap_with_config(cfg: NodeConfig) {
let runtime = tokio::runtime::Runtime::new().unwrap();
let context = runtime.block_on(new_test_context_async(
"test_bootstrap_jsonprc_and_api_configured_at_different_port",
));
let ret = bootstrap(
&cfg,
ChainId::test(),
context.db.clone(),
context.mempool.ac_client.clone(),
);
assert!(ret.is_ok());
assert_web_server(cfg.api.address.port());
}
pub fn assert_web_server(port: u16) {
let base_url = format!("http://localhost:{}", port);
let client = reqwest::blocking::Client::new();
// first call have retry to ensure the server is ready to serve
let api_resp = with_retry(|| Ok(client.get(&base_url).send()?)).unwrap();
assert_eq!(api_resp.status(), 200);
let healthy_check_resp = client
.get(format!("{}/-/healthy", base_url))
.send()
.unwrap();
assert_eq!(healthy_check_resp.status(), 200);
}
fn with_retry<F>(f: F) -> anyhow::Result<reqwest::blocking::Response>
where
F: Fn() -> anyhow::Result<reqwest::blocking::Response>,
{
let mut remaining_attempts = 60;
loop {
match f() {
Ok(r) => return Ok(r),
Err(_) if remaining_attempts > 0 => {
remaining_attempts -= 1;
std::thread::sleep(Duration::from_millis(100));
}
Err(error) => return Err(error),
}
}
}
pub async fn new_test_context_async(test_name: &'static str) -> TestContext {
new_test_context(test_name)
}
}