Skip to content

Commit

Permalink
Add max-conn-rbuf-size configuration (OpenAtomFoundation#822)
Browse files Browse the repository at this point in the history
this configuration defines the maximum size of connection read buffer
update pink
  • Loading branch information
whoiami authored Dec 10, 2019
1 parent 74ffb14 commit 9a2676e
Show file tree
Hide file tree
Showing 13 changed files with 53 additions and 14 deletions.
5 changes: 5 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ slave-priority : 100
server-id : 1
# the size of flow control window while sync binlog between master and slave.Default is 9000 and the maximum is 90000.
sync-window-size : 9000
# max value of connection read buffer size: configurable value 67108864(64MB) or 268435456(256MB) or 536870912(512MB)
# default value is 268435456(256MB)
# NOTICE: master and slave should share exactly the same value
max-conn-rbuf-size : 268435456


###################
## Critical Settings
Expand Down
3 changes: 2 additions & 1 deletion include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ class PikaClientConn: public pink::RedisConn {
PikaClientConn(int fd, std::string ip_port,
pink::Thread *server_thread,
pink::PinkEpoll* pink_epoll,
const pink::HandleType& handle_type);
const pink::HandleType& handle_type,
int max_conn_rubf_size);
virtual ~PikaClientConn() {}

void AsynProcessRedisCmds(const std::vector<pink::RedisCmdArgsType>& argvs, std::string* response) override;
Expand Down
6 changes: 6 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class PikaConf : public slash::BaseConf {
int slowlog_max_len() { RWLock L(&rwlock_, false); return slowlog_max_len_; }
std::string network_interface() { RWLock l(&rwlock_, false); return network_interface_; }
int sync_window_size() { return sync_window_size_.load(); }
int max_conn_rbuf_size() { return max_conn_rbuf_size_.load(); }

// Immutable config items, we don't use lock.
bool daemonize() { return daemonize_; }
Expand Down Expand Up @@ -234,6 +235,10 @@ class PikaConf : public slash::BaseConf {
TryPushDiffCommands("sync-window-size", std::to_string(value));
sync_window_size_.store(value);
}
void SetMaxConnRbufSize(const int& value) {
TryPushDiffCommands("max-conn-rbuf-size", std::to_string(value));
max_conn_rbuf_size_.store(value);
}

Status TablePartitionsSanityCheck(const std::string& table_name,
const std::set<uint32_t>& partition_ids,
Expand Down Expand Up @@ -305,6 +310,7 @@ class PikaConf : public slash::BaseConf {
bool optimize_filters_for_hits_;
bool level_compaction_dynamic_level_bytes_;
std::atomic<int> sync_window_size_;
std::atomic<int> max_conn_rbuf_size_;

std::string network_interface_;

Expand Down
4 changes: 3 additions & 1 deletion include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
#define PIKA_REPL_SERVER_TP_SIZE 3
#define PIKA_META_SYNC_MAX_WAIT_TIME 10
#define PIKA_SCAN_STEP_LENGTH 1000
#define PIKA_PB_MAX_MESSAGE (1 << 28) // 256MB
#define PIKA_MAX_CONN_RBUF (1 << 28) // 256MB
#define PIKA_MAX_CONN_RBUF_LB (1 << 26) // 64MB
#define PIKA_MAX_CONN_RBUF_HB (1 << 29) // 512MB

class PikaServer;

Expand Down
13 changes: 9 additions & 4 deletions include/pika_dispatch_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
class PikaDispatchThread {
public:
PikaDispatchThread(std::set<std::string> &ips, int port, int work_num,
int cron_interval, int queue_limit);
int cron_interval, int queue_limit, int max_conn_rbuf_size);
~PikaDispatchThread();
int StartThread();

Expand All @@ -27,14 +27,19 @@ class PikaDispatchThread {
private:
class ClientConnFactory : public pink::ConnFactory {
public:
virtual std::shared_ptr<pink::PinkConn> NewPinkConn(
explicit ClientConnFactory(int max_conn_rbuf_size)
: max_conn_rbuf_size_(max_conn_rbuf_size) {
}
virtual std::shared_ptr<pink::PinkConn> NewPinkConn(
int connfd,
const std::string &ip_port,
pink::Thread* server_thread,
void* worker_specific_data,
pink::PinkEpoll* pink_epoll) const {
return std::make_shared<PikaClientConn>(connfd, ip_port, server_thread, pink_epoll, pink::HandleType::kAsynchronous);
}
return std::make_shared<PikaClientConn>(connfd, ip_port, server_thread, pink_epoll, pink::HandleType::kAsynchronous, max_conn_rbuf_size_);
}
private:
int max_conn_rbuf_size_;
};

class Handles : public pink::ServerHandle {
Expand Down
6 changes: 6 additions & 0 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1521,6 +1521,12 @@ void ConfigCmd::ConfigGet(std::string &ret) {
EncodeInt32(&config_body, g_pika_conf->sync_window_size());
}

if (slash::stringmatch(pattern.data(), "max-conn-rbuf-size", 1)) {
elements += 2;
EncodeString(&config_body, "max-conn-rbuf-size");
EncodeInt32(&config_body, g_pika_conf->max_conn_rbuf_size());
}

std::stringstream resp;
resp << "*" << std::to_string(elements) << "\r\n" << config_body;
ret = resp.str();
Expand Down
5 changes: 3 additions & 2 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ extern PikaCmdTableManager* g_pika_cmd_table_manager;
PikaClientConn::PikaClientConn(int fd, std::string ip_port,
pink::Thread* thread,
pink::PinkEpoll* pink_epoll,
const pink::HandleType& handle_type)
: RedisConn(fd, ip_port, thread, pink_epoll, handle_type),
const pink::HandleType& handle_type,
int max_conn_rbuf_size)
: RedisConn(fd, ip_port, thread, pink_epoll, handle_type, max_conn_rbuf_size),
server_thread_(reinterpret_cast<pink::ServerThread*>(thread)),
current_table_(g_pika_conf->default_table()),
is_pubsub_(false) {
Expand Down
11 changes: 11 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,17 @@ int PikaConf::Load()
} else {
sync_window_size_.store(tmp_sync_window_size);
}

// max conn rbuf size
int tmp_max_conn_rbuf_size = PIKA_MAX_CONN_RBUF;
GetConfInt("max-conn-rbuf-size", &tmp_max_conn_rbuf_size);
if (tmp_max_conn_rbuf_size == PIKA_MAX_CONN_RBUF_LB
|| tmp_max_conn_rbuf_size == PIKA_MAX_CONN_RBUF_HB) {
max_conn_rbuf_size_.store(tmp_max_conn_rbuf_size);
} else {
max_conn_rbuf_size_.store(PIKA_MAX_CONN_RBUF);
}

return ret;
}

Expand Down
6 changes: 4 additions & 2 deletions src/pika_dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ extern PikaConf* g_pika_conf;
extern PikaServer* g_pika_server;

PikaDispatchThread::PikaDispatchThread(std::set<std::string> &ips, int port, int work_num,
int cron_interval, int queue_limit)
: handles_(this) {
int cron_interval, int queue_limit, int max_conn_rbuf_size)
: conn_factory_(max_conn_rbuf_size),
handles_(this) {
LOG(INFO) << "max conn rbuf size: " << max_conn_rbuf_size;
thread_rep_ = pink::NewDispatchThread(ips, port, work_num, &conn_factory_,
cron_interval, queue_limit, &handles_);
thread_rep_->set_thread_name("Dispatcher");
Expand Down
2 changes: 1 addition & 1 deletion src/pika_repl_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ int PikaReplClientConn::DealMessage() {
std::shared_ptr<InnerMessage::InnerResponse> response = std::make_shared<InnerMessage::InnerResponse>();
::google::protobuf::io::ArrayInputStream input(rbuf_ + cur_pos_ - header_len_, header_len_);
::google::protobuf::io::CodedInputStream decoder(&input);
decoder.SetTotalBytesLimit(PIKA_PB_MAX_MESSAGE, PIKA_PB_MAX_MESSAGE);
decoder.SetTotalBytesLimit(g_pika_conf->max_conn_rbuf_size(), g_pika_conf->max_conn_rbuf_size());
bool success = response->ParseFromCodedStream(&decoder);
if (!success) {
LOG(WARNING) << "ParseFromArray FAILED! rbuf_len: " << rbuf_len_ << " header_len: " << header_len_;
Expand Down
2 changes: 1 addition & 1 deletion src/pika_repl_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ slash::Status PikaReplServer::SendSlaveBinlogChips(const std::string& ip,
return Status::Corruption("Serialized Failed");
}

if (binlog_chip_pb.size() > PIKA_PB_MAX_MESSAGE) {
if (binlog_chip_pb.size() > static_cast<size_t>(g_pika_conf->max_conn_rbuf_size())) {
for (const auto& task : tasks) {
InnerMessage::InnerResponse response;
std::vector<WriteTask> tmp_tasks;
Expand Down
2 changes: 1 addition & 1 deletion src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ PikaServer::PikaServer() :
int worker_queue_limit = g_pika_conf->maxclients() / worker_num_ + 100;
LOG(INFO) << "Worker queue limit is " << worker_queue_limit;
pika_dispatch_thread_ = new PikaDispatchThread(ips, port_, worker_num_, 3000,
worker_queue_limit);
worker_queue_limit, g_pika_conf->max_conn_rbuf_size());
pika_monitor_thread_ = new PikaMonitorThread();
pika_rsync_service_ = new PikaRsyncService(g_pika_conf->db_sync_path(),
g_pika_conf->port() + kPortShiftRSync);
Expand Down
2 changes: 1 addition & 1 deletion third/pink

0 comments on commit 9a2676e

Please sign in to comment.