Skip to content

Commit

Permalink
feat(session_config): system wide session config by alter system set (r…
Browse files Browse the repository at this point in the history
  • Loading branch information
yuhao-su authored Apr 12, 2024
1 parent f31698d commit 1d25497
Show file tree
Hide file tree
Showing 37 changed files with 947 additions and 94 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

33 changes: 33 additions & 0 deletions e2e_test/ddl/alter_session_params.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
statement ok
set RW_STREAMING_ENABLE_DELTA_JOIN to true;

statement error session param query_mode cannot be altered system wide
alter system set query_mode to auto;

connection other1
query T
show RW_STREAMING_ENABLE_DELTA_JOIN;
----
false

statement ok
set RW_STREAMING_ENABLE_DELTA_JOIN to false;

statement ok
alter system set rw_streaming_enable_delta_join to true;

query T
show RW_STREAMING_ENABLE_DELTA_JOIN;
----
false

sleep 1s

connection other2
query T
show RW_STREAMING_ENABLE_DELTA_JOIN;
----
true

statement ok
alter system set RW_STREAMING_ENABLE_DELTA_JOIN to default;
12 changes: 0 additions & 12 deletions e2e_test/error_ui/simple/main.slt
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,3 @@ Caused by these errors (recent errors listed first):
1: Failed to get/set session config
2: Invalid value `maybe` for `rw_implicit_flush`
3: Invalid bool


statement error
set transaction_isolation to 'read committed';
----
db error: ERROR: Failed to run the query

Caused by these errors (recent errors listed first):
1: Failed to get/set session config
2: Invalid value `read committed` for `transaction_isolation`
3: Feature is not yet implemented: isolation level
Tracking issue: https://github.com/risingwavelabs/risingwave/issues/10736
24 changes: 24 additions & 0 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ message MetaSnapshot {
repeated catalog.Connection connections = 17;
repeated catalog.Subscription subscriptions = 19;
repeated user.UserInfo users = 8;
GetSessionParamsResponse session_params = 20;
// for streaming
repeated FragmentParallelUnitMapping parallel_unit_mappings = 9;
repeated common.WorkerNode nodes = 10;
Expand Down Expand Up @@ -432,6 +433,7 @@ message SubscribeResponse {
catalog.Schema schema = 5;
catalog.Function function = 6;
user.UserInfo user = 11;
SetSessionParamRequest session_param = 26;
// for streaming
FragmentParallelUnitMapping parallel_unit_mapping = 12;
common.WorkerNode node = 13;
Expand Down Expand Up @@ -596,6 +598,28 @@ service SystemParamsService {
rpc SetSystemParam(SetSystemParamRequest) returns (SetSystemParamResponse);
}

message GetSessionParamsRequest {}

message GetSessionParamsResponse {
string params = 1;
}

message SetSessionParamRequest {
string param = 1;
// None means set to default value.
optional string value = 2;
}

message SetSessionParamResponse {
string param = 1;
}

// Used for alter system wide default parameters
service SessionParamService {
rpc GetSessionParams(GetSessionParamsRequest) returns (GetSessionParamsResponse);
rpc SetSessionParam(SetSessionParamRequest) returns (SetSessionParamResponse);
}

message GetServingVnodeMappingsRequest {}

message GetServingVnodeMappingsResponse {
Expand Down
2 changes: 1 addition & 1 deletion src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ where
}
Info::HummockSnapshot(_) => true,
Info::MetaBackupManifestId(_) => true,
Info::SystemParams(_) => true,
Info::SystemParams(_) | Info::SessionParam(_) => true,
Info::ServingParallelUnitMappings(_) => true,
Info::Snapshot(_) | Info::HummockWriteLimits(_) => unreachable!(),
Info::HummockStats(_) => true,
Expand Down
1 change: 1 addition & 0 deletions src/common/proc_macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ pub fn derive_estimate_size(input: TokenStream) -> TokenStream {
/// `flags` options include
/// - `SETTER`: to manually write a `set_your_parameter_name` function, in which you should call `set_your_parameter_name_inner`.
/// - `REPORT`: to report the parameter through `ConfigReporter`
/// - `NO_ALTER_SYS`: disallow the parameter to be set by `alter system set`
#[proc_macro_derive(SessionConfig, attributes(parameter))]
#[proc_macro_error]
pub fn session_config(input: TokenStream) -> TokenStream {
Expand Down
90 changes: 67 additions & 23 deletions src/common/proc_macro/src/session_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream {
let mut get_match_branches = vec![];
let mut reset_match_branches = vec![];
let mut show_all_list = vec![];
let mut list_all_list = vec![];
let mut alias_to_entry_name_branches = vec![];
let mut entry_name_flags = vec![];

for field in fields {
let field_ident = field.ident.expect_or_abort("Field need to be named");
Expand Down Expand Up @@ -149,7 +151,7 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream {
&mut self,
val: &str,
reporter: &mut impl ConfigReporter
) -> SessionConfigResult<()> {
) -> SessionConfigResult<String> {
let val_t = #parse(val).map_err(|e| {
SessionConfigError::InvalidValue {
entry: #entry_name,
Expand All @@ -158,21 +160,20 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream {
}
})?;

self.#set_t_func_name(val_t, reporter)?;
Ok(())
self.#set_t_func_name(val_t, reporter).map(|val| val.to_string())
}

#[doc = #set_t_func_doc]
pub fn #gen_set_func_name(
&mut self,
val: #ty,
reporter: &mut impl ConfigReporter
) -> SessionConfigResult<()> {
) -> SessionConfigResult<#ty> {
#check_hook
#report_hook

self.#field_ident = val;
Ok(())
self.#field_ident = val.clone();
Ok(val)
}

});
Expand All @@ -181,10 +182,11 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream {
struct_impl_reset.push(quote! {

#[allow(clippy::useless_conversion)]
pub fn #reset_func_name(&mut self, reporter: &mut impl ConfigReporter) {
pub fn #reset_func_name(&mut self, reporter: &mut impl ConfigReporter) -> String {
let val = #default;
#report_hook
self.#field_ident = val.into();
self.#field_ident.to_string()
}
});

Expand Down Expand Up @@ -224,20 +226,42 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream {
#entry_name => Ok(self.#reset_func_name(reporter)),
});

if !flags.contains(&"NO_SHOW_ALL") {
show_all_list.push(quote! {
VariableInfo {
name: #entry_name.to_string(),
setting: self.#field_ident.to_string(),
description : #description.to_string(),
},
let var_info = quote! {
VariableInfo {
name: #entry_name.to_string(),
setting: self.#field_ident.to_string(),
description : #description.to_string(),
},
};
list_all_list.push(var_info.clone());

});
let no_show_all = flags.contains(&"NO_SHOW_ALL");
let no_show_all_flag: TokenStream = no_show_all.to_string().parse().unwrap();
if !no_show_all {
show_all_list.push(var_info);
}

let no_alter_sys_flag: TokenStream =
flags.contains(&"NO_ALTER_SYS").to_string().parse().unwrap();

entry_name_flags.push(
quote! {
(#entry_name, ParamFlags {no_show_all: #no_show_all_flag, no_alter_sys: #no_alter_sys_flag})
}
);
}

let struct_ident = input.ident;
quote! {
use std::collections::HashMap;
use std::sync::LazyLock;
static PARAM_NAME_FLAGS: LazyLock<HashMap<&'static str, ParamFlags>> = LazyLock::new(|| HashMap::from([#(#entry_name_flags, )*]));

struct ParamFlags {
no_show_all: bool,
no_alter_sys: bool,
}

impl Default for #struct_ident {
#[allow(clippy::useless_conversion)]
fn default() -> Self {
Expand All @@ -252,11 +276,11 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream {
Default::default()
}

fn alias_to_entry_name(key_name: &str) -> &str {
pub fn alias_to_entry_name(key_name: &str) -> String {
match key_name {
#(#alias_to_entry_name_branches)*
_ => key_name,
}
}.to_ascii_lowercase()
}

#(#struct_impl_get)*
Expand All @@ -266,9 +290,9 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream {
#(#struct_impl_reset)*

/// Set a parameter given it's name and value string.
pub fn set(&mut self, key_name: &str, value: String, reporter: &mut impl ConfigReporter) -> SessionConfigResult<()> {
pub fn set(&mut self, key_name: &str, value: String, reporter: &mut impl ConfigReporter) -> SessionConfigResult<String> {
let key_name = Self::alias_to_entry_name(key_name);
match key_name.to_ascii_lowercase().as_ref() {
match key_name.as_ref() {
#(#set_match_branches)*
_ => Err(SessionConfigError::UnrecognizedEntry(key_name.to_string())),
}
Expand All @@ -277,27 +301,47 @@ pub(crate) fn derive_config(input: DeriveInput) -> TokenStream {
/// Get a parameter by it's name.
pub fn get(&self, key_name: &str) -> SessionConfigResult<String> {
let key_name = Self::alias_to_entry_name(key_name);
match key_name.to_ascii_lowercase().as_ref() {
match key_name.as_ref() {
#(#get_match_branches)*
_ => Err(SessionConfigError::UnrecognizedEntry(key_name.to_string())),
}
}

/// Reset a parameter by it's name.
pub fn reset(&mut self, key_name: &str, reporter: &mut impl ConfigReporter) -> SessionConfigResult<()> {
pub fn reset(&mut self, key_name: &str, reporter: &mut impl ConfigReporter) -> SessionConfigResult<String> {
let key_name = Self::alias_to_entry_name(key_name);
match key_name.to_ascii_lowercase().as_ref() {
match key_name.as_ref() {
#(#reset_match_branches)*
_ => Err(SessionConfigError::UnrecognizedEntry(key_name.to_string())),
}
}

/// Show all parameters.
/// Show all parameters except those specified `NO_SHOW_ALL`.
pub fn show_all(&self) -> Vec<VariableInfo> {
vec![
#(#show_all_list)*
]
}

/// List all parameters
pub fn list_all(&self) -> Vec<VariableInfo> {
vec![
#(#list_all_list)*
]
}

/// Check if `SessionConfig` has a parameter.
pub fn contains_param(key_name: &str) -> bool {
let key_name = Self::alias_to_entry_name(key_name);
PARAM_NAME_FLAGS.contains_key(key_name.as_str())
}

/// Check if `SessionConfig` has a parameter.
pub fn check_no_alter_sys(key_name: &str) -> SessionConfigResult<bool> {
let key_name = Self::alias_to_entry_name(key_name);
let flags = PARAM_NAME_FLAGS.get(key_name.as_str()).ok_or_else(|| SessionConfigError::UnrecognizedEntry(key_name.to_string()))?;
Ok(flags.no_alter_sys)
}
}
}
}
2 changes: 1 addition & 1 deletion src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ pub const RW_VERSION: &str = env!("CARGO_PKG_VERSION");
/// Placeholder for unknown git sha.
pub const UNKNOWN_GIT_SHA: &str = "unknown";

// The single source of truth of the pg parameters, Used in ConfigMap and current_cluster_version.
// The single source of truth of the pg parameters, Used in SessionConfig and current_cluster_version.
// The version of PostgreSQL that Risingwave claims to be.
pub const PG_VERSION: &str = "13.14.0";
/// The version of PostgreSQL that Risingwave claims to be.
Expand Down
Loading

0 comments on commit 1d25497

Please sign in to comment.