Skip to content

Commit

Permalink
add metrics about the s3 getobject request & headobject request
Browse files Browse the repository at this point in the history
  • Loading branch information
flaneur2020 committed Oct 18, 2021
1 parent d09d51f commit a7bd646
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions common/dal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ rusoto_core = "0.47.0"
rusoto_s3 = "0.47.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
metrics = "0.17.0"

[dev-dependencies]
pretty_assertions = "1.0"
Expand Down
22 changes: 22 additions & 0 deletions common/dal/src/impls/aws_s3/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

pub static METRIC_S3_GETOBJECT_NUMBERS: &str = "s3.getobject_numbers";
pub static METRIC_S3_GETOBJECT_USEDTIME: &str = "s3.getobject_usedtime";
pub static METRIC_S3_GETOBJECT_ERRORS: &str = "s3.getobject_errors";

pub static METRIC_S3_HEADOBJECT_NUMBERS: &str = "s3.headobject_numbers";
pub static METRIC_S3_HEADOBJECT_USEDTIME: &str = "s3.headobject_usedtime";
pub static METRIC_S3_HEADOBJECT_ERRORS: &str = "s3.headobject_errors";
1 change: 1 addition & 0 deletions common/dal/src/impls/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#[cfg(test)]
mod s3_input_stream_test;

mod metrics;
mod s3;
mod s3_input_stream;

Expand Down
31 changes: 23 additions & 8 deletions common/dal/src/impls/aws_s3/s3_input_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::io::Write;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Instant;

use bytes::BufMut;
use common_base::tokio::io::ErrorKind;
Expand All @@ -28,6 +29,8 @@ use futures::Future;
use futures::FutureExt;
use futures::Stream;
use futures::StreamExt;
use metrics::counter;
use metrics::histogram;
use rusoto_s3::GetObjectRequest;
use rusoto_s3::HeadObjectRequest;
use rusoto_s3::S3Client;
Expand Down Expand Up @@ -81,6 +84,7 @@ impl futures::AsyncRead for S3InputStream {
let empty = { self.buffer.is_empty() };
match &mut self.state {
State::Bare => {
counter!(super::metrics::METRIC_S3_GETOBJECT_NUMBERS, 1);
let req = GetObjectRequest {
range: Some(format!("bytes={}-", self.cursor_pos)),
key: self.key.clone(),
Expand All @@ -89,10 +93,15 @@ impl futures::AsyncRead for S3InputStream {
};
let client = self.client.clone();
let resp = async move {
let reply = client
.get_object(req)
.await
.map_err(|e| Error::new(ErrorKind::Other, e))?;
let start = Instant::now();
let reply = client.get_object(req).await.map_err(|e| {
counter!(super::metrics::METRIC_S3_GETOBJECT_ERRORS, 1);
Error::new(ErrorKind::Other, e)
})?;
histogram!(
super::metrics::METRIC_S3_GETOBJECT_USEDTIME,
start.elapsed()
);
reply
.body
.map(|s| s.fuse())
Expand Down Expand Up @@ -179,11 +188,17 @@ impl S3InputStream {
//jhead_req.key = self.key.clone();
//head_req.bucket = self.bucket.clone();
let cli = self.client.clone();
counter!(super::metrics::METRIC_S3_HEADOBJECT_NUMBERS, 1);
let res = async move {
let result = cli
.head_object(head_req)
.await
.map_err(|e| Error::new(ErrorKind::Other, e))?;
let start = Instant::now();
let result = cli.head_object(head_req).await.map_err(|e| {
counter!(super::metrics::METRIC_S3_HEADOBJECT_ERRORS, 1);
Error::new(ErrorKind::Other, e)
})?;
histogram!(
super::metrics::METRIC_S3_HEADOBJECT_USEDTIME,
start.elapsed()
);
result.content_length.ok_or_else(|| {
Error::new(ErrorKind::Other, "expects content-length")
})
Expand Down

0 comments on commit a7bd646

Please sign in to comment.