Skip to content

Commit

Permalink
Support disconnect for double master (OpenAtomFoundation#381)
Browse files Browse the repository at this point in the history
  • Loading branch information
Leviathan1995 authored and Axlgrep committed Oct 28, 2018
1 parent e569bb5 commit a1d1c60
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 15 deletions.
18 changes: 17 additions & 1 deletion src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void SlaveofCmd::Do() {
}

// Stop rsync
LOG(INFO) << "start slaveof, stop rsync first";
LOG(INFO) << "Start slaveof, stop rsync first";
slash::StopRsync(g_pika_conf->db_sync_path());
g_pika_server->RemoveMaster();

Expand All @@ -105,6 +105,22 @@ void SlaveofCmd::Do() {
}
g_pika_server->logger_->SetProducerStatus(filenum_, pro_offset_);
}
} else {
if (is_noone_) {
// Stop rsync
LOG(INFO) << "Slaveof no one in double-master mode";
slash::StopRsync(g_pika_conf->db_sync_path());

g_pika_server->RemoveMaster();

std::string double_master_ip = g_pika_conf->double_master_ip();
if (double_master_ip == "127.0.0.1") {
double_master_ip = g_pika_server->host();
}
g_pika_server->DeleteSlave(double_master_ip, g_pika_conf->double_master_port());
res_.SetRes(CmdRes::kOk);
return;
}
}

// The conf file already configured double-master item, but now this
Expand Down
28 changes: 15 additions & 13 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ void PikaServer::Start() {
int32_t double_master_port = g_pika_conf->double_master_port();
double_master_sid_ = std::stoi(g_pika_conf->double_master_sid());
if ((double_master_ip == "127.0.0.1" || double_master_ip == host_) && double_master_port == port_) {
LOG(FATAL) << "set yourself as the peer-master, please check";
LOG(FATAL) << "Set yourself as the peer-master, please check";
} else {
double_master_mode_ = true;
SetMaster(double_master_ip, double_master_port);
Expand Down Expand Up @@ -355,23 +355,26 @@ void PikaServer::DeleteSlave(const std::string& ip, int64_t port) {
}
iter++;
}
if (iter == slaves_.end()) {
return;
}
if (iter->sender != NULL) {
delete static_cast<PikaBinlogSenderThread*>(iter->sender);
}
slaves_.erase(iter);
slave_num = slaves_.size();
}

slave_num = slaves_.size();
{
slash::RWLock l(&state_protector_, true);
if (slave_num == 0) {
role_ &= ~PIKA_ROLE_MASTER;
if (DoubleMasterMode()) {
role_ |= PIKA_ROLE_DOUBLE_MASTER;
}
}
}

if (iter == slaves_.end()) {
return;
}
if (iter->sender != NULL) {
delete static_cast<PikaBinlogSenderThread*>(iter->sender);
}
slaves_.erase(iter);
}
}

void PikaServer::DeleteSlave(int fd) {
Expand Down Expand Up @@ -706,7 +709,6 @@ void PikaServer::SyncError() {
}

void PikaServer::RemoveMaster() {

{
slash::RWLock l(&state_protector_, true);
repl_state_ = PIKA_REPL_NO_CONNECT;
Expand All @@ -722,7 +724,7 @@ void PikaServer::RemoveMaster() {
if (ping_thread_ != NULL) {
int err = ping_thread_->StopThread();
if (err != 0) {
std::string msg = "can't join thread " + std::string(strerror(err));
std::string msg = "Can't join thread " + std::string(strerror(err));
LOG(WARNING) << msg;
}
delete ping_thread_;
Expand All @@ -732,7 +734,7 @@ void PikaServer::RemoveMaster() {
slash::RWLock l(&state_protector_, true);
master_connection_ = 0;
}
LOG(INFO) << "close read-only mode";
LOG(INFO) << "Close read-only mode";
g_pika_conf->SetReadonly(false);
}

Expand Down
2 changes: 1 addition & 1 deletion src/pika_trysync_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ void* PikaTrysyncThread::ThreadMain() {
PrepareRsync();

if ((cli_->Connect(master_ip, master_port, "")).ok()) {
LOG(INFO) << "Connect to master ip:" << master_ip << "port: " << master_port;
LOG(INFO) << "Connect to master ip:" << master_ip << " port: " << master_port;
cli_->set_send_timeout(30000);
cli_->set_recv_timeout(30000);
std::string ip_port = slash::IpPortString(master_ip, master_port);
Expand Down

0 comments on commit a1d1c60

Please sign in to comment.