Skip to content

Commit

Permalink
refactor: add subscription via add mutation (risingwavelabs#17897)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Aug 5, 2024
1 parent 4987ccb commit c4adffc
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 86 deletions.
14 changes: 6 additions & 8 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ message AddMutation {
// We may embed a pause mutation here.
// TODO: we may allow multiple mutations in a single barrier.
bool pause = 4;
repeated SubscriptionUpstreamInfo subscriptions_to_add = 5;
}

message StopMutation {
Expand Down Expand Up @@ -95,14 +96,13 @@ message CombinedMutation {
repeated BarrierMutation mutations = 1;
}

message CreateSubscriptionMutation {
uint32 subscription_id = 1;
message SubscriptionUpstreamInfo {
uint32 subscriber_id = 1;
uint32 upstream_mv_table_id = 2;
}

message DropSubscriptionMutation {
uint32 subscription_id = 1;
uint32 upstream_mv_table_id = 2;
message DropSubscriptionsMutation {
repeated SubscriptionUpstreamInfo info = 1;
}

message BarrierMutation {
Expand All @@ -122,10 +122,8 @@ message BarrierMutation {
ResumeMutation resume = 8;
// Throttle specific source exec or chain exec.
ThrottleMutation throttle = 10;
// Create subscription on mv
CreateSubscriptionMutation create_subscription = 11;
// Drop subscription on mv
DropSubscriptionMutation drop_subscription = 12;
DropSubscriptionsMutation drop_subscriptions = 12;
// Combined mutation.
CombinedMutation combined = 100;
}
Expand Down
1 change: 1 addition & 0 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ async fn test_cdc_backfill() -> StreamResult<()> {
added_actors: HashSet::new(),
splits,
pause: false,
subscriptions_to_add: vec![],
}));

tx.send_barrier(init_barrier);
Expand Down
41 changes: 31 additions & 10 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::fmt::Formatter;
use std::sync::Arc;

use futures::future::try_join_all;
Expand All @@ -33,9 +34,9 @@ use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_plan::throttle_mutation::RateLimit;
use risingwave_pb::stream_plan::update_mutation::*;
use risingwave_pb::stream_plan::{
AddMutation, BarrierMutation, CombinedMutation, CreateSubscriptionMutation, Dispatcher,
Dispatchers, DropSubscriptionMutation, PauseMutation, ResumeMutation,
SourceChangeSplitMutation, StopMutation, StreamActor, ThrottleMutation, UpdateMutation,
AddMutation, BarrierMutation, CombinedMutation, Dispatcher, Dispatchers,
DropSubscriptionsMutation, PauseMutation, ResumeMutation, SourceChangeSplitMutation,
StopMutation, StreamActor, SubscriptionUpstreamInfo, ThrottleMutation, UpdateMutation,
};
use risingwave_pb::stream_service::WaitEpochCommitRequest;
use thiserror_ext::AsReport;
Expand Down Expand Up @@ -278,7 +279,7 @@ pub enum Command {
retention_second: u64,
},

/// `DropSubscription` command generates a `DropSubscriptionMutation` to notify
/// `DropSubscription` command generates a `DropSubscriptionsMutation` to notify
/// materialize executor to stop storing old value when there is no
/// subscription depending on it.
DropSubscription {
Expand Down Expand Up @@ -443,6 +444,17 @@ pub struct CommandContext {
pub _span: tracing::Span,
}

impl std::fmt::Debug for CommandContext {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CommandContext")
.field("prev_epoch", &self.prev_epoch.value().0)
.field("curr_epoch", &self.curr_epoch.value().0)
.field("kind", &self.kind)
.field("command", &self.command)
.finish()
}
}

impl CommandContext {
#[allow(clippy::too_many_arguments)]
pub(super) fn new(
Expand Down Expand Up @@ -554,6 +566,7 @@ impl CommandContext {
actor_splits,
// If the cluster is already paused, the new actors should be paused too.
pause: self.current_paused_reason.is_some(),
subscriptions_to_add: Default::default(),
}));

if let Some(ReplaceTablePlan {
Expand Down Expand Up @@ -748,16 +761,24 @@ impl CommandContext {
upstream_mv_table_id,
subscription_id,
..
} => Some(Mutation::CreateSubscription(CreateSubscriptionMutation {
upstream_mv_table_id: upstream_mv_table_id.table_id,
subscription_id: *subscription_id,
} => Some(Mutation::Add(AddMutation {
actor_dispatchers: Default::default(),
added_actors: vec![],
actor_splits: Default::default(),
pause: false,
subscriptions_to_add: vec![SubscriptionUpstreamInfo {
upstream_mv_table_id: upstream_mv_table_id.table_id,
subscriber_id: *subscription_id,
}],
})),
Command::DropSubscription {
upstream_mv_table_id,
subscription_id,
} => Some(Mutation::DropSubscription(DropSubscriptionMutation {
upstream_mv_table_id: upstream_mv_table_id.table_id,
subscription_id: *subscription_id,
} => Some(Mutation::DropSubscriptions(DropSubscriptionsMutation {
info: vec![SubscriptionUpstreamInfo {
subscriber_id: *subscription_id,
upstream_mv_table_id: upstream_mv_table_id.table_id,
}],
})),
};

Expand Down
1 change: 1 addition & 0 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,7 @@ impl GlobalBarrierManager {
added_actors: Default::default(),
actor_splits: build_actor_connector_splits(&source_split_assignments),
pause: paused_reason.is_some(),
subscriptions_to_add: Default::default(),
})));

// Use a different `curr_epoch` for each recovery attempt.
Expand Down
7 changes: 7 additions & 0 deletions src/stream/src/common/table/state_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1157,6 +1157,13 @@ where
op_consistency_level: StateTableOpConsistencyLevel,
) -> StreamExecutorResult<()> {
if self.op_consistency_level != op_consistency_level {
info!(
?new_epoch,
prev_op_consistency_level = ?self.op_consistency_level,
?op_consistency_level,
table_id = self.table_id.table_id,
"switch to new op consistency level"
);
self.commit_inner(new_epoch, Some(op_consistency_level))
.await
} else {
Expand Down
1 change: 1 addition & 0 deletions src/stream/src/executor/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ mod test {
added_actors: maplit::hashset! { actor_id },
splits: Default::default(),
pause: false,
subscriptions_to_add: vec![],
}),
)),
Message::Chunk(StreamChunk::from_pretty("I\n + 3")),
Expand Down
106 changes: 73 additions & 33 deletions src/stream/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ use risingwave_pb::stream_plan::barrier_mutation::Mutation as PbMutation;
use risingwave_pb::stream_plan::stream_message::StreamMessage;
use risingwave_pb::stream_plan::update_mutation::{DispatcherUpdate, MergeUpdate};
use risingwave_pb::stream_plan::{
BarrierMutation, CombinedMutation, CreateSubscriptionMutation, Dispatchers,
DropSubscriptionMutation, PauseMutation, PbAddMutation, PbBarrier, PbBarrierMutation,
PbDispatcher, PbStreamMessage, PbUpdateMutation, PbWatermark, ResumeMutation,
SourceChangeSplitMutation, StopMutation, ThrottleMutation,
BarrierMutation, CombinedMutation, Dispatchers, DropSubscriptionsMutation, PauseMutation,
PbAddMutation, PbBarrier, PbBarrierMutation, PbDispatcher, PbStreamMessage, PbUpdateMutation,
PbWatermark, ResumeMutation, SourceChangeSplitMutation, StopMutation, SubscriptionUpstreamInfo,
ThrottleMutation,
};
use smallvec::SmallVec;

Expand Down Expand Up @@ -274,6 +274,8 @@ pub struct AddMutation {
// TODO: remove this and use `SourceChangesSplit` after we support multiple mutations.
pub splits: HashMap<ActorId, Vec<SplitImpl>>,
pub pause: bool,
/// (`upstream_mv_table_id`, `subscriber_id`)
pub subscriptions_to_add: Vec<(TableId, u32)>,
}

/// See [`PbMutation`] for the semantics of each mutation.
Expand All @@ -287,13 +289,9 @@ pub enum Mutation {
Resume,
Throttle(HashMap<ActorId, Option<u32>>),
AddAndUpdate(AddMutation, UpdateMutation),
CreateSubscription {
subscription_id: u32,
upstream_mv_table_id: TableId,
},
DropSubscription {
subscription_id: u32,
upstream_mv_table_id: TableId,
DropSubscriptions {
/// `subscriber` -> `upstream_mv_table_id`
subscriptions_to_drop: Vec<(u32, TableId)>,
},
}

Expand Down Expand Up @@ -446,8 +444,7 @@ impl Barrier {
| Mutation::Resume
| Mutation::SourceChangeSplit(_)
| Mutation::Throttle(_)
| Mutation::CreateSubscription { .. }
| Mutation::DropSubscription { .. } => false,
| Mutation::DropSubscriptions { .. } => false,
}
}

Expand Down Expand Up @@ -512,6 +509,31 @@ impl Barrier {
pub fn tracing_context(&self) -> &TracingContext {
&self.tracing_context
}

pub fn added_subscriber_on_mv_table(
&self,
mv_table_id: TableId,
) -> impl Iterator<Item = u32> + '_ {
if let Some(Mutation::Add(add)) | Some(Mutation::AddAndUpdate(add, _)) =
self.mutation.as_deref()
{
Some(add)
} else {
None
}
.into_iter()
.flat_map(move |add| {
add.subscriptions_to_add.iter().filter_map(
move |(upstream_mv_table_id, subscriber_id)| {
if *upstream_mv_table_id == mv_table_id {
Some(*subscriber_id)
} else {
None
}
},
)
})
}
}

impl<M: PartialEq> PartialEq for BarrierInner<M> {
Expand Down Expand Up @@ -581,6 +603,7 @@ impl Mutation {
added_actors,
splits,
pause,
subscriptions_to_add,
}) => PbMutation::Add(PbAddMutation {
actor_dispatchers: adds
.iter()
Expand All @@ -596,6 +619,13 @@ impl Mutation {
added_actors: added_actors.iter().copied().collect(),
actor_splits: actor_splits_to_protobuf(splits),
pause: *pause,
subscriptions_to_add: subscriptions_to_add
.iter()
.map(|(table_id, subscriber_id)| SubscriptionUpstreamInfo {
subscriber_id: *subscriber_id,
upstream_mv_table_id: table_id.table_id,
})
.collect(),
}),
Mutation::SourceChangeSplit(changes) => PbMutation::Splits(SourceChangeSplitMutation {
actor_splits: changes
Expand Down Expand Up @@ -629,19 +659,18 @@ impl Mutation {
},
],
}),
Mutation::CreateSubscription {
upstream_mv_table_id,
subscription_id,
} => PbMutation::CreateSubscription(CreateSubscriptionMutation {
upstream_mv_table_id: upstream_mv_table_id.table_id,
subscription_id: *subscription_id,
}),
Mutation::DropSubscription {
upstream_mv_table_id,
subscription_id,
} => PbMutation::DropSubscription(DropSubscriptionMutation {
upstream_mv_table_id: upstream_mv_table_id.table_id,
subscription_id: *subscription_id,
Mutation::DropSubscriptions {
subscriptions_to_drop,
} => PbMutation::DropSubscriptions(DropSubscriptionsMutation {
info: subscriptions_to_drop
.iter()
.map(
|(subscriber_id, upstream_mv_table_id)| SubscriptionUpstreamInfo {
subscriber_id: *subscriber_id,
upstream_mv_table_id: upstream_mv_table_id.table_id,
},
)
.collect(),
}),
}
}
Expand Down Expand Up @@ -712,6 +741,18 @@ impl Mutation {
})
.collect(),
pause: add.pause,
subscriptions_to_add: add
.subscriptions_to_add
.iter()
.map(
|SubscriptionUpstreamInfo {
subscriber_id,
upstream_mv_table_id,
}| {
(TableId::new(*upstream_mv_table_id), *subscriber_id)
},
)
.collect(),
}),

PbMutation::Splits(s) => {
Expand Down Expand Up @@ -740,13 +781,12 @@ impl Mutation {
.map(|(actor_id, limit)| (*actor_id, limit.rate_limit))
.collect(),
),
PbMutation::CreateSubscription(create) => Mutation::CreateSubscription {
upstream_mv_table_id: TableId::new(create.upstream_mv_table_id),
subscription_id: create.subscription_id,
},
PbMutation::DropSubscription(drop) => Mutation::DropSubscription {
upstream_mv_table_id: TableId::new(drop.upstream_mv_table_id),
subscription_id: drop.subscription_id,
PbMutation::DropSubscriptions(drop) => Mutation::DropSubscriptions {
subscriptions_to_drop: drop
.info
.iter()
.map(|info| (info.subscriber_id, TableId::new(info.upstream_mv_table_id)))
.collect(),
},
PbMutation::Combined(CombinedMutation { mutations }) => match &mutations[..] {
[BarrierMutation {
Expand Down
Loading

0 comments on commit c4adffc

Please sign in to comment.