Skip to content

Commit

Permalink
feat(cache): add metrics for new cache
Browse files Browse the repository at this point in the history
  • Loading branch information
BohuTANG committed Jan 3, 2023
1 parent c79f9e8 commit d57167a
Show file tree
Hide file tree
Showing 9 changed files with 264 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/query/storages/cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ common-cache = { path = "../../../common/cache" }
common-exception = { path = "../../../common/exception" }

async-trait = { version = "0.1.57", package = "async-trait-fn" }
metrics = "0.20.1"
opendal = "0.23"
parking_lot = "0.12.1"
serde = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod settings;
pub use cache::ObjectCacheProvider;
pub use object::CachedObject;
pub use object::CachedObjectAccessor;
pub use providers::metrics_reset;
pub use providers::ByPassCache;
pub use providers::FileCache;
pub use providers::MemoryBytesCache;
Expand Down
4 changes: 2 additions & 2 deletions src/query/storages/cache/src/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ impl CachedObject for Vec<u8> {
/// 2. create the CachedObjectAccessor with the cache provider
/// 3. operation with object
pub struct CachedObjectAccessor<T> {
cache: Arc<dyn ObjectCacheProvider<T>>,
cache: Arc<dyn ObjectCacheProvider<T> + Send + Sync>,
}

impl<T> CachedObjectAccessor<T> {
pub fn create(cache: Arc<dyn ObjectCacheProvider<T>>) -> CachedObjectAccessor<T> {
pub fn create(cache: Arc<dyn ObjectCacheProvider<T> + Send + Sync>) -> CachedObjectAccessor<T> {
Self { cache }
}

Expand Down
34 changes: 34 additions & 0 deletions src/query/storages/cache/src/providers/by_pass_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,17 @@
// limitations under the License.

use std::sync::Arc;
use std::time::Instant;

use common_exception::Result;
use opendal::Object;

use crate::providers::metrics_inc_bypass_read_milliseconds;
use crate::providers::metrics_inc_bypass_reads;
use crate::providers::metrics_inc_bypass_remove_milliseconds;
use crate::providers::metrics_inc_bypass_removes;
use crate::providers::metrics_inc_bypass_write_milliseconds;
use crate::providers::metrics_inc_bypass_writes;
use crate::CacheSettings;
use crate::CachedObject;
use crate::ObjectCacheProvider;
Expand All @@ -35,17 +42,44 @@ impl<T> ObjectCacheProvider<T> for ByPassCache
where T: CachedObject + Send + Sync + 'static
{
async fn read_object(&self, object: &Object, start: u64, end: u64) -> Result<Arc<T>> {
let now = Instant::now();

let data = object.range_read(start..end).await?;

// Perf.
{
metrics_inc_bypass_reads(1);
metrics_inc_bypass_read_milliseconds(now.elapsed().as_millis() as u64);
}

T::from_bytes(data)
}

async fn write_object(&self, object: &Object, v: Arc<T>) -> Result<()> {
let now = Instant::now();

object.write(v.to_bytes()?).await?;

// Perf.
{
metrics_inc_bypass_writes(1);
metrics_inc_bypass_write_milliseconds(now.elapsed().as_millis() as u64);
}

Ok(())
}

async fn remove_object(&self, object: &Object) -> Result<()> {
let now = Instant::now();

object.delete().await?;

// Perf.
{
metrics_inc_bypass_removes(1);
metrics_inc_bypass_remove_milliseconds(now.elapsed().as_millis() as u64);
}

Ok(())
}
}
46 changes: 45 additions & 1 deletion src/query/storages/cache/src/providers/memory_bytes_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::sync::Arc;
use std::time::Instant;

use common_cache::BytesMeter;
use common_cache::Cache;
Expand All @@ -22,6 +23,13 @@ use common_exception::Result;
use opendal::Object;
use parking_lot::RwLock;

use crate::providers::metrics_inc_memory_bytes_hits;
use crate::providers::metrics_inc_memory_bytes_load_milliseconds;
use crate::providers::metrics_inc_memory_bytes_misses;
use crate::providers::metrics_inc_memory_bytes_remove_milliseconds;
use crate::providers::metrics_inc_memory_bytes_removes;
use crate::providers::metrics_inc_memory_bytes_write_milliseconds;
use crate::providers::metrics_inc_memory_bytes_writes;
use crate::CacheSettings;
use crate::ObjectCacheProvider;

Expand Down Expand Up @@ -57,14 +65,29 @@ impl ObjectCacheProvider<Vec<u8>> for MemoryBytesCache {
let try_get_val = self.get_cache(&key);
Ok(match try_get_val {
None => {
let now = Instant::now();

let data = object.range_read(start..end).await?;
let v = Arc::new(data);

self.lru.write().put(key, v.clone());

// Perf.
{
metrics_inc_memory_bytes_misses(1);
metrics_inc_memory_bytes_load_milliseconds(now.elapsed().as_millis() as u64);
}

v
}
Some(v) => {
// Perf.
{
metrics_inc_memory_bytes_hits(1);
}

v
}
Some(v) => v,
})
}

Expand All @@ -74,13 +97,34 @@ impl ObjectCacheProvider<Vec<u8>> for MemoryBytesCache {
self.lru.write().put(key, v.clone());
}

let now = Instant::now();

object.write(v.as_slice()).await?;

// Perf.
{
metrics_inc_memory_bytes_writes(1);
metrics_inc_memory_bytes_write_milliseconds(now.elapsed().as_millis() as u64);
}

Ok(())
}

async fn remove_object(&self, object: &Object) -> Result<()> {
let key = object.path();

let now = Instant::now();

// Try to remove from the cache.
self.lru.write().pop(key);
object.delete().await?;

// Perf.
{
metrics_inc_memory_bytes_removes(1);
metrics_inc_memory_bytes_remove_milliseconds(now.elapsed().as_millis() as u64);
}

Ok(())
}
}
43 changes: 42 additions & 1 deletion src/query/storages/cache/src/providers/memory_items_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::sync::Arc;
use std::time::Instant;

use common_cache::Cache;
use common_cache::Count;
Expand All @@ -22,6 +23,13 @@ use common_exception::Result;
use opendal::Object;
use parking_lot::RwLock;

use crate::providers::metrics_inc_memory_item_hits;
use crate::providers::metrics_inc_memory_item_load_milliseconds;
use crate::providers::metrics_inc_memory_item_misses;
use crate::providers::metrics_inc_memory_item_remove_milliseconds;
use crate::providers::metrics_inc_memory_item_removes;
use crate::providers::metrics_inc_memory_item_write_milliseconds;
use crate::providers::metrics_inc_memory_item_writes;
use crate::CacheSettings;
use crate::CachedObject;
use crate::ObjectCacheProvider;
Expand Down Expand Up @@ -57,14 +65,29 @@ where T: CachedObject + Send + Sync

Ok(match try_get_val {
None => {
let now = Instant::now();

let data = object.range_read(start..end).await?;
let v = T::from_bytes(data)?;
// Write to cache.
self.lru.write().put(key, v.clone());

// Perf.
{
metrics_inc_memory_item_misses(1);
metrics_inc_memory_item_load_milliseconds(now.elapsed().as_millis() as u64);
}

v
}
Some(v) => {
// Perf.
{
metrics_inc_memory_item_hits(1);
}

v
}
Some(v) => v,
})
}

Expand All @@ -74,19 +97,37 @@ where T: CachedObject + Send + Sync
self.lru.write().put(key, v.clone());
}

let now = Instant::now();

let data = v.to_bytes()?;

object.write(data).await?;

// Perf.
{
metrics_inc_memory_item_writes(1);
metrics_inc_memory_item_write_milliseconds(now.elapsed().as_millis() as u64);
}

Ok(())
}

async fn remove_object(&self, object: &Object) -> Result<()> {
let key = object.path();

let now = Instant::now();

// Try to remove from the cache.
self.lru.write().pop(key);

object.delete().await?;

// Perf.
{
metrics_inc_memory_item_removes(1);
metrics_inc_memory_item_remove_milliseconds(now.elapsed().as_millis() as u64);
}

Ok(())
}
}
Loading

0 comments on commit d57167a

Please sign in to comment.