From 35f5d5ec8437fd0160a4b609255d6b31e32781a0 Mon Sep 17 00:00:00 2001 From: Zhao Minghuan Date: Tue, 10 Sep 2019 17:44:51 +0800 Subject: [PATCH] Shrink critical section (#747) Update pink to prevent dead lock --- src/pika_rm.cc | 67 +++++++++++++++++++++++++++++----------------- src/pika_server.cc | 20 ++++++++++---- third/pink | 2 +- 3 files changed, 58 insertions(+), 31 deletions(-) diff --git a/src/pika_rm.cc b/src/pika_rm.cc index 2299858815..120ba4276b 100644 --- a/src/pika_rm.cc +++ b/src/pika_rm.cc @@ -525,7 +525,6 @@ Status SyncMasterPartition::CheckSyncTimeout(uint64_t now) { } for (auto& node : to_del) { for (size_t i = 0; i < slaves_.size(); ++i) { - LOG(INFO)<< SyncPartitionInfo().ToString() << " slave " << slaves_[i]->ToString(); if (node.Ip() == slaves_[i]->Ip() && node.Port() == slaves_[i]->Port()) { slaves_.erase(slaves_.begin() + i); LOG(WARNING) << SyncPartitionInfo().ToString() << " Master del Recv Timeout slave success " << node.ToString(); @@ -706,7 +705,9 @@ bool SyncWindow::Update(const SyncWinItem& start_item, const SyncWinItem& end_it } } if (start_pos == kBinlogReadWinSize || end_pos == kBinlogReadWinSize) { - LOG(WARNING) << " ack offset not found in binlog controller window"; + LOG(WARNING) << "Ack offset Start: " << start_item.ToString() << "End: " << end_item.ToString() << + " not found in binlog controller window." << + std::endl << "window status "<< std::endl << ToStringStatus(); return false; } for (size_t i = start_pos; i <= end_pos; ++i) { @@ -786,38 +787,54 @@ void PikaReplicaManager::ProduceWriteQueue(const std::string& ip, int port, cons } int PikaReplicaManager::ConsumeWriteQueue() { - int counter = 0; - slash::MutexLock l(&write_queue_mu_); std::vector to_delete; - for (auto& iter : write_queues_) { - std::queue& queue = iter.second; - for (int i = 0; i < kBinlogSendPacketNum; ++i) { - if (queue.empty()) { - break; - } - size_t batch_index = queue.size() > kBinlogSendBatchNum ? kBinlogSendBatchNum : queue.size(); - std::string ip; - int port = 0; - if (!slash::ParseIpPortString(iter.first, ip, port)) { - LOG(WARNING) << "Parse ip_port error " << iter.first; - continue; - } - std::vector to_send; - for (size_t i = 0; i < batch_index; ++i) { - to_send.push_back(queue.front()); - queue.pop(); - counter++; + std::unordered_map>> to_send_map; + int counter = 0; + { + slash::MutexLock l(&write_queue_mu_); + std::vector to_delete; + for (auto& iter : write_queues_) { + std::queue& queue = iter.second; + for (int i = 0; i < kBinlogSendPacketNum; ++i) { + if (queue.empty()) { + break; + } + size_t batch_index = queue.size() > kBinlogSendBatchNum ? kBinlogSendBatchNum : queue.size(); + std::vector to_send; + for (size_t i = 0; i < batch_index; ++i) { + to_send.push_back(queue.front()); + queue.pop(); + counter++; + } + to_send_map[iter.first].push_back(std::move(to_send)); } + } + } + + for (auto& iter : to_send_map) { + std::string ip; + int port = 0; + if (!slash::ParseIpPortString(iter.first, ip, port)) { + LOG(WARNING) << "Parse ip_port error " << iter.first; + continue; + } + for (auto& to_send : iter.second) { Status s = pika_repl_server_->SendSlaveBinlogChips(ip, port, to_send); if (!s.ok()) { LOG(WARNING) << "send binlog to " << ip << ":" << port << " failed, " << s.ToString(); to_delete.push_back(iter.first); - break; + continue; } } } - for (auto& del_queue : to_delete) { - write_queues_.erase(del_queue); + + if (!to_delete.empty()) { + { + slash::MutexLock l(&write_queue_mu_); + for (auto& del_queue : to_delete) { + write_queues_.erase(del_queue); + } + } } return counter; } diff --git a/src/pika_server.cc b/src/pika_server.cc index b92e701b02..6cde0b3d68 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -629,11 +629,18 @@ void PikaServer::BecomeMaster() { } void PikaServer::DeleteSlave(int fd) { + std::string ip; + int port = -1; + bool is_find = false; + int slave_num = -1; { slash::MutexLock l(&slave_mutex_); std::vector::iterator iter = slaves_.begin(); while (iter != slaves_.end()) { if (iter->conn_fd == fd) { + ip = iter->ip; + port = iter->port; + is_find = true; g_pika_rm->LostConnection(iter->ip, iter->port); g_pika_rm->DropItemInWriteQueue(iter->ip, iter->port); LOG(INFO) << "Delete Slave Success, ip_port: " << iter->ip << ":" << iter->port; @@ -642,14 +649,17 @@ void PikaServer::DeleteSlave(int fd) { } iter++; } + slave_num = slaves_.size(); } - int slave_num = slaves_.size(); - { + if (is_find) { + g_pika_rm->LostConnection(ip, port); + g_pika_rm->DropItemInWriteQueue(ip, port); + } + + if (slave_num == 0) { slash::RWLock l(&state_protector_, true); - if (slave_num == 0) { - role_ &= ~PIKA_ROLE_MASTER; - } + role_ &= ~PIKA_ROLE_MASTER; } } diff --git a/third/pink b/third/pink index 3a828a2c22..8c4e09c23f 160000 --- a/third/pink +++ b/third/pink @@ -1 +1 @@ -Subproject commit 3a828a2c22a7efa1544fb6d7393052a84ac64424 +Subproject commit 8c4e09c23f2e820a1a9341fff14eee5e102ab2bd