Skip to content

Commit

Permalink
fix: Handling the conflict between background DDL and auto-scaling. (r…
Browse files Browse the repository at this point in the history
  • Loading branch information
shanicky authored Mar 8, 2024
1 parent 45cd2f5 commit 2ef0ff2
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 1 deletion.
11 changes: 10 additions & 1 deletion src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,18 @@ impl GlobalBarrierManager {
.cloned()
.collect_vec();

let background_streaming_jobs = self
.context
.metadata_manager
.list_background_creating_jobs()
.await?;

// Resolve actor info for recovery. If there's no actor to recover, most of the
// following steps will be no-op, while the compute nodes will still be reset.
let mut info = if !self.env.opts.disable_automatic_parallelism_control {
// FIXME: Transactions should be used.
let mut info = if !self.env.opts.disable_automatic_parallelism_control
&& background_streaming_jobs.is_empty()
{
self.context
.scale_actors(all_nodes.clone())
.await
Expand Down
23 changes: 23 additions & 0 deletions src/meta/src/manager/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,29 @@ impl MetadataManager {
}
}

pub async fn list_background_creating_jobs(&self) -> MetaResult<Vec<TableId>> {
match self {
MetadataManager::V1(mgr) => {
let tables = mgr.catalog_manager.list_creating_background_mvs().await;
Ok(tables
.into_iter()
.map(|table| TableId::from(table.id))
.collect())
}
MetadataManager::V2(mgr) => {
let tables = mgr
.catalog_controller
.list_background_creating_mviews()
.await?;

Ok(tables
.into_iter()
.map(|table| TableId::from(table.table_id as u32))
.collect())
}
}
}

pub async fn list_sources(&self) -> MetaResult<Vec<PbSource>> {
match self {
MetadataManager::V1(mgr) => Ok(mgr.catalog_manager.list_sources().await),
Expand Down
10 changes: 10 additions & 0 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,16 @@ impl DdlController {
parallelism: PbTableParallelism,
deferred: bool,
) -> MetaResult<()> {
if !deferred
&& !self
.metadata_manager
.list_background_creating_jobs()
.await?
.is_empty()
{
bail!("There are background creating jobs, please try again later")
}

self.stream_manager
.alter_table_parallelism(table_id, parallelism.into(), deferred)
.await
Expand Down
14 changes: 14 additions & 0 deletions src/meta/src/stream/scale.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2684,6 +2684,20 @@ impl GlobalStreamManager {
}

async fn trigger_parallelism_control(&self) -> MetaResult<bool> {
let background_streaming_jobs = self
.metadata_manager
.list_background_creating_jobs()
.await?;

if !background_streaming_jobs.is_empty() {
tracing::debug!(
"skipping parallelism control due to background jobs {:?}",
background_streaming_jobs
);
// skip if there are background creating jobs
return Ok(true);
}

tracing::info!("trigger parallelism control");

let _reschedule_job_lock = self.reschedule_lock_write_guard().await;
Expand Down

0 comments on commit 2ef0ff2

Please sign in to comment.