Skip to content

Commit

Permalink
refactor(meta): only store CreateStreamingJob command in tracker (ris…
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Jul 19, 2024
1 parent bd3b9a1 commit 9335967
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 236 deletions.
160 changes: 74 additions & 86 deletions src/meta/src/barrier/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ use risingwave_pb::stream_service::WaitEpochCommitRequest;
use thiserror_ext::AsReport;
use tracing::warn;

use super::info::{CommandActorChanges, CommandFragmentChanges, InflightActorInfo};
use super::info::{
CommandActorChanges, CommandFragmentChanges, CommandNewFragmentInfo, InflightActorInfo,
};
use super::trace::TracedEpoch;
use crate::barrier::GlobalBarrierManagerContext;
use crate::manager::{DdlType, MetadataManager, StreamingJob, WorkerId};
Expand Down Expand Up @@ -107,7 +109,7 @@ impl ReplaceTablePlan {
fn actor_changes(&self) -> CommandActorChanges {
let mut fragment_changes = HashMap::new();
for fragment in self.new_table_fragments.fragments.values() {
let fragment_change = CommandFragmentChanges::NewFragment {
let fragment_change = CommandFragmentChanges::NewFragment(CommandNewFragmentInfo {
new_actors: fragment
.actors
.iter()
Expand All @@ -130,7 +132,7 @@ impl ReplaceTablePlan {
.map(|table_id| TableId::new(*table_id))
.collect(),
is_injectable: TableFragments::is_injectable(fragment.fragment_type_mask),
};
});
assert!(fragment_changes
.insert(fragment.fragment_id, fragment_change)
.is_none());
Expand All @@ -144,6 +146,54 @@ impl ReplaceTablePlan {
}
}

#[derive(Debug, Clone)]
pub struct CreateStreamingJobCommandInfo {
pub table_fragments: TableFragments,
/// Refer to the doc on [`MetadataManager::get_upstream_root_fragments`] for the meaning of "root".
pub upstream_root_actors: HashMap<TableId, Vec<ActorId>>,
pub dispatchers: HashMap<ActorId, Vec<Dispatcher>>,
pub init_split_assignment: SplitAssignment,
pub definition: String,
pub ddl_type: DdlType,
pub create_type: CreateType,
pub streaming_job: StreamingJob,
pub internal_tables: Vec<Table>,
}

impl CreateStreamingJobCommandInfo {
fn new_fragment_info(&self) -> impl Iterator<Item = (FragmentId, CommandNewFragmentInfo)> + '_ {
self.table_fragments.fragments.values().map(|fragment| {
(
fragment.fragment_id,
CommandNewFragmentInfo {
new_actors: fragment
.actors
.iter()
.map(|actor| {
(
actor.actor_id,
self.table_fragments
.actor_status
.get(&actor.actor_id)
.expect("should exist")
.get_parallel_unit()
.expect("should set")
.worker_node_id,
)
})
.collect(),
table_ids: fragment
.state_table_ids
.iter()
.map(|table_id| TableId::new(*table_id))
.collect(),
is_injectable: TableFragments::is_injectable(fragment.fragment_type_mask),
},
)
})
}
}

/// [`Command`] is the input of [`crate::barrier::GlobalBarrierManager`]. For different commands,
/// it will build different barriers to send, and may do different stuffs after the barrier is
/// collected.
Expand Down Expand Up @@ -187,16 +237,7 @@ pub enum Command {
/// for a while** until the `finish` channel is signaled, then the state of `TableFragments`
/// will be set to `Created`.
CreateStreamingJob {
streaming_job: StreamingJob,
internal_tables: Vec<Table>,
table_fragments: TableFragments,
/// Refer to the doc on [`MetadataManager::get_upstream_root_fragments`] for the meaning of "root".
upstream_root_actors: HashMap<TableId, Vec<ActorId>>,
dispatchers: HashMap<ActorId, Vec<Dispatcher>>,
init_split_assignment: SplitAssignment,
definition: String,
ddl_type: DdlType,
create_type: CreateType,
info: CreateStreamingJobCommandInfo,
/// This is for create SINK into table.
replace_table: Option<ReplaceTablePlan>,
},
Expand Down Expand Up @@ -279,43 +320,13 @@ impl Command {
.collect(),
}),
Command::CreateStreamingJob {
table_fragments,
info,
replace_table,
..
} => {
let fragment_changes = table_fragments
.fragments
.values()
.map(|fragment| {
(
fragment.fragment_id,
CommandFragmentChanges::NewFragment {
new_actors: fragment
.actors
.iter()
.map(|actor| {
(
actor.actor_id,
table_fragments
.actor_status
.get(&actor.actor_id)
.expect("should exist")
.get_parallel_unit()
.expect("should set")
.worker_node_id,
)
})
.collect(),
table_ids: fragment
.state_table_ids
.iter()
.map(|table_id| TableId::new(*table_id))
.collect(),
is_injectable: TableFragments::is_injectable(
fragment.fragment_type_mask,
),
},
)
let fragment_changes = info
.new_fragment_info()
.map(|(fragment_id, info)| {
(fragment_id, CommandFragmentChanges::NewFragment(info))
})
.collect();
let mut changes = CommandActorChanges { fragment_changes };
Expand Down Expand Up @@ -460,10 +471,6 @@ impl CommandContext {
_span: span,
}
}

pub fn metadata_manager(&self) -> &MetadataManager {
&self.barrier_manager_context.metadata_manager
}
}

impl CommandContext {
Expand Down Expand Up @@ -521,11 +528,14 @@ impl CommandContext {
})),

Command::CreateStreamingJob {
table_fragments,
dispatchers,
init_split_assignment: split_assignment,
info:
CreateStreamingJobCommandInfo {
table_fragments,
dispatchers,
init_split_assignment: split_assignment,
..
},
replace_table,
..
} => {
let actor_dispatchers = dispatchers
.iter()
Expand Down Expand Up @@ -818,20 +828,6 @@ impl CommandContext {
}
}

/// For `CreateStreamingJob`, returns the actors of the `StreamScan`, and `StreamValue` nodes. For other commands,
/// returns an empty set.
pub fn actors_to_track(&self) -> HashSet<ActorId> {
match &self.command {
Command::CreateStreamingJob {
table_fragments, ..
} => table_fragments
.tracking_progress_actor_ids()
.into_iter()
.collect(),
_ => Default::default(),
}
}

/// For `CancelStreamingJob`, returns the table id of the target table.
pub fn table_to_cancel(&self) -> Option<TableId> {
match &self.command {
Expand All @@ -840,16 +836,6 @@ impl CommandContext {
}
}

/// For `CreateStreamingJob`, returns the table id of the target table.
pub fn table_to_create(&self) -> Option<TableId> {
match &self.command {
Command::CreateStreamingJob {
table_fragments, ..
} => Some(table_fragments.table_id()),
_ => None,
}
}

/// Clean up actors in CNs if needed, used by drop, cancel and reschedule commands.
async fn clean_up(&self, actors: Vec<ActorId>) -> MetaResult<()> {
self.barrier_manager_context
Expand Down Expand Up @@ -992,14 +978,16 @@ impl CommandContext {
}

Command::CreateStreamingJob {
table_fragments,
dispatchers,
upstream_root_actors,
init_split_assignment,
definition: _,
info,
replace_table,
..
} => {
let CreateStreamingJobCommandInfo {
table_fragments,
dispatchers,
upstream_root_actors,
init_split_assignment,
..
} = info;
match &self.barrier_manager_context.metadata_manager {
MetadataManager::V1(mgr) => {
let mut dependent_table_actors =
Expand Down
20 changes: 12 additions & 8 deletions src/meta/src/barrier/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ use crate::barrier::Command;
use crate::manager::{ActiveStreamingWorkerNodes, ActorInfos, InflightFragmentInfo, WorkerId};
use crate::model::{ActorId, FragmentId};

#[derive(Debug, Clone)]
pub(crate) struct CommandNewFragmentInfo {
pub new_actors: HashMap<ActorId, WorkerId>,
pub table_ids: HashSet<TableId>,
pub is_injectable: bool,
}

#[derive(Debug, Clone)]
pub(crate) enum CommandFragmentChanges {
NewFragment {
new_actors: HashMap<ActorId, WorkerId>,
table_ids: HashSet<TableId>,
is_injectable: bool,
},
NewFragment(CommandNewFragmentInfo),
Reschedule {
new_actors: HashMap<ActorId, WorkerId>,
to_remove: HashSet<ActorId>,
Expand Down Expand Up @@ -149,11 +152,12 @@ impl InflightActorInfo {
let mut to_add = HashMap::new();
for (fragment_id, change) in fragment_changes {
match change {
CommandFragmentChanges::NewFragment {
CommandFragmentChanges::NewFragment(CommandNewFragmentInfo {
new_actors,
table_ids,
is_injectable,
} => {
..
}) => {
for (actor_id, node_id) in &new_actors {
assert!(to_add
.insert(*actor_id, (*node_id, is_injectable))
Expand Down Expand Up @@ -232,7 +236,7 @@ impl InflightActorInfo {
let mut all_to_remove = HashSet::new();
for (fragment_id, changes) in fragment_changes.fragment_changes {
match changes {
CommandFragmentChanges::NewFragment { .. } => {}
CommandFragmentChanges::NewFragment(_) => {}
CommandFragmentChanges::Reschedule { to_remove, .. } => {
let info = self
.fragment_infos
Expand Down
Loading

0 comments on commit 9335967

Please sign in to comment.