Skip to content

Commit

Permalink
Shrink critical section (OpenAtomFoundation#747)
Browse files Browse the repository at this point in the history
Update pink to prevent dead lock
  • Loading branch information
whoiami authored Sep 10, 2019
1 parent 285703e commit 35f5d5e
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 31 deletions.
67 changes: 42 additions & 25 deletions src/pika_rm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,6 @@ Status SyncMasterPartition::CheckSyncTimeout(uint64_t now) {
}
for (auto& node : to_del) {
for (size_t i = 0; i < slaves_.size(); ++i) {
LOG(INFO)<< SyncPartitionInfo().ToString() << " slave " << slaves_[i]->ToString();
if (node.Ip() == slaves_[i]->Ip() && node.Port() == slaves_[i]->Port()) {
slaves_.erase(slaves_.begin() + i);
LOG(WARNING) << SyncPartitionInfo().ToString() << " Master del Recv Timeout slave success " << node.ToString();
Expand Down Expand Up @@ -706,7 +705,9 @@ bool SyncWindow::Update(const SyncWinItem& start_item, const SyncWinItem& end_it
}
}
if (start_pos == kBinlogReadWinSize || end_pos == kBinlogReadWinSize) {
LOG(WARNING) << " ack offset not found in binlog controller window";
LOG(WARNING) << "Ack offset Start: " << start_item.ToString() << "End: " << end_item.ToString() <<
" not found in binlog controller window." <<
std::endl << "window status "<< std::endl << ToStringStatus();
return false;
}
for (size_t i = start_pos; i <= end_pos; ++i) {
Expand Down Expand Up @@ -786,38 +787,54 @@ void PikaReplicaManager::ProduceWriteQueue(const std::string& ip, int port, cons
}

int PikaReplicaManager::ConsumeWriteQueue() {
int counter = 0;
slash::MutexLock l(&write_queue_mu_);
std::vector<std::string> to_delete;
for (auto& iter : write_queues_) {
std::queue<WriteTask>& queue = iter.second;
for (int i = 0; i < kBinlogSendPacketNum; ++i) {
if (queue.empty()) {
break;
}
size_t batch_index = queue.size() > kBinlogSendBatchNum ? kBinlogSendBatchNum : queue.size();
std::string ip;
int port = 0;
if (!slash::ParseIpPortString(iter.first, ip, port)) {
LOG(WARNING) << "Parse ip_port error " << iter.first;
continue;
}
std::vector<WriteTask> to_send;
for (size_t i = 0; i < batch_index; ++i) {
to_send.push_back(queue.front());
queue.pop();
counter++;
std::unordered_map<std::string, std::vector<std::vector<WriteTask>>> to_send_map;
int counter = 0;
{
slash::MutexLock l(&write_queue_mu_);
std::vector<std::string> to_delete;
for (auto& iter : write_queues_) {
std::queue<WriteTask>& queue = iter.second;
for (int i = 0; i < kBinlogSendPacketNum; ++i) {
if (queue.empty()) {
break;
}
size_t batch_index = queue.size() > kBinlogSendBatchNum ? kBinlogSendBatchNum : queue.size();
std::vector<WriteTask> to_send;
for (size_t i = 0; i < batch_index; ++i) {
to_send.push_back(queue.front());
queue.pop();
counter++;
}
to_send_map[iter.first].push_back(std::move(to_send));
}
}
}

for (auto& iter : to_send_map) {
std::string ip;
int port = 0;
if (!slash::ParseIpPortString(iter.first, ip, port)) {
LOG(WARNING) << "Parse ip_port error " << iter.first;
continue;
}
for (auto& to_send : iter.second) {
Status s = pika_repl_server_->SendSlaveBinlogChips(ip, port, to_send);
if (!s.ok()) {
LOG(WARNING) << "send binlog to " << ip << ":" << port << " failed, " << s.ToString();
to_delete.push_back(iter.first);
break;
continue;
}
}
}
for (auto& del_queue : to_delete) {
write_queues_.erase(del_queue);

if (!to_delete.empty()) {
{
slash::MutexLock l(&write_queue_mu_);
for (auto& del_queue : to_delete) {
write_queues_.erase(del_queue);
}
}
}
return counter;
}
Expand Down
20 changes: 15 additions & 5 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -629,11 +629,18 @@ void PikaServer::BecomeMaster() {
}

void PikaServer::DeleteSlave(int fd) {
std::string ip;
int port = -1;
bool is_find = false;
int slave_num = -1;
{
slash::MutexLock l(&slave_mutex_);
std::vector<SlaveItem>::iterator iter = slaves_.begin();
while (iter != slaves_.end()) {
if (iter->conn_fd == fd) {
ip = iter->ip;
port = iter->port;
is_find = true;
g_pika_rm->LostConnection(iter->ip, iter->port);
g_pika_rm->DropItemInWriteQueue(iter->ip, iter->port);
LOG(INFO) << "Delete Slave Success, ip_port: " << iter->ip << ":" << iter->port;
Expand All @@ -642,14 +649,17 @@ void PikaServer::DeleteSlave(int fd) {
}
iter++;
}
slave_num = slaves_.size();
}

int slave_num = slaves_.size();
{
if (is_find) {
g_pika_rm->LostConnection(ip, port);
g_pika_rm->DropItemInWriteQueue(ip, port);
}

if (slave_num == 0) {
slash::RWLock l(&state_protector_, true);
if (slave_num == 0) {
role_ &= ~PIKA_ROLE_MASTER;
}
role_ &= ~PIKA_ROLE_MASTER;
}
}

Expand Down
2 changes: 1 addition & 1 deletion third/pink

0 comments on commit 35f5d5e

Please sign in to comment.