Skip to content

Commit

Permalink
feat: read data from write cache (GreptimeTeam#3128)
Browse files Browse the repository at this point in the history
* feat: read from write cache

* chore: add read ranges test

* fix: use get instead of contains_key

* chore: clippy

* chore: cr comment

Co-authored-by: Yingwen <[email protected]>

* fix: with_label_values

---------

Co-authored-by: Yingwen <[email protected]>
  • Loading branch information
QuenKar and evenyag authored Jan 11, 2024
1 parent b00b492 commit 8ec1e42
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 100 deletions.
66 changes: 66 additions & 0 deletions src/mito2/src/cache/file_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

//! A cache for files.
use std::ops::{Range, RangeBounds};
use std::sync::Arc;
use std::time::Instant;

use bytes::Bytes;
use common_base::readable_size::ReadableSize;
use common_telemetry::{info, warn};
use futures::{FutureExt, TryStreamExt};
Expand All @@ -31,6 +33,7 @@ use crate::cache::FILE_TYPE;
use crate::error::{OpenDalSnafu, Result};
use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
use crate::sst::file::FileId;
use crate::sst::parquet::helper::fetch_byte_ranges;

/// Subdirectory of cached files.
const FILE_DIR: &str = "files/";
Expand Down Expand Up @@ -129,6 +132,39 @@ impl FileCache {
None
}

/// Reads ranges from the cache.
pub(crate) async fn read_ranges(
&self,
key: IndexKey,
ranges: &[Range<u64>],
) -> Option<Vec<Bytes>> {
if self.memory_index.get(&key).await.is_none() {
CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
return None;
}

let file_path = self.cache_file_path(key);
// In most cases, it will use blocking read,
// because FileCache is normally based on local file system, which supports blocking read.
let bytes_result = fetch_byte_ranges(&file_path, self.local_store.clone(), ranges).await;
match bytes_result {
Ok(bytes) => {
CACHE_HIT.with_label_values(&[FILE_TYPE]).inc();
Some(bytes)
}
Err(e) => {
if e.kind() != ErrorKind::NotFound {
warn!("Failed to get file for key {:?}, err: {}", key, e);
}

// We removes the file from the index.
self.memory_index.remove(&key).await;
CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
None
}
}
}

/// Removes a file from the cache explicitly.
pub(crate) async fn remove(&self, key: IndexKey) {
let file_path = self.cache_file_path(key);
Expand Down Expand Up @@ -372,6 +408,36 @@ mod tests {
}
}

#[tokio::test]
async fn test_file_cache_read_ranges() {
let dir = create_temp_dir("");
let local_store = new_fs_store(dir.path().to_str().unwrap());
let file_cache = FileCache::new(local_store.clone(), ReadableSize::mb(10));
let region_id = RegionId::new(2000, 0);
let file_id = FileId::random();
let key = (region_id, file_id);
let file_path = file_cache.cache_file_path(key);
// Write a file.
let data = b"hello greptime database";
local_store
.write(&file_path, data.as_slice())
.await
.unwrap();
// Add to the cache.
file_cache
.put((region_id, file_id), IndexValue { file_size: 5 })
.await;
// Ranges
let ranges = vec![0..5, 6..10, 15..19, 0..data.len() as u64];
let bytes = file_cache.read_ranges(key, &ranges).await.unwrap();

assert_eq!(4, bytes.len());
assert_eq!(b"hello", bytes[0].as_ref());
assert_eq!(b"grep", bytes[1].as_ref());
assert_eq!(b"data", bytes[2].as_ref());
assert_eq!(data, bytes[3].as_ref());
}

#[test]
fn test_cache_file_path() {
let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
Expand Down
8 changes: 8 additions & 0 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@

//! A write-through cache for remote object stores.
use std::ops::Range;
use std::sync::Arc;

use api::v1::region;
use bytes::Bytes;
use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, info};
use object_store::manager::ObjectStoreManagerRef;
use object_store::ObjectStore;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;

use super::file_cache::IndexKey;
use crate::access_layer::new_fs_object_store;
use crate::cache::file_cache::{FileCache, FileCacheRef, IndexValue};
use crate::error::{self, Result};
Expand Down Expand Up @@ -150,6 +153,11 @@ impl WriteCache {

Ok(Some(sst_info))
}

/// Returns the file cache of the write cache.
pub(crate) fn file_cache(&self) -> FileCacheRef {
self.file_cache.clone()
}
}

