Skip to content

Commit

Permalink
feat(snapshot-backfill): only receive mutation from barrier worker fo…
Browse files Browse the repository at this point in the history
…r snapshot backfill (risingwavelabs#18210)
  • Loading branch information
wenym1 authored Sep 4, 2024
1 parent 9923c3a commit 0dd06ff
Show file tree
Hide file tree
Showing 7 changed files with 321 additions and 114 deletions.
21 changes: 14 additions & 7 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,16 +497,16 @@ impl CommandContext {
}
}

impl CommandContext {
impl Command {
/// Generate a mutation for the given command.
pub fn to_mutation(&self) -> Option<Mutation> {
pub fn to_mutation(&self, current_paused_reason: Option<&PausedReason>) -> Option<Mutation> {
let mutation =
match &self.command {
match self {
Command::Plain(mutation) => mutation.clone(),

Command::Pause(_) => {
// Only pause when the cluster is not already paused.
if self.current_paused_reason.is_none() {
if current_paused_reason.is_none() {
Some(Mutation::Pause(PauseMutation {}))
} else {
None
Expand All @@ -515,7 +515,7 @@ impl CommandContext {

Command::Resume(reason) => {
// Only resume when the cluster is paused with the same reason.
if self.current_paused_reason == Some(*reason) {
if current_paused_reason == Some(reason) {
Some(Mutation::Resume(ResumeMutation {}))
} else {
None
Expand Down Expand Up @@ -607,7 +607,7 @@ impl CommandContext {
added_actors,
actor_splits,
// If the cluster is already paused, the new actors should be paused too.
pause: self.current_paused_reason.is_some(),
pause: current_paused_reason.is_some(),
subscriptions_to_add,
}));

Expand Down Expand Up @@ -846,7 +846,7 @@ impl CommandContext {
}

pub fn actors_to_create(&self) -> Option<HashMap<WorkerId, Vec<StreamActor>>> {
match &self.command {
match self {
Command::CreateStreamingJob { info, job_type } => {
let mut map = match job_type {
CreateStreamingJobType::Normal => HashMap::new(),
Expand Down Expand Up @@ -914,6 +914,13 @@ impl CommandContext {
..Default::default()
}))
}
}

impl CommandContext {
pub fn to_mutation(&self) -> Option<Mutation> {
self.command
.to_mutation(self.current_paused_reason.as_ref())
}

/// Returns the paused reason after executing the current command.
pub fn next_paused_reason(&self) -> Option<PausedReason> {
Expand Down
10 changes: 7 additions & 3 deletions src/meta/src/barrier/creating_job/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use risingwave_common::util::epoch::Epoch;
use risingwave_pb::common::WorkerNode;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_service::{BarrierCompleteResponse, BuildActorInfo};
use tracing::{debug, info};

Expand Down Expand Up @@ -67,6 +68,7 @@ impl CreatingStreamingJobControl {
backfill_epoch: u64,
version_stat: &HummockVersionStats,
metrics: &MetaMetrics,
initial_mutation: Mutation,
) -> Self {
info!(
table_id = info.table_fragments.table_id().table_id,
Expand Down Expand Up @@ -108,7 +110,7 @@ impl CreatingStreamingJobControl {
backfill_epoch,
pending_non_checkpoint_barriers: vec![],
snapshot_backfill_actors,
actors_to_create: Some(
initial_barrier_info: Some((
actors_to_create
.into_iter()
.map(|(worker_id, actors)| {
Expand All @@ -124,7 +126,8 @@ impl CreatingStreamingJobControl {
)
})
.collect(),
),
initial_mutation,
)),
},
upstream_lag: metrics
.snapshot_backfill_lag
Expand Down Expand Up @@ -283,11 +286,12 @@ impl CreatingStreamingJobControl {
prev_epoch,
kind,
new_actors,
mutation,
} in barriers_to_inject
{
let node_to_collect = control_stream_manager.inject_barrier(
Some(table_id),
None,
mutation,
(&curr_epoch, &prev_epoch),
&kind,
graph_info,
Expand Down
21 changes: 17 additions & 4 deletions src/meta/src/barrier/creating_job/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use std::sync::Arc;

use risingwave_common::util::epoch::Epoch;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
use risingwave_pb::stream_service::BuildActorInfo;

Expand All @@ -40,7 +41,9 @@ pub(super) enum CreatingStreamingJobStatus {
/// The `prev_epoch` of pending non checkpoint barriers
pending_non_checkpoint_barriers: Vec<u64>,
snapshot_backfill_actors: HashMap<WorkerId, HashSet<ActorId>>,
actors_to_create: Option<HashMap<WorkerId, Vec<BuildActorInfo>>>,
/// Info of the first barrier: (`actors_to_create`, `mutation`)
/// Take the mutation out when injecting the first barrier
initial_barrier_info: Option<(HashMap<WorkerId, Vec<BuildActorInfo>>, Mutation)>,
},
ConsumingLogStore {
graph_info: InflightGraphInfo,
Expand All @@ -60,6 +63,7 @@ pub(super) struct CreatingJobInjectBarrierInfo {
pub prev_epoch: TracedEpoch,
pub kind: BarrierKind,
pub new_actors: Option<HashMap<WorkerId, Vec<BuildActorInfo>>>,
pub mutation: Option<Mutation>,
}

impl CreatingStreamingJobStatus {
Expand Down Expand Up @@ -104,12 +108,12 @@ impl CreatingStreamingJobStatus {
graph_info,
pending_non_checkpoint_barriers,
ref backfill_epoch,
actors_to_create,
initial_barrier_info,
..
} = self
{
if create_mview_tracker.has_pending_finished_jobs() {
assert!(actors_to_create.is_none());
assert!(initial_barrier_info.is_none());
pending_non_checkpoint_barriers.push(*backfill_epoch);

let prev_epoch = Epoch::from_physical_time(*prev_epoch_fake_physical_time);
Expand All @@ -119,6 +123,7 @@ impl CreatingStreamingJobStatus {
prev_epoch: TracedEpoch::new(prev_epoch),
kind: BarrierKind::Checkpoint(take(pending_non_checkpoint_barriers)),
new_actors: None,
mutation: None,
}]
.into_iter()
.chain(pending_commands.drain(..).map(|command_ctx| {
Expand All @@ -127,6 +132,7 @@ impl CreatingStreamingJobStatus {
prev_epoch: command_ctx.prev_epoch.clone(),
kind: command_ctx.kind.clone(),
new_actors: None,
mutation: None,
}
}))
.collect();
Expand All @@ -145,12 +151,19 @@ impl CreatingStreamingJobStatus {
} else {
BarrierKind::Barrier
};
let (new_actors, mutation) =
if let Some((new_actors, mutation)) = initial_barrier_info.take() {
(Some(new_actors), Some(mutation))
} else {
Default::default()
};
Some((
vec![CreatingJobInjectBarrierInfo {
curr_epoch,
prev_epoch,
kind,
new_actors: actors_to_create.take(),
new_actors,
mutation,
}],
None,
))
Expand Down
14 changes: 14 additions & 0 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,19 @@ impl GlobalBarrierManager {
info,
} = &command
{
if self.state.paused_reason().is_some() {
warn!("cannot create streaming job with snapshot backfill when paused");
for notifier in notifiers {
notifier.notify_start_failed(
anyhow!("cannot create streaming job with snapshot backfill when paused",)
.into(),
);
}
return Ok(());
}
let mutation = command
.to_mutation(None)
.expect("should have some mutation in `CreateStreamingJob` command");
self.checkpoint_control
.creating_streaming_job_controls
.insert(
Expand All @@ -975,6 +988,7 @@ impl GlobalBarrierManager {
prev_epoch.value().0,
&self.checkpoint_control.hummock_version_stats,
&self.context.metrics,
mutation,
),
);
}
Expand Down
69 changes: 36 additions & 33 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,39 +263,42 @@ impl ControlStreamManager {
pre_applied_graph_info,
applied_graph_info,
actor_ids_to_pre_sync_mutation,
command_ctx.actors_to_create().map(|actors_to_create| {
actors_to_create
.into_iter()
.map(|(worker_id, actors)| {
(
worker_id,
actors
.into_iter()
.map(|actor| BuildActorInfo {
actor: Some(actor),
// TODO: consider subscriber of backfilling mv
related_subscriptions: command_ctx
.subscription_info
.mv_depended_subscriptions
.iter()
.map(|(table_id, subscriptions)| {
(
table_id.table_id,
SubscriptionIds {
subscription_ids: subscriptions
.keys()
.cloned()
.collect(),
},
)
})
.collect(),
})
.collect_vec(),
)
})
.collect()
}),
command_ctx
.command
.actors_to_create()
.map(|actors_to_create| {
actors_to_create
.into_iter()
.map(|(worker_id, actors)| {
(
worker_id,
actors
.into_iter()
.map(|actor| BuildActorInfo {
actor: Some(actor),
// TODO: consider subscriber of backfilling mv
related_subscriptions: command_ctx
.subscription_info
.mv_depended_subscriptions
.iter()
.map(|(table_id, subscriptions)| {
(
table_id.table_id,
SubscriptionIds {
subscription_ids: subscriptions
.keys()
.cloned()
.collect(),
},
)
})
.collect(),
})
.collect_vec(),
)
})
.collect()
}),
)
}

Expand Down
Loading

0 comments on commit 0dd06ff

Please sign in to comment.