Skip to content

Commit

Permalink
feat(read): Add field-id-filter in ColumnFile and some optimizes.
Browse files Browse the repository at this point in the history
To avoid unnecessary fopen,recovers and compactoins will generate this filter.
Other chagnes: String database turned to Arc<String>.
  • Loading branch information
zipper-meng authored and roseboy-liu committed Feb 7, 2023
1 parent 298b1eb commit 9d426be
Show file tree
Hide file tree
Showing 16 changed files with 507 additions and 280 deletions.
6 changes: 6 additions & 0 deletions common/utils/src/bloom_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ impl BloomFilter {
}
}

impl Default for BloomFilter {
fn default() -> Self {
Self::new(512)
}
}

#[cfg(test)]
mod test {
use super::BloomFilter;
Expand Down
1 change: 0 additions & 1 deletion tskv/src/compaction/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use crate::{
database::Database,
error::{self, Error, Result},
schema::schemas::DBschemas,
tseries_family::{ColumnFile, TseriesFamily},
tsm::{DataBlock, TsmReader},
TimeRange, TseriesFamilyId,
};
Expand Down
36 changes: 21 additions & 15 deletions tskv/src/compaction/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use evmap::new;
use models::{FieldId, Timestamp, ValueType};
use snafu::ResultExt;
use trace::{debug, error, info, trace};
use utils::BloomFilter;

use crate::{
compaction::CompactReq,
Expand All @@ -25,7 +26,7 @@ use crate::{
self, BlockMeta, BlockMetaIterator, ColumnReader, DataBlock, Index, IndexIterator,
IndexMeta, IndexReader, TsmReader, TsmWriter,
},
Error, LevelId, TseriesFamilyId,
ColumnFileId, Error, LevelId, TseriesFamilyId,
};

/// Temporary compacting data block meta
Expand Down Expand Up @@ -456,7 +457,7 @@ fn overlaps_tuples(r1: (i64, i64), r2: (i64, i64)) -> bool {
pub async fn run_compaction_job(
request: CompactReq,
kernel: Arc<GlobalContext>,
) -> Result<Option<VersionEdit>> {
) -> Result<Option<(VersionEdit, HashMap<ColumnFileId, Arc<BloomFilter>>)>> {
info!(
"Compaction: Running compaction job on ts_family: {} and files: [ {} ]",
request.ts_family_id,
Expand Down Expand Up @@ -499,6 +500,7 @@ pub async fn run_compaction_job(
let mut tsm_writer = tsm::new_tsm_writer(&tsm_dir, kernel.file_id_next(), false, 0).await?;
info!("Compaction: File {} been created.", tsm_writer.sequence());
let mut version_edit = VersionEdit::new(tsf_id);
let mut file_metas: HashMap<ColumnFileId, Arc<BloomFilter>> = HashMap::new();

loop {
let block = iter.next().await;
Expand Down Expand Up @@ -575,6 +577,10 @@ pub async fn run_compaction_job(
);
let cm = new_compact_meta(&tsm_writer, request.ts_family_id, request.out_level);
version_edit.add_file(cm, version.max_level_ts);
file_metas.insert(
tsm_writer.sequence(),
Arc::new(tsm_writer.bloom_filter_cloned()),
);
for file in request.files {
version_edit.del_file(file.level(), file.file_id(), file.is_delta());
}
Expand All @@ -584,7 +590,7 @@ pub async fn run_compaction_job(
version_edit
);

Ok(Some(version_edit))
Ok(Some((version_edit, file_metas)))
}

fn new_compact_meta(
Expand Down Expand Up @@ -727,14 +733,14 @@ pub mod test {
}

fn prepare_compact_req_and_kernel(
database: String,
database: Arc<String>,
opt: Arc<Options>,
next_file_id: u64,
files: Vec<Arc<ColumnFile>>,
) -> (CompactReq, Arc<GlobalContext>) {
let version = Arc::new(Version::new(
1,
"version_1".to_string(),
database.clone(),
opt.storage.clone(),
1,
LevelInfo::init_levels(database.clone(), 0, opt.storage.clone()),
Expand Down Expand Up @@ -783,15 +789,15 @@ pub mod test {
]);

let dir = "/tmp/test/compaction";
let database = "dba".to_string();
let database = Arc::new("dba".to_string());
let opt = create_options(dir.to_string());
let dir = opt.storage.tsm_dir(&database, 1);

let (next_file_id, files) =
write_data_blocks_to_column_file(&dir, data, 1, opt.clone()).await;
let (compact_req, kernel) =
prepare_compact_req_and_kernel(database, opt, next_file_id, files);
let version_edit = run_compaction_job(compact_req, kernel)
let (version_edit, _) = run_compaction_job(compact_req, kernel)
.await
.unwrap()
.unwrap();
Expand Down Expand Up @@ -826,15 +832,15 @@ pub mod test {
]);

let dir = "/tmp/test/compaction/1";
let database = "dba".to_string();
let database = Arc::new("dba".to_string());
let opt = create_options(dir.to_string());
let dir = opt.storage.tsm_dir(&database, 1);

let (next_file_id, files) =
write_data_blocks_to_column_file(&dir, data, 1, opt.clone()).await;
let (compact_req, kernel) =
prepare_compact_req_and_kernel(database, opt, next_file_id, files);
let version_edit = run_compaction_job(compact_req, kernel)
let (version_edit, _) = run_compaction_job(compact_req, kernel)
.await
.unwrap()
.unwrap();
Expand Down Expand Up @@ -869,15 +875,15 @@ pub mod test {
]);

let dir = "/tmp/test/compaction/2";
let database = "dba".to_string();
let database = Arc::new("dba".to_string());
let opt = create_options(dir.to_string());
let dir = opt.storage.tsm_dir(&database, 1);

let (next_file_id, files) =
write_data_blocks_to_column_file(&dir, data, 1, opt.clone()).await;
let (compact_req, kernel) =
prepare_compact_req_and_kernel(database, opt, next_file_id, files);
let version_edit = run_compaction_job(compact_req, kernel)
let (version_edit, _) = run_compaction_job(compact_req, kernel)
.await
.unwrap()
.unwrap();
Expand Down Expand Up @@ -1064,7 +1070,7 @@ pub mod test {
);

let dir = "/tmp/test/compaction/3";
let database = "dba".to_string();
let database = Arc::new("dba".to_string());
let opt = create_options(dir.to_string());
let dir = opt.storage.tsm_dir(&database, 1);
if !file_manager::try_exists(&dir) {
Expand Down Expand Up @@ -1099,7 +1105,7 @@ pub mod test {
let (compact_req, kernel) =
prepare_compact_req_and_kernel(database, opt, next_file_id, column_files);

let version_edit = run_compaction_job(compact_req, kernel)
let (version_edit, _) = run_compaction_job(compact_req, kernel)
.await
.unwrap()
.unwrap();
Expand Down Expand Up @@ -1211,7 +1217,7 @@ pub mod test {
);

let dir = "/tmp/test/compaction/4";
let database = "dba".to_string();
let database = Arc::new("dba".to_string());
let opt = create_options(dir.to_string());
let dir = opt.storage.tsm_dir(&database, 1);
if !file_manager::try_exists(&dir) {
Expand Down Expand Up @@ -1255,7 +1261,7 @@ pub mod test {
let (compact_req, kernel) =
prepare_compact_req_and_kernel(database, opt, next_file_id, column_files);

let version_edit = run_compaction_job(compact_req, kernel)
let (version_edit, _) = run_compaction_job(compact_req, kernel)
.await
.unwrap()
.unwrap();
Expand Down
73 changes: 45 additions & 28 deletions tskv/src/compaction/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use tokio::{
},
};
use trace::{debug, error, info, log_error, warn};
use utils::BloomFilter;

use crate::{
compaction::FlushReq,
Expand All @@ -40,7 +41,7 @@ use crate::{
tseries_family::{LevelInfo, Version},
tsm::{self, codec::DataBlockEncoding, DataBlock, TsmWriter},
version_set::VersionSet,
TseriesFamilyId,
ColumnFileId, TseriesFamilyId,
};

struct FlushingBlock {
Expand Down Expand Up @@ -78,6 +79,7 @@ impl FlushTask {
self,
version: Arc<Version>,
version_edits: &mut Vec<VersionEdit>,
file_metas: &mut HashMap<ColumnFileId, Arc<BloomFilter>>,
) -> Result<()> {
info!(
"Flush: Running flush job on ts_family: {} with {} MemCaches, collecting informations.",
Expand Down Expand Up @@ -113,25 +115,28 @@ impl FlushTask {
}

let mut max_level_ts = version.max_level_ts;
let mut compact_metas = self
let mut column_file_metas = self
.flush_mem_caches(
flushing_mems_data,
max_level_ts,
tsm::MAX_BLOCK_VALUES as usize,
)
.await?;
let mut edit = VersionEdit::new(self.ts_family_id);
for cm in compact_metas.iter_mut() {
for (cm, _) in column_file_metas.iter_mut() {
cm.low_seq = low_seq;
cm.high_seq = high_seq;
max_level_ts = max_level_ts.max(cm.max_ts);
}
for cm in compact_metas {
for (cm, field_filter) in column_file_metas {
file_metas.insert(cm.file_id, field_filter);
edit.add_file(cm, max_level_ts);
}
version_edits.push(edit);

for mem in self.mem_caches.iter() {
// TODO Cache marked as flushed but there are no column files in current version.
// May it turned to true after the new version inited?
mem.write().flushed = true;
}

Expand All @@ -145,7 +150,7 @@ impl FlushTask {
mut caches_data: HashMap<SeriesId, Vec<Arc<RwLock<SeriesData>>>>,
max_level_ts: Timestamp,
data_block_size: usize,
) -> Result<Vec<CompactMeta>> {
) -> Result<Vec<(CompactMeta, Arc<BloomFilter>)>> {
let mut delta_writer: Option<TsmWriter> = None;
let mut tsm_writer: Option<TsmWriter> = None;

Expand Down Expand Up @@ -302,12 +307,13 @@ impl FlushTask {
tsm::new_tsm_writer(dir, self.global_context.file_id_next(), is_delta, 0).await
}

/// Flush writers (if it exists) and then generate `CompactMeta`s.
/// Flush writers (if they exists) and then generate (`CompactMeta`, `Arc<BloomFilter>`)s
/// using writers (if they exists).
async fn finish_flush_mem_caches(
&self,
mut delta_writer: Option<TsmWriter>,
mut tsm_writer: Option<TsmWriter>,
) -> Result<Vec<CompactMeta>> {
) -> Result<Vec<(CompactMeta, Arc<BloomFilter>)>> {
let compact_meta_builder = CompactMetaBuilder::new(self.ts_family_id);
if let Some(writer) = tsm_writer.as_mut() {
writer.write_index().await.context(error::WriteTsmSnafu)?;
Expand All @@ -328,30 +334,36 @@ impl FlushTask {
);
}

let mut compact_metas = vec![];
let mut column_file_metas = vec![];
if let Some(writer) = tsm_writer {
compact_metas.push(compact_meta_builder.build_tsm(
writer.sequence(),
writer.size(),
1,
writer.min_ts(),
writer.max_ts(),
column_file_metas.push((
compact_meta_builder.build_tsm(
writer.sequence(),
writer.size(),
1,
writer.min_ts(),
writer.max_ts(),
),
Arc::new(writer.bloom_filter_cloned()),
));
}
if let Some(writer) = delta_writer {
compact_metas.push(compact_meta_builder.build_delta(
writer.sequence(),
writer.size(),
1,
writer.min_ts(),
writer.max_ts(),
column_file_metas.push((
compact_meta_builder.build_delta(
writer.sequence(),
writer.size(),
1,
writer.min_ts(),
writer.max_ts(),
),
Arc::new(writer.bloom_filter_cloned()),
));
}

// Sort by File id.
compact_metas.sort_by_key(|c| c.file_id);
column_file_metas.sort_by_key(|c| c.0.file_id);

Ok(compact_metas)
Ok(column_file_metas)
}
}

Expand All @@ -373,7 +385,8 @@ pub async fn run_flush_memtable_job(
}
}

let mut edits: Vec<VersionEdit> = vec![];
let mut version_edits: Vec<VersionEdit> = vec![];
let mut file_metas: HashMap<ColumnFileId, Arc<BloomFilter>> = HashMap::new();
for (tsf_id, caches) in tsf_caches.into_iter() {
if caches.is_empty() {
continue;
Expand All @@ -394,7 +407,7 @@ pub async fn run_flush_memtable_job(
let path_delta = storage_opt.delta_dir(&database, tsf_id);

FlushTask::new(caches, tsf_id, global_context.clone(), path_tsm, path_delta)
.run(version, &mut edits)
.run(version, &mut version_edits, &mut file_metas)
.await?;

if let Err(e) = compact_task_sender.send(tsf_id) {
Expand All @@ -403,10 +416,10 @@ pub async fn run_flush_memtable_job(
}
}

info!("Flush: Flush finished, version edits: {:?}", edits);
info!("Flush: Flush finished, version edits: {:?}", version_edits);

let (task_state_sender, task_state_receiver) = oneshot::channel();
let task = SummaryTask::new_append_task(edits, task_state_sender);
let task = SummaryTask::new_column_file_task(file_metas, version_edits, task_state_sender);

if let Err(e) = summary_task_sender.send(task) {
warn!("failed to send Summary task, {}", e);
Expand Down Expand Up @@ -570,7 +583,7 @@ pub mod flush_tests {
]);

let ts_family_id = 1;
let database = "test_db".to_string();
let database = Arc::new("test_db".to_string());

let caches = caches
.into_iter()
Expand All @@ -587,7 +600,11 @@ pub mod flush_tests {
});
let flush_task = FlushTask::new(caches, 1, global_context, &tsm_dir, &delta_dir);
let mut version_edits = vec![];
flush_task.run(version, &mut version_edits).await.unwrap();
let mut file_metas = HashMap::new();
flush_task
.run(version, &mut version_edits, &mut file_metas)
.await
.unwrap();

assert_eq!(version_edits.len(), 1);
let ve = version_edits.get(0).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion tskv/src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{

pub struct CompactReq {
pub ts_family_id: TseriesFamilyId,
pub database: String,
pub database: Arc<String>,
storage_opt: Arc<StorageOptions>,

files: Vec<Arc<ColumnFile>>,
Expand Down
Loading

0 comments on commit 9d426be

Please sign in to comment.