Skip to content

Commit

Permalink
[json-rpc] endpoint and method processing log
Browse files Browse the repository at this point in the history
  • Loading branch information
Xiao Li authored and bors-libra committed Sep 29, 2020
1 parent 08a0fe3 commit 2811242
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 5 deletions.
1 change: 1 addition & 0 deletions json-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ futures = "0.3.5"
hex = "0.4.2"
hyper = "0.13.7"
once_cell = "1.4.1"
rand = "0.7.3"
serde_json = "1.0.57"
serde = { version = "1.0.116", default-features = false }
tokio = { version = "0.2.22", features = ["full"] }
Expand Down
1 change: 1 addition & 0 deletions json-rpc/src/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type RpcHandler =
pub(crate) type RpcRegistry = HashMap<String, RpcHandler>;

pub(crate) struct JsonRpcRequest {
pub trace_id: u64,
pub params: Vec<Value>,
pub ledger_info: LedgerInfoWithSignatures,
}
Expand Down
90 changes: 85 additions & 5 deletions json-rpc/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ use crate::{
};
use futures::future::join_all;
use libra_config::config::{NodeConfig, RoleType};
use libra_logger::{debug, error, info, Schema};
use libra_mempool::MempoolClientSender;
use libra_types::{chain_id::ChainId, ledger_info::LedgerInfoWithSignatures};
use rand::{rngs::OsRng, RngCore};
use serde_json::{map::Map, Value};
use std::{net::SocketAddr, sync::Arc};
use storage_interface::DbReader;
Expand All @@ -26,6 +28,33 @@ const LABEL_SUCCESS: &str = "success";
const LABEL_BATCH: &str = "batch";
const LABEL_SINGLE: &str = "single";

#[derive(Schema)]
struct HttpRequestLog<'a> {
#[schema(display)]
remote_addr: Option<std::net::SocketAddr>,
method: String,
path: String,
status: u16,
referer: Option<&'a str>,
user_agent: Option<&'a str>,
#[schema(debug)]
elapsed: std::time::Duration,
}

#[derive(Schema)]
struct RpcRequestLog {
trace_id: u64,
request: Value,
}

#[derive(Schema)]
struct RpcResponseLog<'a> {
trace_id: u64,
is_batch: bool,
response_error: bool,
response: &'a JsonRpcResponse,
}

/// Creates HTTP server (warp-based) that serves JSON RPC requests
/// Returns handle to corresponding Tokio runtime
pub fn bootstrap(
Expand All @@ -39,11 +68,11 @@ pub fn bootstrap(
chain_id: ChainId,
) -> Runtime {
let runtime = Builder::new()
.thread_name("rpc-")
.thread_name("json-rpc")
.threaded_scheduler()
.enable_all()
.build()
.expect("[rpc] failed to create runtime");
.expect("[json-rpc] failed to create runtime");

let registry = Arc::new(build_registry());
let service = JsonRpcService::new(
Expand All @@ -62,7 +91,18 @@ pub fn bootstrap(
.and(warp::body::json())
.and(warp::any().map(move || service.clone()))
.and(warp::any().map(move || Arc::clone(&registry)))
.and_then(rpc_endpoint);
.and_then(rpc_endpoint)
.with(warp::log::custom(|info| {
info!(HttpRequestLog {
remote_addr: info.remote_addr(),
method: info.method().to_string(),
path: info.path().to_string(),
status: info.status().as_u16(),
referer: info.referer(),
user_agent: info.user_agent(),
elapsed: info.elapsed(),
})
}));

// For now we still allow user to use "/", but user should start to move to "/v1" soon
let route_root = warp::path::end().and(base_route.clone());
Expand Down Expand Up @@ -138,6 +178,14 @@ async fn rpc_endpoint_without_metrics(
.get_latest_ledger_info()
.map_err(|_| reject::custom(DatabaseError))?;

let mut rng = OsRng;
let trace_id = rng.next_u64();
let request_log = RpcRequestLog {
trace_id,
request: data.clone(),
};
info!(request_log);

let resp = Ok(if let Value::Array(requests) = data {
match service.validate_batch_size_limit(requests.len()) {
Ok(_) => {
Expand All @@ -149,9 +197,13 @@ async fn rpc_endpoint_without_metrics(
Arc::clone(&registry),
ledger_info.clone(),
LABEL_BATCH,
trace_id,
)
});
let responses = join_all(futures).await;
for resp in &responses {
log_response(trace_id, &resp, true);
}
warp::reply::json(&responses)
}
Err(err) => {
Expand All @@ -161,19 +213,37 @@ async fn rpc_endpoint_without_metrics(
ledger_info.ledger_info().timestamp_usecs(),
);
set_response_error(&mut resp, err);
log_response(trace_id, &resp, true);

warp::reply::json(&resp)
}
}
} else {
// single API call
let resp = rpc_request_handler(data, service, registry, ledger_info, LABEL_SINGLE).await;
let resp =
rpc_request_handler(data, service, registry, ledger_info, LABEL_SINGLE, trace_id).await;
log_response(trace_id, &resp, false);

warp::reply::json(&resp)
});

Ok(Box::new(resp) as Box<dyn warp::Reply>)
}

fn log_response(trace_id: u64, resp: &JsonRpcResponse, is_batch: bool) {
let log = RpcResponseLog {
trace_id,
is_batch,
response_error: resp.error.is_some(),
response: resp,
};
if is_internal_error(resp.error.clone()) {
error!(log);
} else {
debug!(log);
}
}

/// Handler of single RPC request
/// Performs validation and executes corresponding rpc handler
async fn rpc_request_handler(
Expand All @@ -182,6 +252,7 @@ async fn rpc_request_handler(
registry: Arc<RpcRegistry>,
ledger_info: LedgerInfoWithSignatures,
request_type_label: &str,
trace_id: u64,
) -> JsonRpcResponse {
let request: Map<String, Value>;
let mut response = JsonRpcResponse::new(
Expand Down Expand Up @@ -230,6 +301,7 @@ async fn rpc_request_handler(
}

let request_params = JsonRpcRequest {
trace_id,
ledger_info,
params,
};
Expand Down Expand Up @@ -274,7 +346,7 @@ async fn rpc_request_handler(
// If a counter label is supplied, also increments the invalid request counter using the label,
fn set_response_error(response: &mut JsonRpcResponse, error: JsonRpcError) {
let err_code = error.code;
if err_code <= -32000 && err_code >= -32099 {
if is_internal_error(Some(error.clone())) {
counters::INTERNAL_ERRORS.inc();
} else {
let label = match err_code {
Expand All @@ -291,6 +363,14 @@ fn set_response_error(response: &mut JsonRpcResponse, error: JsonRpcError) {
response.error = Some(error);
}

fn is_internal_error(error: Option<JsonRpcError>) -> bool {
if let Some(e) = error {
e.code <= -32000 && e.code >= -32099
} else {
false
}
}

fn parse_request_id(request: &Map<String, Value>) -> Result<Value, JsonRpcError> {
match request.get("id") {
Some(req_id) => {
Expand Down

0 comments on commit 2811242

Please sign in to comment.