Skip to content

Commit

Permalink
feature:add config API for rocksdb(OpenAtomFoundation#967)
Browse files Browse the repository at this point in the history
* Add area_block_size and max_write_buffer_number configs

* Config related commands can modify mutable options for storage engine
  • Loading branch information
LIBA-S authored Oct 15, 2020
1 parent 2929eaf commit 5a46e08
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 2 deletions.
10 changes: 10 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ log-path : ./log/
db-path : ./db/
# Pika write-buffer-size
write-buffer-size : 268435456
# size of one block in arena memory allocation.
# If <= 0, a proper value is automatically calculated
# (usually 1/8 of writer-buffer-size, rounded up to a multiple of 4KB)
arena-block-size :
# Pika timeout
timeout : 60
# Requirepass
Expand Down Expand Up @@ -115,6 +119,12 @@ small-compaction-threshold : 5000
# the limit, a flush will be triggered in the next DB to which the next write
# is issued.
max-write-buffer-size : 10737418240
# The maximum number of write buffers that are built up in memory for one ColumnFamily in DB.
# The default and the minimum number is 2, so that when 1 write buffer
# is being flushed to storage, new writes can continue to the other write buffer.
# If max-write-buffer-number > 3, writing will be slowed down
# if we are writing to the last write buffer allowed.
max-write-buffer-number : 2
# Limit some command response size, like Scan, Keys*
max-client-response-size : 1073741824
# Compression type supported [snappy, zlib, lz4, zstd]
Expand Down
29 changes: 29 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ class PikaConf : public slash::BaseConf {
std::string compact_cron() { RWLock l(&rwlock_, false); return compact_cron_; }
std::string compact_interval() { RWLock l(&rwlock_, false); return compact_interval_; }
int64_t write_buffer_size() { RWLock l(&rwlock_, false); return write_buffer_size_; }
int64_t arena_block_size() { RWLock l(&rwlock_, false); return arena_block_size_; }
int64_t max_write_buffer_size() { RWLock l(&rwlock_, false); return max_write_buffer_size_; }
int max_write_buffer_number() { RWLock l(&rwlock_, false); return max_write_buffer_num_; }
int64_t max_client_response_size() { RWLock L(&rwlock_, false); return max_client_response_size_;}
int timeout() { RWLock l(&rwlock_, false); return timeout_; }
std::string server_id() { RWLock l(&rwlock_, false); return server_id_; }
Expand Down Expand Up @@ -242,6 +244,31 @@ class PikaConf : public slash::BaseConf {
TryPushDiffCommands("max-conn-rbuf-size", std::to_string(value));
max_conn_rbuf_size_.store(value);
}
void SetMaxCacheFiles(const int& value) {
RWLock l(&rwlock_, true);
TryPushDiffCommands("max-cache-files", std::to_string(value));
max_cache_files_ = value;
}
void SetMaxBackgroudCompactions(const int& value) {
RWLock l(&rwlock_, true);
TryPushDiffCommands("max-background-compactions", std::to_string(value));
max_background_compactions_ = value;
}
void SetWriteBufferSize(const int& value) {
RWLock l(&rwlock_, true);
TryPushDiffCommands("write-buffer-size", std::to_string(value));
write_buffer_size_ = value;
}
void SetMaxWriteBufferNumber(const int& value) {
RWLock l(&rwlock_, true);
TryPushDiffCommands("max-write-buffer-number", std::to_string(value));
max_write_buffer_num_ = value;
}
void SetArenaBlockSize(const int& value) {
RWLock l(&rwlock_, true);
TryPushDiffCommands("arena-block-size", std::to_string(value));
arena_block_size_ = value;
}

Status TablePartitionsSanityCheck(const std::string& table_name,
const std::set<uint32_t>& partition_ids,
Expand Down Expand Up @@ -276,7 +303,9 @@ class PikaConf : public slash::BaseConf {
std::string compact_cron_;
std::string compact_interval_;
int64_t write_buffer_size_;
int64_t arena_block_size_;
int64_t max_write_buffer_size_;
int max_write_buffer_num_;
int64_t max_client_response_size_;
bool daemonize_;
int timeout_;
Expand Down
7 changes: 7 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,12 @@ class PikaServer {
// info debug use
void ServerStatus(std::string* info);

/*
* BlackwidowOptions used
*/
blackwidow::Status RewriteBlackwidowOptions(const blackwidow::OptionType& option_type,
const std::unordered_map<std::string, std::string>& options);

friend class Cmd;
friend class InfoCmd;
friend class PkClusterAddSlotsCmd;
Expand All @@ -322,6 +328,7 @@ class PikaServer {
int port_;
time_t start_time_s_;

pthread_rwlock_t bw_options_rw_;
blackwidow::BlackwidowOptions bw_options_;
void InitBlackwidowOptions();

Expand Down
87 changes: 86 additions & 1 deletion src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1245,6 +1245,18 @@ void ConfigCmd::ConfigGet(std::string &ret) {
EncodeInt64(&config_body, g_pika_conf->write_buffer_size());
}

if (slash::stringmatch(pattern.data(), "arena-block-size", 1)) {
elements += 2;
EncodeString(&config_body, "arena-block-size");
EncodeInt64(&config_body, g_pika_conf->arena_block_size());
}

if (slash::stringmatch(pattern.data(), "max-write-buffer-number", 1)) {
elements += 2;
EncodeString(&config_body, "max-write-buffer-number");
EncodeInt32(&config_body, g_pika_conf->max_write_buffer_number());
}

if (slash::stringmatch(pattern.data(), "timeout", 1)) {
elements += 2;
EncodeString(&config_body, "timeout");
Expand Down Expand Up @@ -1561,7 +1573,7 @@ void ConfigCmd::ConfigGet(std::string &ret) {
void ConfigCmd::ConfigSet(std::string& ret) {
std::string set_item = config_args_v_[1];
if (set_item == "*") {
ret = "*23\r\n";
ret = "*28\r\n";
EncodeString(&ret, "timeout");
EncodeString(&ret, "requirepass");
EncodeString(&ret, "masterauth");
Expand All @@ -1585,6 +1597,14 @@ void ConfigCmd::ConfigSet(std::string& ret) {
EncodeString(&ret, "compact-interval");
EncodeString(&ret, "slave-priority");
EncodeString(&ret, "sync-window-size");
// Options for storage engine
// MutableDBOptions
EncodeString(&ret, "max-cache-files");
EncodeString(&ret, "max-background-compactions");
// MutableColumnFamilyOptions
EncodeString(&ret, "write-buffer-size");
EncodeString(&ret, "max-write-buffer-number");
EncodeString(&ret, "arena-block-size");
return;
}
long int ival;
Expand Down Expand Up @@ -1798,6 +1818,71 @@ void ConfigCmd::ConfigSet(std::string& ret) {
}
g_pika_conf->SetSyncWindowSize(ival);
ret = "+OK\r\n";
} else if (set_item == "max-cache-files") {
if (!slash::string2l(value.data(), value.size(), &ival)) {
ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-cache-files'\r\n";
return;
}
std::unordered_map<std::string, std::string> options_map{{"max_open_files", value}};
blackwidow::Status s = g_pika_server->RewriteBlackwidowOptions(blackwidow::OptionType::kDB, options_map);
if (!s.ok()) {
ret = "-ERR Set max-cache-files wrong: " + s.ToString() + "\r\n";
return;
}
g_pika_conf->SetMaxCacheFiles(ival);
ret = "+OK\r\n";
} else if (set_item == "max-background-compactions") {
if (!slash::string2l(value.data(), value.size(), &ival)) {
ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-background-compactions'\r\n";
return;
}
std::unordered_map<std::string, std::string> options_map{{"max_background_compactions", value}};
blackwidow::Status s = g_pika_server->RewriteBlackwidowOptions(blackwidow::OptionType::kDB, options_map);
if (!s.ok()) {
ret = "-ERR Set max-background-compactions wrong: " + s.ToString() + "\r\n";
return;
}
g_pika_conf->SetMaxBackgroudCompactions(ival);
ret = "+OK\r\n";
} else if (set_item == "write-buffer-size") {
if (!slash::string2l(value.data(), value.size(), &ival)) {
ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'write-buffer-size'\r\n";
return;
}
std::unordered_map<std::string, std::string> options_map{{"write_buffer_size", value}};
blackwidow::Status s = g_pika_server->RewriteBlackwidowOptions(blackwidow::OptionType::kColumnFamily, options_map);
if (!s.ok()) {
ret = "-ERR Set write-buffer-size wrong: " + s.ToString() + "\r\n";
return;
}
g_pika_conf->SetWriteBufferSize(ival);
ret = "+OK\r\n";
} else if (set_item == "max-write-buffer-number") {
if (!slash::string2l(value.data(), value.size(), &ival)) {
ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'max-write-buffer-number'\r\n";
return;
}
std::unordered_map<std::string, std::string> options_map{{"max_write_buffer_number", value}};
blackwidow::Status s = g_pika_server->RewriteBlackwidowOptions(blackwidow::OptionType::kColumnFamily, options_map);
if (!s.ok()) {
ret = "-ERR Set max-write-buffer-number wrong: " + s.ToString() + "\r\n";
return;
}
g_pika_conf->SetMaxWriteBufferNumber(ival);
ret = "+OK\r\n";
} else if (set_item == "arena-block-size") {
if (!slash::string2l(value.data(), value.size(), &ival)) {
ret = "-ERR Invalid argument \'" + value + "\' for CONFIG SET 'arena-block-size'\r\n";
return;
}
std::unordered_map<std::string, std::string> options_map{{"arena_block_size", value}};
blackwidow::Status s = g_pika_server->RewriteBlackwidowOptions(blackwidow::OptionType::kColumnFamily, options_map);
if (!s.ok()) {
ret = "-ERR Set arena-block-size wrong: " + s.ToString() + "\r\n";
return;
}
g_pika_conf->SetArenaBlockSize(ival);
ret = "+OK\r\n";
} else {
ret = "-ERR Unsupported CONFIG parameter: " + set_item + "\r\n";
}
Expand Down
19 changes: 19 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -363,12 +363,25 @@ int PikaConf::Load()
write_buffer_size_ = 268435456; // 256Mb
}

// arena_block_size
GetConfInt64("arena-block-size", &arena_block_size_);
if (arena_block_size_ <= 0) {
arena_block_size_ = write_buffer_size_ >> 3; // 1/8 of the write_buffer_size_
}

// max_write_buffer_size
GetConfInt64("max-write-buffer-size", &max_write_buffer_size_);
if (max_write_buffer_size_ <= 0) {
max_write_buffer_size_ = 10737418240; // 10Gb
}

// max_write_buffer_num
max_write_buffer_num_ = 2;
GetConfInt("max-write-buffer-num", &max_write_buffer_num_);
if (max_write_buffer_num_ <= 0) {
max_write_buffer_num_ = 2; // 1 for immutable memtable, 1 for mutable memtable
}

// max_client_response_size
GetConfInt64("max-client-response-size", &max_client_response_size_);
if (max_client_response_size_ <= 0) {
Expand Down Expand Up @@ -545,6 +558,12 @@ int PikaConf::ConfigRewrite() {
SetConfInt("sync-window-size", sync_window_size_.load());
SetConfInt("consensus-level", consensus_level_.load());
SetConfInt("replication-num", replication_num_.load());
// options for storage engine
SetConfInt("max-cache-files", max_cache_files_);
SetConfInt("max-background-compactions", max_background_compactions_);
SetConfInt("max-write-buffer-number", max_write_buffer_num_);
SetConfInt64("write-buffer-size", write_buffer_size_);
SetConfInt64("arena-block-size", arena_block_size_);
// slaveof config item is special
SetConfStr("slaveof", slaveof_);

Expand Down
29 changes: 29 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ PikaServer::PikaServer() :
LOG(FATAL) << "ServerInit iotcl error";
}

pthread_rwlockattr_t bw_options_rw_attr;
pthread_rwlockattr_init(&bw_options_rw_attr);
pthread_rwlockattr_setkind_np(&bw_options_rw_attr,
PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
pthread_rwlock_init(&bw_options_rw_, &bw_options_rw_attr);

InitBlackwidowOptions();

pthread_rwlockattr_t tables_rw_attr;
Expand Down Expand Up @@ -408,6 +414,7 @@ void PikaServer::SetDispatchQueueLimit(int queue_limit) {
}

blackwidow::BlackwidowOptions PikaServer::bw_options() {
slash::RWLock rwl(&bw_options_rw_, false);
return bw_options_;
}

Expand Down Expand Up @@ -1587,6 +1594,7 @@ void PikaServer::AutoKeepAliveRSync() {
}

void PikaServer::InitBlackwidowOptions() {
slash::RWLock rwl(&bw_options_rw_, true);

// For rocksdb::Options
bw_options_.options.create_if_missing = true;
Expand All @@ -1596,8 +1604,12 @@ void PikaServer::InitBlackwidowOptions() {

bw_options_.options.write_buffer_size =
g_pika_conf->write_buffer_size();
bw_options_.options.arena_block_size =
g_pika_conf->arena_block_size();
bw_options_.options.write_buffer_manager.reset(
new rocksdb::WriteBufferManager(g_pika_conf->max_write_buffer_size()));
bw_options_.options.max_write_buffer_number =
g_pika_conf->max_write_buffer_number();
bw_options_.options.target_file_size_base =
g_pika_conf->target_file_size_base();
bw_options_.options.max_background_flushes =
Expand Down Expand Up @@ -1651,6 +1663,23 @@ void PikaServer::InitBlackwidowOptions() {
g_pika_conf->small_compaction_threshold();
}

blackwidow::Status PikaServer::RewriteBlackwidowOptions(const blackwidow::OptionType& option_type,
const std::unordered_map<std::string, std::string>& options_map) {
blackwidow::Status s;
for (const auto& table_item : tables_) {
slash::RWLock partition_rwl(&table_item.second->partitions_rw_, true);
for (const auto& partition_item: table_item.second->partitions_) {
partition_item.second->DbRWLockWriter();
s = partition_item.second->db()->SetOptions(option_type, blackwidow::ALL_DB, options_map);
partition_item.second->DbRWUnLock();
if (!s.ok()) return s;
}
}
slash::RWLock rwl(&bw_options_rw_, true);
s = bw_options_.ResetOptions(option_type, options_map);
return s;
}

void PikaServer::ServerStatus(std::string* info) {
std::stringstream tmp_stream;
size_t q_size = ClientProcessorThreadPoolCurQueueSize();
Expand Down

0 comments on commit 5a46e08

Please sign in to comment.