Skip to content
This repository has been archived by the owner on Sep 25, 2019. It is now read-only.

Commit

Permalink
Log at the start and end of requests (#60)
Browse files Browse the repository at this point in the history
* Log at the start and end of requests

* common log format messages + beginning/ending debug messages
  • Loading branch information
michaeldwan authored and jeromegn committed Feb 6, 2019
1 parent bbc5cbc commit f5c9989
Showing 1 changed file with 104 additions and 59 deletions.
163 changes: 104 additions & 59 deletions src/http_server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use futures::{future, Future, Stream};
use std::net::SocketAddr;
use std::net::{IpAddr, SocketAddr};

use crate::js::*;
use crate::metrics::*;
Expand All @@ -13,7 +13,7 @@ use floating_duration::TimeAsFloat;
use std::io;
use std::time;

use slog::{slog_info, slog_o};
use slog::{o, slog_debug, slog_error, slog_info};

type BoxedResponseFuture = Box<Future<Item = Response<Body>, Error = futures::Canceled> + Send>;

Expand All @@ -25,23 +25,46 @@ lazy_static! {
};
}

struct RequestInfo {
timer: time::Instant,
request_id: String,
remote_addr: IpAddr,
url: String,
method: String,
}

pub fn serve_http(
tls: bool,
req: Request<Body>,
selector: &RuntimeSelector,
remote_addr: SocketAddr,
) -> BoxedResponseFuture {
let timer = time::Instant::now();
debug!("serving http: {}", req.uri());
let req_id = ksuid::Ksuid::generate().to_base62();
let mut request_info = RequestInfo {
timer: time::Instant::now(),
request_id: ksuid::Ksuid::generate().to_base62(),
remote_addr: remote_addr.ip(),
url: req.uri().to_string(),
method: req.method().to_string(),
};

let logger = slog_scope::logger().new(o!(
"request_id" => request_info.request_id.to_owned(),
"client_ip" => remote_addr,
"uri" => request_info.url.to_string(),
"method" => request_info.method.to_owned()
));

slog_debug!(logger, "begin request");

let (parts, body) = req.into_parts();
let host = if parts.version == hyper::Version::HTTP_2 {
match parts.uri.host() {
Some(h) => h,
None => {
return future_response(
simple_response(req_id, StatusCode::NOT_FOUND, None),
timer,
simple_response(StatusCode::NOT_FOUND, None),
request_info,
logger,
None,
);
}
Expand All @@ -51,67 +74,68 @@ pub fn serve_http(
Some(v) => match v.to_str() {
Ok(s) => s,
Err(e) => {
error!("error stringifying host: {}", e);
slog_error!(logger, "error stringifying host: {}", e);
return future_response(
simple_response(
req_id,
StatusCode::BAD_REQUEST,
Some(Body::from("Bad host header")),
),
timer,
simple_response(StatusCode::BAD_REQUEST, Some("Bad host header")),
request_info,
logger,
None,
);
}
},
None => {
return future_response(
simple_response(req_id, StatusCode::NOT_FOUND, None),
timer,
simple_response(StatusCode::NOT_FOUND, None),
request_info,
logger,
None,
);
}
}
};

let url: String = if parts.version == hyper::Version::HTTP_2 {
format!("{}", parts.uri)
} else {
format!(
"{}://{}{}",
if tls { "https" } else { "http" },
host,
parts.uri.path_and_query().unwrap()
)
};

request_info.url = url.to_owned();

let rt = match selector.get_by_hostname(host) {
Ok(maybe_rt) => match maybe_rt {
Some(rt) => rt,
None => {
return future_response(
simple_response(
req_id,
StatusCode::NOT_FOUND,
Some(Body::from("app not found")),
),
timer,
simple_response(StatusCode::NOT_FOUND, Some("app not found")),
request_info,
logger,
None,
);
}
},
Err(e) => {
error!("error getting runtime: {:?}", e);
slog_error!(logger, "error getting runtime: {:?}", e);
return future_response(
simple_response(req_id, StatusCode::SERVICE_UNAVAILABLE, None),
timer,
simple_response(StatusCode::SERVICE_UNAVAILABLE, None),
request_info,
logger,
None,
);
}
};

let url: String = if parts.version == hyper::Version::HTTP_2 {
format!("{}", parts.uri)
} else {
format!(
"{}://{}{}",
if tls { "https" } else { "http" },
host,
parts.uri.path_and_query().unwrap()
)
};
let stream_id = get_next_stream_id();

let rt_name = rt.name.clone();
let rt_version = rt.version.clone();
let logger =
logger.new(o!("app_name" => rt_name.to_owned(), "app_version" => rt.version.to_owned()));

match rt.dispatch_event(
stream_id,
Expand Down Expand Up @@ -141,15 +165,17 @@ pub fn serve_http(
}),
) {
None => future_response(
simple_response(req_id, StatusCode::SERVICE_UNAVAILABLE, None),
timer,
simple_response(StatusCode::SERVICE_UNAVAILABLE, None),
request_info,
logger,
Some((rt.name.clone(), rt.version.clone())),
),
Some(Err(e)) => {
error!("error sending js http request: {:?}", e);
slog_error!(logger, "error sending js http request: {:?}", e);
future_response(
simple_response(req_id, StatusCode::INTERNAL_SERVER_ERROR, None),
timer,
simple_response(StatusCode::INTERNAL_SERVER_ERROR, None),
request_info,
logger,
Some((rt.name.clone(), rt.version.clone())),
)
}
Expand Down Expand Up @@ -207,12 +233,10 @@ pub fn serve_http(
};
}

Ok(set_request_id(
set_server_header(Response::from_parts(parts, body)),
req_id,
))
Ok(Response::from_parts(parts, body))
}),
timer,
request_info,
logger,
Some((rt.name.clone(), rt.version.clone())),
)
}
Expand All @@ -222,22 +246,18 @@ pub fn serve_http(

fn future_response(
res: Response<Body>,
timer: time::Instant,
req: RequestInfo,
logger: slog::Logger,
namever: Option<(String, String)>,
) -> BoxedResponseFuture {
wrap_future(future::ok(res), timer, namever)
wrap_future(future::ok(res), req, logger, namever)
}

fn simple_response(req_id: String, status: StatusCode, body: Option<Body>) -> Response<Body> {
set_request_id(
set_server_header(
Response::builder()
.status(status)
.body(body.unwrap_or(Body::empty()))
.unwrap(),
),
req_id,
)
fn simple_response(status: StatusCode, body: Option<&str>) -> Response<Body> {
Response::builder()
.status(status)
.body(body.map_or_else(|| Body::empty(), |b| Body::from(b.to_owned())))
.unwrap()
}

fn set_server_header(mut res: Response<Body>) -> Response<Body> {
Expand All @@ -256,7 +276,8 @@ fn set_request_id(mut res: Response<Body>, req_id: String) -> Response<Body> {

fn wrap_future<F>(
fut: F,
timer: time::Instant,
req: RequestInfo,
logger: slog::Logger,
namever: Option<(String, String)>,
) -> BoxedResponseFuture
where
Expand All @@ -266,12 +287,36 @@ where
let (name, ver) = namever.unwrap_or((String::new(), String::new()));
let status = res.status();
let status_str = status.as_str();
let elapsed = req.timer.elapsed();

let res = set_server_header(set_request_id(res, req.request_id));

HTTP_RESPONSE_TIME_HISTOGRAM
.with_label_values(&[name.as_str(), ver.as_str(), status_str])
.observe(timer.elapsed().as_fractional_secs());
.observe(elapsed.as_fractional_secs());
HTTP_RESPONSE_COUNTER
.with_label_values(&[name.as_str(), ver.as_str(), status_str])
.inc();

slog_debug!(
logger,
"end request {http_response} {request_time_ms}",
http_response = res.status().as_u16(),
request_time_ms = elapsed.as_fractional_secs() * 1000.0
);

// TODO(md): send common log format message to the app logger once debugging is done
// commong log format
slog_info!(
logger,
"{client_ip} {http_method} {request_uri} {http_response} {request_time_ms}ms",
client_ip = req.remote_addr.to_string(),
http_method = req.method.to_owned(),
request_uri = req.url.to_owned(),
http_response = res.status().as_u16(),
request_time_ms = elapsed.as_fractional_secs() * 1000.0
);

Ok(res)
}))
}
Expand Down

0 comments on commit f5c9989

Please sign in to comment.