From 263ef9505fa2f77d22cf7d72a216d9d17b894ee0 Mon Sep 17 00:00:00 2001 From: whoiami Date: Fri, 8 Mar 2019 09:52:10 +0800 Subject: [PATCH] Multi parts (#526) * Update pink support DestConnectFailedHandle func to handle connect failed situation * Update Pink * Add Async Repl Server Binlog Process 1. Add Thread array for writing binlog and writing DB 2. Writing same partition binlog by a preassigned thread in the thread array which assigned by hashing its table+partition 3. Writing same key to DB by a preassigned thread in the thread array which assigned by hashing by its key 4. Add write queue in repl client, all binlog chips will be forwared into that queue first and scheduled to send from the queue later. * Add Repl Client Thread Pool 1. Build a thread pool to process send response. Pb response triggers reading binlog and hadnling binlog and that will block a single thread. Using thread pool instead of single thread will ease that situation. 2. Add rw lock for binlog ctl array in repl clent 3. Add mutex lock for every binlog ctl * Change TrySync MetaSync into async * Debug Binlog Syncing Process 1. Periodically consume write queue to sync binlog to slave using auxiliary thread 2. Trigger sending binlog process using auxiliary thread * Fix Rebase --- include/pika_auxiliary_thread.h | 10 +- include/pika_repl_bgworker.h | 72 +++++++ include/pika_repl_client.h | 92 ++++++++- include/pika_repl_client_conn.h | 19 +- include/pika_repl_client_thread.h | 4 + include/pika_repl_server.h | 43 +++- include/pika_repl_server_conn.h | 19 +- include/pika_server.h | 55 ++++- src/pika_auxiliary_thread.cc | 16 +- src/pika_inner_message.proto | 4 +- src/pika_repl_bgworker.cc | 330 ++++++++++++++++++++++++++++++ src/pika_repl_client.cc | 234 ++++++++++++++------- src/pika_repl_client_conn.cc | 111 +++++++--- src/pika_repl_server.cc | 43 +++- src/pika_repl_server_conn.cc | 186 +++-------------- src/pika_server.cc | 65 +++++- third/pink | 2 +- 17 files changed, 1000 insertions(+), 305 deletions(-) create mode 100644 include/pika_repl_bgworker.h create mode 100644 src/pika_repl_bgworker.cc diff --git a/include/pika_auxiliary_thread.h b/include/pika_auxiliary_thread.h index 49f5862198..94a312aef0 100644 --- a/include/pika_auxiliary_thread.h +++ b/include/pika_auxiliary_thread.h @@ -8,11 +8,17 @@ #include "pink/include/pink_thread.h" +#include "slash/include/slash_mutex.h" + class PikaAuxiliaryThread : public pink::Thread { public: - PikaAuxiliaryThread() {} + PikaAuxiliaryThread() : + mu_(), + cv_(&mu_) { + } virtual ~PikaAuxiliaryThread(); - + slash::Mutex mu_; + slash::CondVar cv_; private: virtual void* ThreadMain(); void RunEveryPartitionStateMachine(); diff --git a/include/pika_repl_bgworker.h b/include/pika_repl_bgworker.h new file mode 100644 index 0000000000..dc18545c30 --- /dev/null +++ b/include/pika_repl_bgworker.h @@ -0,0 +1,72 @@ +// Copyright (c) 2019-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef PIKA_REPL_BGWROKER_H_ +#define PIKA_REPL_BGWROKER_H_ + +#include +#include + +#include "pink/include/bg_thread.h" +#include "pink/include/pb_conn.h" + +#include "src/pika_inner_message.pb.h" + +#include "include/pika_command.h" +#include "include/pika_binlog_transverter.h" + +class PikaReplBgWorker { + public: + explicit PikaReplBgWorker(int queue_size); + void ScheduleRequest(const std::shared_ptr req, + std::shared_ptr conn, void* req_private_data); + void ScheduleWriteDb(PikaCmdArgsType* argv, BinlogItem* binlog_item, + const std::string table_name, uint32_t partition_id); + int StartThread(); + void QueueSize(int* size_a, int* size_b) { + bg_thread_.QueueSize(size_a, size_b); + } + + static void HandleMetaSyncRequest(void* arg); + static void HandleBinlogSyncRequest(void* arg); + static void HandleTrySyncRequest(void* arg); + static void HandleWriteDb(void* arg); + + BinlogItem binlog_item_; + pink::RedisParser redis_parser_; + std::string ip_port_; + std::string table_name_; + uint32_t partition_id_; + + private: + pink::BGThread bg_thread_; + + static int HandleWriteBinlog(pink::RedisParser* parser, const pink::RedisCmdArgsType& argv); + + struct ReplBgWorkerArg { + const std::shared_ptr req; + std::shared_ptr conn; + void* req_private_data; + PikaReplBgWorker* worker; + ReplBgWorkerArg(const std::shared_ptr _req, std::shared_ptr _conn, void* _req_private_data, PikaReplBgWorker* _worker) : req(_req), conn(_conn), req_private_data(_req_private_data), worker(_worker) { + } + }; + + struct WriteDbBgArg { + PikaCmdArgsType *argv; + BinlogItem* binlog_item; + std::string table_name; + uint32_t partition_id; + WriteDbBgArg(PikaCmdArgsType* _argv, BinlogItem* _binlog_item, const std::string _table_name, uint32_t _partition_id) + : argv(_argv), binlog_item(_binlog_item), table_name(_table_name), partition_id(_partition_id) { + } + ~WriteDbBgArg() { + delete argv; + delete binlog_item; + } + }; +}; + +#endif // PIKA_REPL_BGWROKER_H_ diff --git a/include/pika_repl_client.h b/include/pika_repl_client.h index bbd4888415..a6a8cfba7d 100644 --- a/include/pika_repl_client.h +++ b/include/pika_repl_client.h @@ -18,6 +18,9 @@ #include "include/pika_binlog_reader.h" #include "include/pika_repl_client_thread.h" +#include "pink/include/thread_pool.h" +#include "src/pika_inner_message.pb.h" + #define kBinlogSyncBatchNum 10 using slash::Status; @@ -29,6 +32,47 @@ struct RmNode { int port_; RmNode(const std::string& table, int partition, const std::string& ip, int port) : table_(table), partition_(partition), ip_(ip), port_(port) { } + RmNode(const RmNode& node) { + table_ = node.table_; + partition_ = node.partition_; + ip_ = node.ip_; + port_ = node.port_; + } + bool operator <(const RmNode& other) const { + if (table_ < other.table_) { + return true; + } else if (partition_ < other.partition_) { + return true; + } else if (ip_ < other.ip_) { + return true; + } else if (port_ < other.port_) { + return true; + } + return false; + } + std::string ToString() const { + return table_ + "_" + std::to_string(partition_) + "_" + ip_ + ":" + std::to_string(port_); + } +}; + +struct BinlogChip { + uint32_t file_num_; + uint64_t offset_; + std::string binlog_; + BinlogChip(uint32_t file_num, uint64_t offset, std::string binlog) :file_num_(file_num), offset_(offset), binlog_(binlog) { + } + BinlogChip(const BinlogChip& binlog_chip) { + file_num_ = binlog_chip.file_num_; + offset_ = binlog_chip.offset_; + binlog_ = binlog_chip.binlog_; + } +}; + +struct WriteTask { + struct RmNode rm_node_; + struct BinlogChip binlog_chip_; + WriteTask(RmNode rm_node, BinlogChip binlog_chip) : rm_node_(rm_node), binlog_chip_(binlog_chip) { + } }; class PikaReplClient { @@ -40,25 +84,61 @@ class PikaReplClient { int Start(); Status AddBinlogReader(const RmNode& slave, std::shared_ptr logger, uint32_t filenum, uint64_t offset); Status RemoveBinlogReader(const RmNode& slave); - void RunStateMachine(const RmNode& slave); + bool NeedToSendBinlog(const RmNode& slave); Status SendMetaSync(); Status SendPartitionTrySync(const std::string& table_name, uint32_t partition_id, const BinlogOffset& boffset); + Status SendBinlogSync(const RmNode& slave); + + Status TriggerSendBinlogSync(); + + int ConsumeWriteQueue(); + + void Schedule(pink::TaskFunc func, void* arg){ + client_tp_->Schedule(func, arg); + } + + bool SetAckInfo(const RmNode& slave, uint32_t ack_file_num, uint64_t ack_offset, uint64_t active_time); + bool GetAckInfo(const RmNode& slave, uint32_t* act_file_num, uint64_t* ack_offset, uint64_t* active_time); private: PikaBinlogReader* NewPikaBinlogReader(std::shared_ptr logger, uint32_t filenum, uint64_t offset); - Status TrySendSyncBinlog(const RmNode& slave); - void BuildBinlogPb(const RmNode& slave, const std::string& msg, uint32_t filenum, uint64_t offset, InnerMessage::InnerRequest& request); + void ProduceWriteQueue(WriteTask& task); - Status BuildBinlogMsgFromFile(const RmNode& slave, std::string* scratch, uint32_t* filenum, uint64_t* offset); + void BuildBinlogPb(const RmNode& slave, const std::string& msg, uint32_t filenum, uint64_t offset, InnerMessage::InnerRequest& request); PikaReplClientThread* client_thread_; - // keys of this map: table_partition_slaveip:port - std::map slave_binlog_readers_; + + + struct BinlogSyncCtl { + slash::Mutex ctl_mu_; + PikaBinlogReader* reader_; + uint32_t ack_file_num_; + uint64_t ack_offset_; + uint64_t active_time_; + + BinlogSyncCtl(PikaBinlogReader* reader, uint32_t ack_file_num, uint64_t ack_offset, uint64_t active_time) + : reader_(reader), ack_file_num_(ack_file_num), ack_offset_(ack_offset), active_time_(active_time) { + } + ~BinlogSyncCtl() { + if (reader_) { + delete reader_; + } + } + }; + + pthread_rwlock_t binlog_ctl_rw_; + std::map binlog_ctl_; + + slash::Mutex write_queue_mu_; + // every host owns a queue + std::unordered_map> write_queues_; // ip+port, queue + + pink::ThreadPool* client_tp_; }; #endif diff --git a/include/pika_repl_client_conn.h b/include/pika_repl_client_conn.h index 4a4d4401d5..a3bcdd7294 100644 --- a/include/pika_repl_client_conn.h +++ b/include/pika_repl_client_conn.h @@ -8,6 +8,8 @@ #include "pink/include/pb_conn.h" +#include + #include "include/pika_conf.h" #include "src/pika_inner_message.pb.h" @@ -15,14 +17,19 @@ class PikaReplClientConn: public pink::PbConn { public: PikaReplClientConn(int fd, const std::string& ip_port, pink::Thread *thread, void* worker_specific_data, pink::PinkEpoll* epoll); virtual ~PikaReplClientConn() = default; - - static void DoReplClientTask(void* arg); + static void HandleBinlogSyncResponse(void* arg); + static void HandleMetaSyncResponse(void* arg); + static void HandleTrySyncResponse(void* arg); + static bool IsTableStructConsistent(const std::vector& current_tables, + const std::vector& expect_tables); int DealMessage() override; private: - bool IsTableStructConsistent(const std::vector& current_tables, - const std::vector& expect_tables); - int HandleMetaSyncResponse(const InnerMessage::InnerResponse& response); - int HandleTrySyncResponse(const InnerMessage::InnerResponse& response); + struct ReplRespArg { + std::shared_ptr resp; + std::shared_ptr conn; + ReplRespArg(std::shared_ptr _resp, std::shared_ptr _conn) : resp(_resp), conn(_conn) { + } + }; }; #endif diff --git a/include/pika_repl_client_thread.h b/include/pika_repl_client_thread.h index 49796c9d61..4523d85fd8 100644 --- a/include/pika_repl_client_thread.h +++ b/include/pika_repl_client_thread.h @@ -39,6 +39,10 @@ class ReplClientHandle : public pink::ClientHandle { // int DeleteWorkerSpecificData(void* data) const override { // return 0; // } + // void DestConnectFailedHandle(std::string ip_port, std::string reason) const override { + // std::cout << "ip " << ip_port << " " << reason << std::endl; + // } + }; class PikaReplClientThread : public pink::ClientThread { diff --git a/include/pika_repl_server.h b/include/pika_repl_server.h index 97491f84d0..cebd15030a 100644 --- a/include/pika_repl_server.h +++ b/include/pika_repl_server.h @@ -8,15 +8,46 @@ #include "include/pika_repl_server_thread.h" +#include + +#include "include/pika_repl_bgworker.h" +#include "include/pika_command.h" +#include "pika_binlog_transverter.h" + class PikaReplServer { - public: - PikaReplServer(const std::set& ips, int port, int cron_interval); - ~PikaReplServer(); + public: + PikaReplServer(const std::set& ips, int port, int cron_interval); + ~PikaReplServer(); + int Start(); + + void ScheduleBinlogSyncTask(std::string table_partition, + const std::shared_ptr req, + std::shared_ptr conn, + void* req_private_data); + + void ScheduleMetaSyncTask(const std::shared_ptr req, + std::shared_ptr conn, + void* req_private_data); + + void ScheduleTrySyncTask(const std::shared_ptr req, + std::shared_ptr conn, + void* req_private_data); - int Start(); + void ScheduleDbTask(const std::string& key, + PikaCmdArgsType* argv, + BinlogItem* binlog_item, + const std::string& table_name, + uint32_t partition_id); + private: + size_t GetHashIndex(std::string key, bool upper_half); + void UpdateNextAvail() { + next_avail_ = (next_avail_ + 1) % bg_workers_.size(); + } - private: - PikaReplServerThread* pika_repl_server_thread_; + PikaReplServerThread* pika_repl_server_thread_; + std::vector bg_workers_; + int next_avail_; + std::hash str_hash; }; #endif diff --git a/include/pika_repl_server_conn.h b/include/pika_repl_server_conn.h index cfe6e690d2..b23d9b5a2f 100644 --- a/include/pika_repl_server_conn.h +++ b/include/pika_repl_server_conn.h @@ -9,34 +9,19 @@ #include #include "pink/include/pb_conn.h" -#include "pink/include/redis_parser.h" #include "pink/include/pink_thread.h" #include "src/pika_inner_message.pb.h" -#include "include/pika_binlog_transverter.h" - -class PikaReplServerThread; - class PikaReplServerConn: public pink::PbConn { public: PikaReplServerConn(int fd, std::string ip_port, pink::Thread* thread, void* worker_specific_data, pink::PinkEpoll* epoll); virtual ~PikaReplServerConn(); int DealMessage(); - bool ProcessAuth(const pink::RedisCmdArgsType& argv); - bool ProcessBinlogData(const pink::RedisCmdArgsType& argv, const BinlogItem& binlog_item); - - BinlogItem binlog_item_; private: - static int ParserDealMessage(pink::RedisParser* parser, const pink::RedisCmdArgsType& argv); - int HandleMetaSyncRequest(const InnerMessage::InnerRequest& req); - int HandleTrySync(const InnerMessage::InnerRequest& req); - int HandleBinlogSync(const InnerMessage::InnerRequest& req); - - pink::RedisParser redis_parser_; - - PikaReplServerThread* binlog_receiver_; + // dispatch binlog by its table_name + partition + void DispatchBinlogReq(const std::shared_ptr req); }; #endif // INCLUDE_PIKA_REPL_SERVER_CONN_H_ diff --git a/include/pika_server.h b/include/pika_server.h index b4d27f8c3f..2472e2a419 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -426,12 +426,6 @@ class PikaServer { int PubSubNumPat(); - /* - * Communication use - */ - Status SendMetaSyncRequest(); - Status SendPartitionTrySyncRequest(std::shared_ptr partition); - /* * Monitor used */ @@ -478,6 +472,55 @@ class PikaServer { void SetDispatchQueueLimit(int queue_limit); + /* + * for RM {repl client, repl server} use + */ + void ScheduleReplBinlogSyncTask(std::string table_partition, + const std::shared_ptr req, + std::shared_ptr conn, + void* req_private_data); + + void ScheduleReplMetaSyncTask(const std::shared_ptr req, + std::shared_ptr conn, + void* req_private_data); + + void ScheduleReplTrySyncTask(const std::shared_ptr req, + std::shared_ptr conn, + void* req_private_data); + + void ScheduleReplDbTask(const std::string &key, + PikaCmdArgsType* argv, BinlogItem* binlog_item, + const std::string& table_name, uint32_t partition_id); + + bool SetBinlogAckInfo(const std::string& table, uint32_t partition, const std::string& ip, int port, + uint32_t ack_file_num, uint64_t ack_offset, uint64_t active_time); + + bool GetBinlogAckInfo(const std::string& table, uint32_t partition, const std::string& ip, int port, + uint32_t* ack_file_num, uint64_t* ack_offset, uint64_t* active_time); + + /* + * Communication use + */ + Status SendMetaSyncRequest(); + Status SendPartitionTrySyncRequest(std::shared_ptr partition); + + Status SendBinlogSyncRequest(const std::string& table, uint32_t partition, const std::string& ip, int port); + + // schedule repl_client thread pool + void ScheduleReplCliTask(pink::TaskFunc func, void*arg); + + /* + * For repl client write queue consumer + * return value: consumed binlog task + */ + int SendToPeer(); + + /* + * Used to trigger binglog sync process + */ + Status TriggerSendBinlogSync(); + + void SignalAuxiliary(); private: std::atomic exit_; std::atomic binlog_io_error_; diff --git a/src/pika_auxiliary_thread.cc b/src/pika_auxiliary_thread.cc index b94bb0756f..d45c36d4c3 100644 --- a/src/pika_auxiliary_thread.cc +++ b/src/pika_auxiliary_thread.cc @@ -17,7 +17,6 @@ PikaAuxiliaryThread::~PikaAuxiliaryThread() { void* PikaAuxiliaryThread::ThreadMain() { while (!should_stop()) { - sleep(1); if (g_pika_server->ShouldMetaSync()) { g_pika_server->SendMetaSyncRequest(); LOG(INFO) << "Send meta sync request finish"; @@ -32,6 +31,21 @@ void* PikaAuxiliaryThread::ThreadMain() { if (g_pika_server->ShouldTrySyncPartition()) { RunEveryPartitionStateMachine(); } + // TODO(whoiami) timeout + Status s = g_pika_server->TriggerSendBinlogSync(); + if (!s.ok()) { + LOG(WARNING) << s.ToString(); + } + // send to peer + int res = g_pika_server->SendToPeer(); + if (!res) { + // sleep 100 ms + mu_.Lock(); + cv_.TimedWait(100); + mu_.Unlock(); + } else { + //LOG_EVERY_N(INFO, 100) << "Consume binlog number " << res; + } } return NULL; } diff --git a/src/pika_inner_message.proto b/src/pika_inner_message.proto index 80228400e7..be9bd9aa5c 100644 --- a/src/pika_inner_message.proto +++ b/src/pika_inner_message.proto @@ -95,7 +95,9 @@ message InnerResponse { // slave to master message BinlogSync { - required BinlogOffset binlog_offset = 1; + required string table_name = 1; + required uint32 partition_id = 2; + required BinlogOffset binlog_offset = 3; } // slave to master diff --git a/src/pika_repl_bgworker.cc b/src/pika_repl_bgworker.cc new file mode 100644 index 0000000000..d33d9dc6e4 --- /dev/null +++ b/src/pika_repl_bgworker.cc @@ -0,0 +1,330 @@ +// Copyright (c) 2019-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "include/pika_repl_bgworker.h" + +#include + +#include "include/pika_conf.h" +#include "include/pika_cmd_table_manager.h" +#include "include/pika_server.h" + +extern PikaServer* g_pika_server; +extern PikaConf* g_pika_conf; +extern PikaCmdTableManager* g_pika_cmd_table_manager; + +PikaReplBgWorker::PikaReplBgWorker(int queue_size) + : bg_thread_(queue_size) { + bg_thread_.set_thread_name("ReplBgWorker"); + pink::RedisParserSettings settings; + settings.DealMessage = &(PikaReplBgWorker::HandleWriteBinlog); + redis_parser_.RedisParserInit(REDIS_PARSER_REQUEST, settings); + redis_parser_.data = this; + table_name_ = g_pika_conf->default_table(); + partition_id_ = 0; +} + +int PikaReplBgWorker::StartThread() { + return bg_thread_.StartThread(); +} + +void PikaReplBgWorker::ScheduleRequest(const std::shared_ptr req, + std::shared_ptr conn, void* req_private_data) { + ReplBgWorkerArg* arg = new ReplBgWorkerArg(req, conn, req_private_data, this); + switch (req->type()) { + case InnerMessage::kMetaSync: + bg_thread_.Schedule(&PikaReplBgWorker::HandleMetaSyncRequest, static_cast(arg)); + break; + case InnerMessage::kTrySync: + bg_thread_.Schedule(&PikaReplBgWorker::HandleTrySyncRequest, static_cast(arg)); + break; + case InnerMessage::kBinlogSync: + bg_thread_.Schedule(&PikaReplBgWorker::HandleBinlogSyncRequest, static_cast(arg)); + break; + default: + break; + } +} + +void PikaReplBgWorker::ScheduleWriteDb(PikaCmdArgsType* argv, BinlogItem* binlog_item, + const std::string table_name, uint32_t partition_id) { + WriteDbBgArg* arg = new WriteDbBgArg(argv, binlog_item, table_name, partition_id); + bg_thread_.Schedule(&PikaReplBgWorker::HandleWriteDb, static_cast(arg)); +} + +void PikaReplBgWorker::HandleMetaSyncRequest(void* arg) { + ReplBgWorkerArg* bg_worker_arg = static_cast(arg); + const std::shared_ptr req = bg_worker_arg->req; + std::shared_ptr conn = bg_worker_arg->conn; + + std::vector table_structs = g_pika_conf->table_structs(); + InnerMessage::InnerResponse response; + response.set_code(InnerMessage::kOk); + response.set_type(InnerMessage::kMetaSync); + InnerMessage::InnerResponse_MetaSync* meta_sync = response.mutable_meta_sync(); + meta_sync->set_classic_mode(g_pika_conf->classic_mode()); + for (const auto& table_struct : table_structs) { + InnerMessage::InnerResponse_MetaSync_TableInfo* table_info = meta_sync->add_tables_info(); + table_info->set_table_name(table_struct.table_name); + table_info->set_partition_num(table_struct.partition_num); + } + + std::string reply_str; + if (!response.SerializeToString(&reply_str)) { + LOG(WARNING) << "Process MetaSync request serialization failed"; + delete bg_worker_arg; + return; + } + int res = conn->WriteResp(reply_str); + if (res) { + LOG(WARNING) << "Process MetaSync request write resp failed"; + delete bg_worker_arg; + return; + } + conn->NotifyWrite(); + delete bg_worker_arg; +} + +void PikaReplBgWorker::HandleBinlogSyncRequest(void* arg) { + ReplBgWorkerArg* bg_worker_arg = static_cast(arg); + const std::shared_ptr req = bg_worker_arg->req; + std::shared_ptr conn = bg_worker_arg->conn; + std::vector* index = static_cast* >(bg_worker_arg->req_private_data); + PikaReplBgWorker* worker = bg_worker_arg->worker; + worker->ip_port_ = conn->ip_port(); + + const InnerMessage::InnerRequest::BinlogSync& binlog_req = + req->binlog_sync((*index)[(*index).size() - 1]); + std::string table_name = binlog_req.table_name(); + uint32_t partition_id = binlog_req.partition_id(); + + worker->table_name_ = table_name; + worker->partition_id_ = partition_id; + + for (size_t i = 0; i < index->size(); ++i) { + const InnerMessage::InnerRequest::BinlogSync& binlog_req = req->binlog_sync((*index)[i]); + if (!PikaBinlogTransverter::BinlogItemWithoutContentDecode(TypeFirst, binlog_req.binlog(), &worker->binlog_item_)) { + LOG(WARNING) << "Binlog item decode failed"; + delete index; + delete bg_worker_arg; + return; + } + const char* redis_parser_start = binlog_req.binlog().data() + BINLOG_ENCODE_LEN; + int redis_parser_len = static_cast(binlog_req.binlog().size()) - BINLOG_ENCODE_LEN; + int processed_len = 0; + pink::RedisParserStatus ret = worker->redis_parser_.ProcessInputBuffer( + redis_parser_start, redis_parser_len, &processed_len); + if (ret != pink::kRedisParserDone) { + LOG(WARNING) << "Redis parser failed"; + delete index; + delete bg_worker_arg; + return; + } + } + + // build response + std::shared_ptr partition = g_pika_server->GetTablePartitionById(table_name, partition_id); + std::shared_ptr logger = partition->logger(); + uint32_t file_num; + uint64_t offset; + logger->GetProducerStatus(&file_num, &offset); + + InnerMessage::InnerResponse response; + response.set_code(InnerMessage::kOk); + response.set_type(InnerMessage::kBinlogSync); + InnerMessage::InnerResponse_BinlogSync* binlog_sync_resp = response.mutable_binlog_sync(); + binlog_sync_resp->set_table_name(table_name); + binlog_sync_resp->set_partition_id(partition_id); + InnerMessage::BinlogOffset* binlog_offset = binlog_sync_resp->mutable_binlog_offset(); + binlog_offset->set_filenum(file_num); + binlog_offset->set_offset(offset); + + std::string reply_str; + if (!response.SerializeToString(&reply_str)) { + LOG(WARNING) << "Process MetaSync request serialization failed"; + delete index; + delete bg_worker_arg; + return; + } + + int res = conn->WriteResp(reply_str); + if (res) { + LOG(WARNING) << "Process BinlogSync request write resp failed"; + delete index; + delete bg_worker_arg; + return; + } + conn->NotifyWrite(); + delete index; + delete bg_worker_arg; +} + +int PikaReplBgWorker::HandleWriteBinlog(pink::RedisParser* parser, const pink::RedisCmdArgsType& argv) { + PikaReplBgWorker* worker = static_cast(parser->data); + const BinlogItem& binlog_item = worker->binlog_item_; + g_pika_server->UpdateQueryNumAndExecCountTable(argv[0]); + + // Monitor related + std::string monitor_message; + if (g_pika_server->HasMonitorClients()) { + std::string monitor_message = std::to_string(1.0 * slash::NowMicros() / 1000000) + + " [" + worker->ip_port_ + "]"; + for (const auto& item : argv) { + monitor_message += " " + slash::ToRead(item); + } + g_pika_server->AddMonitorMessage(monitor_message); + } + + std::string opt = argv[0]; + Cmd* c_ptr = g_pika_cmd_table_manager->GetCmd(slash::StringToLower(opt)); + // Initial + c_ptr->Initial(argv, worker->table_name_); + if (!c_ptr->res().ok()) { + LOG(WARNING) << "Fail to initial command from binlog: " << opt; + return -1; + } + + std::shared_ptr partition = g_pika_server->GetTablePartitionById(worker->table_name_, worker->partition_id_); + std::shared_ptr logger = partition->logger(); + + logger->Lock(); + logger->Put(c_ptr->ToBinlog(binlog_item.exec_time(), + std::to_string(binlog_item.server_id()), + binlog_item.logic_id(), + binlog_item.filenum(), + binlog_item.offset())); + uint32_t filenum; + uint64_t offset; + logger->GetProducerStatus(&filenum, &offset); + logger->Unlock(); + + PikaCmdArgsType *v = new PikaCmdArgsType(argv); + BinlogItem *b = new BinlogItem(binlog_item); + std::string dispatch_key = argv.size() >= 2 ? argv[1] : argv[0]; + g_pika_server->ScheduleReplDbTask(dispatch_key, v, b, worker->table_name_, worker->partition_id_); + return 0; +} + +void PikaReplBgWorker::HandleWriteDb(void* arg) { + WriteDbBgArg *bg_worker_arg = static_cast(arg); + PikaCmdArgsType* argv = bg_worker_arg->argv; + BinlogItem binlog_item = *(bg_worker_arg->binlog_item); + std::string table_name = bg_worker_arg->table_name; + uint32_t partition_id = bg_worker_arg->partition_id; + std::string opt = (*argv)[0]; + slash::StringToLower(opt); + + // Get command + Cmd* c_ptr = g_pika_cmd_table_manager->GetCmd(slash::StringToLower(opt)); + if (!c_ptr) { + LOG(WARNING) << "Error operation from binlog: " << opt; + delete bg_worker_arg; + return; + } + c_ptr->res().clear(); + + // Initial + c_ptr->Initial(*argv, table_name); + if (!c_ptr->res().ok()) { + LOG(WARNING) << "Fail to initial command from binlog: " << opt; + delete bg_worker_arg; + return; + } + + uint64_t start_us = 0; + if (g_pika_conf->slowlog_slower_than() >= 0) { + start_us = slash::NowMicros(); + } + std::shared_ptr partition = g_pika_server->GetTablePartitionById(table_name, partition_id); + // Add read lock for no suspend command + if (!c_ptr->is_suspend()) { + g_pika_server->RWLockReader(); + } + + c_ptr->Do(partition); + + if (!c_ptr->is_suspend()) { + g_pika_server->RWUnlock(); + } + + if (g_pika_conf->slowlog_slower_than() >= 0) { + int64_t duration = slash::NowMicros() - start_us; + if (duration > g_pika_conf->slowlog_slower_than()) { + LOG(ERROR) << "command: " << opt << ", start_time(s): " << start_us / 1000000 << ", duration(us): " << duration; + } + } + + delete bg_worker_arg; +} + +void PikaReplBgWorker::HandleTrySyncRequest(void* arg) { + ReplBgWorkerArg* bg_worker_arg = static_cast(arg); + const std::shared_ptr req = bg_worker_arg->req; + std::shared_ptr conn = bg_worker_arg->conn; + + InnerMessage::InnerRequest::TrySync try_sync_request = req->try_sync(); + InnerMessage::Partition partition_request = try_sync_request.partition(); + std::string table_name = partition_request.table_name(); + uint32_t partition_id = partition_request.partition_id(); + bool force = try_sync_request.force(); + std::string partition_name = table_name + "_" + std::to_string(partition_id); + InnerMessage::BinlogOffset slave_boffset = try_sync_request.binlog_offset(); + InnerMessage::Node node = try_sync_request.node(); + LOG(INFO) << "Trysync, Slave ip: " << node.ip() << ", Slave port:" + << node.port() << ", Partition: " << partition_name << ", filenum: " + << slave_boffset.filenum() << ", pro_offset: " << slave_boffset.offset(); + + InnerMessage::InnerResponse response; + response.set_type(InnerMessage::Type::kTrySync); + response.set_code(InnerMessage::StatusCode::kOk); + InnerMessage::InnerResponse::TrySync* try_sync_response = response.mutable_try_sync(); + InnerMessage::Partition* partition_response = try_sync_response->mutable_partition(); + partition_response->set_table_name(table_name); + partition_response->set_partition_id(partition_id); + if (force) { + } else { + BinlogOffset boffset; + if (!g_pika_server->GetTablePartitionBinlogOffset(table_name, partition_id, &boffset)) { + try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kError); + LOG(WARNING) << "Handle TrySync, Partition: " + << partition_name << " not found, TrySync failed"; + } else { + if (boffset.filenum < slave_boffset.filenum() + || (boffset.filenum == slave_boffset.filenum() && boffset.offset < slave_boffset.offset())) { + try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kInvalidOffset); + LOG(WARNING) << "Slave offset is larger than mine, Slave ip: " + << node.ip() << ", Slave port: " << node.port() << ", Partition: " + << partition_name << ", filenum: " << slave_boffset.filenum() + << ", pro_offset_: " << slave_boffset.offset(); + } else { + try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kOk); + try_sync_response->set_sid(0); + LOG(INFO) << "Try Incrental SYNC " << " master filenum: " << boffset.filenum << " offset: " << boffset.offset + << " slave filenum: " << slave_boffset.filenum() << " offset: " << slave_boffset.offset(); + // incremental sync + Status s = g_pika_server->AddBinlogSender( + table_name, + partition_id, + node.ip(), + node.port(), + 0, + slave_boffset.filenum(), + slave_boffset.offset()); + if (!s.ok()) { + LOG(WARNING) << s.ToString(); + } + } + } + } + delete bg_worker_arg; + + std::string reply_str; + if (!response.SerializeToString(&reply_str) + || conn->WriteResp(reply_str)) { + LOG(WARNING) << "Handle Try Sync Failed"; + return; + } + conn->NotifyWrite(); +} diff --git a/src/pika_repl_client.cc b/src/pika_repl_client.cc index 2c30f0e6ba..dab561092d 100644 --- a/src/pika_repl_client.cc +++ b/src/pika_repl_client.cc @@ -8,96 +8,166 @@ #include "include/pika_server.h" #include "slash/include/slash_coding.h" #include "slash/include/env.h" +#include "slash/include/slash_string.h" extern PikaServer* g_pika_server; PikaReplClient::PikaReplClient(int cron_interval, int keepalive_timeout) { client_thread_ = new PikaReplClientThread(cron_interval, keepalive_timeout); client_thread_->set_thread_name("PikaReplClient"); + client_tp_ = new pink::ThreadPool(g_pika_conf->thread_pool_size(), 100000); + pthread_rwlock_init(&binlog_ctl_rw_, NULL); } PikaReplClient::~PikaReplClient() { client_thread_->StopThread(); delete client_thread_; - for (auto iter : slave_binlog_readers_) { + for (auto iter : binlog_ctl_) { delete iter.second; } + delete client_tp_; + pthread_rwlock_destroy(&binlog_ctl_rw_); LOG(INFO) << "PikaReplClient exit!!!"; } -static inline void BuildBinlogReaderIndex(const RmNode& slave, std::string* index) { - *index = slave.table_ + "_" + std::to_string(slave.partition_) + "_" + slave.ip_ + ":" + std::to_string(slave.port_); -} - int PikaReplClient::Start() { int res = client_thread_->StartThread(); if (res != pink::kSuccess) { LOG(FATAL) << "Start ReplClient ClientThread Error: " << res << (res == pink::kCreateThreadError ? ": create thread error " : ": other error"); } + res = client_tp_->start_thread_pool(); + if (res != pink::kSuccess) { + LOG(FATAL) << "Start ThreadPool Error: " << res << (res == pink::kCreateThreadError ? ": create thread error " : ": other error"); + } return res; } +void PikaReplClient::ProduceWriteQueue(WriteTask& task) { + slash::MutexLock l(&write_queue_mu_); + std::string index = task.rm_node_.ip_ + ":" + std::to_string(task.rm_node_.port_); + write_queues_[index].push(task); +} + +int PikaReplClient::ConsumeWriteQueue() { + InnerMessage::InnerRequest request; + request.set_type(InnerMessage::kBinlogSync); + + int counter = 0; + slash::MutexLock l(&write_queue_mu_); + for (auto& iter : write_queues_) { + std::queue& queue = iter.second; + if (queue.empty()) { + continue; + } + size_t batch_index = queue.size() > kBinlogSyncBatchNum ? kBinlogSyncBatchNum : queue.size(); + std::string ip; + int port = 0; + if (!slash::ParseIpPortString(iter.first, ip, port)) { + LOG(WARNING) << "Parse ip_port error " << iter.first; + } + for (size_t i = 0; i < batch_index; ++i) { + WriteTask task = queue.front(); + queue.pop(); + BuildBinlogPb(task.rm_node_, + task.binlog_chip_.binlog_, + task.binlog_chip_.file_num_, + task.binlog_chip_.offset_, request); + + counter++; + } + std::string to_send; + bool res = request.SerializeToString(&to_send); + if (!res) { + LOG(WARNING) << "Serialize Failed"; + continue; + } + Status s = client_thread_->Write(ip, port, to_send); + if (!s.ok()) { + LOG(WARNING) << "write to " << ip << ":" << port << " failed"; + continue; + } + } + return counter; +} + +bool PikaReplClient::SetAckInfo(const RmNode& slave, uint32_t ack_file_num, uint64_t ack_offset, uint64_t active_time) { + BinlogSyncCtl* ctl = nullptr; + { + slash::RWLock l(&binlog_ctl_rw_, false); + if (binlog_ctl_.find(slave) == binlog_ctl_.end()) { + return false; + } + ctl = binlog_ctl_[slave]; + } + + { + slash::MutexLock l(&(ctl->ctl_mu_)); + ctl->ack_file_num_ = ack_file_num; + ctl->ack_offset_ = ack_offset; + ctl->active_time_ = active_time; + } + return true; +} + +bool PikaReplClient::GetAckInfo(const RmNode& slave, uint32_t* ack_file_num, uint64_t* ack_offset, uint64_t* active_time) { + BinlogSyncCtl* ctl = nullptr; + { + slash::RWLock l(&binlog_ctl_rw_, false); + if (binlog_ctl_.find(slave) == binlog_ctl_.end()) { + return false; + } + ctl = binlog_ctl_[slave]; + } + + { + slash::MutexLock l(&(ctl->ctl_mu_)); + *ack_file_num = ctl->ack_file_num_; + *ack_offset = ctl->ack_offset_; + *active_time = ctl->active_time_; + } + return true; +} + Status PikaReplClient::Write(const std::string& ip, const int port, const std::string& msg) { // shift port 3000 tobe inner connect port return client_thread_->Write(ip, port, msg); } Status PikaReplClient::RemoveBinlogReader(const RmNode& slave) { - std::string index; - BuildBinlogReaderIndex(slave, &index); - if (slave_binlog_readers_.find(index) != slave_binlog_readers_.end()) { - delete slave_binlog_readers_[index]; - slave_binlog_readers_.erase(index); + { + slash::RWLock l(&binlog_ctl_rw_, true); + if (binlog_ctl_.find(slave) != binlog_ctl_.end()) { + delete binlog_ctl_[slave]; + binlog_ctl_.erase(slave); + } } return Status::OK(); } Status PikaReplClient::AddBinlogReader(const RmNode& slave, std::shared_ptr logger, uint32_t filenum, uint64_t offset) { - std::string index; - BuildBinlogReaderIndex(slave, &index); RemoveBinlogReader(slave); PikaBinlogReader* binlog_reader = NewPikaBinlogReader(logger, filenum, offset); if (!binlog_reader) { - return Status::Corruption(index + " new binlog reader failed"); + return Status::Corruption(slave.ToString() + " new binlog reader failed"); } int res = binlog_reader->Seek(); if (res) { delete binlog_reader; - return Status::Corruption(index + " binlog reader init failed"); + return Status::Corruption(slave.ToString() + " binlog reader init failed"); } - slave_binlog_readers_[index] = binlog_reader; - return Status::OK(); -} + uint32_t cur_file_num; + uint64_t cur_offset; + binlog_reader->GetReaderStatus(&cur_file_num, &cur_offset); -void PikaReplClient::RunStateMachine(const RmNode& slave) { - Status s = TrySendSyncBinlog(slave); - if (!s.ok()) { - LOG(INFO) << s.ToString(); - return; - } - //state = GetState(slave); - //bool res = false; - //switch(state) { - // case ShouldSendAuth : - // res = TrySyncBinlog(slave, false); - // break; - // case ShouldSendBinlog : - // TrySyncBinlog(slave, true); - // break; - // default: - // break; - //} -} - -bool PikaReplClient::NeedToSendBinlog(const RmNode& slave) { - std::string index; - BuildBinlogReaderIndex(slave, &index); - auto binlog_reader_iter = slave_binlog_readers_.find(index); - if (binlog_reader_iter == slave_binlog_readers_.end()) { - return false; + { + slash::RWLock l(&binlog_ctl_rw_, true); + uint64_t now; + struct timeval tv; + gettimeofday(&tv, NULL); + now = tv.tv_sec; + binlog_ctl_[slave] = new BinlogSyncCtl(binlog_reader, cur_file_num, cur_offset, now); } - PikaBinlogReader* reader = binlog_reader_iter->second; - return !(reader->ReadToTheEnd()); + return Status::OK(); } Status PikaReplClient::SendMetaSync() { @@ -149,31 +219,64 @@ Status PikaReplClient::SendPartitionTrySync(const std::string& table_name, return client_thread_->Write(master_ip, master_port + 3000, to_send); } -Status PikaReplClient::TrySendSyncBinlog(const RmNode& slave) { - InnerMessage::InnerRequest request; - request.set_type(InnerMessage::kBinlogSync); - if (!NeedToSendBinlog(slave)) { - return Status::OK(); +Status PikaReplClient::SendBinlogSync(const RmNode& slave) { + BinlogSyncCtl* ctl = nullptr; + { + slash::RWLock l(&binlog_ctl_rw_, false); + auto iter = binlog_ctl_.find(slave); + if (iter == binlog_ctl_.end()) { + return Status::NotFound(slave.ToString() + " not found"); } + ctl = iter->second; + } + + { + slash::MutexLock l(&(ctl->ctl_mu_)); for (int i = 0; i < kBinlogSyncBatchNum; ++i) { std::string msg; uint32_t filenum; uint64_t offset; - Status s = BuildBinlogMsgFromFile(slave, &msg, &filenum, &offset); + Status s = ctl->reader_->Get(&msg, &filenum, &offset); if (s.IsEndFile()) { break; } else if (s.IsCorruption() || s.IsIOError()) { return s; } - BuildBinlogPb(slave, msg, filenum, offset, request); + WriteTask task(slave, BinlogChip(filenum, offset, msg)); + ProduceWriteQueue(task); } - std::string to_send; - bool res = request.SerializeToString(&to_send); - if (!res) { - return Status::Corruption("Serialize Failed"); } - return client_thread_->Write(slave.ip_, slave.port_, to_send); + return Status::OK(); +} + +Status PikaReplClient::TriggerSendBinlogSync() { + slash::RWLock l(&binlog_ctl_rw_, false); + for (auto& binlog_ctl : binlog_ctl_) { + BinlogSyncCtl* ctl = binlog_ctl.second; + { + slash::MutexLock l(&(ctl->ctl_mu_)); + uint32_t send_file_num; + uint64_t send_offset; + ctl->reader_->GetReaderStatus(&send_file_num, &send_offset); + if (ctl->ack_file_num_ == send_file_num && ctl->ack_offset_ == send_offset) { + for (int i = 0; i < kBinlogSyncBatchNum; ++i) { + std::string msg; + uint32_t filenum; + uint64_t offset; + Status s = ctl->reader_->Get(&msg, &filenum, &offset); + if (s.IsEndFile()) { + break; + } else if (s.IsCorruption() || s.IsIOError()) { + return s; + } + WriteTask task(binlog_ctl.first, BinlogChip(filenum, offset, msg)); + ProduceWriteQueue(task); + } + } + } + } + return Status::OK(); } void PikaReplClient::BuildBinlogPb(const RmNode& slave, const std::string& msg, uint32_t filenum, uint64_t offset, InnerMessage::InnerRequest& request) { @@ -189,23 +292,6 @@ void PikaReplClient::BuildBinlogPb(const RmNode& slave, const std::string& msg, binlog_msg->set_binlog(msg); } -Status PikaReplClient::BuildBinlogMsgFromFile(const RmNode& slave, std::string* scratch, uint32_t* filenum, uint64_t* offset) { - std::string index; - BuildBinlogReaderIndex(slave, &index); - auto iter = slave_binlog_readers_.find(index); - if (iter == slave_binlog_readers_.end()) { - return Status::NotFound(index + " not found"); - } - PikaBinlogReader* reader = iter->second; - - // Get command supports append binlog - Status s = reader->Get(scratch, filenum, offset); - if (!s.ok()) { - return s; - } - return Status::OK(); -} - PikaBinlogReader* PikaReplClient::NewPikaBinlogReader(std::shared_ptr logger, uint32_t filenum, uint64_t offset) { std::string confile = NewFileName(logger->filename, filenum); if (!slash::FileExists(confile)) { diff --git a/src/pika_repl_client_conn.cc b/src/pika_repl_client_conn.cc index 7def284bf3..de654c1eec 100644 --- a/src/pika_repl_client_conn.cc +++ b/src/pika_repl_client_conn.cc @@ -5,8 +5,10 @@ #include "include/pika_repl_client_conn.h" +#include + #include "include/pika_server.h" -#include "src/pika_inner_message.pb.h" +#include "slash/include/slash_string.h" extern PikaConf* g_pika_conf; extern PikaServer* g_pika_server; @@ -19,11 +21,6 @@ PikaReplClientConn::PikaReplClientConn(int fd, : pink::PbConn(fd, ip_port, thread, epoll) { } -void PikaReplClientConn::DoReplClientTask(void* arg) { - InnerMessage::InnerResponse* response = reinterpret_cast(arg); - delete response; -} - bool PikaReplClientConn::IsTableStructConsistent( const std::vector& current_tables, const std::vector& expect_tables) { @@ -39,14 +36,19 @@ bool PikaReplClientConn::IsTableStructConsistent( return true; } -int PikaReplClientConn::HandleMetaSyncResponse(const InnerMessage::InnerResponse& response) { - const InnerMessage::InnerResponse_MetaSync meta_sync = response.meta_sync(); +void PikaReplClientConn::HandleMetaSyncResponse(void* arg) { + ReplRespArg* resp_arg = static_cast(arg); + std::shared_ptr conn = resp_arg->conn; + std::shared_ptr response = resp_arg->resp; + + const InnerMessage::InnerResponse_MetaSync meta_sync = response->meta_sync(); if (g_pika_conf->classic_mode() != meta_sync.classic_mode()) { LOG(WARNING) << "Self in " << (g_pika_conf->classic_mode() ? "classic" : "sharding") << " mode, but master in " << (meta_sync.classic_mode() ? "classic" : "sharding") << " mode, failed to establish master-slave relationship"; g_pika_server->SyncError(); - return -1; + delete resp_arg; + return; } std::vector master_table_structs; @@ -58,11 +60,12 @@ int PikaReplClientConn::HandleMetaSyncResponse(const InnerMessage::InnerResponse bool force_full_sync = g_pika_server->force_full_sync(); std::vector self_table_structs = g_pika_conf->table_structs(); if (!force_full_sync - && !IsTableStructConsistent(self_table_structs, master_table_structs)) { + && !PikaReplClientConn::IsTableStructConsistent(self_table_structs, master_table_structs)) { LOG(WARNING) << "Self table structs inconsistent with master" << ", failed to establish master-slave relationship"; g_pika_server->SyncError(); - return -1; + delete resp_arg; + return; } if (force_full_sync) { @@ -72,17 +75,22 @@ int PikaReplClientConn::HandleMetaSyncResponse(const InnerMessage::InnerResponse LOG(WARNING) << "Need force full sync but rebuild table struct error" << ", failed to establish master-slave relationship"; g_pika_server->SyncError(); - return -1; + delete resp_arg; + return; } g_pika_server->PurgeDir(g_pika_conf->trash_path()); } LOG(INFO) << "Finish to handle meta sync response"; g_pika_server->MetaSyncDone(); - return 0; + delete resp_arg; } -int PikaReplClientConn::HandleTrySyncResponse(const InnerMessage::InnerResponse& response) { - const InnerMessage::InnerResponse_TrySync try_sync_response = response.try_sync(); +void PikaReplClientConn::HandleTrySyncResponse(void* arg) { + ReplRespArg* resp_arg = static_cast(arg); + std::shared_ptr conn = resp_arg->conn; + std::shared_ptr response = resp_arg->resp; + + const InnerMessage::InnerResponse_TrySync try_sync_response = response->try_sync(); const InnerMessage::Partition partition_response = try_sync_response.partition(); std::string table_name = partition_response.table_name(); uint32_t partition_id = partition_response.partition_id(); @@ -95,24 +103,79 @@ int PikaReplClientConn::HandleTrySyncResponse(const InnerMessage::InnerResponse& } else if (try_sync_response.reply_code() == InnerMessage::InnerResponse::TrySync::kInvalidOffset) { LOG(WARNING) << "Partition: " << partition_name << " TrySync Error, Because the invalid filenum and offset"; } else if (try_sync_response.reply_code() == InnerMessage::InnerResponse::TrySync::kOk) { - LOG(INFO) << "Partition: " << partition_name << " TrySync Ok"; + LOG(INFO) << "Partition: " << partition_name << " TrySync Ok"; } - return 0; + delete resp_arg; } int PikaReplClientConn::DealMessage() { - int res = 0; - InnerMessage::InnerResponse response; - response.ParseFromArray(rbuf_ + cur_pos_ - header_len_, header_len_); - switch (response.type()) { + std::shared_ptr response = std::make_shared(); + response->ParseFromArray(rbuf_ + cur_pos_ - header_len_, header_len_); + switch (response->type()) { case InnerMessage::kMetaSync: - res = HandleMetaSyncResponse(response); + { + ReplRespArg* arg = new ReplRespArg(response, std::dynamic_pointer_cast(shared_from_this())); + g_pika_server->ScheduleReplCliTask(&PikaReplClientConn::HandleMetaSyncResponse, static_cast(arg)); break; + } case InnerMessage::kTrySync: - res = HandleTrySyncResponse(response); + { + ReplRespArg* arg = new ReplRespArg(response, std::dynamic_pointer_cast(shared_from_this())); + g_pika_server->ScheduleReplCliTask(&PikaReplClientConn::HandleTrySyncResponse, static_cast(arg)); break; + } + case InnerMessage::kBinlogSync: + { + ReplRespArg* arg = new ReplRespArg(response, std::dynamic_pointer_cast(shared_from_this())); + g_pika_server->ScheduleReplCliTask(&PikaReplClientConn::HandleBinlogSyncResponse, static_cast(arg)); + break; + } default: break; } - return res; + return 0; +} + +void PikaReplClientConn::HandleBinlogSyncResponse(void* arg) { + ReplRespArg* resp_arg = static_cast(arg); + std::shared_ptr conn = resp_arg->conn; + std::shared_ptr resp = resp_arg->resp; + if (!resp->has_binlog_sync()) { + LOG(WARNING) << "Pb parse error"; + delete resp_arg; + return; + } + const InnerMessage::InnerResponse_BinlogSync& binlog_ack = resp->binlog_sync(); + std::string table_name = binlog_ack.table_name(); + uint32_t partition_id = binlog_ack.partition_id(); + std::string ip; + int port = 0; + bool res = slash::ParseIpPortString(conn->ip_port(), ip, port); + if (!res) { + LOG(WARNING) << "Parse Error ParseIpPortString faile"; + delete resp_arg; + return; + } + const InnerMessage::BinlogOffset& binlog_offset = binlog_ack.binlog_offset(); + + uint64_t now; + struct timeval tv; + gettimeofday(&tv, NULL); + now = tv.tv_sec; + + // Set ack info from slave + res = g_pika_server->SetBinlogAckInfo(table_name, partition_id, ip, port, binlog_offset.filenum(), binlog_offset.offset(), now); + if (!res) { + LOG(WARNING) << "Update binlog ack failed " << table_name << " " << partition_id; + delete resp_arg; + return; + } + delete resp_arg; + + Status s = g_pika_server->SendBinlogSyncRequest(table_name, partition_id, ip, port); + if (!s.ok()) { + LOG(WARNING) << "Send BinlogSync Request failed " << table_name << " " << partition_id << s.ToString(); + return; + } + g_pika_server->SignalAuxiliary(); } diff --git a/src/pika_repl_server.cc b/src/pika_repl_server.cc index 4fc7994b5f..68641b62b6 100644 --- a/src/pika_repl_server.cc +++ b/src/pika_repl_server.cc @@ -7,14 +7,24 @@ #include -PikaReplServer::PikaReplServer(const std::set& ips, int port, int cron_interval) { +#include "include/pika_conf.h" + +extern PikaConf* g_pika_conf; + +PikaReplServer::PikaReplServer(const std::set& ips, int port, int cron_interval) : next_avail_(0) { pika_repl_server_thread_ = new PikaReplServerThread(ips, port, cron_interval); pika_repl_server_thread_->set_thread_name("PikaReplServer"); + for (int i = 0; i < 2 * g_pika_conf->sync_thread_num(); ++i) { + bg_workers_.push_back(new PikaReplBgWorker(g_pika_conf->sync_buffer_size())); + } } PikaReplServer::~PikaReplServer() { pika_repl_server_thread_->StopThread(); delete pika_repl_server_thread_; + for (size_t i = 0; i < bg_workers_.size(); ++i) { + delete bg_workers_[i]; + } LOG(INFO) << "PikaReplServer exit!!!"; } @@ -23,5 +33,36 @@ int PikaReplServer::Start() { if (res != pink::kSuccess) { LOG(FATAL) << "Start Pika Repl Server Thread Error: " << res << (res == pink::kCreateThreadError ? ": create thread error " : ": other error"); } + for (size_t i = 0; i < bg_workers_.size(); ++i) { + res = bg_workers_[i]->StartThread(); + if (res != pink::kSuccess) { + LOG(FATAL) << "Start Pika Repl Worker Thread Error: " << res << (res == pink::kCreateThreadError ? ": create thread error " : ": other error"); + } + } return res; } + +void PikaReplServer::ScheduleBinlogSyncTask(std::string table_partition, const std::shared_ptr req, std::shared_ptr conn, void* req_private_data) { + size_t index = GetHashIndex(table_partition, true); + bg_workers_[index]->ScheduleRequest(req, conn, req_private_data); +} + +void PikaReplServer::ScheduleMetaSyncTask(const std::shared_ptr req, std::shared_ptr conn, void* req_private_data) { + bg_workers_[next_avail_]->ScheduleRequest(req, conn, req_private_data); + UpdateNextAvail(); +} + +void PikaReplServer::ScheduleTrySyncTask(const std::shared_ptr req, std::shared_ptr conn, void* req_private_data) { + bg_workers_[next_avail_]->ScheduleRequest(req, conn, NULL); + UpdateNextAvail(); +} + +void PikaReplServer::ScheduleDbTask(const std::string& key, PikaCmdArgsType* argv, BinlogItem* binlog_item, const std::string& table_name, uint32_t partition_id) { + size_t index = GetHashIndex(key, false); + bg_workers_[index]->ScheduleWriteDb(argv, binlog_item, table_name, partition_id); +} + +size_t PikaReplServer::GetHashIndex(std::string key, bool upper_half) { + size_t hash_base = bg_workers_.size() / 2; + return (str_hash(key) % hash_base) + (upper_half ? 0 : hash_base); +} diff --git a/src/pika_repl_server_conn.cc b/src/pika_repl_server_conn.cc index 0c02eb1860..a3100fc999 100644 --- a/src/pika_repl_server_conn.cc +++ b/src/pika_repl_server_conn.cc @@ -7,195 +7,67 @@ #include -#include "include/pika_conf.h" #include "include/pika_server.h" -#include "include/pika_cmd_table_manager.h" -extern PikaConf* g_pika_conf; extern PikaServer* g_pika_server; -extern PikaCmdTableManager* g_pika_cmd_table_manager; PikaReplServerConn::PikaReplServerConn(int fd, std::string ip_port, pink::Thread* thread, void* worker_specific_data, pink::PinkEpoll* epoll) : PbConn(fd, ip_port, thread, epoll) { - binlog_receiver_ = reinterpret_cast(worker_specific_data); - pink::RedisParserSettings settings; - settings.DealMessage = &(PikaReplServerConn::ParserDealMessage); - redis_parser_.RedisParserInit(REDIS_PARSER_REQUEST, settings); - redis_parser_.data = this; } PikaReplServerConn::~PikaReplServerConn() { } int PikaReplServerConn::DealMessage() { - InnerMessage::InnerRequest req; - bool parse_res = req.ParseFromArray(rbuf_ + cur_pos_ - header_len_, header_len_); + std::shared_ptr req = std::make_shared(); + bool parse_res = req->ParseFromArray(rbuf_ + cur_pos_ - header_len_, header_len_); if (!parse_res) { LOG(WARNING) << "Pika repl server connection pb parse error."; return -1; } int res = 0; - switch (req.type()) { + switch (req->type()) { case InnerMessage::kMetaSync: - HandleMetaSyncRequest(req); + g_pika_server->ScheduleReplMetaSyncTask( + req, + std::dynamic_pointer_cast(shared_from_this()), + NULL); break; case InnerMessage::kTrySync: - HandleTrySync(req); + g_pika_server->ScheduleReplTrySyncTask( + req, + std::dynamic_pointer_cast(shared_from_this()), + NULL); break; case InnerMessage::kBinlogSync: - res = HandleBinlogSync(req); + DispatchBinlogReq(req); + break; default: break; } return res; } -int PikaReplServerConn::HandleMetaSyncRequest(const InnerMessage::InnerRequest& req) { - std::vector table_structs = g_pika_conf->table_structs(); - InnerMessage::InnerResponse response; - response.set_code(InnerMessage::StatusCode::kOk); - response.set_type(InnerMessage::Type::kMetaSync); - InnerMessage::InnerResponse_MetaSync* meta_sync = response.mutable_meta_sync(); - meta_sync->set_classic_mode(g_pika_conf->classic_mode()); - for (const auto& table_struct : table_structs) { - InnerMessage::InnerResponse_MetaSync_TableInfo* table_info = meta_sync->add_tables_info(); - table_info->set_table_name(table_struct.table_name); - table_info->set_partition_num(table_struct.partition_num); - } - - std::string reply_str; - if (!response.SerializeToString(&reply_str) - || WriteResp(reply_str)) { - return -1; - } - NotifyWrite(); - return 0; -} - -int PikaReplServerConn::HandleTrySync(const InnerMessage::InnerRequest& req) { - InnerMessage::InnerRequest::TrySync try_sync_request = req.try_sync(); - InnerMessage::Partition partition_request = try_sync_request.partition(); - std::string table_name = partition_request.table_name(); - uint32_t partition_id = partition_request.partition_id(); - bool force = try_sync_request.force(); - std::string partition_name = table_name + "_" + std::to_string(partition_id); - InnerMessage::BinlogOffset slave_boffset = try_sync_request.binlog_offset(); - InnerMessage::Node node = try_sync_request.node(); - LOG(INFO) << "Trysync, Slave ip: " << node.ip() << ", Slave port:" - << node.port() << ", Partition: " << partition_name << ", filenum: " - << slave_boffset.filenum() << ", pro_offset: " << slave_boffset.offset() - << ", force: " << (force ? "yes" : "no"); - - InnerMessage::InnerResponse response; - response.set_type(InnerMessage::Type::kTrySync); - response.set_code(InnerMessage::StatusCode::kOk); - InnerMessage::InnerResponse::TrySync* try_sync_response = response.mutable_try_sync(); - InnerMessage::Partition* partition_response = try_sync_response->mutable_partition(); - partition_response->set_table_name(table_name); - partition_response->set_partition_id(partition_id); - if (force) { - LOG(INFO) << "Partition: " << partition_name << " force full sync, BgSave and DbSync first"; - g_pika_server->TryDBSync(node.ip(), node.port(), table_name, partition_id, slave_boffset.filenum()); - try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kWait); - } else { - BinlogOffset boffset; - if (!g_pika_server->GetTablePartitionBinlogOffset(table_name, partition_id, &boffset)) { - try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kError); - LOG(WARNING) << "Handle TrySync, Partition: " - << partition_name << " not found, TrySync failed"; - } else { - if (boffset.filenum < slave_boffset.filenum() - || (boffset.filenum == slave_boffset.filenum() && boffset.offset < slave_boffset.offset())) { - try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kInvalidOffset); - LOG(WARNING) << "Slave offset is larger than mine, Slave ip: " - << node.ip() << ", Slave port: " << node.port() << ", Partition: " - << partition_name << ", filenum: " << slave_boffset.filenum() - << ", pro_offset_: " << slave_boffset.offset() << ", force: " - << (force ? "yes" : "no"); - } else { - LOG(INFO) << "Partition: " << partition_name << " TrySync success"; - try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kOk); - try_sync_response->set_sid(0); - } +void PikaReplServerConn::DispatchBinlogReq(const std::shared_ptr req) { + // partition to a bunch of binlog chips + std::unordered_map*> par_binlog; + for (int i = 0; i < req->binlog_sync_size(); ++i) { + const InnerMessage::InnerRequest::BinlogSync& binlog_req = req->binlog_sync(i); + // hash key: table + partition_id + std::string key = binlog_req.table_name() + std::to_string(binlog_req.partition_id()); + if (par_binlog.find(key) == par_binlog.end()) { + par_binlog[key] = new std::vector(); } + par_binlog[key]->push_back(i); } - - std::string reply_str; - if (!response.SerializeToString(&reply_str) - || WriteResp(reply_str)) { - return -1; - } - NotifyWrite(); - return 0; -} - -int PikaReplServerConn::HandleBinlogSync(const InnerMessage::InnerRequest& req) { - for (int i = 0; i < req.binlog_sync_size(); ++i) { - const InnerMessage::InnerRequest::BinlogSync& binlog_req = req.binlog_sync(i); - if(!PikaBinlogTransverter::BinlogItemWithoutContentDecode(TypeFirst, binlog_req.binlog(), &binlog_item_)) { - return -1; - } - const char* redis_parser_start = binlog_req.binlog().data() + BINLOG_ENCODE_LEN; - int redis_parser_len = static_cast(binlog_req.binlog().size()) - BINLOG_ENCODE_LEN; - int processed_len = 0; - pink::RedisParserStatus ret = redis_parser_.ProcessInputBuffer( - redis_parser_start, redis_parser_len, &processed_len); - if (ret != pink::kRedisParserDone) { - return -1; - } + for (auto& binlog_nums : par_binlog) { + g_pika_server->ScheduleReplBinlogSyncTask( + binlog_nums.first, + req, + std::dynamic_pointer_cast(shared_from_this()), + reinterpret_cast(binlog_nums.second)); } - return 0; -} - - -bool PikaReplServerConn::ProcessBinlogData(const pink::RedisCmdArgsType& argv, const BinlogItem& binlog_item) { - g_pika_server->UpdateQueryNumAndExecCountTable(argv[0]); - - // Monitor related - std::string monitor_message; - if (g_pika_server->HasMonitorClients()) { - std::string monitor_message = std::to_string(1.0 * slash::NowMicros() / 1000000) - + " [" + this->ip_port() + "]"; - for (const auto& item : argv) { - monitor_message += " " + slash::ToRead(item); - } - g_pika_server->AddMonitorMessage(monitor_message); - } - - bool is_readonly = g_pika_server->readonly(); - - // Here, the binlog dispatch thread, instead of the binlog bgthread takes on the task to write binlog - // Only when the server is readonly - uint64_t serial = binlog_receiver_->GetnPlusSerial(); - if (is_readonly) { - if (!g_pika_server->WaitTillBinlogBGSerial(serial)) { - return false; - } - std::string opt = argv[0]; - Cmd* c_ptr = g_pika_cmd_table_manager->GetCmd(slash::StringToLower(opt)); - c_ptr->Initial(argv, g_pika_conf->default_table()); - - g_pika_server->logger_->Lock(); - g_pika_server->logger_->Put(c_ptr->ToBinlog(binlog_item.exec_time(), - std::to_string(binlog_item.server_id()), - binlog_item.logic_id(), - binlog_item.filenum(), - binlog_item.offset())); - g_pika_server->logger_->Unlock(); - g_pika_server->SignalNextBinlogBGSerial(); - } - - PikaCmdArgsType *v = new PikaCmdArgsType(argv); - BinlogItem *b = new BinlogItem(binlog_item); - std::string dispatch_key = argv.size() >= 2 ? argv[1] : argv[0]; - g_pika_server->DispatchBinlogBG(dispatch_key, v, b, serial, is_readonly); - return true; -} - -int PikaReplServerConn::ParserDealMessage(pink::RedisParser* parser, const pink::RedisCmdArgsType& argv) { - PikaReplServerConn* conn = reinterpret_cast(parser->data); - return conn->ProcessBinlogData(argv, conn->binlog_item_) == true ? 0 : -1; } diff --git a/src/pika_server.cc b/src/pika_server.cc index d59381302f..eda0512f96 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -129,9 +129,9 @@ PikaServer::~PikaServer() { delete ping_thread_; delete pika_pubsub_thread_; + delete pika_auxiliary_thread_; delete pika_repl_client_; delete pika_repl_server_; - delete pika_auxiliary_thread_; binlogbg_exit_ = true; std::vector::iterator binlogbg_iter = binlogbg_workers_.begin(); @@ -242,6 +242,10 @@ void PikaServer::Schedule(pink::TaskFunc func, void* arg) { pika_thread_pool_->Schedule(func, arg); } +void PikaServer::ScheduleReplCliTask(pink::TaskFunc func, void* arg) { + pika_repl_client_->Schedule(func, arg); +} + void PikaServer::RocksdbOptionInit(blackwidow::BlackwidowOptions* bw_option) { bw_option->options.create_if_missing = true; bw_option->options.keep_log_file_num = 10; @@ -1087,8 +1091,7 @@ Status PikaServer::AddBinlogSender(const std::string& table_name, if (!res.ok()) { return res; } - pika_repl_client_->RunStateMachine(slave); - return Status::OK(); + return pika_repl_client_->SendBinlogSync(slave); } /* @@ -1641,6 +1644,11 @@ Status PikaServer::SendPartitionTrySyncRequest(std::shared_ptr partit return status; } +Status PikaServer::SendBinlogSyncRequest(const std::string& table, uint32_t partition, const std::string& ip, int port) { + RmNode slave = RmNode(table, partition, ip, port); + return pika_repl_client_->SendBinlogSync(slave); +} + void PikaServer::AddMonitorClient(std::shared_ptr client_ptr) { monitor_thread_->AddMonitorClient(client_ptr); } @@ -1659,6 +1667,57 @@ void PikaServer::DispatchBinlogBG(const std::string &key, binlogbg_workers_[index]->Schedule(argv, binlog_item, cur_serial, readonly); } +void PikaServer::ScheduleReplBinlogSyncTask(std::string table_partition, + const std::shared_ptr req, + std::shared_ptr conn, + void* req_private_data) { + pika_repl_server_->ScheduleBinlogSyncTask(table_partition, req, conn, req_private_data); +} + +void PikaServer::ScheduleReplMetaSyncTask(const std::shared_ptr req, + std::shared_ptr conn, + void* req_private_data) { + pika_repl_server_->ScheduleMetaSyncTask(req, conn, req_private_data); +} + +void PikaServer::ScheduleReplTrySyncTask(const std::shared_ptr req, + std::shared_ptr conn, + void* req_private_data) { + pika_repl_server_->ScheduleTrySyncTask(req, conn, req_private_data); +} + +void PikaServer::ScheduleReplDbTask(const std::string &key, + PikaCmdArgsType* argv, BinlogItem* binlog_item, + const std::string& table_name, uint32_t partition_id) { + pika_repl_server_->ScheduleDbTask(key, argv, binlog_item, table_name, partition_id); +} + +bool PikaServer::SetBinlogAckInfo(const std::string& table, uint32_t partition, const std::string& ip, int port, + uint32_t ack_file_num, uint64_t ack_offset, uint64_t active_time) { + RmNode slave = RmNode(table, partition, ip, port); + return pika_repl_client_->SetAckInfo(slave, ack_file_num, ack_offset, active_time); +} + +bool PikaServer::GetBinlogAckInfo(const std::string& table, uint32_t partition, const std::string& ip, int port, + uint32_t* ack_file_num, uint64_t* ack_offset, uint64_t* active_time) { + RmNode slave = RmNode(table, partition, ip, port); + return pika_repl_client_->GetAckInfo(slave, ack_file_num, ack_offset, active_time); +} + +int PikaServer::SendToPeer() { + return pika_repl_client_->ConsumeWriteQueue(); +} + +Status PikaServer::TriggerSendBinlogSync() { + return pika_repl_client_->TriggerSendBinlogSync(); +} + +void PikaServer::SignalAuxiliary() { + pika_auxiliary_thread_->mu_.Lock(); + pika_auxiliary_thread_->cv_.Signal(); + pika_auxiliary_thread_->mu_.Unlock(); +} + bool PikaServer::WaitTillBinlogBGSerial(uint64_t my_serial) { binlogbg_mutex_.Lock(); //DLOG(INFO) << "Binlog serial wait: " << my_serial << ", current: " << binlogbg_serial_; diff --git a/third/pink b/third/pink index 00fa498ce9..c9ca6094ce 160000 --- a/third/pink +++ b/third/pink @@ -1 +1 @@ -Subproject commit 00fa498ce9c3a9011197602dc4977cfe40d11948 +Subproject commit c9ca6094ceb69d6d056d0192acf8eb9b62457c26