/// Request to write and upload a SST.
Expand Down
2 changes: 1 addition & 1 deletion src/mito2/src/sst/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//! SST in parquet format.
mod format;
mod helper;
pub(crate) mod helper;
mod page_reader;
pub mod reader;
pub mod row_group;
Expand Down
85 changes: 85 additions & 0 deletions src/mito2/src/sst/parquet/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Range;
use std::sync::Arc;

use bytes::Bytes;
use object_store::{ErrorKind, ObjectStore};
use parquet::basic::ColumnOrder;
use parquet::file::metadata::{FileMetaData, ParquetMetaData, RowGroupMetaData};
use parquet::format;
Expand Down Expand Up @@ -84,3 +87,85 @@ fn parse_column_orders(
None => None,
}
}

/// Fetches data from object store.
/// If the object store supports blocking, use sequence blocking read.
/// Otherwise, use concurrent read.
pub async fn fetch_byte_ranges(
file_path: &str,
object_store: ObjectStore,
ranges: &[Range<u64>],
) -> object_store::Result<Vec<Bytes>> {
if object_store.info().full_capability().blocking {
fetch_ranges_seq(file_path, object_store, ranges).await
} else {
fetch_ranges_concurrent(file_path, object_store, ranges).await
}
}

/// Fetches data from object store sequentially
async fn fetch_ranges_seq(
file_path: &str,
object_store: ObjectStore,
ranges: &[Range<u64>],
) -> object_store::Result<Vec<Bytes>> {
let block_object_store = object_store.blocking();
let file_path = file_path.to_string();
let ranges = ranges.to_vec();

let f = move || -> object_store::Result<Vec<Bytes>> {
ranges
.into_iter()
.map(|range| {
let data = block_object_store
.read_with(&file_path)
.range(range.start..range.end)
.call()?;
Ok::<_, object_store::Error>(Bytes::from(data))
})
.collect::<object_store::Result<Vec<_>>>()
};

maybe_spawn_blocking(f).await
}

/// Fetches data from object store concurrently.
async fn fetch_ranges_concurrent(
file_path: &str,
object_store: ObjectStore,
ranges: &[Range<u64>],
) -> object_store::Result<Vec<Bytes>> {
// TODO(QuenKar): may merge small ranges to a bigger range to optimize.
let mut handles = Vec::with_capacity(ranges.len());
for range in ranges {
let future_read = object_store.read_with(file_path);
handles.push(async move {
let data = future_read.range(range.start..range.end).await?;
Ok::<_, object_store::Error>(Bytes::from(data))
});
}
let results = futures::future::try_join_all(handles).await?;
Ok(results)
}

// Port from https://github.com/apache/arrow-rs/blob/802ed428f87051fdca31180430ddb0ecb2f60e8b/object_store/src/util.rs#L74-L83
/// Takes a function and spawns it to a tokio blocking pool if available
async fn maybe_spawn_blocking<F, T>(f: F) -> object_store::Result<T>
where
F: FnOnce() -> object_store::Result<T> + Send + 'static,
T: Send + 'static,
{
match tokio::runtime::Handle::try_current() {
Ok(runtime) => runtime
.spawn_blocking(f)
.await
.map_err(new_task_join_error)?,
Err(_) => f(),
}
}

// https://github.com/apache/incubator-opendal/blob/7144ab1ca2409dff0c324bfed062ce985997f8ce/core/src/raw/tokio_util.rs#L21-L23
/// Parse tokio error into opendal::Error.
fn new_task_join_error(e: tokio::task::JoinError) -> object_store::Error {
object_store::Error::new(ErrorKind::Unexpected, "tokio task join failed").set_source(e)
}
Loading

0 comments on commit 8ec1e42

Please sign in to comment.