Skip to content

Commit

Permalink
feat(meta): support commit epoch on subset of tables (risingwavelabs#…
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Aug 15, 2024
1 parent f511bcf commit cbeda4d
Show file tree
Hide file tree
Showing 9 changed files with 415 additions and 313 deletions.
25 changes: 12 additions & 13 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1284,17 +1284,16 @@ fn collect_commit_epoch_info(
let new_table_fragment_info =
if let Command::CreateStreamingJob { info, .. } = &command_ctx.command {
let table_fragments = &info.table_fragments;
Some(NewTableFragmentInfo {
table_id: table_fragments.table_id(),
NewTableFragmentInfo::Normal {
mv_table_id: table_fragments.mv_table_id().map(TableId::new),
internal_table_ids: table_fragments
.internal_table_ids()
.into_iter()
.map(TableId::new)
.collect(),
})
}
} else {
None
NewTableFragmentInfo::None
};

let table_new_change_log = build_table_change_log_delta(
Expand All @@ -1317,9 +1316,9 @@ fn collect_commit_epoch_info(

let epoch = command_ctx.prev_epoch.value().0;

CommitEpochInfo::new(
synced_ssts,
merge_multiple_new_table_watermarks(
CommitEpochInfo {
sstables: synced_ssts,
new_table_watermarks: merge_multiple_new_table_watermarks(
table_watermarks
.into_iter()
.map(|watermarks| {
Expand All @@ -1332,11 +1331,11 @@ fn collect_commit_epoch_info(
})
.collect_vec(),
),
sst_to_worker,
sst_to_context: sst_to_worker,
new_table_fragment_info,
table_new_change_log,
BTreeMap::from_iter([(epoch, command_ctx.table_ids_to_commit.clone())]),
epoch,
vec![],
)
change_log_delta: table_new_change_log,
committed_epoch: epoch,
tables_to_commit: command_ctx.table_ids_to_commit.clone(),
is_visible_table_committed_epoch: true,
}
}
227 changes: 86 additions & 141 deletions src/meta/src/hummock/manager/commit_epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ use risingwave_hummock_sdk::table_stats::{
use risingwave_hummock_sdk::table_watermark::TableWatermarks;
use risingwave_hummock_sdk::version::HummockVersionStateTableInfo;
use risingwave_hummock_sdk::{
CompactionGroupId, HummockContextId, HummockEpoch, HummockSstableObjectId, LocalSstableInfo,
CompactionGroupId, HummockContextId, HummockSstableObjectId, LocalSstableInfo,
};
use risingwave_pb::hummock::compact_task::{self};
use risingwave_pb::hummock::HummockSnapshot;
use sea_orm::TransactionTrait;
use tracing::warn;

use crate::hummock::error::{Error, Result};
use crate::hummock::manager::compaction_group_manager::CompactionGroupManager;
Expand All @@ -48,60 +47,33 @@ use crate::hummock::{
HummockManager,
};

#[derive(Debug, Clone)]
pub struct NewTableFragmentInfo {
pub table_id: TableId,
pub mv_table_id: Option<TableId>,
pub internal_table_ids: Vec<TableId>,
}

pub struct BatchCommitForNewCg {
pub epoch_to_ssts: BTreeMap<HummockEpoch, Vec<LocalSstableInfo>>,
pub table_ids: Vec<TableId>,
pub enum NewTableFragmentInfo {
None,
Normal {
mv_table_id: Option<TableId>,
internal_table_ids: Vec<TableId>,
},
NewCompactionGroup {
table_ids: HashSet<TableId>,
},
}

pub struct CommitEpochInfo {
pub sstables: Vec<LocalSstableInfo>,
pub new_table_watermarks: HashMap<TableId, TableWatermarks>,
pub sst_to_context: HashMap<HummockSstableObjectId, HummockContextId>,
pub new_table_fragment_info: Option<NewTableFragmentInfo>,
pub new_table_fragment_info: NewTableFragmentInfo,
pub change_log_delta: HashMap<TableId, ChangeLogDelta>,
pub table_committed_epoch: BTreeMap<HummockEpoch, HashSet<TableId>>,
pub max_committed_epoch: HummockEpoch,

// commit multi Epoch and SSTs for new compaction group
pub batch_commit_for_new_cg: Vec<BatchCommitForNewCg>,
}

impl CommitEpochInfo {
pub fn new(
sstables: Vec<LocalSstableInfo>,
new_table_watermarks: HashMap<TableId, TableWatermarks>,
sst_to_context: HashMap<HummockSstableObjectId, HummockContextId>,
new_table_fragment_info: Option<NewTableFragmentInfo>,
change_log_delta: HashMap<TableId, ChangeLogDelta>,
table_committed_epoch: BTreeMap<HummockEpoch, HashSet<TableId>>,
max_committed_epoch: HummockEpoch,
batch_commit_for_new_cg: Vec<BatchCommitForNewCg>,
) -> Self {
Self {
sstables,
new_table_watermarks,
sst_to_context,
new_table_fragment_info,
change_log_delta,
table_committed_epoch,
max_committed_epoch,
batch_commit_for_new_cg,
}
}
pub committed_epoch: u64,
pub tables_to_commit: HashSet<TableId>,
pub is_visible_table_committed_epoch: bool,
}

impl HummockManager {
#[cfg(any(test, feature = "test"))]
pub async fn commit_epoch_for_test(
&self,
epoch: HummockEpoch,
epoch: u64,
sstables: Vec<impl Into<LocalSstableInfo>>,
sst_to_context: HashMap<HummockSstableObjectId, HummockContextId>,
) -> Result<()> {
Expand All @@ -115,16 +87,16 @@ impl HummockManager {
.keys()
.cloned()
.collect();
let info = CommitEpochInfo::new(
sstables.into_iter().map(Into::into).collect(),
HashMap::new(),
let info = CommitEpochInfo {
sstables: sstables.into_iter().map(Into::into).collect(),
new_table_watermarks: HashMap::new(),
sst_to_context,
None,
HashMap::new(),
BTreeMap::from_iter([(epoch, tables)]),
epoch,
vec![],
);
new_table_fragment_info: NewTableFragmentInfo::None,
change_log_delta: HashMap::new(),
committed_epoch: epoch,
tables_to_commit: tables,
is_visible_table_committed_epoch: true,
};
self.commit_epoch(info).await?;
Ok(())
}
Expand All @@ -140,9 +112,9 @@ impl HummockManager {
sst_to_context,
new_table_fragment_info,
change_log_delta,
table_committed_epoch,
max_committed_epoch: epoch,
batch_commit_for_new_cg,
committed_epoch,
tables_to_commit,
is_visible_table_committed_epoch,
} = commit_info;
let mut versioning_guard = self.versioning.write().await;
let _timer = start_measure_real_process_timer!(self, "commit_epoch");
Expand All @@ -153,7 +125,9 @@ impl HummockManager {

let versioning: &mut Versioning = &mut versioning_guard;
self.commit_epoch_sanity_check(
epoch,
committed_epoch,
&tables_to_commit,
is_visible_table_committed_epoch,
&sstables,
&sst_to_context,
&versioning.current_version,
Expand Down Expand Up @@ -187,45 +161,41 @@ impl HummockManager {

let mut table_compaction_group_mapping = state_table_info.build_table_compaction_group_id();

let mut new_table_ids = HashMap::new();
// Add new table
if let Some(new_fragment_table_info) = new_table_fragment_info {
if !new_fragment_table_info.internal_table_ids.is_empty() {
on_handle_add_new_table(
state_table_info,
&new_fragment_table_info.internal_table_ids,
StaticCompactionGroupId::StateDefault as u64,
&mut table_compaction_group_mapping,
&mut new_table_ids,
)?;
}

if let Some(mv_table_id) = new_fragment_table_info.mv_table_id {
on_handle_add_new_table(
state_table_info,
&[mv_table_id],
StaticCompactionGroupId::MaterializedView as u64,
&mut table_compaction_group_mapping,
&mut new_table_ids,
)?;
}
}
let (new_table_ids, new_compaction_group, compaction_group_manager_txn) =
match new_table_fragment_info {
NewTableFragmentInfo::Normal {
mv_table_id,
internal_table_ids,
} => {
let mut new_table_ids = HashMap::new();
on_handle_add_new_table(
state_table_info,
&internal_table_ids,
StaticCompactionGroupId::StateDefault as u64,
&mut table_compaction_group_mapping,
&mut new_table_ids,
)?;

let (batch_commit_for_new_cg, compaction_group_manager_txn) =
if !batch_commit_for_new_cg.is_empty() {
let compaction_group_manager_guard = self.compaction_group_manager.write().await;
let compaction_group_config =
compaction_group_manager_guard.default_compaction_config();
let mut compaction_group_manager =
CompactionGroupManager::start_owned_compaction_groups_txn(
compaction_group_manager_guard,
);
let mut batch_commit_info = HashMap::new();
for BatchCommitForNewCg {
epoch_to_ssts,
table_ids,
} in batch_commit_for_new_cg
{
on_handle_add_new_table(
state_table_info,
&mv_table_id,
StaticCompactionGroupId::MaterializedView as u64,
&mut table_compaction_group_mapping,
&mut new_table_ids,
)?;
(new_table_ids, None, None)
}
NewTableFragmentInfo::NewCompactionGroup { table_ids } => {
let compaction_group_manager_guard =
self.compaction_group_manager.write().await;
let compaction_group_config =
compaction_group_manager_guard.default_compaction_config();
let mut compaction_group_manager =
CompactionGroupManager::start_owned_compaction_groups_txn(
compaction_group_manager_guard,
);
let mut new_table_ids = HashMap::new();
let new_compaction_group_id = next_compaction_group_id(&self.env).await?;
compaction_group_manager.insert(
new_compaction_group_id,
Expand All @@ -242,15 +212,13 @@ impl HummockManager {
&mut table_compaction_group_mapping,
&mut new_table_ids,
)?;

batch_commit_info.insert(new_compaction_group_id, epoch_to_ssts);
(
new_table_ids,
Some((new_compaction_group_id, (*compaction_group_config).clone())),
Some(compaction_group_manager),
)
}
(
Some((batch_commit_info, (*compaction_group_config).clone())),
Some(compaction_group_manager),
)
} else {
(None, None)
NewTableFragmentInfo::None => (HashMap::new(), None, None),
};

let commit_sstables = self
Expand All @@ -260,39 +228,16 @@ impl HummockManager {
let modified_compaction_groups: Vec<_> = commit_sstables.keys().cloned().collect();

let time_travel_delta = version.pre_commit_epoch(
epoch,
committed_epoch,
&tables_to_commit,
is_visible_table_committed_epoch,
new_compaction_group,
commit_sstables,
new_table_ids,
new_table_watermarks,
change_log_delta,
batch_commit_for_new_cg,
);

// TODO: remove the sanity check when supporting partial checkpoint
assert_eq!(1, table_committed_epoch.len());
{
let (table_committed_epoch, committed_table_ids) =
table_committed_epoch.iter().next().expect("non-empty");
assert_eq!(*table_committed_epoch, epoch);
let table_ids: HashSet<_> = version
.latest_version()
.state_table_info
.info()
.keys()
.cloned()
.collect();
assert!(table_ids.is_subset(committed_table_ids), "hummock table ids {table_ids:?} not a subset of table ids to commit{committed_table_ids:?}");
if cfg!(debug_assertions) {
assert_eq!(&table_ids, committed_table_ids);
} else if table_ids != *committed_table_ids {
let extra_table_ids = committed_table_ids - &table_ids;
warn!(
?extra_table_ids,
"ignore extra table ids that are not previously registered"
);
}
}

// Apply stats changes.
let mut version_stats = HummockVersionStatsTransaction::new(
&mut versioning.version_stats,
Expand Down Expand Up @@ -373,13 +318,18 @@ impl HummockManager {
)?;
}

let snapshot = HummockSnapshot {
committed_epoch: epoch,
current_epoch: epoch,
let snapshot = if is_visible_table_committed_epoch {
let snapshot = HummockSnapshot {
committed_epoch,
current_epoch: committed_epoch,
};
let prev_snapshot = self.latest_snapshot.swap(snapshot.into());
assert!(prev_snapshot.committed_epoch < committed_epoch);
assert!(prev_snapshot.current_epoch < committed_epoch);
Some(snapshot)
} else {
None
};
let prev_snapshot = self.latest_snapshot.swap(snapshot.into());
assert!(prev_snapshot.committed_epoch < epoch);
assert!(prev_snapshot.current_epoch < epoch);

for compaction_group_id in &modified_compaction_groups {
trigger_sst_stat(
Expand All @@ -391,7 +341,6 @@ impl HummockManager {
}

drop(versioning_guard);
tracing::trace!("new committed epoch {}", epoch);

// Don't trigger compactions if we enable deterministic compaction
if !self.env.opts.compaction_deterministic_test {
Expand All @@ -411,7 +360,7 @@ impl HummockManager {
{
self.check_state_consistency().await;
}
Ok(Some(snapshot))
Ok(snapshot)
}

fn collect_table_write_throughput(&self, table_stats: PbTableStatsMap) {
Expand Down Expand Up @@ -479,15 +428,11 @@ impl HummockManager {

fn on_handle_add_new_table(
state_table_info: &HummockVersionStateTableInfo,
table_ids: &[TableId],
table_ids: impl IntoIterator<Item = &TableId>,
compaction_group_id: CompactionGroupId,
table_compaction_group_mapping: &mut HashMap<TableId, CompactionGroupId>,
new_table_ids: &mut HashMap<TableId, CompactionGroupId>,
) -> Result<()> {
if table_ids.is_empty() {
return Err(Error::CompactionGroup("empty table ids".to_string()));
}

for table_id in table_ids {
if let Some(info) = state_table_info.info().get(table_id) {
return Err(Error::CompactionGroup(format!(
Expand Down
Loading

0 comments on commit cbeda4d

Please sign in to comment.