Skip to content

Commit

Permalink
add dbsync when no trysync point found
Browse files Browse the repository at this point in the history
  • Loading branch information
CatKang committed Apr 14, 2016
1 parent 8045251 commit 6e32516
Show file tree
Hide file tree
Showing 13 changed files with 396 additions and 108 deletions.
4 changes: 4 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ root_connection_num : 2
slowlog_log_slower_than : 10000
# slave-read-only(yes/no, 1/0)
slave_read_only : 0
# Pika db sync path
db_sync_path : ./dbsync/
# db sync speed(MB) max is set to 125MB, min is set to 0, and if below 0 or above 125, the value will be adjust to 125
db_sync_speed : -1

###################
## Critical Settings
Expand Down
11 changes: 8 additions & 3 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class PikaConf : public slash::BaseConf {
std::string log_path() { RWLock l(&rwlock_, false); return log_path_; }
int log_level() { RWLock l(&rwlock_, false); return log_level_; }
std::string db_path() { RWLock l(&rwlock_, false); return db_path_; }
std::string db_sync_path() { RWLock l(&rwlock_, false); return db_sync_path_; }
int db_sync_speed() { RWLock l(&rwlock_, false); return db_sync_speed_; }
int write_buffer_size() { RWLock l(&rwlock_, false); return write_buffer_size_; }
int timeout() { RWLock l(&rwlock_, false); return timeout_; }

Expand Down Expand Up @@ -99,6 +101,10 @@ class PikaConf : public slash::BaseConf {
RWLock l(&rwlock_, true);
slowlog_log_slower_than_ = value;
}
void SetDbSyncSpeed(const int value) {
RWLock l(&rwlock_, true);
db_sync_speed_ = value;
}

int Load();
int ConfigRewrite();
Expand All @@ -108,8 +114,8 @@ class PikaConf : public slash::BaseConf {
int thread_num_;
std::string log_path_;
std::string db_path_;
//char master_db_sync_path_[PIKA_WORD_SIZE];
//char slave_db_sync_path_[PIKA_WORD_SIZE];
std::string db_sync_path_;
int db_sync_speed_;
int write_buffer_size_;
int log_level_;
bool daemonize_;
Expand All @@ -131,7 +137,6 @@ class PikaConf : public slash::BaseConf {
std::string conf_path_;
//char username_[30];
//char password_[30];
//int db_sync_speed_;

//
// Critical configure items
Expand Down
17 changes: 17 additions & 0 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,21 @@ const std::string kManifest = "manifest";
*/
#define COMMA ','

/*
* define reply between master and slave
*
*/
const std::string kInnerReplOk = "ok";
const std::string kInnerReplWait = "wait";

/*
* db sync
*/
const uint32_t kDBSyncMaxGap = 50;
const std::string kDBSyncModule = "document";

// Use when try to change db
const std::string kTmpDbPath = "./tmpdb/";

const std::string kBgsaveInfoFile = "info";
#endif
27 changes: 27 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <vector>
#include <map>
#include <unordered_set>
#include <nemo.h>
#include <time.h>
#include "pika_binlog.h"
Expand Down Expand Up @@ -172,6 +173,9 @@ class PikaServer
void ClearBgsave() {
bgsave_info_.Clear();
}
void FinishBgsave() {
bgsave_info_.bgsaving = false;
}

/*
* PurgeLog used
Expand All @@ -188,6 +192,20 @@ class PikaServer
purging_ = false;
}

/*
* DBSync used
*/
struct DBSyncArg {
PikaServer *p;
std::string ip;
int port;
DBSyncArg(PikaServer *_p, const std::string& _ip, int &_port)
: p(_p), ip(_ip), port(_port) {}
};
void DBSyncSendFile(const std::string& ip, int port);
bool ChangeDb(const std::string& new_path);


//flushall
bool FlushAll();
void PurgeDir(std::string& path);
Expand Down Expand Up @@ -278,6 +296,7 @@ class PikaServer
bool InitBgsaveEnv(const std::string& bgsave_path);
bool InitBgsaveEngine();


/*
* Purgelogs use
*/
Expand All @@ -289,6 +308,14 @@ class PikaServer
void AutoPurge();
bool CouldPurge(uint32_t index);

/*
* DBSync use
*/
std::unordered_set<std::string> db_sync_slaves;
void TryDBSync(const std::string& ip, int port);
void DBSync(const std::string& ip, int port);
static void DoDBSync(void* arg);

/*
* Flushall use
*/
Expand Down
9 changes: 3 additions & 6 deletions include/pika_trysync_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,7 @@ class PikaTrysyncThread : public pink::Thread {
cli_ = new pink::RedisCli();
cli_->set_connect_timeout(1500);
};
virtual ~PikaTrysyncThread() {
should_exit_ = true;
pthread_join(thread_id(), NULL);
delete cli_;
DLOG(INFO) << " Trysync thread " << pthread_self() << " exit!!!";
};
virtual ~PikaTrysyncThread();

private:
int sockfd_;
Expand All @@ -24,6 +19,8 @@ class PikaTrysyncThread : public pink::Thread {

bool Send();
bool RecvProc();
bool PrepareRsync();
bool TryUpdateMasterOffset();

virtual void* ThreadMain();

Expand Down
71 changes: 32 additions & 39 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ void SlaveofCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info)
} else if (cur_size == 2) {
have_offset_ = true;
std::string str_filenum = *it++;
if (!slash::string2l(str_filenum.data(), str_filenum.size(), &filenum_) && filenum_ < 0) {
if (!slash::string2l(str_filenum.data(), str_filenum.size(), &filenum_) || filenum_ < 0) {
res_.SetRes(CmdRes::kInvalidInt);
return;
}
std::string str_pro_offset = *it++;
if (!slash::string2l(str_pro_offset.data(), str_pro_offset.size(), &pro_offset_) && pro_offset_ < 0) {
if (!slash::string2l(str_pro_offset.data(), str_pro_offset.size(), &pro_offset_) || pro_offset_ < 0) {
res_.SetRes(CmdRes::kInvalidInt);
return;
}
Expand All @@ -61,6 +61,10 @@ void SlaveofCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info)

void SlaveofCmd::Do() {
if (is_noone_) {
// Stop rsync
LOG(ERROR) << "stop rsync";
slash::StopRsync(g_pika_conf->db_sync_path());

g_pika_server->RemoveMaster();
res_.SetRes(CmdRes::kOk);
return;
Expand Down Expand Up @@ -127,8 +131,10 @@ void TrysyncCmd::Do() {
res_.AppendInteger(s.sid);
DLOG(INFO) << "Send Sid to Slave: " << s.sid;
g_pika_server->BecomeMaster();
} else if (status.IsIncomplete()) {
res_.AppendString(kInnerReplWait);
} else {
res_.SetRes(CmdRes::kErrOther, "Error in AddBinlogSender");
res_.SetRes(CmdRes::kErrOther, status.ToString());
}
} else {
res_.SetRes(CmdRes::kErrOther, "Already Exist");
Expand Down Expand Up @@ -630,6 +636,14 @@ void ConfigCmd::ConfigGet(std::string &ret) {
ret = "*2\r\n";
EncodeString(&ret, "db_path");
EncodeString(&ret, g_pika_conf->db_path());
} else if (get_item == "db_sync_path") {
ret = "*2\r\n";
EncodeString(&ret, "db_sync_path");
EncodeString(&ret, g_pika_conf->db_sync_path());
} else if (get_item == "db_sync_speed") {
ret = "*2\r\n";
EncodeString(&ret, "db_sync_speed");
EncodeInt32(&ret, g_pika_conf->db_sync_speed());
} else if (get_item == "maxmemory") {
ret = "*2\r\n";
EncodeString(&ret, "maxmemory");
Expand Down Expand Up @@ -710,20 +724,8 @@ void ConfigCmd::ConfigGet(std::string &ret) {
} else {
EncodeString(&ret, "no");
}
// } else if (get_item == "master_db_sync_path") {
// ret = "*2\r\n";
// EncodeString(&ret, "master_db_sync_path");
// EncodeString(&ret, g_pika_conf->master_db_sync_path());
// } else if (get_item == "slave_db_sync_path") {
// ret = "*2\r\n";
// EncodeString(&ret, "slave_db_sync_path");
// EncodeString(&ret, g_pika_conf->slave_db_sync_path());
// } else if (get_item == "db_sync_speed") {
// ret = "*2\r\n";
// EncodeString(&ret, "db_sync_speed");
// EncodeInt32(&ret, g_pika_conf->db_sync_speed());
} else if (get_item == "*") {
ret = "*27\r\n";
ret = "*25\r\n";
EncodeString(&ret, "port");
EncodeString(&ret, "thread_num");
EncodeString(&ret, "log_path");
Expand All @@ -735,7 +737,7 @@ void ConfigCmd::ConfigGet(std::string &ret) {
EncodeString(&ret, "requirepass");
EncodeString(&ret, "userpass");
EncodeString(&ret, "userblacklist");
EncodeString(&ret, "dump_prefix");
//EncodeString(&ret, "dump_prefix");
EncodeString(&ret, "daemonize");
EncodeString(&ret, "dump_path");
EncodeString(&ret, "pidfile");
Expand All @@ -748,8 +750,7 @@ void ConfigCmd::ConfigGet(std::string &ret) {
EncodeString(&ret, "slave-read-only");
EncodeString(&ret, "binlog_file_size");
EncodeString(&ret, "compression");
EncodeString(&ret, "master_db_sync_path");
EncodeString(&ret, "slave_db_sync_path");
EncodeString(&ret, "db_sync_path");
EncodeString(&ret, "db_sync_speed");
} else {
ret = "*0\r\n";
Expand All @@ -759,21 +760,19 @@ void ConfigCmd::ConfigGet(std::string &ret) {
void ConfigCmd::ConfigSet(std::string& ret) {
std::string set_item = config_args_v_[1];
if (set_item == "*") {
ret = "*15\r\n";
ret = "*12\r\n";
EncodeString(&ret, "log_level");
EncodeString(&ret, "timeout");
EncodeString(&ret, "requirepass");
EncodeString(&ret, "userpass");
EncodeString(&ret, "userblacklist");
EncodeString(&ret, "dump_prefix");
//EncodeString(&ret, "dump_prefix");
EncodeString(&ret, "maxconnection");
EncodeString(&ret, "expire_logs_days");
EncodeString(&ret, "expire_logs_nums");
EncodeString(&ret, "root_connection_num");
EncodeString(&ret, "slowlog_log_slower_than");
EncodeString(&ret, "slave-read-only");
EncodeString(&ret, "master_db_sync_path");
EncodeString(&ret, "slave_db_sync_path");
EncodeString(&ret, "db_sync_speed");
return;
}
Expand Down Expand Up @@ -855,22 +854,16 @@ void ConfigCmd::ConfigSet(std::string& ret) {
g_pika_conf->SetReadonly(is_readonly);
pthread_rwlock_rdlock(g_pika_server->rwlock());
ret = "+OK\r\n";
// } else if (set_item == "master_db_sync_path") {
// g_pika_conf->SetMasterDbSyncPath(value);
// ret = "+OK\r\n";
// } else if (set_item == "slave_db_sync_path") {
// g_pika_conf->SetSlaveDbSyncPath(value);
// ret = "+OK\r\n";
// } else if (set_item == "db_sync_speed") {
// if (!slash::string2l(value.data(), value.size(), &ival)) {
// ret = "-ERR Invalid argument " + value + " for CONFIG SET 'db_sync_speed(MB)'\r\n";
// return;
// }
// if (ival < 0 || ival > 125) {
// ival = 125;
// }
// g_pika_conf->SetDbSyncSpeed(ival);
// ret = "+OK\r\n";
} else if (set_item == "db_sync_speed") {
if (!slash::string2l(value.data(), value.size(), &ival)) {
ret = "-ERR Invalid argument " + value + " for CONFIG SET 'db_sync_speed(MB)'\r\n";
return;
}
if (ival < 0 || ival > 125) {
ival = 125;
}
g_pika_conf->SetDbSyncSpeed(ival);
ret = "+OK\r\n";
} else {
ret = "-ERR No such configure item\r\n";
}
Expand Down
2 changes: 1 addition & 1 deletion src/pika_binlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Binlog::Binlog(const std::string& Binlog_path, const int file_size) :
pool_(NULL),
exit_all_consume_(false),
binlog_path_(Binlog_path),
file_size_(file_size_) {
file_size_(file_size) {

//slash::SetMmapBoundSize(file_size);
//slash::kMmapBoundSize = 1024 * 1024 * 100;
Expand Down
6 changes: 3 additions & 3 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ void InitCmdInfoTable() {
CmdInfo* slaveofptr = new CmdInfo(kCmdNameSlaveof, -3, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameSlaveof, slaveofptr));
////Trysync
CmdInfo* trysyncptr = new CmdInfo(kCmdNameTrysync, 5, kCmdFlagsRead | kCmdFlagsAdmin | kCmdFlagsAdminRequire);
CmdInfo* trysyncptr = new CmdInfo(kCmdNameTrysync, 5, kCmdFlagsRead | kCmdFlagsAdmin | kCmdFlagsSuspend | kCmdFlagsAdminRequire);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameTrysync, trysyncptr));
CmdInfo* authptr = new CmdInfo(kCmdNameAuth, 2, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameAuth, authptr));
Expand All @@ -30,9 +30,9 @@ void InitCmdInfoTable() {
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNamePing, pingptr));
CmdInfo* selectptr = new CmdInfo(kCmdNameSelect, 2, kCmdFlagsWrite | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameSelect, selectptr));
CmdInfo* flushallptr = new CmdInfo(kCmdNameFlushall, 1, kCmdFlagsRead | kCmdFlagsMaskSuspend | kCmdFlagsAdmin);
CmdInfo* flushallptr = new CmdInfo(kCmdNameFlushall, 1, kCmdFlagsRead | kCmdFlagsSuspend | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameFlushall, flushallptr));
CmdInfo* readonlyptr = new CmdInfo(kCmdNameReadonly, 2, kCmdFlagsRead | kCmdFlagsMaskSuspend | kCmdFlagsAdmin);
CmdInfo* readonlyptr = new CmdInfo(kCmdNameReadonly, 2, kCmdFlagsRead | kCmdFlagsSuspend | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameReadonly, readonlyptr));
CmdInfo* clientptr = new CmdInfo(kCmdNameClient, -2, kCmdFlagsRead | kCmdFlagsAdmin);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameClient, clientptr));
Expand Down
Loading

0 comments on commit 6e32516

Please sign in to comment.