Skip to content

Commit

Permalink
feat(config): Add default values for config; Add wal/max_file_size(1Gib)
Browse files Browse the repository at this point in the history
  • Loading branch information
zipper-meng authored and roseboy-liu committed Jan 20, 2023
1 parent a43a27d commit 6e5f07b
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 37 deletions.
1 change: 1 addition & 0 deletions config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ strict_write = false
[wal]
enabled = true
path = 'data/wal'
max_file_size = 1073741824 # 1024 * 1024 * 1024
sync = false

[cache]
Expand Down
1 change: 1 addition & 0 deletions config/config_31001.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ strict_write = false
[wal]
enabled = true
path = '/tmp/cnosdb/1001/wal'
max_file_size = 1073741824 # 1024 * 1024 * 1024
sync = false

[cache]
Expand Down
1 change: 1 addition & 0 deletions config/config_32001.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ strict_write = false
[wal]
enabled = true
path = '/tmp/cnosdb/2001/wal'
max_file_size = 1073741824 # 1024 * 1024 * 1024
sync = false

[cache]
Expand Down
215 changes: 184 additions & 31 deletions config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,115 @@ impl Config {
}
}

pub fn get_config(path: &str) -> Config {
let mut file = match File::open(path) {
Ok(file) => file,
Err(err) => panic!("Failed to open configurtion file '{}': {}", path, err),
};
let mut content = String::new();
if let Err(err) = file.read_to_string(&mut content) {
panic!("Failed to read configurtion file '{}': {}", path, err);
}
let config: Config = match toml::from_str(&content) {
Ok(config) => config,
Err(err) => panic!("Failed to parse configurtion file '{}': {}", path, err),
};
info!("Start with configuration: {:#?}", config);
config
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryConfig {
#[serde(default = "QueryConfig::default_max_server_connections")]
pub max_server_connections: u32,
#[serde(default = "QueryConfig::default_query_sql_limit")]
pub query_sql_limit: u64,
#[serde(default = "QueryConfig::default_write_sql_limit")]
pub write_sql_limit: u64,
#[serde(default = "QueryConfig::default_auth_enabled")]
pub auth_enabled: bool,
}

impl QueryConfig {
fn default_max_server_connections() -> u32 {
10240
}

fn default_query_sql_limit() -> u64 {
16 * 1024 * 1024
}

fn default_write_sql_limit() -> u64 {
160 * 1024 * 1024
}

fn default_auth_enabled() -> bool {
false
}

pub fn override_by_env(&mut self) {
if let Ok(size) = std::env::var("MAX_SERVER_CONNECTIONS") {
self.max_server_connections = size.parse::<u32>().unwrap();
}
if let Ok(size) = std::env::var("QUERY_SQL_LIMIT") {
self.query_sql_limit = size.parse::<u64>().unwrap();
}
if let Ok(size) = std::env::var("WRITE_SQL_LIMIT") {
self.write_sql_limit = size.parse::<u64>().unwrap();
}
if let Ok(val) = std::env::var("AUTH_ENABLED") {
self.auth_enabled = val.parse::<bool>().unwrap();
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StorageConfig {
#[serde(default = "StorageConfig::default_path")]
pub path: String,
#[serde(default = "StorageConfig::default_max_summary_size")]
pub max_summary_size: u64,
#[serde(default = "StorageConfig::default_max_level")]
pub max_level: u32,
#[serde(default = "StorageConfig::default_base_file_size")]
pub base_file_size: u64,
#[serde(default = "StorageConfig::default_compact_trigger")]
pub compact_trigger: u32,
#[serde(default = "StorageConfig::default_max_compact_size")]
pub max_compact_size: u64,
#[serde(default = "StorageConfig::default_strict_write")]
pub strict_write: bool,
}

impl StorageConfig {
fn default_path() -> String {
"data/db".to_string()
}

fn default_max_summary_size() -> u64 {
128 * 1024 * 1024
}

fn default_max_level() -> u32 {
4
}

fn default_base_file_size() -> u64 {
16 * 1024 * 1024
}

fn default_compact_trigger() -> u32 {
4
}

fn default_max_compact_size() -> u64 {
2 * 1024 * 1024 * 1024
}

fn default_strict_write() -> bool {
false
}

pub fn override_by_env(&mut self) {
if let Ok(path) = std::env::var("CNOSDB_APPLICATION_PATH") {
self.path = path;
Expand Down Expand Up @@ -73,12 +162,33 @@ impl StorageConfig {

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalConfig {
#[serde(default = "WalConfig::default_enabled")]
pub enabled: bool,
#[serde(default = "WalConfig::default_path")]
pub path: String,
#[serde(default = "WalConfig::default_max_file_size")]
pub max_file_size: u64,
#[serde(default = "WalConfig::default_sync")]
pub sync: bool,
}

impl WalConfig {
fn default_enabled() -> bool {
true
}

fn default_path() -> String {
"data/wal".to_string()
}

fn default_max_file_size() -> u64 {
1024 * 1024 * 1024
}

fn default_sync() -> bool {
false
}

pub fn override_by_env(&mut self) {
if let Ok(enabled) = std::env::var("CNOSDB_WAL_ENABLED") {
self.enabled = enabled.as_str() == "true";
Expand All @@ -94,11 +204,21 @@ impl WalConfig {

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CacheConfig {
#[serde(default = "CacheConfig::default_max_buffer_size")]
pub max_buffer_size: u64,
#[serde(default = "CacheConfig::default_max_immutable_number")]
pub max_immutable_number: u16,
}

impl CacheConfig {
fn default_max_buffer_size() -> u64 {
128 * 1024 * 1024
}

fn default_max_immutable_number() -> u16 {
4
}

pub fn override_by_env(&mut self) {
if let Ok(size) = std::env::var("CNOSDB_CACHE_MAX_BUFFER_SIZE") {
self.max_buffer_size = size.parse::<u64>().unwrap();
Expand All @@ -109,30 +229,23 @@ impl CacheConfig {
}
}

impl QueryConfig {
pub fn override_by_env(&mut self) {
if let Ok(size) = std::env::var("MAX_SERVER_CONNECTIONS") {
self.max_server_connections = size.parse::<u32>().unwrap();
}
if let Ok(size) = std::env::var("QUERY_SQL_LIMIT") {
self.query_sql_limit = size.parse::<u64>().unwrap();
}
if let Ok(size) = std::env::var("WRITE_SQL_LIMIT") {
self.write_sql_limit = size.parse::<u64>().unwrap();
}
if let Ok(val) = std::env::var("AUTH_ENABLED") {
self.auth_enabled = val.parse::<bool>().unwrap();
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogConfig {
#[serde(default = "LogConfig::default_level")]
pub level: String,
#[serde(default = "LogConfig::default_path")]
pub path: String,
}

impl LogConfig {
fn default_level() -> String {
"info".to_string()
}

fn default_path() -> String {
"data/log".to_string()
}

pub fn override_by_env(&mut self) {
if let Ok(level) = std::env::var("CNOSDB_LOG_LEVEL") {
self.level = level;
Expand All @@ -150,40 +263,70 @@ pub struct SecurityConfig {

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TLSConfig {
#[serde(default = "TLSConfig::default_certificate")]
pub certificate: String,
#[serde(default = "TLSConfig::default_private_key")]
pub private_key: String,
}

pub fn get_config(path: &str) -> Config {
let mut file = match File::open(path) {
Ok(file) => file,
Err(err) => panic!("Failed to open configurtion file '{}': {}", path, err),
};
let mut content = String::new();
if let Err(err) = file.read_to_string(&mut content) {
panic!("Failed to read configurtion file '{}': {}", path, err);
impl TLSConfig {
fn default_certificate() -> String {
"./config/tls/server.crt".to_string()
}

fn default_private_key() -> String {
"./config/tls/server.key".to_string()
}
let config: Config = match toml::from_str(&content) {
Ok(config) => config,
Err(err) => panic!("Failed to parse configurtion file '{}': {}", path, err),
};
info!("Start with configuration: {:#?}", config);
config
}

#[derive(Debug, Clone, Serialize, Default, Deserialize)]
pub struct ClusterConfig {
#[serde(default = "ClusterConfig::default_node_id")]
pub node_id: u64,
#[serde(default = "ClusterConfig::default_name")]
pub name: String,
#[serde(default = "ClusterConfig::default_meta")]
pub meta: String,

#[serde(default = "ClusterConfig::default_http_server")]
pub http_server: String,
#[serde(default = "ClusterConfig::default_grpc_server")]
pub grpc_server: String,
#[serde(default = "ClusterConfig::default_tcp_server")]
pub tcp_server: String,
#[serde(default = "ClusterConfig::default_flight_rpc_server")]
pub flight_rpc_server: String,
}

impl ClusterConfig {
fn default_node_id() -> u64 {
100
}

fn default_name() -> String {
"cluster_xxx".to_string()
}

fn default_meta() -> String {
"127.0.0.1:21001".to_string()
}

fn default_http_server() -> String {
"127.0.0.1:31007".to_string()
}

fn default_grpc_server() -> String {
"127.0.0.1:31008".to_string()
}

fn default_tcp_server() -> String {
"127.0.0.1:31009".to_string()
}

fn default_flight_rpc_server() -> String {
"127.0.0.1:31006".to_string()
}

pub fn override_by_env(&mut self) {
if let Ok(name) = std::env::var("CNOSDB_CLUSTER_NAME") {
self.name = name;
Expand Down Expand Up @@ -215,11 +358,21 @@ impl ClusterConfig {

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HintedOffConfig {
#[serde(default = "HintedOffConfig::default_enable")]
pub enable: bool,
#[serde(default = "HintedOffConfig::default_path")]
pub path: String,
}

impl HintedOffConfig {
fn default_enable() -> bool {
true
}

fn default_path() -> String {
"/tmp/cnosdb/hh".to_string()
}

pub fn override_by_env(&mut self) {
if let Ok(enable) = std::env::var("CNOSDB_HINTEDOFF_ENABLE") {
self.enable = enable.parse::<bool>().unwrap();
Expand Down
2 changes: 2 additions & 0 deletions tskv/src/kv_option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ impl From<&Config> for QueryOptions {
pub struct WalOptions {
pub enabled: bool,
pub path: PathBuf,
pub max_file_size: u64,
pub sync: bool,
}

Expand All @@ -127,6 +128,7 @@ impl From<&Config> for WalOptions {
Self {
enabled: config.wal.enabled,
path: PathBuf::from(config.wal.path.clone()),
max_file_size: config.wal.max_file_size,
sync: config.wal.sync,
}
}
Expand Down
9 changes: 3 additions & 6 deletions tskv/src/wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ const FOOTER_MAGIC_NUMBER: u32 = u32::from_be_bytes([b'w', b'a', b'l', b'o']);
const FOOTER_MAGIC_NUMBER_LEN: usize = 4;

const SEGMENT_MAGIC: [u8; 4] = [0x57, 0x47, 0x4c, 0x00];
const SEGMENT_SIZE: u64 = 1073741824; // 1 GiB

const BLOCK_HEADER_SIZE: usize = 25;

Expand Down Expand Up @@ -370,7 +369,7 @@ impl WalManager {
id: TseriesFamilyId,
tenant: Arc<Vec<u8>>,
) -> Result<(u64, usize)> {
self.roll_wal_file(SEGMENT_SIZE).await?;
self.roll_wal_file(self.config.max_file_size).await?;
self.current_file.write(typ, data, id, tenant).await
}

Expand Down Expand Up @@ -686,6 +685,8 @@ mod test {
let _ = std::fs::remove_dir_all(dir.clone()); // Ignore errors
let mut global_config = get_config("../config/config.toml");
global_config.wal.path = dir.clone();
// Argument max_file_size is so small that there must a new wal file created.
global_config.wal.max_file_size = 1;
global_config.wal.sync = false;
let options = Options::from(&global_config);
let wal_config = WalOptions::from(&global_config);
Expand All @@ -710,10 +711,6 @@ mod test {
mgr.write(WalEntryType::Write, data.clone(), 0, tenant.clone())
.await
.unwrap();
if seq < 10 {
// Argument max_file_size is so small that there must a new wal file created.
mgr.roll_wal_file(1).await.unwrap();
}
}
mgr.close().await.unwrap();

Expand Down

0 comments on commit 6e5f07b

Please sign in to comment.