Skip to content

Commit

Permalink
Slaveof Force command will rebuild slave sync partitions (OpenAtomFou…
Browse files Browse the repository at this point in the history
…ndation#586)

* Signal Auxiliary thread when master node received binlog sync resp

* Slaveof Force command will rebuild slave sync partitions
  • Loading branch information
whoiami authored and Axlgrep committed May 15, 2019
1 parent fa9d686 commit 06e123e
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 73 deletions.
10 changes: 2 additions & 8 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ class PikaReplicaManager {
Status SetMasterLastRecvTime(const RmNode& slave, uint64_t time);
Status SetSlaveLastRecvTime(const RmNode& slave, uint64_t time);

Status RebuildPartition();

Status CheckSyncTimeout(uint64_t now);

// following funcs invoked by master partition only
Expand Down Expand Up @@ -227,14 +229,6 @@ class PikaReplicaManager {

private:
void InitPartition();
Status AddSlave(const RmNode& slave);
Status RecordNodePartition(const RmNode& slave);
Status RemoveSlave(const RmNode& slave);
Status EraseNodePartition(const RmNode& slave);

slash::Mutex node_partitions_mu_;
// used to manage peer slave node to partition map
std::unordered_map<std::string, std::vector<RmNode>> node_partitions_;

pthread_rwlock_t partitions_rw_;
std::unordered_map<PartitionInfo, std::shared_ptr<SyncMasterPartition>, hash_partition_info> sync_master_partitions_;
Expand Down
83 changes: 18 additions & 65 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,18 @@ void PikaReplicaManager::InitPartition() {
}
}

Status PikaReplicaManager::RebuildPartition() {
slash::RWLock l(&partitions_rw_, true);
if (!sync_slave_partitions_.empty()) {
return Status::Corruption("Slave partition is NOT empty");
}
sync_master_partitions_.clear();
InitPartition();
LOG(INFO) << "Rebuild Sync Partition Success! " <<
"Rebuilded partition size " << sync_master_partitions_.size();
return Status::OK();
}

void PikaReplicaManager::ProduceWriteQueue(const std::string& ip, int port, const std::vector<WriteTask>& tasks) {
slash::MutexLock l(&write_queue_mu_);
std::string index = ip + ":" + std::to_string(port);
Expand Down Expand Up @@ -644,18 +656,6 @@ Status PikaReplicaManager::GetSyncBinlogStatus(const RmNode& slave, BinlogOffset
}

Status PikaReplicaManager::AddPartitionSlave(const RmNode& slave) {
Status s = AddSlave(slave);
if (!s.ok()) {
return s;
}
s = RecordNodePartition(slave);
if (!s.ok()) {
return s;
}
return Status::OK();
}

Status PikaReplicaManager::AddSlave(const RmNode& slave) {
slash::RWLock l(&partitions_rw_, false);
if (sync_master_partitions_.find(slave.NodePartitionInfo()) == sync_master_partitions_.end()) {
return Status::NotFound(slave.ToString() + " not found");
Expand All @@ -672,32 +672,7 @@ Status PikaReplicaManager::AddSlave(const RmNode& slave) {
return Status::OK();
}

Status PikaReplicaManager::RecordNodePartition(const RmNode& slave) {
std::string index = slave.Ip() + ":" + std::to_string(slave.Port());
slash::MutexLock l(&node_partitions_mu_);
std::vector<RmNode>& partitions = node_partitions_[index];
for (size_t i = 0; i < partitions.size(); ++i) {
if (slave == partitions[i]) {
return Status::OK();
}
}
node_partitions_[index].push_back(slave);
return Status::OK();
}

Status PikaReplicaManager::RemovePartitionSlave(const RmNode& slave) {
Status s = RemoveSlave(slave);
if (!s.ok()) {
return s;
}
s = EraseNodePartition(slave);
if (!s.ok()) {
return s;
}
return Status::OK();
}

Status PikaReplicaManager::RemoveSlave(const RmNode& slave) {
slash::RWLock l(&partitions_rw_, false);
if (sync_master_partitions_.find(slave.NodePartitionInfo()) == sync_master_partitions_.end()) {
return Status::NotFound(slave.ToString() + " not found");
Expand All @@ -710,39 +685,17 @@ Status PikaReplicaManager::RemoveSlave(const RmNode& slave) {
return Status::OK();
}

Status PikaReplicaManager::EraseNodePartition(const RmNode& slave) {
std::string index = slave.Ip() + ":" + std::to_string(slave.Port());
slash::MutexLock l(&node_partitions_mu_);
if (node_partitions_.find(index) == node_partitions_.end()) {
return Status::NotFound(slave.ToString());
}
std::vector<RmNode>& rm_nodes = node_partitions_[index];
for (size_t i = 0; i < rm_nodes.size(); ++i) {
if (rm_nodes[i] == slave) {
rm_nodes.erase(rm_nodes.begin() + i);
return Status::OK();
}
}
return Status::NotFound("slave " + slave.ToString());
}

Status PikaReplicaManager::LostConnection(const std::string& ip, int port) {
std::string index = ip + ":" + std::to_string(port);
slash::MutexLock l(&node_partitions_mu_);
// if masterpartition owns ip+port slave
if (node_partitions_.find(index) != node_partitions_.end()) {
std::vector<RmNode>& rm_nodes = node_partitions_[index];
for (auto& rm_node : rm_nodes) {
Status s = RemoveSlave(rm_node);
if (!s.ok()) {
return s;
}
slash::RWLock l_part(&partitions_rw_, true);
for (auto& iter : sync_master_partitions_) {
std::shared_ptr<SyncMasterPartition> partition = iter.second;
Status s = partition->RemoveSlaveNode(ip, port);
if (!s.ok() && !s.IsNotFound()) {
LOG(WARNING) << "Lost Connection failed " << s.ToString();
}
node_partitions_.erase(index);
}

std::vector<PartitionInfo> to_del;
slash::RWLock l_part(&partitions_rw_, true);
for (auto& iter : sync_slave_partitions_) {
std::shared_ptr<SyncSlavePartition> partition = iter.second;
if (partition->MasterIp() == ip && partition->MasterPort() == port) {
Expand Down
5 changes: 5 additions & 0 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,11 @@ bool PikaServer::RebuildTableStruct(const std::vector<TableStruct>& table_struct
// to the pika.conf file
g_pika_conf->SetTableStructs(table_structs);
InitTableStruct();
Status s = g_pika_rm->RebuildPartition();
if (!s.ok()) {
LOG(WARNING) << s.ToString();
return false;
}
return true;
}

Expand Down

0 comments on commit 06e123e

Please sign in to comment.