Skip to content

Commit

Permalink
1) add slaveof item in config file; 2) include binding loop network i…
Browse files Browse the repository at this point in the history
…nterface when use a single network interface in config file
  • Loading branch information
JacketWoo committed Sep 21, 2016
1 parent 22451be commit a412f6b
Show file tree
Hide file tree
Showing 13 changed files with 66 additions and 7 deletions.
4 changes: 3 additions & 1 deletion conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ db-sync-path : ./dbsync/
# db sync speed(MB) max is set to 125MB, min is set to 0, and if below 0 or above 125, the value will be adjust to 125
db-sync-speed : -1
# network interface
#network-interface : eth1
# network-interface : eth1
# replication
# slaveof : master-ip:master-port

###################
## Critical Settings
Expand Down
2 changes: 2 additions & 0 deletions include/pika_binlog_receiver_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define PIKA_BINLOG_RECEIVER_THREAD_H_

#include <queue>
#include <set>

#include "holy_thread.h"
#include "slash_mutex.h"
Expand All @@ -14,6 +15,7 @@ class PikaBinlogReceiverThread : public pink::HolyThread<PikaMasterConn>
{
public:
PikaBinlogReceiverThread(std::string &ip, int port, int cron_interval = 0);
PikaBinlogReceiverThread(std::set<std::string> &ips, int port, int cron_interval = 0);
virtual ~PikaBinlogReceiverThread();
virtual void CronHandle();
virtual bool AccessHandle(std::string& ip);
Expand Down
6 changes: 6 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class PikaConf : public slash::BaseConf {

// Getter
int port() { RWLock l(&rwlock_, false); return port_; }
std::string slaveof() {RWLock l(&rwlock_, false); return slaveof_;}
int thread_num() { RWLock l(&rwlock_, false); return thread_num_; }
int sync_thread_num() { RWLock l(&rwlock_, false); return sync_thread_num_; }
int sync_buffer_size() { RWLock l(&rwlock_, false); return sync_buffer_size_; }
Expand Down Expand Up @@ -66,6 +67,10 @@ class PikaConf : public slash::BaseConf {
void SetThreadNum(const int value) { RWLock l(&rwlock_, true); thread_num_ = value; }
void SetLogLevel(const int value) { RWLock l(&rwlock_, true); log_level_ = value; }
void SetTimeout(const int value) { RWLock l(&rwlock_, true); timeout_ = value; }
void SetSlaveof(const std::string value) {
RWLock l(&rwlock_, true);
slaveof_ = value;
}
void SetBgsavePath(const std::string &value) {
RWLock l(&rwlock_, true);
bgsave_path_ = value;
Expand Down Expand Up @@ -122,6 +127,7 @@ class PikaConf : public slash::BaseConf {

private:
int port_;
std::string slaveof_;
int thread_num_;
int sync_thread_num_;
int sync_buffer_size_;
Expand Down
1 change: 1 addition & 0 deletions include/pika_dispatch_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ class PikaDispatchThread : public pink::DispatchThread<PikaClientConn>
public:
PikaDispatchThread(int port, int work_num, PikaWorkerThread** pika_worker_thread, int cron_interval);
PikaDispatchThread(std::string &ip, int port, int work_num, PikaWorkerThread** pika_worker_thread, int cron_interval);
PikaDispatchThread(std::set<std::string> &ips, int port, int work_num, PikaWorkerThread** pika_worker_thread, int cron_interval);
virtual ~PikaDispatchThread();
virtual void CronHandle();
virtual bool AccessHandle(std::string& ip);
Expand Down
1 change: 1 addition & 0 deletions include/pika_heartbeat_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class PikaHeartbeatThread : public pink::HolyThread<PikaHeartbeatConn>
{
public:
PikaHeartbeatThread(std::string &ip, int port, int cron_interval = 0);
PikaHeartbeatThread(std::set<std::string> &ip, int port, int cron_interval = 0);
virtual ~PikaHeartbeatThread();
virtual void CronHandle();
virtual bool AccessHandle(std::string& ip_port);
Expand Down
8 changes: 7 additions & 1 deletion src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -840,8 +840,12 @@ void ConfigCmd::ConfigGet(std::string &ret) {
} else {
EncodeString(&ret, "no");
}
} else if (get_item == "slaveof") {
ret = "*2\r\n";
EncodeString(&ret, "slaveof");
EncodeString(&ret, g_pika_conf->slaveof());
} else if (get_item == "*") {
ret = "*64\r\n";
ret = "*66\r\n";
EncodeString(&ret, "port");
EncodeInt32(&ret, g_pika_conf->port());
EncodeString(&ret, "thread-num");
Expand Down Expand Up @@ -906,6 +910,8 @@ void ConfigCmd::ConfigGet(std::string &ret) {
EncodeInt32(&ret, g_pika_conf->db_sync_speed());
EncodeString(&ret, "network-interface");
EncodeString(&ret, g_pika_conf->network_interface());
EncodeString(&ret, "slaveof");
EncodeString(&ret, g_pika_conf->slaveof());
} else {
ret = "*0\r\n";
}
Expand Down
11 changes: 11 additions & 0 deletions src/pika_binlog_receiver_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ PikaBinlogReceiverThread::PikaBinlogReceiverThread(std::string &ip, int port, in
InitCmdTable(&cmds_);
}

PikaBinlogReceiverThread::PikaBinlogReceiverThread(std::set<std::string> &ips, int port, int cron_interval) :
HolyThread::HolyThread(ips, port, cron_interval),
thread_querynum_(0),
last_thread_querynum_(0),
last_time_us_(slash::NowMicros()),
last_sec_thread_querynum_(0),
serial_(0) {
cmds_.reserve(300);
InitCmdTable(&cmds_);
}

PikaBinlogReceiverThread::~PikaBinlogReceiverThread() {
DestoryCmdTable(cmds_);
LOG(INFO) << "BinlogReceiver thread " << thread_id() << " exit!!!";
Expand Down
4 changes: 4 additions & 0 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ int PikaConf::Load()
network_interface_ = "";
GetConfStr("network-interface", &network_interface_);

// slaveof
slaveof_ = "";
GetConfStr("slaveof", &slaveof_);
return ret;
}

Expand Down Expand Up @@ -188,6 +191,7 @@ int PikaConf::ConfigRewrite() {
SetConfStr("db-sync-path", db_sync_path_);
SetConfInt("db-sync-speed", db_sync_speed_);
SetConfStr("network-interface", network_interface_);
SetConfStr("slaveof", slaveof_);

SetConfInt("binlog-file-size", binlog_file_size_);
SetConfStr("compression", compression_);
Expand Down
4 changes: 4 additions & 0 deletions src/pika_dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ PikaDispatchThread::PikaDispatchThread(std::string &ip, int port, int work_num,
DispatchThread::DispatchThread(ip, port, work_num, reinterpret_cast<pink::WorkerThread<PikaClientConn>**>(pika_worker_thread), cron_interval) {
}

PikaDispatchThread::PikaDispatchThread(std::set<std::string> &ips, int port, int work_num, PikaWorkerThread** pika_worker_thread, int cron_interval) :
DispatchThread::DispatchThread(ips, port, work_num, reinterpret_cast<pink::WorkerThread<PikaClientConn>**>(pika_worker_thread), cron_interval) {
}

PikaDispatchThread::~PikaDispatchThread() {
LOG(INFO) << "dispatch thread " << thread_id() << " exit!!!";
}
Expand Down
4 changes: 4 additions & 0 deletions src/pika_heartbeat_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ PikaHeartbeatThread::PikaHeartbeatThread(std::string& ip, int port, int cron_int
HolyThread::HolyThread(ip, port, cron_interval) {
}

PikaHeartbeatThread::PikaHeartbeatThread(std::set<std::string>& ips, int port, int cron_interval) :
HolyThread::HolyThread(ips, port, cron_interval) {
}

PikaHeartbeatThread::~PikaHeartbeatThread() {
LOG(INFO) << "PikaHeartbeat thread " << thread_id() << " exit!!!";
}
Expand Down
24 changes: 21 additions & 3 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,16 @@ PikaServer::PikaServer() :
pika_worker_thread_[i] = new PikaWorkerThread(1000);
}

pika_dispatch_thread_ = new PikaDispatchThread(host_, port_, worker_num_, pika_worker_thread_, 3000);
pika_binlog_receiver_thread_ = new PikaBinlogReceiverThread(host_, port_ + 1000, 1000);
pika_heartbeat_thread_ = new PikaHeartbeatThread(host_, port_ + 2000, 1000);
std::set<std::string> ips;
if (g_pika_conf->network_interface().empty()) {
ips.insert("0.0.0.0");
} else {
ips.insert("127.0.0.1");
ips.insert(host_);
}
pika_dispatch_thread_ = new PikaDispatchThread(ips, port_, worker_num_, pika_worker_thread_, 3000);
pika_binlog_receiver_thread_ = new PikaBinlogReceiverThread(ips, port_ + 1000, 1000);
pika_heartbeat_thread_ = new PikaHeartbeatThread(ips, port_ + 2000, 1000);
pika_trysync_thread_ = new PikaTrysyncThread();
monitor_thread_ = new PikaMonitorThread();

Expand Down Expand Up @@ -229,6 +236,17 @@ void PikaServer::Start() {
time(&start_time_s_);

//SetMaster("127.0.0.1", 9221);
std::string slaveof = g_pika_conf->slaveof();
if (!slaveof.empty()) {
int32_t sep = slaveof.find(":");
std::string master_ip = slaveof.substr(0, sep);
int32_t master_port = std::stoi(slaveof.substr(sep+1));
if ((master_ip == "127.0.0.1" || master_ip == host_) && master_port == port_) {
LOG(FATAL) << "you will slaveof yourself as the config file, please check";
} else {
SetMaster(master_ip, master_port);
}
}

LOG(INFO) << "Pika Server going to start";
while (!exit_) {
Expand Down
2 changes: 1 addition & 1 deletion third/pink
Submodule pink updated from cc85f1 to ea6709
2 changes: 1 addition & 1 deletion third/slash
Submodule slash updated from 306a91 to d2b6fa

0 comments on commit a412f6b

Please sign in to comment.