Skip to content

Commit

Permalink
allow master-slave connection to be established with any reachable IP…
Browse files Browse the repository at this point in the history
… without binding NIC (OpenAtomFoundation#323)
  • Loading branch information
fancy-rabbit authored and Axlgrep committed Aug 16, 2018
1 parent cd3d82e commit 150fbc8
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 18 deletions.
2 changes: 1 addition & 1 deletion include/pika_trysync_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class PikaTrysyncThread : public pink::Thread {
int64_t sid_;
pink::PinkCli *cli_;

bool Send();
bool Send(std::string lip);
bool RecvProc();
void PrepareRsync();
bool TryUpdateMasterOffset();
Expand Down
2 changes: 1 addition & 1 deletion src/pika_binlog_sender_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ void* PikaBinlogSenderThread::ThreadMain() {
while (!should_stop()) {
sleep(2);
// 1. Connect to slave
result = cli_->Connect(ip_, port_, g_pika_server->host());
result = cli_->Connect(ip_, port_, "");
LOG(INFO) << "BinlogSender Connect slave(" << ip_ << ":" << port_ << ") " << result.ToString();

// 2. Auth
Expand Down
29 changes: 26 additions & 3 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,16 @@ void PikaServer::DBSyncSendFile(const std::string& ip, int port) {
// Iterate to send files
ret = 0;
std::string local_path, target_path;
std::string module = kDBSyncModule + "_" + slash::IpPortString(host_, port_);
pink::PinkCli *cli = pink::NewRedisCli();
std::string lip(host_);
if (cli->Connect(ip, port, "").ok()) {
struct sockaddr_in laddr;
socklen_t llen = sizeof(laddr);
getsockname(cli->fd(), (struct sockaddr*) &laddr, &llen);
lip = inet_ntoa(laddr.sin_addr);
cli->Close();
}
std::string module = kDBSyncModule + "_" + slash::IpPortString(lip, port_);
std::vector<std::string>::iterator it = descendant.begin();
slash::RsyncRemote remote(ip, port, module, g_pika_conf->db_sync_speed() * 1024);
for (; it != descendant.end(); ++it) {
Expand Down Expand Up @@ -861,7 +870,21 @@ void PikaServer::DBSyncSendFile(const std::string& ip, int port) {

// Send info file at last
if (0 == ret) {
if (0 != (ret = slash::RsyncSendFile(bg_path + "/" + kBgsaveInfoFile, kBgsaveInfoFile, remote))) {
// need to modify the IP addr in the info file
if (lip.compare(host_) != 0) {
std::ofstream fix;
std::string fn = bg_path + "/" + kBgsaveInfoFile + "." + std::to_string(time(NULL));
fix.open(fn, std::ios::in | std::ios::trunc);
if (fix.is_open()) {
fix << "0s\n" << lip << "\n" << port_ << "\n" << binlog_filenum << "\n" << binlog_offset << "\n";
fix.close();
}
ret = slash::RsyncSendFile(fn, kBgsaveInfoFile, remote);
slash::DeleteFile(fn);
if (ret != 0) {
LOG(WARNING) << "send modified info file failed";
}
} else if (0 != (ret = slash::RsyncSendFile(bg_path + "/" + kBgsaveInfoFile, kBgsaveInfoFile, remote))) {
LOG(WARNING) << "send info file failed";
}
}
Expand Down Expand Up @@ -1035,7 +1058,7 @@ void PikaServer::DoBgsave(void* arg) {
// Some output
BGSaveInfo info = p->bgsave_info();
std::ofstream out;
out.open(info.path + "/info", std::ios::in | std::ios::trunc);
out.open(info.path + "/" + kBgsaveInfoFile, std::ios::in | std::ios::trunc);
if (out.is_open()) {
out << (time(NULL) - info.start_time) << "s\n"
<< p->host() << "\n"
Expand Down
2 changes: 1 addition & 1 deletion src/pika_slaveping_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void* PikaSlavepingThread::ThreadMain() {
while (!should_stop() && g_pika_server->ShouldStartPingMaster()) {
if (!should_stop() && (cli_->Connect(g_pika_server->master_ip(),
g_pika_server->master_port() + 2000,
g_pika_server->host())).ok()) {
"")).ok()) {
cli_->set_send_timeout(1000);
cli_->set_recv_timeout(1000);
connect_retry_times = 0;
Expand Down
49 changes: 37 additions & 12 deletions src/pika_trysync_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
#include <fstream>
#include <glog/logging.h>
#include <poll.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include "slash/include/env.h"
#include "slash/include/rsync.h"
Expand All @@ -25,7 +28,7 @@ PikaTrysyncThread::~PikaTrysyncThread() {
LOG(INFO) << " Trysync thread " << thread_id() << " exit!!!";
}

bool PikaTrysyncThread::Send() {
bool PikaTrysyncThread::Send(std::string lip) {
pink::RedisCmdArgsType argv;
std::string wbuf_str;
std::string masterauth = g_pika_conf->masterauth();
Expand All @@ -38,7 +41,7 @@ bool PikaTrysyncThread::Send() {
argv.clear();
std::string tbuf_str;
argv.push_back("trysync");
argv.push_back(g_pika_server->host());
argv.push_back(lip);
argv.push_back(std::to_string(g_pika_server->port()));
uint32_t filenum;
uint64_t pro_offset;
Expand Down Expand Up @@ -239,20 +242,42 @@ void* PikaTrysyncThread::ThreadMain() {

// Start rsync service
PrepareRsync();
std::string ip_port = slash::IpPortString(master_ip, master_port);
// We append the master ip port after module name
// To make sure only data from current master is received
int ret = slash::StartRsync(dbsync_path, kDBSyncModule + "_" + ip_port, g_pika_server->host(), g_pika_conf->port() + 3000);
if (0 != ret) {
LOG(WARNING) << "Failed to start rsync, path:" << dbsync_path << " error : " << ret;
}
LOG(INFO) << "Finish to start rsync, path:" << dbsync_path;

if ((cli_->Connect(master_ip, master_port, g_pika_server->host())).ok()) {
if ((cli_->Connect(master_ip, master_port, "")).ok()) {
LOG(INFO) << "Connect to master ip:" << master_ip << "port: " << master_port;
cli_->set_send_timeout(30000);
cli_->set_recv_timeout(30000);
if (Send() && RecvProc()) {
std::string ip_port = slash::IpPortString(master_ip, master_port);
struct sockaddr_in laddr;
socklen_t llen = sizeof(laddr);
getsockname(cli_->fd(), (struct sockaddr*) &laddr, &llen);
std::string lip(inet_ntoa(laddr.sin_addr));
// We append the master ip port after module name
// To make sure only data from current master is received
int ret = slash::StartRsync(dbsync_path, kDBSyncModule + "_" + ip_port, lip, g_pika_conf->port() + 3000);
if (0 != ret) {
LOG(WARNING) << "Failed to start rsync, path:" << dbsync_path << " error : " << ret;
}
LOG(INFO) << "Finish to start rsync, path:" << dbsync_path;

// Make sure the listening addr of rsyncd is accessible, avoid the corner case
// that rsync --daemon process is started but not finished listening on the socket
pink::PinkCli *rsync = pink::NewRedisCli();
int retry_times;
for (retry_times = 0; retry_times < 5; retry_times++) {
if (rsync->Connect(lip, g_pika_conf->port() + 3000, "").ok()) {
LOG(INFO) << "rsync successfully started, address:" << lip << ":" << g_pika_conf->port() + 3000;
rsync->Close();
break;
} else {
sleep(1);
}
}
if (retry_times >= 5) {
LOG(WARNING) << "connecting to rsync failed, address:" << lip << ":" << g_pika_conf->port() + 3000;
}

if (Send(lip) && RecvProc()) {
g_pika_server->ConnectMasterDone();
// Stop rsync, binlog sync with master is begin
slash::StopRsync(dbsync_path);
Expand Down

0 comments on commit 150fbc8

Please sign in to comment.