Skip to content

Commit

Permalink
1, add connect wait state\n2, clear target rsync dir after rsync send…
Browse files Browse the repository at this point in the history
…\n3, add master ipport into module name
  • Loading branch information
CatKang committed Apr 15, 2016
1 parent 44c0919 commit 09dd9f4
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 27 deletions.
1 change: 1 addition & 0 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ struct SlaveItem {
#define PIKA_REPL_CONNECT 1
#define PIKA_REPL_CONNECTING 2
#define PIKA_REPL_CONNECTED 3
#define PIKA_REPL_WAIT_DBSYNC 4

//role
#define PIKA_ROLE_SINGLE 0
Expand Down
3 changes: 3 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ class PikaServer
void PlusMasterConnection();
bool ShouldAccessConnAsMaster(const std::string& ip);
void RemoveMaster();
bool WaitingDBSync();
void NeedWaitDBSync();
void WaitDBSyncFinish();

void Start();
void Exit() {
Expand Down
6 changes: 1 addition & 5 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,7 @@ void TrysyncCmd::DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info)
}

void TrysyncCmd::Do() {
std::string ip_port = slave_ip_;
char buf[10];
slash::ll2string(buf, sizeof(buf), slave_port_);
ip_port.append(":");
ip_port.append(buf);
std::string ip_port = slash::IpPortString(slave_ip_, slave_port_);
DLOG(INFO) << "Trysync, Slave ip_port: " << ip_port << " filenum: " << filenum_ << " pro_offset: " << pro_offset_;
slash::MutexLock l(&(g_pika_server->slave_mutex_));
if (!g_pika_server->FindSlave(ip_port)) {
Expand Down
45 changes: 35 additions & 10 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,27 @@ bool PikaServer::SetMaster(std::string& master_ip, int master_port) {
return false;
}

bool PikaServer::WaitingDBSync() {
slash::RWLock l(&state_protector_, false);
DLOG(INFO) << "repl_state: " << repl_state_ << " role: " << role_ << " master_connection: " << master_connection_;
if (repl_state_ == PIKA_REPL_WAIT_DBSYNC) {
return true;
}
return false;
}

void PikaServer::NeedWaitDBSync() {
slash::RWLock l(&state_protector_, true);
repl_state_ = PIKA_REPL_WAIT_DBSYNC;
}

void PikaServer::WaitDBSyncFinish() {
slash::RWLock l(&state_protector_, true);
if (repl_state_ == PIKA_REPL_WAIT_DBSYNC) {
repl_state_ = PIKA_REPL_NO_CONNECT;
}
}

bool PikaServer::ShouldConnectMaster() {
slash::RWLock l(&state_protector_, false);
DLOG(INFO) << "repl_state: " << repl_state_ << " role: " << role_ << " master_connection: " << master_connection_;
Expand Down Expand Up @@ -342,7 +363,7 @@ void PikaServer::PlusMasterConnection() {
bool PikaServer::ShouldAccessConnAsMaster(const std::string& ip) {
slash::RWLock l(&state_protector_, false);
DLOG(INFO) << "ShouldAccessConnAsMaster, repl_state_: " << repl_state_ << " ip: " << ip << " master_ip: " << master_ip_;
if (repl_state_ != PIKA_REPL_NO_CONNECT && ip == master_ip_) {
if (repl_state_ != PIKA_REPL_NO_CONNECT && repl_state_ != PIKA_REPL_WAIT_DBSYNC && ip == master_ip_) {
return true;
}
return false;
Expand Down Expand Up @@ -388,9 +409,7 @@ void PikaServer::TryDBSync(const std::string& ip, int port) {

void PikaServer::DBSync(const std::string& ip, int port) {
// Only one DBSync task for every ip_port
char buf[10];
slash::ll2string(buf, sizeof(buf), port);
std::string ip_port(ip + ":" + buf);
std::string ip_port = slash::IpPortString(ip, port);
if (db_sync_slaves.find(ip_port) != db_sync_slaves.end()) {
return;
}
Expand All @@ -408,7 +427,7 @@ void PikaServer::DBSync(const std::string& ip, int port) {
void PikaServer::DoDBSync(void* arg) {
DBSyncArg *ppurge = static_cast<DBSyncArg*>(arg);
PikaServer* ps = ppurge->p;
LOG(ERROR) << "begin bg do dbsycn";
LOG(INFO) << "begin bg do dbsycn";

ps->DBSyncSendFile(ppurge->ip, ppurge->port);

Expand All @@ -425,14 +444,15 @@ void PikaServer::DBSyncSendFile(const std::string& ip, int port) {
// Iterate to send files
int ret = 0;
std::string target_path;
std::string module = kDBSyncModule + "_" + slash::IpPortString(host_, port_);
std::vector<std::string>::iterator it = descendant.begin();
slash::RsyncRemote remote(ip, port, module, g_pika_conf->db_sync_speed() * 1024);
for (; it != descendant.end(); ++it) {
target_path = (*it).substr(bgsave_info_.path.size() + 1);
if (target_path == kBgsaveInfoFile) {
continue;
}
// We need specify the speed limit for every single file
slash::RsyncRemote remote(ip, port, kDBSyncModule, g_pika_conf->db_sync_speed() * 1024);
ret = slash::RsyncSendFile(*it, target_path, remote);
if (0 != ret) {
LOG(ERROR) << "rsync send file failed! From: " << *it
Expand All @@ -442,18 +462,23 @@ void PikaServer::DBSyncSendFile(const std::string& ip, int port) {
break;
}
}

// Clear target path
slash::RsyncSendClearTarget(bgsave_info_.path + "/kv", "kv", remote);
slash::RsyncSendClearTarget(bgsave_info_.path + "/hash", "hash", remote);
slash::RsyncSendClearTarget(bgsave_info_.path + "/list", "list", remote);
slash::RsyncSendClearTarget(bgsave_info_.path + "/set", "set", remote);
slash::RsyncSendClearTarget(bgsave_info_.path + "/zset", "zset", remote);

// Send info file at last
if (0 == ret) {
slash::RsyncRemote remote(ip, port, kDBSyncModule, g_pika_conf->db_sync_speed() * 1024);
if (0 != (ret = slash::RsyncSendFile(bgsave_info_.path + "/" + kBgsaveInfoFile, kBgsaveInfoFile, remote))) {
LOG(ERROR) << "send info file failed";
}
}

// remove slave
char buf[10];
slash::ll2string(buf, sizeof(buf), port);
std::string ip_port(ip + ":" + buf);
std::string ip_port = slash::IpPortString(ip, port);
db_sync_slaves.erase(ip_port);
if (0 == ret) {
LOG(INFO) << "rsync send files success";
Expand Down
31 changes: 20 additions & 11 deletions src/pika_trysync_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,17 @@ bool PikaTrysyncThread::RecvProc() {
break;
}
// Failed
g_pika_server->RemoveMaster();

if (kInnerReplWait == reply) {
// You can't sync this time, but may be different next time,
// This may happened when
// 1, Master do bgsave first.
// 2, Master waiting for an existing bgsaving process
// 3, Master do dbsyncing
DLOG(INFO) << "Need wait to sync";
g_pika_server->NeedWaitDBSync();
}
g_pika_server->RemoveMaster();
return false;
}
}
Expand Down Expand Up @@ -155,6 +157,7 @@ bool PikaTrysyncThread::TryUpdateMasterOffset() {

// Update master offset
g_pika_server->logger_->SetProducerStatus(filenum, offset);
g_pika_server->WaitDBSyncFinish();
if (!g_pika_server->SetMaster(master_ip, static_cast<int>(master_port))) {
LOG(ERROR) << "Server is not in correct state for slaveof";
return false;
Expand All @@ -176,28 +179,34 @@ void PikaTrysyncThread::PrepareRsync() {
void* PikaTrysyncThread::ThreadMain() {
while (!should_exit_) {
sleep(1);
if (!g_pika_server->ShouldConnectMaster()) {
if (g_pika_server->WaitingDBSync()) {
//Try to update offset by db sync
if (!TryUpdateMasterOffset()) {
LOG(ERROR) << "No Update Master Offset";
continue;
if (TryUpdateMasterOffset()) {
LOG(INFO) << "Success Update Master Offset";
}
LOG(ERROR) << "Success Update Master Offset";
}

if (!g_pika_server->ShouldConnectMaster()) {
continue;
}
sleep(2);
DLOG(INFO) << "Should connect master";

//Start rsync
std::string master_ip = g_pika_server->master_ip();
int master_port = g_pika_server->master_port();

// Start rsync
std::string dbsync_path = g_pika_conf->db_sync_path();
PrepareRsync();
int ret = slash::StartRsync(dbsync_path, kDBSyncModule, g_pika_conf->port() + 300);
std::string ip_port = slash::IpPortString(master_ip, master_port);
// We append the master ip port after module name
// To make sure only data from current master is received
int ret = slash::StartRsync(dbsync_path, kDBSyncModule + "_" + ip_port, g_pika_conf->port() + 300);
if (0 != ret) {
LOG(ERROR) << "Failed to start rsync, path:" << dbsync_path << " error : " << ret;
}
LOG(ERROR) << "Finish to start rsync, path:" << dbsync_path << " error : " << ret;
DLOG(INFO) << "Finish to start rsync, path:" << dbsync_path;

std::string master_ip = g_pika_server->master_ip();
int master_port = g_pika_server->master_port();

if ((cli_->Connect(master_ip, master_port)).ok()) {
cli_->set_send_timeout(5000);
Expand Down
2 changes: 1 addition & 1 deletion third/slash
Submodule slash updated from 7c9025 to b29fb8

0 comments on commit 09dd9f4

Please sign in to comment.