Skip to content

Commit

Permalink
when master outage, slave periodically try to reconnect (OpenAtomFoun…
Browse files Browse the repository at this point in the history
  • Loading branch information
Axlgrep committed May 15, 2019
1 parent 49fac6e commit 7aea09d
Show file tree
Hide file tree
Showing 10 changed files with 49 additions and 72 deletions.
3 changes: 1 addition & 2 deletions include/pika_define.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@

#define PIKA_SYNC_BUFFER_SIZE 1000
#define PIKA_MAX_WORKER_THREAD_NUM 24
#define PIKA_META_SYNC_MAX_WAIT_TIME 3
#define PIKA_REPL_SERVER_TP_SIZE 3
#define PIKA_META_SYNC_MAX_WAIT_TIME 5

class PikaServer;

Expand All @@ -35,7 +35,6 @@ typedef WorkerCronTask MonitorCronTask;

//slave item
struct SlaveItem {
int64_t sid;
std::string ip_port;
std::string ip;
int port;
Expand Down
17 changes: 2 additions & 15 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ class PikaServer {
time_t start_time_s();
std::string master_ip();
int master_port();
int64_t sid();
void SetSid(int64_t sid);
int role();
bool readonly();
int repl_state();
Expand Down Expand Up @@ -142,18 +140,12 @@ class PikaServer {
/*
* Master use
*/
int64_t GenSid();
void BecomeMaster();
void DeleteSlave(int fd); //conn fd
int32_t CountSyncSlaves();
int32_t GetSlaveListString(std::string& slave_list_str);
int64_t TryAddSlave(const std::string& ip, int64_t port, int fd,
const std::vector<TableStruct>& table_structs);
//Status AddBinlogSender(const std::string& table_name,
// uint32_t partition_id,
// const std::string& ip, int64_t port,
// int64_t sid,
// uint32_t filenum, uint64_t con_offset);
bool TryAddSlave(const std::string& ip, int64_t port, int fd,
const std::vector<TableStruct>& table_structs);
slash::Mutex slave_mutex_; // protect slaves_;
std::vector<SlaveItem> slaves_;

Expand Down Expand Up @@ -327,11 +319,6 @@ class PikaServer {
PikaDispatchThread* pika_dispatch_thread_;


/*
* Master used
*/
int64_t sid_;

/*
* Slave used
*/
Expand Down
1 change: 0 additions & 1 deletion src/pika_inner_message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ message InnerResponse {
}
required bool classic_mode = 1;
repeated TableInfo tables_info = 2;
required int64 sid = 3;
}

// master to slave
Expand Down
7 changes: 5 additions & 2 deletions src/pika_repl_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,11 @@ Status PikaReplClient::SendMetaSync() {
delete cli;
} else {
LOG(WARNING) << "Failed to connect master, Master ("
<< g_pika_server->master_ip() << ":" << g_pika_server->master_port() << ")";
g_pika_server->SyncError();
<< g_pika_server->master_ip() << ":" << g_pika_server->master_port() << "), try reconnect";
// Sleep three seconds to avoid frequent try Meta Sync
// when the connection fails
sleep(3);
g_pika_server->ResetMetaSyncStatus();
delete cli;
return Status::Corruption("Connect master error");
}
Expand Down
4 changes: 1 addition & 3 deletions src/pika_repl_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) {
std::string partition_name = partition->GetPartitionName();
if (try_sync_response.reply_code() == InnerMessage::InnerResponse::TrySync::kOk) {
BinlogOffset boffset;
uint32_t session_id = try_sync_response.session_id();
int32_t session_id = try_sync_response.session_id();
partition->SetReplState(ReplState::kConnected);
partition->logger()->GetProducerStatus(&boffset.filenum, &boffset.offset);
g_pika_rm->AddSyncSlavePartition(RmNode(g_pika_server->master_ip(), g_pika_server->master_port(), table_name, partition_id, session_id));
Expand All @@ -214,11 +214,9 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) {
} else if (try_sync_response.reply_code() == InnerMessage::InnerResponse::TrySync::kSyncPointLarger) {
partition->SetReplState(ReplState::kError);
LOG(WARNING) << "Partition: " << partition_name << " TrySync Error, Because the invalid filenum and offset";
conn->NotifyClose();
} else if (try_sync_response.reply_code() == InnerMessage::InnerResponse::TrySync::kError) {
partition->SetReplState(ReplState::kError);
LOG(WARNING) << "Partition: " << partition_name << " TrySync Error";
conn->NotifyClose();
}
delete task_arg;
}
Expand Down
12 changes: 11 additions & 1 deletion src/pika_repl_client_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,15 @@ void PikaReplClientThread::ReplClientHandle::FdClosedHandle(int fd, const std::s
};

void PikaReplClientThread::ReplClientHandle::FdTimeoutHandle(int fd, const std::string& ip_port) const {
LOG(INFO) << "slave " << ip_port << " " << fd << " timeout";
std::string ip;
int port = 0;
if (!slash::ParseIpPortString(ip_port, ip, port)) {
LOG(WARNING) << "Parse ip_port error " << ip_port;
return;
}
if (ip == g_pika_server->master_ip()
&& port == g_pika_server->master_port() + kPortShiftReplServer) {
LOG(WARNING) << "Master conn timeout : " << ip_port << " try reconnect";
g_pika_server->ResetMetaSyncStatus();
}
};
5 changes: 2 additions & 3 deletions src/pika_repl_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,16 @@ void PikaReplServer::HandleMetaSyncRequest(void* arg) {
response.set_reply("Auth with master error, Invalid masterauth");
} else {
std::vector<TableStruct> table_structs = g_pika_conf->table_structs();
int64_t sid = g_pika_server->TryAddSlave(node.ip(), node.port(), conn->fd(), table_structs);
bool success = g_pika_server->TryAddSlave(node.ip(), node.port(), conn->fd(), table_structs);
const std::string ip_port = slash::IpPortString(node.ip(), node.port());
g_pika_rm->ReplServerUpdateClientConnMap(ip_port, conn->fd());
if (sid < 0) {
if (!success) {
response.set_code(InnerMessage::kError);
response.set_reply("Slave AlreadyExist");
} else {
g_pika_server->BecomeMaster();
response.set_code(InnerMessage::kOk);
InnerMessage::InnerResponse_MetaSync* meta_sync = response.mutable_meta_sync();
meta_sync->set_sid(sid);
meta_sync->set_classic_mode(g_pika_conf->classic_mode());
for (const auto& table_struct : table_structs) {
InnerMessage::InnerResponse_MetaSync_TableInfo* table_info = meta_sync->add_tables_info();
Expand Down
2 changes: 1 addition & 1 deletion src/pika_repl_server_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ PikaReplServerThread::PikaReplServerThread(const std::set<std::string>& ips,
}

void PikaReplServerThread::ReplServerHandle::FdClosedHandle(int fd, const std::string& ip_port) const {
LOG(INFO) << "ServerThread close fd:" << fd << ", ip_port:" << ip_port;
LOG(INFO) << "ServerThread Close Slave Conn, fd: " << fd << ", ip_port: " << ip_port;
g_pika_server->DeleteSlave(fd);
g_pika_rm->ReplServerRemoveClientConn(fd);
}
19 changes: 10 additions & 9 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ Status SyncMasterPartition::AddSlaveNode(const std::string& ip, int port, int se
slave_ptr->SetLastSendTime(slash::NowMicros());
slave_ptr->SetLastRecvTime(slash::NowMicros());
slaves_.push_back(slave_ptr);
LOG(INFO) << "Add Slave Node Partition " << SyncPartitionInfo().ToString() << ", Master AddSlaveNode "<< ip << ":" << port;
LOG(INFO) << "Add Slave Node, partition: " << SyncPartitionInfo().ToString() << ", ip_port: "<< ip << ":" << port;
return Status::OK();
}

Expand All @@ -133,7 +133,8 @@ Status SyncMasterPartition::RemoveSlaveNode(const std::string& ip, int port) {
std::shared_ptr<SlaveNode> slave = slaves_[i];
if (ip == slave->Ip() && port == slave->Port()) {
slaves_.erase(slaves_.begin() + i);
LOG(INFO) << "Remove Slave Node Partiiton " << SyncPartitionInfo().ToString() << ", Master RemoveSlaveNode "<< ip << ":" << port;
LOG(INFO) << "Remove Slave Node, Partition: " << SyncPartitionInfo().ToString()
<< ", ip_port: "<< ip << ":" << port;
return Status::OK();
}
}
Expand Down Expand Up @@ -761,13 +762,13 @@ Status PikaReplicaManager::LostConnection(const std::string& ip, int port) {
for (auto& iter : sync_slave_partitions_) {
std::shared_ptr<SyncSlavePartition> partition = iter.second;
if (partition->MasterIp() == ip && partition->MasterPort() == port) {
LOG(INFO) << "Slave Delete Slave Partition " << partition->SyncPartitionInfo().ToString()
<< " master " << partition->MasterIp() << ":" << partition->MasterPort();
to_del.push_back(partition->SyncPartitionInfo());
}
}
for (auto& partition_info : to_del) {
sync_slave_partitions_.erase(partition_info);
for (auto& p_info : to_del) {
LOG(INFO) << "Remove Master Node, Partition: " << p_info.ToString()
<< ", ip_port:" << ip << ":" << port;
sync_slave_partitions_.erase(p_info);
}
return Status::OK();
}
Expand Down Expand Up @@ -857,8 +858,8 @@ Status PikaReplicaManager::AddSyncSlavePartition(const RmNode& node) {
}
sync_slave_partitions_[partition_info] =
std::make_shared<SyncSlavePartition>(node.TableName(), node.PartitionId(), node);
LOG(INFO) << "Slave Add salve partition " << partition_info.ToString() <<
", Master " << node.Ip() << ":" << node.Port();
LOG(INFO) << "Add Master Node, partition: " << partition_info.ToString() <<
", ip_port: " << node.Ip() << ":" << node.Port();
return Status::OK();
}

Expand Down Expand Up @@ -969,7 +970,7 @@ Status PikaReplicaManager::CheckSyncTimeout(uint64_t now) {
}
for (auto& partition_info : to_del) {
sync_slave_partitions_.erase(partition_info);
LOG(INFO) <<" Slave del slave partition success " << partition_info.ToString();
LOG(INFO) << "SyncTimeout Delete Master Node Success, partition: " << partition_info.ToString();
}
return Status::OK();
}
Expand Down
51 changes: 16 additions & 35 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ PikaServer::PikaServer() :
exit_(false),
have_scheduled_crontask_(false),
last_check_compact_time_({0, 0}),
sid_(0),
master_ip_(""),
master_port_(0),
repl_state_(PIKA_REPL_NO_CONNECT),
Expand Down Expand Up @@ -289,14 +288,6 @@ int PikaServer::master_port() {
return master_port_;
}

int64_t PikaServer::sid() {
return sid_;
}

void PikaServer::SetSid(int64_t sid) {
sid_ = sid;
}

int PikaServer::role() {
slash::RWLock(&state_protector_, false);
return role_;
Expand Down Expand Up @@ -603,14 +594,6 @@ Status PikaServer::DoSameThingEveryPartition(const TaskType& type) {
return Status::OK();
}


int64_t PikaServer::GenSid() {
// slave_mutex has been locked from exterior
int64_t sid = sid_;
sid_++;
return sid;
}

void PikaServer::BecomeMaster() {
slash::RWLock l(&state_protector_, true);
role_ |= PIKA_ROLE_MASTER;
Expand All @@ -624,7 +607,7 @@ void PikaServer::DeleteSlave(int fd) {
if (iter->conn_fd == fd) {
g_pika_rm->LostConnection(iter->ip, iter->port);
g_pika_rm->DropItemInWriteQueue(iter->ip, iter->port);
LOG(INFO) << "Delete Slave Success, " << iter->ip << ":" << iter->port;
LOG(INFO) << "Delete Slave Success, ip_port: " << iter->ip << ":" << iter->port;
slaves_.erase(iter);
break;
}
Expand Down Expand Up @@ -656,7 +639,7 @@ int32_t PikaServer::GetSlaveListString(std::string& slave_list_str) {
for (const auto& slave : slaves_) {
std::stringstream sync_status_stream;
sync_status_stream << " sync points:" << "\r\n";
tmp_stream << "slave" << index++ << ":ip=" << slave.ip << ",port=" << slave.port << ",sid=" << slave.sid << ",lag=";
tmp_stream << "slave" << index++ << ":ip=" << slave.ip << ",port=" << slave.port << ",lag=";
for (const auto& ts : slave.table_structs) {
for (size_t idx = 0; idx < ts.partition_num; ++idx) {
std::shared_ptr<Partition> partition = GetTablePartitionById(ts.table_name, idx);
Expand Down Expand Up @@ -685,26 +668,25 @@ int32_t PikaServer::GetSlaveListString(std::string& slave_list_str) {
return index;
}

// Try add Slave, return slave sid if success,
// return -1 when slave already exist
int64_t PikaServer::TryAddSlave(const std::string& ip, int64_t port, int fd,
const std::vector<TableStruct>& table_structs) {
// Try add Slave, return true if success,
// return false when slave already exist
bool PikaServer::TryAddSlave(const std::string& ip, int64_t port, int fd,
const std::vector<TableStruct>& table_structs) {
std::string ip_port = slash::IpPortString(ip, port);

slash::MutexLock l(&slave_mutex_);
std::vector<SlaveItem>::iterator iter = slaves_.begin();
while (iter != slaves_.end()) {
if (iter->ip_port == ip_port) {
LOG(INFO) << "Slave already exist, " << ip << ":" << port;
return -1;
LOG(WARNING) << "Slave Already Exist, ip_port: " << ip << ":" << port;
return false;
}
iter++;
}

// Not exist, so add new
LOG(INFO) << "Add new slave, " << ip << ":" << port;
LOG(INFO) << "Add New Slave, " << ip << ":" << port;
SlaveItem s;
s.sid = GenSid();
s.ip_port = ip_port;
s.ip = ip;
s.port = port;
Expand All @@ -713,7 +695,7 @@ int64_t PikaServer::TryAddSlave(const std::string& ip, int64_t port, int fd,
s.table_structs = table_structs;
gettimeofday(&s.create_time, NULL);
slaves_.push_back(s);
return s.sid;
return true;
}

void PikaServer::SyncError() {
Expand All @@ -732,7 +714,7 @@ void PikaServer::RemoveMaster() {
g_pika_rm->GetPikaReplClient()->Close(master_ip_, master_port_ + kPortShiftReplServer);
g_pika_rm->GetPikaReplClient()->DropWriteBinlogTask();
g_pika_rm->LostConnection(master_ip_, master_port_);
LOG(INFO) << "Remove Master " << master_ip_ << ":" << master_port_;
LOG(INFO) << "Remove Master Success, ip_port: " << master_ip_ << ":" << master_port_;
}

master_ip_ = "";
Expand Down Expand Up @@ -1176,13 +1158,12 @@ Status PikaServer::TriggerSendBinlogSync() {
}

Status PikaServer::SendMetaSyncRequest() {
// Sleep one second to avoid frequent try Meta Sync
// when the connection is closed
sleep(1);
Status status = g_pika_rm->GetPikaReplClient()->SendMetaSync();
last_meta_sync_timestamp_ = time(NULL);
slash::RWLock l(&state_protector_, true);
repl_state_ = PIKA_REPL_WAIT_META_SYNC_RESPONSE;
if (status.ok()) {
last_meta_sync_timestamp_ = time(NULL);
slash::RWLock l(&state_protector_, true);
repl_state_ = PIKA_REPL_WAIT_META_SYNC_RESPONSE;
}
return status;
}

Expand Down

0 comments on commit 7aea09d

Please sign in to comment.