Skip to content

Commit

Permalink
feature: async file system api (cnosdb#781)
Browse files Browse the repository at this point in the history
* use async file interface

* improve : fix wal test

* io_uring done

* feat(io): fix bugs for asnyc io, modify record_file API and some other changes:
1. wal, tombstone now use record_file API.
2. some changes for wal, tombstone.

* fix: doc and record reader buffer size wrong

* fix clippy errors

* fix(test): too many error sending to channel;

* fix: tskv drop_table removes block_on()

Co-authored-by: roseboy <[email protected]>
Co-authored-by: Subsegment <[email protected]>
Co-authored-by: yongtaoliu <[email protected]>
  • Loading branch information
4 people authored Nov 24, 2022
1 parent eaa5617 commit eee230a
Show file tree
Hide file tree
Showing 53 changed files with 2,291 additions and 4,298 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ fmt:
cargo +nightly fmt --all

clippy_check:
cargo clippy --workspace --all-features --all-targets -- -D warnings
cargo clippy --workspace --all-targets -- -D warnings

clippy:
cargo clippy --workspace --all-features --all-targets --fix
cargo clippy --workspace --all-targets --fix

build:
cargo build --all-features --all-targets

test:
cargo test --workspace --all-features --exclude e2e_test
cargo test --workspace --exclude e2e_test

check: fmt_check clippy_check build test docs_check

Expand Down
5 changes: 1 addition & 4 deletions config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#reporting_disabled = false

[query]
max_server_connections = 10240
max_server_connections = 10240
query_sql_limit = 16777216 # 16 * 1024 * 1024
write_sql_limit = 167772160 # 160 * 1024 * 1024

Expand All @@ -17,9 +17,6 @@ max_level = 4
base_file_size = 16777216 # 16 * 1024 * 1024
compact_trigger = 4
max_compact_size = 2147483648 # 2 * 1024 * 1024 * 1024
dio_max_resident = 1024
dio_max_non_resident = 1024
dio_page_len_scale = 10
strict_write = false

[wal]
Expand Down
17 changes: 1 addition & 16 deletions config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ pub struct StorageConfig {
pub base_file_size: u64,
pub compact_trigger: u32,
pub max_compact_size: u64,
pub dio_max_resident: usize,
pub dio_max_non_resident: usize,
pub dio_page_len_scale: usize,
pub strict_write: bool,
}

Expand All @@ -64,15 +61,6 @@ impl StorageConfig {
if let Ok(size) = std::env::var("CNOSDB_STORAGE_MAX_COMPACT_SIZE") {
self.max_compact_size = size.parse::<u64>().unwrap();
}
if let Ok(size) = std::env::var("CNOSDB_STORAGE_DIO_MAX_RESIDENT") {
self.dio_max_resident = size.parse::<usize>().unwrap();
}
if let Ok(size) = std::env::var("CNOSDB_STORAGE_DIO_MAX_NON_RESIDENT") {
self.dio_max_non_resident = size.parse::<usize>().unwrap();
}
if let Ok(size) = std::env::var("CNOSDB_STORAGE_DIO_PAGE_LEN_SCALE") {
self.dio_page_len_scale = size.parse::<usize>().unwrap();
}
if let Ok(size) = std::env::var("CNOSDB_STORAGE_STRICT_WRITE") {
self.strict_write = size.parse::<bool>().unwrap();
}
Expand Down Expand Up @@ -180,7 +168,7 @@ pub fn get_config(path: &str) -> Config {
fn test() {
let config_str = r#"
[query]
max_server_connections = 10240
max_server_connections = 10240
query_sql_limit = 16777216 # 16 * 1024 * 1024
write_sql_limit = 167772160 # 160 * 1024 * 1024
[storage]
Expand All @@ -190,9 +178,6 @@ max_level = 4
base_file_size = 16777216 # 16 * 1024 * 1024
compact_trigger = 4
max_compact_size = 2147483648 # 2 * 1024 * 1024 * 1024
dio_max_resident = 1024
dio_max_non_resident = 1024
dio_page_len_scale = 1
strict_write = true
[wal]
Expand Down
84 changes: 46 additions & 38 deletions query_server/query/src/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::stream::TskvSourceMetrics;

use tskv::{
engine::EngineRef,
error,
error::IndexErrSnafu,
memcache::DataType,
tseries_family::{ColumnFile, SuperVersion, TimeRange},
Expand Down Expand Up @@ -75,11 +76,11 @@ impl FieldFileLocation {
}
}

pub fn peek(&mut self) -> Result<Option<DataType>, Error> {
pub async fn peek(&mut self) -> Result<Option<DataType>, Error> {
if self.read_index >= self.data_block.len() {
if let Some(meta) = self.block_it.next() {
self.read_index = 0;
self.data_block = self.reader.get_data_block(&meta)?;
self.data_block = self.reader.get_data_block(&meta).await?;
} else {
return Ok(None);
}
Expand All @@ -94,13 +95,14 @@ impl FieldFileLocation {
}

//-----------trait Cursor----------------
#[async_trait::async_trait]
pub trait Cursor: Send + Sync {
fn name(&self) -> &String;
fn is_field(&self) -> bool;
fn val_type(&self) -> ValueType;

fn next(&mut self, ts: i64);
fn peek(&mut self) -> Result<Option<DataType>, Error>;
async fn next(&mut self, ts: i64);
async fn peek(&mut self) -> Result<Option<DataType>, Error>;
}

//-----------Time Cursor----------------
Expand All @@ -115,18 +117,19 @@ impl TimeCursor {
}
}

#[async_trait::async_trait]
impl Cursor for TimeCursor {
fn name(&self) -> &String {
&self.name
}

fn peek(&mut self) -> Result<Option<DataType>, Error> {
async fn peek(&mut self) -> Result<Option<DataType>, Error> {
let data = DataType::I64(self.ts, self.ts);

Ok(Some(data))
}

fn next(&mut self, _ts: i64) {}
async fn next(&mut self, _ts: i64) {}

fn val_type(&self) -> ValueType {
ValueType::Integer
Expand All @@ -148,25 +151,26 @@ impl TagCursor {
}
}

#[async_trait::async_trait]
impl Cursor for TagCursor {
fn name(&self) -> &String {
&self.name
}

fn peek(&mut self) -> Result<Option<DataType>, Error> {
let data = DataType::Str(0, MiniVec::from(self.value.as_bytes()));

Ok(Some(data))
fn is_field(&self) -> bool {
false
}

fn next(&mut self, _ts: i64) {}

fn val_type(&self) -> ValueType {
ValueType::String
}

fn is_field(&self) -> bool {
false
async fn next(&mut self, _ts: i64) {}

async fn peek(&mut self) -> Result<Option<DataType>, Error> {
let data = DataType::Str(0, MiniVec::from(self.value.as_bytes()));

Ok(Some(data))
}
}

Expand All @@ -191,7 +195,7 @@ impl FieldCursor {
}
}

pub fn new(
pub async fn new(
field_id: FieldId,
name: String,
vtype: ValueType,
Expand Down Expand Up @@ -255,7 +259,7 @@ impl FieldCursor {
file.file_path().display()
);

let tsm_reader = iterator.get_tsm_reader(file.clone())?;
let tsm_reader = iterator.get_tsm_reader(file.clone()).await?;
for idx in tsm_reader.index_iterator_opt(field_id) {
let block_it = idx.block_iterator_opt(time_range);
let location = FieldFileLocation::new(tsm_reader.clone(), block_it, vtype);
Expand Down Expand Up @@ -294,15 +298,16 @@ impl FieldCursor {
}
}

#[async_trait::async_trait]
impl Cursor for FieldCursor {
fn name(&self) -> &String {
&self.name
}

fn peek(&mut self) -> Result<Option<DataType>, Error> {
async fn peek(&mut self) -> Result<Option<DataType>, Error> {
let mut data = DataType::new(self.value_type, i64::MAX);
for loc in self.locations.iter_mut() {
if let Some(val) = loc.peek()? {
if let Some(val) = loc.peek().await? {
if data.timestamp() >= val.timestamp() {
data = val;
}
Expand All @@ -321,15 +326,15 @@ impl Cursor for FieldCursor {
Ok(Some(data))
}

fn next(&mut self, ts: i64) {
async fn next(&mut self, ts: i64) {
if let Some(val) = self.peek_cache() {
if val.timestamp() == ts {
self.cache_index += 1;
}
}

for loc in self.locations.iter_mut() {
if let Some(val) = loc.peek().unwrap() {
if let Some(val) = loc.peek().await.unwrap() {
if val.timestamp() == ts {
loc.next();
}
Expand Down Expand Up @@ -442,7 +447,7 @@ impl RowIterator {
&option.table_schema.name,
&option.tags_filter,
)
.context(IndexErrSnafu)?;
.context(error::IndexErrSnafu)?;

debug!("series number: {}", series.len());

Expand All @@ -461,18 +466,18 @@ impl RowIterator {
})
}

pub fn get_tsm_reader(&mut self, file: Arc<ColumnFile>) -> Result<TsmReader, Error> {
pub async fn get_tsm_reader(&mut self, file: Arc<ColumnFile>) -> Result<TsmReader, Error> {
if let Some(val) = self.open_files.get(&file.file_id()) {
return Ok(val.clone());
}

let tsm_reader = TsmReader::open(file.file_path())?;
let tsm_reader = TsmReader::open(file.file_path()).await?;
self.open_files.insert(file.file_id(), tsm_reader.clone());

Ok(tsm_reader)
}

fn build_series_columns(&mut self, id: SeriesId) -> Result<(), Error> {
async fn build_series_columns(&mut self, id: SeriesId) -> Result<(), Error> {
let start = Instant::now();

if let Some(key) = self
Expand Down Expand Up @@ -502,7 +507,8 @@ impl RowIterator {
field_name,
vtype,
self,
)?;
)
.await?;
Box::new(cursor)
}
},
Expand All @@ -519,7 +525,7 @@ impl RowIterator {
Ok(())
}

fn next_series(&mut self) -> Result<Option<()>, Error> {
async fn next_series(&mut self) -> Result<Option<()>, Error> {
if self.series_index == usize::MAX {
self.series_index = 0;
} else {
Expand All @@ -530,19 +536,23 @@ impl RowIterator {
return Ok(None);
}

self.build_series_columns(self.series[self.series_index])?;
self.build_series_columns(self.series[self.series_index])
.await?;

Ok(Some(()))
}

fn collect_row_data(&mut self, builder: &mut [ArrayBuilderPtr]) -> Result<Option<()>, Error> {
async fn collect_row_data(
&mut self,
builder: &mut [ArrayBuilderPtr],
) -> Result<Option<()>, Error> {
debug!("======collect_row_data=========");
let timer = self.metrics.elapsed_field_scan().timer();

let mut min_time = i64::MAX;
let mut values = Vec::with_capacity(self.columns.len());
for column in self.columns.iter_mut() {
let val = column.peek()?;
let val = column.peek().await?;
if let Some(ref data) = val {
if column.is_field() {
min_time = min_num(min_time, data.timestamp());
Expand All @@ -560,7 +570,7 @@ impl RowIterator {
if let Some(data) = value {
let ts = data.timestamp();
if ts == min_time {
column.next(ts);
column.next(ts).await;
} else {
*value = None;
}
Expand Down Expand Up @@ -657,13 +667,13 @@ impl RowIterator {
Ok(Some(()))
}

fn next_row(&mut self, builder: &mut [ArrayBuilderPtr]) -> Result<Option<()>, Error> {
async fn next_row(&mut self, builder: &mut [ArrayBuilderPtr]) -> Result<Option<()>, Error> {
loop {
if self.columns.is_empty() && self.next_series()?.is_none() {
if self.columns.is_empty() && self.next_series().await?.is_none() {
return Ok(None);
}

if self.collect_row_data(builder)?.is_some() {
if self.collect_row_data(builder).await?.is_some() {
return Ok(Some(()));
}
}
Expand Down Expand Up @@ -717,10 +727,8 @@ impl RowIterator {
}
}

impl Iterator for RowIterator {
type Item = Result<RecordBatch, Error>;

fn next(&mut self) -> Option<Self::Item> {
impl RowIterator {
pub async fn next(&mut self) -> Option<Result<RecordBatch, Error>> {
if self.is_finish() {
return None;
}
Expand All @@ -731,7 +739,7 @@ impl Iterator for RowIterator {

for _ in 0..self.batch_size {
debug!("========next_row");
match self.next_row(&mut builder) {
match self.next_row(&mut builder).await {
Ok(Some(_)) => {}
Ok(None) => break,
Err(err) => return Some(Err(err)),
Expand Down
Loading

0 comments on commit eee230a

Please sign in to comment.