diff --git a/include/pika_auxiliary_thread.h b/include/pika_auxiliary_thread.h index 49f5862198..94a312aef0 100644 --- a/include/pika_auxiliary_thread.h +++ b/include/pika_auxiliary_thread.h @@ -8,11 +8,17 @@ #include "pink/include/pink_thread.h" +#include "slash/include/slash_mutex.h" + class PikaAuxiliaryThread : public pink::Thread { public: - PikaAuxiliaryThread() {} + PikaAuxiliaryThread() : + mu_(), + cv_(&mu_) { + } virtual ~PikaAuxiliaryThread(); - + slash::Mutex mu_; + slash::CondVar cv_; private: virtual void* ThreadMain(); void RunEveryPartitionStateMachine(); diff --git a/include/pika_repl_bgworker.h b/include/pika_repl_bgworker.h new file mode 100644 index 0000000000..dc18545c30 --- /dev/null +++ b/include/pika_repl_bgworker.h @@ -0,0 +1,72 @@ +// Copyright (c) 2019-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef PIKA_REPL_BGWROKER_H_ +#define PIKA_REPL_BGWROKER_H_ + +#include +#include + +#include "pink/include/bg_thread.h" +#include "pink/include/pb_conn.h" + +#include "src/pika_inner_message.pb.h" + +#include "include/pika_command.h" +#include "include/pika_binlog_transverter.h" + +class PikaReplBgWorker { + public: + explicit PikaReplBgWorker(int queue_size); + void ScheduleRequest(const std::shared_ptr req, + std::shared_ptr conn, void* req_private_data); + void ScheduleWriteDb(PikaCmdArgsType* argv, BinlogItem* binlog_item, + const std::string table_name, uint32_t partition_id); + int StartThread(); + void QueueSize(int* size_a, int* size_b) { + bg_thread_.QueueSize(size_a, size_b); + } + + static void HandleMetaSyncRequest(void* arg); + static void HandleBinlogSyncRequest(void* arg); + static void HandleTrySyncRequest(void* arg); + static void HandleWriteDb(void* arg); + + BinlogItem binlog_item_; + pink::RedisParser redis_parser_; + std::string ip_port_; + std::string table_name_; + uint32_t partition_id_; + + private: + pink::BGThread bg_thread_; + + static int HandleWriteBinlog(pink::RedisParser* parser, const pink::RedisCmdArgsType& argv); + + struct ReplBgWorkerArg { + const std::shared_ptr req; + std::shared_ptr conn; + void* req_private_data; + PikaReplBgWorker* worker; + ReplBgWorkerArg(const std::shared_ptr _req, std::shared_ptr _conn, void* _req_private_data, PikaReplBgWorker* _worker) : req(_req), conn(_conn), req_private_data(_req_private_data), worker(_worker) { + } + }; + + struct WriteDbBgArg { + PikaCmdArgsType *argv; + BinlogItem* binlog_item; + std::string table_name; + uint32_t partition_id; + WriteDbBgArg(PikaCmdArgsType* _argv, BinlogItem* _binlog_item, const std::string _table_name, uint32_t _partition_id) + : argv(_argv), binlog_item(_binlog_item), table_name(_table_name), partition_id(_partition_id) { + } + ~WriteDbBgArg() { + delete argv; + delete binlog_item; + } + }; +}; + +#endif // PIKA_REPL_BGWROKER_H_ diff --git a/include/pika_repl_client.h b/include/pika_repl_client.h index bbd4888415..a6a8cfba7d 100644 --- a/include/pika_repl_client.h +++ b/include/pika_repl_client.h @@ -18,6 +18,9 @@ #include "include/pika_binlog_reader.h" #include "include/pika_repl_client_thread.h" +#include "pink/include/thread_pool.h" +#include "src/pika_inner_message.pb.h" + #define kBinlogSyncBatchNum 10 using slash::Status; @@ -29,6 +32,47 @@ struct RmNode { int port_; RmNode(const std::string& table, int partition, const std::string& ip, int port) : table_(table), partition_(partition), ip_(ip), port_(port) { } + RmNode(const RmNode& node) { + table_ = node.table_; + partition_ = node.partition_; + ip_ = node.ip_; + port_ = node.port_; + } + bool operator <(const RmNode& other) const { + if (table_ < other.table_) { + return true; + } else if (partition_ < other.partition_) { + return true; + } else if (ip_ < other.ip_) { + return true; + } else if (port_ < other.port_) { + return true; + } + return false; + } + std::string ToString() const { + return table_ + "_" + std::to_string(partition_) + "_" + ip_ + ":" + std::to_string(port_); + } +}; + +struct BinlogChip { + uint32_t file_num_; + uint64_t offset_; + std::string binlog_; + BinlogChip(uint32_t file_num, uint64_t offset, std::string binlog) :file_num_(file_num), offset_(offset), binlog_(binlog) { + } + BinlogChip(const BinlogChip& binlog_chip) { + file_num_ = binlog_chip.file_num_; + offset_ = binlog_chip.offset_; + binlog_ = binlog_chip.binlog_; + } +}; + +struct WriteTask { + struct RmNode rm_node_; + struct BinlogChip binlog_chip_; + WriteTask(RmNode rm_node, BinlogChip binlog_chip) : rm_node_(rm_node), binlog_chip_(binlog_chip) { + } }; class PikaReplClient { @@ -40,25 +84,61 @@ class PikaReplClient { int Start(); Status AddBinlogReader(const RmNode& slave, std::shared_ptr logger, uint32_t filenum, uint64_t offset); Status RemoveBinlogReader(const RmNode& slave); - void RunStateMachine(const RmNode& slave); + bool NeedToSendBinlog(const RmNode& slave); Status SendMetaSync(); Status SendPartitionTrySync(const std::string& table_name, uint32_t partition_id, const BinlogOffset& boffset); + Status SendBinlogSync(const RmNode& slave); + + Status TriggerSendBinlogSync(); + + int ConsumeWriteQueue(); + + void Schedule(pink::TaskFunc func, void* arg){ + client_tp_->Schedule(func, arg); + } + + bool SetAckInfo(const RmNode& slave, uint32_t ack_file_num, uint64_t ack_offset, uint64_t active_time); + bool GetAckInfo(const RmNode& slave, uint32_t* act_file_num, uint64_t* ack_offset, uint64_t* active_time); private: PikaBinlogReader* NewPikaBinlogReader(std::shared_ptr logger, uint32_t filenum, uint64_t offset); - Status TrySendSyncBinlog(const RmNode& slave); - void BuildBinlogPb(const RmNode& slave, const std::string& msg, uint32_t filenum, uint64_t offset, InnerMessage::InnerRequest& request); + void ProduceWriteQueue(WriteTask& task); - Status BuildBinlogMsgFromFile(const RmNode& slave, std::string* scratch, uint32_t* filenum, uint64_t* offset); + void BuildBinlogPb(const RmNode& slave, const std::string& msg, uint32_t filenum, uint64_t offset, InnerMessage::InnerRequest& request); PikaReplClientThread* client_thread_; - // keys of this map: table_partition_slaveip:port - std::map slave_binlog_readers_; + + + struct BinlogSyncCtl { + slash::Mutex ctl_mu_; + PikaBinlogReader* reader_; + uint32_t ack_file_num_; + uint64_t ack_offset_; + uint64_t active_time_; + + BinlogSyncCtl(PikaBinlogReader* reader, uint32_t ack_file_num, uint64_t ack_offset, uint64_t active_time) + : reader_(reader), ack_file_num_(ack_file_num), ack_offset_(ack_offset), active_time_(active_time) { + } + ~BinlogSyncCtl() { + if (reader_) { + delete reader_; + } + } + }; + + pthread_rwlock_t binlog_ctl_rw_; + std::map binlog_ctl_; + + slash::Mutex write_queue_mu_; + // every host owns a queue + std::unordered_map> write_queues_; // ip+port, queue + + pink::ThreadPool* client_tp_; }; #endif diff --git a/include/pika_repl_client_conn.h b/include/pika_repl_client_conn.h index 4a4d4401d5..a3bcdd7294 100644 --- a/include/pika_repl_client_conn.h +++ b/include/pika_repl_client_conn.h @@ -8,6 +8,8 @@ #include "pink/include/pb_conn.h" +#include + #include "include/pika_conf.h" #include "src/pika_inner_message.pb.h" @@ -15,14 +17,19 @@ class PikaReplClientConn: public pink::PbConn { public: PikaReplClientConn(int fd, const std::string& ip_port, pink::Thread *thread, void* worker_specific_data, pink::PinkEpoll* epoll); virtual ~PikaReplClientConn() = default; - - static void DoReplClientTask(void* arg); + static void HandleBinlogSyncResponse(void* arg); + static void HandleMetaSyncResponse(void* arg); + static void HandleTrySyncResponse(void* arg); + static bool IsTableStructConsistent(const std::vector& current_tables, + const std::vector& expect_tables); int DealMessage() override; private: - bool IsTableStructConsistent(const std::vector& current_tables, - const std::vector& expect_tables); - int HandleMetaSyncResponse(const InnerMessage::InnerResponse& response); - int HandleTrySyncResponse(const InnerMessage::InnerResponse& response); + struct ReplRespArg { + std::shared_ptr resp; + std::shared_ptr conn; + ReplRespArg(std::shared_ptr _resp, std::shared_ptr _conn) : resp(_resp), conn(_conn) { + } + }; }; #endif diff --git a/include/pika_repl_client_thread.h b/include/pika_repl_client_thread.h index 49796c9d61..4523d85fd8 100644 --- a/include/pika_repl_client_thread.h +++ b/include/pika_repl_client_thread.h @@ -39,6 +39,10 @@ class ReplClientHandle : public pink::ClientHandle { // int DeleteWorkerSpecificData(void* data) const override { // return 0; // } + // void DestConnectFailedHandle(std::string ip_port, std::string reason) const override { + // std::cout << "ip " << ip_port << " " << reason << std::endl; + // } + }; class PikaReplClientThread : public pink::ClientThread { diff --git a/include/pika_repl_server.h b/include/pika_repl_server.h index 97491f84d0..cebd15030a 100644 --- a/include/pika_repl_server.h +++ b/include/pika_repl_server.h @@ -8,15 +8,46 @@ #include "include/pika_repl_server_thread.h" +#include + +#include "include/pika_repl_bgworker.h" +#include "include/pika_command.h" +#include "pika_binlog_transverter.h" + class PikaReplServer { - public: - PikaReplServer(const std::set& ips, int port, int cron_interval); - ~PikaReplServer(); + public: + PikaReplServer(const std::set& ips, int port, int cron_interval); + ~PikaReplServer(); + int Start(); + + void ScheduleBinlogSyncTask(std::string table_partition, + const std::shared_ptr req, + std::shared_ptr conn, + void* req_private_data); + + void ScheduleMetaSyncTask(const std::shared_ptr req, + std::shared_ptr conn, + void* req_private_data); + + void ScheduleTrySyncTask(const std::shared_ptr req, + std::shared_ptr conn, + void* req_private_data); - int Start(); + void ScheduleDbTask(const std::string& key, + PikaCmdArgsType* argv, + BinlogItem* binlog_item, + const std::string& table_name, + uint32_t partition_id); + private: + size_t GetHashIndex(std::string key, bool upper_half); + void UpdateNextAvail() { + next_avail_ = (next_avail_ + 1) % bg_workers_.size(); + } - private: - PikaReplServerThread* pika_repl_server_thread_; + PikaReplServerThread* pika_repl_server_thread_; + std::vector bg_workers_; + int next_avail_; + std::hash str_hash; }; #endif diff --git a/include/pika_repl_server_conn.h b/include/pika_repl_server_conn.h index cfe6e690d2..b23d9b5a2f 100644 --- a/include/pika_repl_server_conn.h +++ b/include/pika_repl_server_conn.h @@ -9,34 +9,19 @@ #include #include "pink/include/pb_conn.h" -#include "pink/include/redis_parser.h" #include "pink/include/pink_thread.h" #include "src/pika_inner_message.pb.h" -#include "include/pika_binlog_transverter.h" - -class PikaReplServerThread; - class PikaReplServerConn: public pink::PbConn { public: PikaReplServerConn(int fd, std::string ip_port, pink::Thread* thread, void* worker_specific_data, pink::PinkEpoll* epoll); virtual ~PikaReplServerConn(); int DealMessage(); - bool ProcessAuth(const pink::RedisCmdArgsType& argv); - bool ProcessBinlogData(const pink::RedisCmdArgsType& argv, const BinlogItem& binlog_item); - - BinlogItem binlog_item_; private: - static int ParserDealMessage(pink::RedisParser* parser, const pink::RedisCmdArgsType& argv); - int HandleMetaSyncRequest(const InnerMessage::InnerRequest& req); - int HandleTrySync(const InnerMessage::InnerRequest& req); - int HandleBinlogSync(const InnerMessage::InnerRequest& req); - - pink::RedisParser redis_parser_; - - PikaReplServerThread* binlog_receiver_; + // dispatch binlog by its table_name + partition + void DispatchBinlogReq(const std::shared_ptr req); }; #endif // INCLUDE_PIKA_REPL_SERVER_CONN_H_ diff --git a/include/pika_server.h b/include/pika_server.h index b4d27f8c3f..2472e2a419 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -426,12 +426,6 @@ class PikaServer { int PubSubNumPat(); - /* - * Communication use - */ - Status SendMetaSyncRequest(); - Status SendPartitionTrySyncRequest(std::shared_ptr partition); - /* * Monitor used */ @@ -478,6 +472,55 @@ class PikaServer { void SetDispatchQueueLimit(int queue_limit); + /* + * for RM {repl client, repl server} use + */ + void ScheduleReplBinlogSyncTask(std::string table_partition, + const std::shared_ptr req, + std::shared_ptr conn, + void* req_private_data); + + void ScheduleReplMetaSyncTask(const std::shared_ptr req, + std::shared_ptr conn, + void* req_private_data); + + void ScheduleReplTrySyncTask(const std::shared_ptr req, + std::shared_ptr conn, + void* req_private_data); + + void ScheduleReplDbTask(const std::string &key, + PikaCmdArgsType* argv, BinlogItem* binlog_item, + const std::string& table_name, uint32_t partition_id); + + bool SetBinlogAckInfo(const std::string& table, uint32_t partition, const std::string& ip, int port, + uint32_t ack_file_num, uint64_t ack_offset, uint64_t active_time); + + bool GetBinlogAckInfo(const std::string& table, uint32_t partition, const std::string& ip, int port, + uint32_t* ack_file_num, uint64_t* ack_offset, uint64_t* active_time); + + /* + * Communication use + */ + Status SendMetaSyncRequest(); + Status SendPartitionTrySyncRequest(std::shared_ptr partition); + + Status SendBinlogSyncRequest(const std::string& table, uint32_t partition, const std::string& ip, int port); + + // schedule repl_client thread pool + void ScheduleReplCliTask(pink::TaskFunc func, void*arg); + + /* + * For repl client write queue consumer + * return value: consumed binlog task + */ + int SendToPeer(); + + /* + * Used to trigger binglog sync process + */ + Status TriggerSendBinlogSync(); + + void SignalAuxiliary(); private: std::atomic exit_; std::atomic binlog_io_error_; diff --git a/src/pika_auxiliary_thread.cc b/src/pika_auxiliary_thread.cc index b94bb0756f..d45c36d4c3 100644 --- a/src/pika_auxiliary_thread.cc +++ b/src/pika_auxiliary_thread.cc @@ -17,7 +17,6 @@ PikaAuxiliaryThread::~PikaAuxiliaryThread() { void* PikaAuxiliaryThread::ThreadMain() { while (!should_stop()) { - sleep(1); if (g_pika_server->ShouldMetaSync()) { g_pika_server->SendMetaSyncRequest(); LOG(INFO) << "Send meta sync request finish"; @@ -32,6 +31,21 @@ void* PikaAuxiliaryThread::ThreadMain() { if (g_pika_server->ShouldTrySyncPartition()) { RunEveryPartitionStateMachine(); } + // TODO(whoiami) timeout + Status s = g_pika_server->TriggerSendBinlogSync(); + if (!s.ok()) { + LOG(WARNING) << s.ToString(); + } + // send to peer + int res = g_pika_server->SendToPeer(); + if (!res) { + // sleep 100 ms + mu_.Lock(); + cv_.TimedWait(100); + mu_.Unlock(); + } else { + //LOG_EVERY_N(INFO, 100) << "Consume binlog number " << res; + } } return NULL; } diff --git a/src/pika_inner_message.proto b/src/pika_inner_message.proto index 80228400e7..be9bd9aa5c 100644 --- a/src/pika_inner_message.proto +++ b/src/pika_inner_message.proto @@ -95,7 +95,9 @@ message InnerResponse { // slave to master message BinlogSync { - required BinlogOffset binlog_offset = 1; + required string table_name = 1; + required uint32 partition_id = 2; + required BinlogOffset binlog_offset = 3; } // slave to master diff --git a/src/pika_repl_bgworker.cc b/src/pika_repl_bgworker.cc new file mode 100644 index 0000000000..d33d9dc6e4 --- /dev/null +++ b/src/pika_repl_bgworker.cc @@ -0,0 +1,330 @@ +// Copyright (c) 2019-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "include/pika_repl_bgworker.h" + +#include + +#include "include/pika_conf.h" +#include "include/pika_cmd_table_manager.h" +#include "include/pika_server.h" + +extern PikaServer* g_pika_server; +extern PikaConf* g_pika_conf; +extern PikaCmdTableManager* g_pika_cmd_table_manager; + +PikaReplBgWorker::PikaReplBgWorker(int queue_size) + : bg_thread_(queue_size) { + bg_thread_.set_thread_name("ReplBgWorker"); + pink::RedisParserSettings settings; + settings.DealMessage = &(PikaReplBgWorker::HandleWriteBinlog); + redis_parser_.RedisParserInit(REDIS_PARSER_REQUEST, settings); + redis_parser_.data = this; + table_name_ = g_pika_conf->default_table(); + partition_id_ = 0; +} + +int PikaReplBgWorker::StartThread() { + return bg_thread_.StartThread(); +} + +void PikaReplBgWorker::ScheduleRequest(const std::shared_ptr req, + std::shared_ptr conn, void* req_private_data) { + ReplBgWorkerArg* arg = new ReplBgWorkerArg(req, conn, req_private_data, this); + switch (req->type()) { + case InnerMessage::kMetaSync: + bg_thread_.Schedule(&PikaReplBgWorker::HandleMetaSyncRequest, static_cast(arg)); + break; + case InnerMessage::kTrySync: + bg_thread_.Schedule(&PikaReplBgWorker::HandleTrySyncRequest, static_cast(arg)); + break; + case InnerMessage::kBinlogSync: + bg_thread_.Schedule(&PikaReplBgWorker::HandleBinlogSyncRequest, static_cast(arg)); + break; + default: + break; + } +} + +void PikaReplBgWorker::ScheduleWriteDb(PikaCmdArgsType* argv, BinlogItem* binlog_item, + const std::string table_name, uint32_t partition_id) { + WriteDbBgArg* arg = new WriteDbBgArg(argv, binlog_item, table_name, partition_id); + bg_thread_.Schedule(&PikaReplBgWorker::HandleWriteDb, static_cast(arg)); +} + +void PikaReplBgWorker::HandleMetaSyncRequest(void* arg) { + ReplBgWorkerArg* bg_worker_arg = static_cast(arg); + const std::shared_ptr req = bg_worker_arg->req; + std::shared_ptr conn = bg_worker_arg->conn; + + std::vector table_structs = g_pika_conf->table_structs(); + InnerMessage::InnerResponse response; + response.set_code(InnerMessage::kOk); + response.set_type(InnerMessage::kMetaSync); + InnerMessage::InnerResponse_MetaSync* meta_sync = response.mutable_meta_sync(); + meta_sync->set_classic_mode(g_pika_conf->classic_mode()); + for (const auto& table_struct : table_structs) { + InnerMessage::InnerResponse_MetaSync_TableInfo* table_info = meta_sync->add_tables_info(); + table_info->set_table_name(table_struct.table_name); + table_info->set_partition_num(table_struct.partition_num); + } + + std::string reply_str; + if (!response.SerializeToString(&reply_str)) { + LOG(WARNING) << "Process MetaSync request serialization failed"; + delete bg_worker_arg; + return; + } + int res = conn->WriteResp(reply_str); + if (res) { + LOG(WARNING) << "Process MetaSync request write resp failed"; + delete bg_worker_arg; + return; + } + conn->NotifyWrite(); + delete bg_worker_arg; +} + +void PikaReplBgWorker::HandleBinlogSyncRequest(void* arg) { + ReplBgWorkerArg* bg_worker_arg = static_cast(arg); + const std::shared_ptr req = bg_worker_arg->req; + std::shared_ptr conn = bg_worker_arg->conn; + std::vector* index = static_cast* >(bg_worker_arg->req_private_data); + PikaReplBgWorker* worker = bg_worker_arg->worker; + worker->ip_port_ = conn->ip_port(); + + const InnerMessage::InnerRequest::BinlogSync& binlog_req = + req->binlog_sync((*index)[(*index).size() - 1]); + std::string table_name = binlog_req.table_name(); + uint32_t partition_id = binlog_req.partition_id(); + + worker->table_name_ = table_name; + worker->partition_id_ = partition_id; + + for (size_t i = 0; i < index->size(); ++i) { + const InnerMessage::InnerRequest::BinlogSync& binlog_req = req->binlog_sync((*index)[i]); + if (!PikaBinlogTransverter::BinlogItemWithoutContentDecode(TypeFirst, binlog_req.binlog(), &worker->binlog_item_)) { + LOG(WARNING) << "Binlog item decode failed"; + delete index; + delete bg_worker_arg; + return; + } + const char* redis_parser_start = binlog_req.binlog().data() + BINLOG_ENCODE_LEN; + int redis_parser_len = static_cast(binlog_req.binlog().size()) - BINLOG_ENCODE_LEN; + int processed_len = 0; + pink::RedisParserStatus ret = worker->redis_parser_.ProcessInputBuffer( + redis_parser_start, redis_parser_len, &processed_len); + if (ret != pink::kRedisParserDone) { + LOG(WARNING) << "Redis parser failed"; + delete index; + delete bg_worker_arg; + return; + } + } + + // build response + std::shared_ptr partition = g_pika_server->GetTablePartitionById(table_name, partition_id); + std::shared_ptr logger = partition->logger(); + uint32_t file_num; + uint64_t offset; + logger->GetProducerStatus(&file_num, &offset); + + InnerMessage::InnerResponse response; + response.set_code(InnerMessage::kOk); + response.set_type(InnerMessage::kBinlogSync); + InnerMessage::InnerResponse_BinlogSync* binlog_sync_resp = response.mutable_binlog_sync(); + binlog_sync_resp->set_table_name(table_name); + binlog_sync_resp->set_partition_id(partition_id); + InnerMessage::BinlogOffset* binlog_offset = binlog_sync_resp->mutable_binlog_offset(); + binlog_offset->set_filenum(file_num); + binlog_offset->set_offset(offset); + + std::string reply_str; + if (!response.SerializeToString(&reply_str)) { + LOG(WARNING) << "Process MetaSync request serialization failed"; + delete index; + delete bg_worker_arg; + return; + } + + int res = conn->WriteResp(reply_str); + if (res) { + LOG(WARNING) << "Process BinlogSync request write resp failed"; + delete index; + delete bg_worker_arg; + return; + } + conn->NotifyWrite(); + delete index; + delete bg_worker_arg; +} + +int PikaReplBgWorker::HandleWriteBinlog(pink::RedisParser* parser, const pink::RedisCmdArgsType& argv) { + PikaReplBgWorker* worker = static_cast(parser->data); + const BinlogItem& binlog_item = worker->binlog_item_; + g_pika_server->UpdateQueryNumAndExecCountTable(argv[0]); + + // Monitor related + std::string monitor_message; + if (g_pika_server->HasMonitorClients()) { + std::string monitor_message = std::to_string(1.0 * slash::NowMicros() / 1000000) + + " [" + worker->ip_port_ + "]"; + for (const auto& item : argv) { + monitor_message += " " + slash::ToRead(item); + } + g_pika_server->AddMonitorMessage(monitor_message); + } + + std::string opt = argv[0]; + Cmd* c_ptr = g_pika_cmd_table_manager->GetCmd(slash::StringToLower(opt)); + // Initial + c_ptr->Initial(argv, worker->table_name_); + if (!c_ptr->res().ok()) { + LOG(WARNING) << "Fail to initial command from binlog: " << opt; + return -1; + } + + std::shared_ptr partition = g_pika_server->GetTablePartitionById(worker->table_name_, worker->partition_id_); + std::shared_ptr logger = partition->logger(); + + logger->Lock(); + logger->Put(c_ptr->ToBinlog(binlog_item.exec_time(), + std::to_string(binlog_item.server_id()), + binlog_item.logic_id(), + binlog_item.filenum(), + binlog_item.offset())); + uint32_t filenum; + uint64_t offset; + logger->GetProducerStatus(&filenum, &offset); + logger->Unlock(); + + PikaCmdArgsType *v = new PikaCmdArgsType(argv); + BinlogItem *b = new BinlogItem(binlog_item); + std::string dispatch_key = argv.size() >= 2 ? argv[1] : argv[0]; + g_pika_server->ScheduleReplDbTask(dispatch_key, v, b, worker->table_name_, worker->partition_id_); + return 0; +} + +void PikaReplBgWorker::HandleWriteDb(void* arg) { + WriteDbBgArg *bg_worker_arg = static_cast(arg); + PikaCmdArgsType* argv = bg_worker_arg->argv; + BinlogItem binlog_item = *(bg_worker_arg->binlog_item); + std::string table_name = bg_worker_arg->table_name; + uint32_t partition_id = bg_worker_arg->partition_id; + std::string opt = (*argv)[0]; + slash::StringToLower(opt); + + // Get command + Cmd* c_ptr = g_pika_cmd_table_manager->GetCmd(slash::StringToLower(opt)); + if (!c_ptr) { + LOG(WARNING) << "Error operation from binlog: " << opt; + delete bg_worker_arg; + return; + } + c_ptr->res().clear(); + + // Initial + c_ptr->Initial(*argv, table_name); + if (!c_ptr->res().ok()) { + LOG(WARNING) << "Fail to initial command from binlog: " << opt; + delete bg_worker_arg; + return; + } + + uint64_t start_us = 0; + if (g_pika_conf->slowlog_slower_than() >= 0) { + start_us = slash::NowMicros(); + } + std::shared_ptr partition = g_pika_server->GetTablePartitionById(table_name, partition_id); + // Add read lock for no suspend command + if (!c_ptr->is_suspend()) { + g_pika_server->RWLockReader(); + } + + c_ptr->Do(partition); + + if (!c_ptr->is_suspend()) { + g_pika_server->RWUnlock(); + } + + if (g_pika_conf->slowlog_slower_than() >= 0) { + int64_t duration = slash::NowMicros() - start_us; + if (duration > g_pika_conf->slowlog_slower_than()) { + LOG(ERROR) << "command: " << opt << ", start_time(s): " << start_us / 1000000 << ", duration(us): " << duration; + } + } + + delete bg_worker_arg; +} + +void PikaReplBgWorker::HandleTrySyncRequest(void* arg) { + ReplBgWorkerArg* bg_worker_arg = static_cast(arg); + const std::shared_ptr req = bg_worker_arg->req; + std::shared_ptr conn = bg_worker_arg->conn; + + InnerMessage::InnerRequest::TrySync try_sync_request = req->try_sync(); + InnerMessage::Partition partition_request = try_sync_request.partition(); + std::string table_name = partition_request.table_name(); + uint32_t partition_id = partition_request.partition_id(); + bool force = try_sync_request.force(); + std::string partition_name = table_name + "_" + std::to_string(partition_id); + InnerMessage::BinlogOffset slave_boffset = try_sync_request.binlog_offset(); + InnerMessage::Node node = try_sync_request.node(); + LOG(INFO) << "Trysync, Slave ip: " << node.ip() << ", Slave port:" + << node.port() << ", Partition: " << partition_name << ", filenum: " + << slave_boffset.filenum() << ", pro_offset: " << slave_boffset.offset(); + + InnerMessage::InnerResponse response; + response.set_type(InnerMessage::Type::kTrySync); + response.set_code(InnerMessage::StatusCode::kOk); + InnerMessage::InnerResponse::TrySync* try_sync_response = response.mutable_try_sync(); + InnerMessage::Partition* partition_response = try_sync_response->mutable_partition(); + partition_response->set_table_name(table_name); + partition_response->set_partition_id(partition_id); + if (force) { + } else { + BinlogOffset boffset; + if (!g_pika_server->GetTablePartitionBinlogOffset(table_name, partition_id, &boffset)) { + try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kError); + LOG(WARNING) << "Handle TrySync, Partition: " + << partition_name << " not found, TrySync failed"; + } else { + if (boffset.filenum < slave_boffset.filenum() + || (boffset.filenum == slave_boffset.filenum() && boffset.offset < slave_boffset.offset())) { + try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kInvalidOffset); + LOG(WARNING) << "Slave offset is larger than mine, Slave ip: " + << node.ip() << ", Slave port: " << node.port() << ", Partition: " + << partition_name << ", filenum: " << slave_boffset.filenum() + << ", pro_offset_: " << slave_boffset.offset(); + } else { + try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kOk); + try_sync_response->set_sid(0); + LOG(INFO) << "Try Incrental SYNC " << " master filenum: " << boffset.filenum << " offset: " << boffset.offset + << " slave filenum: " << slave_boffset.filenum() << " offset: " << slave_boffset.offset(); + // incremental sync + Status s = g_pika_server->AddBinlogSender( + table_name, + partition_id, + node.ip(), + node.port(), + 0, + slave_boffset.filenum(), + slave_boffset.offset()); + if (!s.ok()) { + LOG(WARNING) << s.ToString(); + } + } + } + } + delete bg_worker_arg; + + std::string reply_str; + if (!response.SerializeToString(&reply_str) + || conn->WriteResp(reply_str)) { + LOG(WARNING) << "Handle Try Sync Failed"; + return; + } + conn->NotifyWrite(); +} diff --git a/src/pika_repl_client.cc b/src/pika_repl_client.cc index 2c30f0e6ba..dab561092d 100644 --- a/src/pika_repl_client.cc +++ b/src/pika_repl_client.cc @@ -8,96 +8,166 @@ #include "include/pika_server.h" #include "slash/include/slash_coding.h" #include "slash/include/env.h" +#include "slash/include/slash_string.h" extern PikaServer* g_pika_server; PikaReplClient::PikaReplClient(int cron_interval, int keepalive_timeout) { client_thread_ = new PikaReplClientThread(cron_interval, keepalive_timeout); client_thread_->set_thread_name("PikaReplClient"); + client_tp_ = new pink::ThreadPool(g_pika_conf->thread_pool_size(), 100000); + pthread_rwlock_init(&binlog_ctl_rw_, NULL); } PikaReplClient::~PikaReplClient() { client_thread_->StopThread(); delete client_thread_; - for (auto iter : slave_binlog_readers_) { + for (auto iter : binlog_ctl_) { delete iter.second; } + delete client_tp_; + pthread_rwlock_destroy(&binlog_ctl_rw_); LOG(INFO) << "PikaReplClient exit!!!"; } -static inline void BuildBinlogReaderIndex(const RmNode& slave, std::string* index) { - *index = slave.table_ + "_" + std::to_string(slave.partition_) + "_" + slave.ip_ + ":" + std::to_string(slave.port_); -} - int PikaReplClient::Start() { int res = client_thread_->StartThread(); if (res != pink::kSuccess) { LOG(FATAL) << "Start ReplClient ClientThread Error: " << res << (res == pink::kCreateThreadError ? ": create thread error " : ": other error"); } + res = client_tp_->start_thread_pool(); + if (res != pink::kSuccess) { + LOG(FATAL) << "Start ThreadPool Error: " << res << (res == pink::kCreateThreadError ? ": create thread error " : ": other error"); + } return res; } +void PikaReplClient::ProduceWriteQueue(WriteTask& task) { + slash::MutexLock l(&write_queue_mu_); + std::string index = task.rm_node_.ip_ + ":" + std::to_string(task.rm_node_.port_); + write_queues_[index].push(task); +} + +int PikaReplClient::ConsumeWriteQueue() { + InnerMessage::InnerRequest request; + request.set_type(InnerMessage::kBinlogSync); + + int counter = 0; + slash::MutexLock l(&write_queue_mu_); + for (auto& iter : write_queues_) { + std::queue& queue = iter.second; + if (queue.empty()) { + continue; + } + size_t batch_index = queue.size() > kBinlogSyncBatchNum ? kBinlogSyncBatchNum : queue.size(); + std::string ip; + int port = 0; + if (!slash::ParseIpPortString(iter.first, ip, port)) { + LOG(WARNING) << "Parse ip_port error " << iter.first; + } + for (size_t i = 0; i < batch_index; ++i) { + WriteTask task = queue.front(); + queue.pop(); + BuildBinlogPb(task.rm_node_, + task.binlog_chip_.binlog_, + task.binlog_chip_.file_num_, + task.binlog_chip_.offset_, request); + + counter++; + } + std::string to_send; + bool res = request.SerializeToString(&to_send); + if (!res) { + LOG(WARNING) << "Serialize Failed"; + continue; + } + Status s = client_thread_->Write(ip, port, to_send); + if (!s.ok()) { + LOG(WARNING) << "write to " << ip << ":" << port << " failed"; + continue; + } + } + return counter; +} + +bool PikaReplClient::SetAckInfo(const RmNode& slave, uint32_t ack_file_num, uint64_t ack_offset, uint64_t active_time) { + BinlogSyncCtl* ctl = nullptr; + { + slash::RWLock l(&binlog_ctl_rw_, false); + if (binlog_ctl_.find(slave) == binlog_ctl_.end()) { + return false; + } + ctl = binlog_ctl_[slave]; + } + + { + slash::MutexLock l(&(ctl->ctl_mu_)); + ctl->ack_file_num_ = ack_file_num; + ctl->ack_offset_ = ack_offset; + ctl->active_time_ = active_time; + } + return true; +} + +bool PikaReplClient::GetAckInfo(const RmNode& slave, uint32_t* ack_file_num, uint64_t* ack_offset, uint64_t* active_time) { + BinlogSyncCtl* ctl = nullptr; + { + slash::RWLock l(&binlog_ctl_rw_, false); + if (binlog_ctl_.find(slave) == binlog_ctl_.end()) { + return false; + } + ctl = binlog_ctl_[slave]; + } + + { + slash::MutexLock l(&(ctl->ctl_mu_)); + *ack_file_num = ctl->ack_file_num_; + *ack_offset = ctl->ack_offset_; + *active_time = ctl->active_time_; + } + return true; +} + Status PikaReplClient::Write(const std::string& ip, const int port, const std::string& msg) { // shift port 3000 tobe inner connect port return client_thread_->Write(ip, port, msg); } Status PikaReplClient::RemoveBinlogReader(const RmNode& slave) { - std::string index; - BuildBinlogReaderIndex(slave, &index); - if (slave_binlog_readers_.find(index) != slave_binlog_readers_.end()) { - delete slave_binlog_readers_[index]; - slave_binlog_readers_.erase(index); + { + slash::RWLock l(&binlog_ctl_rw_, true); + if (binlog_ctl_.find(slave) != binlog_ctl_.end()) { + delete binlog_ctl_[slave]; + binlog_ctl_.erase(slave); + } } return Status::OK(); } Status PikaReplClient::AddBinlogReader(const RmNode& slave, std::shared_ptr logger, uint32_t filenum, uint64_t offset) { - std::string index; - BuildBinlogReaderIndex(slave, &index); RemoveBinlogReader(slave); PikaBinlogReader* binlog_reader = NewPikaBinlogReader(logger, filenum, offset); if (!binlog_reader) { - return Status::Corruption(index + " new binlog reader failed"); + return Status::Corruption(slave.ToString() + " new binlog reader failed"); } int res = binlog_reader->Seek(); if (res) { delete binlog_reader; - return Status::Corruption(index + " binlog reader init failed"); + return Status::Corruption(slave.ToString() + " binlog reader init failed"); } - slave_binlog_readers_[index] = binlog_reader; - return Status::OK(); -} + uint32_t cur_file_num; + uint64_t cur_offset; + binlog_reader->GetReaderStatus(&cur_file_num, &cur_offset); -void PikaReplClient::RunStateMachine(const RmNode& slave) { - Status s = TrySendSyncBinlog(slave); - if (!s.ok()) { - LOG(INFO) << s.ToString(); - return; - } - //state = GetState(slave); - //bool res = false; - //switch(state) { - // case ShouldSendAuth : - // res = TrySyncBinlog(slave, false); - // break; - // case ShouldSendBinlog : - // TrySyncBinlog(slave, true); - // break; - // default: - // break; - //} -} - -bool PikaReplClient::NeedToSendBinlog(const RmNode& slave) { - std::string index; - BuildBinlogReaderIndex(slave, &index); - auto binlog_reader_iter = slave_binlog_readers_.find(index); - if (binlog_reader_iter == slave_binlog_readers_.end()) { - return false; + { + slash::RWLock l(&binlog_ctl_rw_, true); + uint64_t now; + struct timeval tv; + gettimeofday(&tv, NULL); + now = tv.tv_sec; + binlog_ctl_[slave] = new BinlogSyncCtl(binlog_reader, cur_file_num, cur_offset, now); } - PikaBinlogReader* reader = binlog_reader_iter->second; - return !(reader->ReadToTheEnd()); + return Status::OK(); } Status PikaReplClient::SendMetaSync() { @@ -149,31 +219,64 @@ Status PikaReplClient::SendPartitionTrySync(const std::string& table_name, return client_thread_->Write(master_ip, master_port + 3000, to_send); } -Status PikaReplClient::TrySendSyncBinlog(const RmNode& slave) { - InnerMessage::InnerRequest request; - request.set_type(InnerMessage::kBinlogSync); - if (!NeedToSendBinlog(slave)) { - return Status::OK(); +Status PikaReplClient::SendBinlogSync(const RmNode& slave) { + BinlogSyncCtl* ctl = nullptr; + { + slash::RWLock l(&binlog_ctl_rw_, false); + auto iter = binlog_ctl_.find(slave); + if (iter == binlog_ctl_.end()) { + return Status::NotFound(slave.ToString() + " not found"); } + ctl = iter->second; + } + + { + slash::MutexLock l(&(ctl->ctl_mu_)); for (int i = 0; i < kBinlogSyncBatchNum; ++i) { std::string msg; uint32_t filenum; uint64_t offset; - Status s = BuildBinlogMsgFromFile(slave, &msg, &filenum, &offset); + Status s = ctl->reader_->Get(&msg, &filenum, &offset); if (s.IsEndFile()) { break; } else if (s.IsCorruption() || s.IsIOError()) { return s; } - BuildBinlogPb(slave, msg, filenum, offset, request); + WriteTask task(slave, BinlogChip(filenum, offset, msg)); + ProduceWriteQueue(task); } - std::string to_send; - bool res = request.SerializeToString(&to_send); - if (!res) { - return Status::Corruption("Serialize Failed"); } - return client_thread_->Write(slave.ip_, slave.port_, to_send); + return Status::OK(); +} + +Status PikaReplClient::TriggerSendBinlogSync() { + slash::RWLock l(&binlog_ctl_rw_, false); + for (auto& binlog_ctl : binlog_ctl_) { + BinlogSyncCtl* ctl = binlog_ctl.second; + { + slash::MutexLock l(&(ctl->ctl_mu_)); + uint32_t send_file_num; + uint64_t send_offset; + ctl->reader_->GetReaderStatus(&send_file_num, &send_offset); + if (ctl->ack_file_num_ == send_file_num && ctl->ack_offset_ == send_offset) { + for (int i = 0; i < kBinlogSyncBatchNum; ++i) { + std::string msg; + uint32_t filenum; + uint64_t offset; + Status s = ctl->reader_->Get(&msg, &filenum, &offset); + if (s.IsEndFile()) { + break; + } else if (s.IsCorruption() || s.IsIOError()) { + return s; + } + WriteTask task(binlog_ctl.first, BinlogChip(filenum, offset, msg)); + ProduceWriteQueue(task); + } + } + } + } + return Status::OK(); } void PikaReplClient::BuildBinlogPb(const RmNode& slave, const std::string& msg, uint32_t filenum, uint64_t offset, InnerMessage::InnerRequest& request) { @@ -189,23 +292,6 @@ void PikaReplClient::BuildBinlogPb(const RmNode& slave, const std::string& msg, binlog_msg->set_binlog(msg); } -Status PikaReplClient::BuildBinlogMsgFromFile(const RmNode& slave, std::string* scratch, uint32_t* filenum, uint64_t* offset) { - std::string index; - BuildBinlogReaderIndex(slave, &index); - auto iter = slave_binlog_readers_.find(index); - if (iter == slave_binlog_readers_.end()) { - return Status::NotFound(index + " not found"); - } - PikaBinlogReader* reader = iter->second; - - // Get command supports append binlog - Status s = reader->Get(scratch, filenum, offset); - if (!s.ok()) { - return s; - } - return Status::OK(); -} - PikaBinlogReader* PikaReplClient::NewPikaBinlogReader(std::shared_ptr logger, uint32_t filenum, uint64_t offset) { std::string confile = NewFileName(logger->filename, filenum); if (!slash::FileExists(confile)) { diff --git a/src/pika_repl_client_conn.cc b/src/pika_repl_client_conn.cc index 7def284bf3..de654c1eec 100644 --- a/src/pika_repl_client_conn.cc +++ b/src/pika_repl_client_conn.cc @@ -5,8 +5,10 @@ #include "include/pika_repl_client_conn.h" +#include + #include "include/pika_server.h" -#include "src/pika_inner_message.pb.h" +#include "slash/include/slash_string.h" extern PikaConf* g_pika_conf; extern PikaServer* g_pika_server; @@ -19,11 +21,6 @@ PikaReplClientConn::PikaReplClientConn(int fd, : pink::PbConn(fd, ip_port, thread, epoll) { } -void PikaReplClientConn::DoReplClientTask(void* arg) { - InnerMessage::InnerResponse* response = reinterpret_cast(arg); - delete response; -} - bool PikaReplClientConn::IsTableStructConsistent( const std::vector& current_tables, const std::vector& expect_tables) { @@ -39,14 +36,19 @@ bool PikaReplClientConn::IsTableStructConsistent( return true; } -int PikaReplClientConn::HandleMetaSyncResponse(const InnerMessage::InnerResponse& response) { - const InnerMessage::InnerResponse_MetaSync meta_sync = response.meta_sync(); +void PikaReplClientConn::HandleMetaSyncResponse(void* arg) { + ReplRespArg* resp_arg = static_cast(arg); + std::shared_ptr conn = resp_arg->conn; + std::shared_ptr response = resp_arg->resp; + + const InnerMessage::InnerResponse_MetaSync meta_sync = response->meta_sync(); if (g_pika_conf->classic_mode() != meta_sync.classic_mode()) { LOG(WARNING) << "Self in " << (g_pika_conf->classic_mode() ? "classic" : "sharding") << " mode, but master in " << (meta_sync.classic_mode() ? "classic" : "sharding") << " mode, failed to establish master-slave relationship"; g_pika_server->SyncError(); - return -1; + delete resp_arg; + return; } std::vector master_table_structs; @@ -58,11 +60,12 @@ int PikaReplClientConn::HandleMetaSyncResponse(const InnerMessage::InnerResponse bool force_full_sync = g_pika_server->force_full_sync(); std::vector self_table_structs = g_pika_conf->table_structs(); if (!force_full_sync - && !IsTableStructConsistent(self_table_structs, master_table_structs)) { + && !PikaReplClientConn::IsTableStructConsistent(self_table_structs, master_table_structs)) { LOG(WARNING) << "Self table structs inconsistent with master" << ", failed to establish master-slave relationship"; g_pika_server->SyncError(); - return -1; + delete resp_arg; + return; } if (force_full_sync) { @@ -72,17 +75,22 @@ int PikaReplClientConn::HandleMetaSyncResponse(const InnerMessage::InnerResponse LOG(WARNING) << "Need force full sync but rebuild table struct error" << ", failed to establish master-slave relationship"; g_pika_server->SyncError(); - return -1; + delete resp_arg; + return; } g_pika_server->PurgeDir(g_pika_conf->trash_path()); } LOG(INFO) << "Finish to handle meta sync response"; g_pika_server->MetaSyncDone(); - return 0; + delete resp_arg; } -int PikaReplClientConn::HandleTrySyncResponse(const InnerMessage::InnerResponse& response) { - const InnerMessage::InnerResponse_TrySync try_sync_response = response.try_sync(); +void PikaReplClientConn::HandleTrySyncResponse(void* arg) { + ReplRespArg* resp_arg = static_cast(arg); + std::shared_ptr conn = resp_arg->conn; + std::shared_ptr response = resp_arg->resp; + + const InnerMessage::InnerResponse_TrySync try_sync_response = response->try_sync(); const InnerMessage::Partition partition_response = try_sync_response.partition(); std::string table_name = partition_response.table_name(); uint32_t partition_id = partition_response.partition_id(); @@ -95,24 +103,79 @@ int PikaReplClientConn::HandleTrySyncResponse(const InnerMessage::InnerResponse& } else if (try_sync_response.reply_code() == InnerMessage::InnerResponse::TrySync::kInvalidOffset) { LOG(WARNING) << "Partition: " << partition_name << " TrySync Error, Because the invalid filenum and offset"; } else if (try_sync_response.reply_code() == InnerMessage::InnerResponse::TrySync::kOk) { - LOG(INFO) << "Partition: " << partition_name << " TrySync Ok"; + LOG(INFO) << "Partition: " << partition_name << " TrySync Ok"; } - return 0; + delete resp_arg; } int PikaReplClientConn::DealMessage() { - int res = 0; - InnerMessage::InnerResponse response; - response.ParseFromArray(rbuf_ + cur_pos_ - header_len_, header_len_); - switch (response.type()) { + std::shared_ptr response = std::make_shared(); + response->ParseFromArray(rbuf_ + cur_pos_ - header_len_, header_len_); + switch (response->type()) { case InnerMessage::kMetaSync: - res = HandleMetaSyncResponse(response); + { + ReplRespArg* arg = new ReplRespArg(response, std::dynamic_pointer_cast(shared_from_this())); + g_pika_server->ScheduleReplCliTask(&PikaReplClientConn::HandleMetaSyncResponse, static_cast(arg)); break; + } case InnerMessage::kTrySync: - res = HandleTrySyncResponse(response); + { + ReplRespArg* arg = new ReplRespArg(response, std::dynamic_pointer_cast(shared_from_this())); + g_pika_server->ScheduleReplCliTask(&PikaReplClientConn::HandleTrySyncResponse, static_cast(arg)); break; + } + case InnerMessage::kBinlogSync: + { + ReplRespArg* arg = new ReplRespArg(response, std::dynamic_pointer_cast(shared_from_this())); + g_pika_server->ScheduleReplCliTask(&PikaReplClientConn::HandleBinlogSyncResponse, static_cast(arg)); + break; + } default: break; } - return res; + return 0; +} + +void PikaReplClientConn::HandleBinlogSyncResponse(void* arg) { + ReplRespArg* resp_arg = static_cast(arg); + std::shared_ptr conn = resp_arg->conn; + std::shared_ptr resp = resp_arg->resp; + if (!resp->has_binlog_sync()) { + LOG(WARNING) << "Pb parse error"; + delete resp_arg; + return; + } + const InnerMessage::InnerResponse_BinlogSync& binlog_ack = resp->binlog_sync(); + std::string table_name = binlog_ack.table_name(); + uint32_t partition_id = binlog_ack.partition_id(); + std::string ip; + int port = 0; + bool res = slash::ParseIpPortString(conn->ip_port(), ip, port); + if (!res) { + LOG(WARNING) << "Parse Error ParseIpPortString faile"; + delete resp_arg; + return; + } + const InnerMessage::BinlogOffset& binlog_offset = binlog_ack.binlog_offset(); + + uint64_t now; + struct timeval tv; + gettimeofday(&tv, NULL); + now = tv.tv_sec; + + // Set ack info from slave + res = g_pika_server->SetBinlogAckInfo(table_name, partition_id, ip, port, binlog_offset.filenum(), binlog_offset.offset(), now); + if (!res) { + LOG(WARNING) << "Update binlog ack failed " << table_name << " " << partition_id; + delete resp_arg; + return; + } + delete resp_arg; + + Status s = g_pika_server->SendBinlogSyncRequest(table_name, partition_id, ip, port); + if (!s.ok()) { + LOG(WARNING) << "Send BinlogSync Request failed " << table_name << " " << partition_id << s.ToString(); + return; + } + g_pika_server->SignalAuxiliary(); } diff --git a/src/pika_repl_server.cc b/src/pika_repl_server.cc index 4fc7994b5f..68641b62b6 100644 --- a/src/pika_repl_server.cc +++ b/src/pika_repl_server.cc @@ -7,14 +7,24 @@ #include -PikaReplServer::PikaReplServer(const std::set& ips, int port, int cron_interval) { +#include "include/pika_conf.h" + +extern PikaConf* g_pika_conf; + +PikaReplServer::PikaReplServer(const std::set& ips, int port, int cron_interval) : next_avail_(0) { pika_repl_server_thread_ = new PikaReplServerThread(ips, port, cron_interval); pika_repl_server_thread_->set_thread_name("PikaReplServer"); + for (int i = 0; i < 2 * g_pika_conf->sync_thread_num(); ++i) { + bg_workers_.push_back(new PikaReplBgWorker(g_pika_conf->sync_buffer_size())); + } } PikaReplServer::~PikaReplServer() { pika_repl_server_thread_->StopThread(); delete pika_repl_server_thread_; + for (size_t i = 0; i < bg_workers_.size(); ++i) { + delete bg_workers_[i]; + } LOG(INFO) << "PikaReplServer exit!!!"; } @@ -23,5 +33,36 @@ int PikaReplServer::Start() { if (res != pink::kSuccess) { LOG(FATAL) << "Start Pika Repl Server Thread Error: " << res << (res == pink::kCreateThreadError ? ": create thread error " : ": other error"); } + for (size_t i = 0; i < bg_workers_.size(); ++i) { + res = bg_workers_[i]->StartThread(); + if (res != pink::kSuccess) { + LOG(FATAL) << "Start Pika Repl Worker Thread Error: " << res << (res == pink::kCreateThreadError ? ": create thread error " : ": other error"); + } + } return res; } + +void PikaReplServer::ScheduleBinlogSyncTask(std::string table_partition, const std::shared_ptr req, std::shared_ptr conn, void* req_private_data) { + size_t index = GetHashIndex(table_partition, true); + bg_workers_[index]->ScheduleRequest(req, conn, req_private_data); +} + +void PikaReplServer::ScheduleMetaSyncTask(const std::shared_ptr req, std::shared_ptr conn, void* req_private_data) { + bg_workers_[next_avail_]->ScheduleRequest(req, conn, req_private_data); + UpdateNextAvail(); +} + +void PikaReplServer::ScheduleTrySyncTask(const std::shared_ptr req, std::shared_ptr conn, void* req_private_data) { + bg_workers_[next_avail_]->ScheduleRequest(req, conn, NULL); + UpdateNextAvail(); +} + +void PikaReplServer::ScheduleDbTask(const std::string& key, PikaCmdArgsType* argv, BinlogItem* binlog_item, const std::string& table_name, uint32_t partition_id) { + size_t index = GetHashIndex(key, false); + bg_workers_[index]->ScheduleWriteDb(argv, binlog_item, table_name, partition_id); +} + +size_t PikaReplServer::GetHashIndex(std::string key, bool upper_half) { + size_t hash_base = bg_workers_.size() / 2; + return (str_hash(key) % hash_base) + (upper_half ? 0 : hash_base); +} diff --git a/src/pika_repl_server_conn.cc b/src/pika_repl_server_conn.cc index 0c02eb1860..a3100fc999 100644 --- a/src/pika_repl_server_conn.cc +++ b/src/pika_repl_server_conn.cc @@ -7,195 +7,67 @@ #include -#include "include/pika_conf.h" #include "include/pika_server.h" -#include "include/pika_cmd_table_manager.h" -extern PikaConf* g_pika_conf; extern PikaServer* g_pika_server; -extern PikaCmdTableManager* g_pika_cmd_table_manager; PikaReplServerConn::PikaReplServerConn(int fd, std::string ip_port, pink::Thread* thread, void* worker_specific_data, pink::PinkEpoll* epoll) : PbConn(fd, ip_port, thread, epoll) { - binlog_receiver_ = reinterpret_cast(worker_specific_data); - pink::RedisParserSettings settings; - settings.DealMessage = &(PikaReplServerConn::ParserDealMessage); - redis_parser_.RedisParserInit(REDIS_PARSER_REQUEST, settings); - redis_parser_.data = this; } PikaReplServerConn::~PikaReplServerConn() { } int PikaReplServerConn::DealMessage() { - InnerMessage::InnerRequest req; - bool parse_res = req.ParseFromArray(rbuf_ + cur_pos_ - header_len_, header_len_); + std::shared_ptr req = std::make_shared(); + bool parse_res = req->ParseFromArray(rbuf_ + cur_pos_ - header_len_, header_len_); if (!parse_res) { LOG(WARNING) << "Pika repl server connection pb parse error."; return -1; } int res = 0; - switch (req.type()) { + switch (req->type()) { case InnerMessage::kMetaSync: - HandleMetaSyncRequest(req); + g_pika_server->ScheduleReplMetaSyncTask( + req, + std::dynamic_pointer_cast(shared_from_this()), + NULL); break; case InnerMessage::kTrySync: - HandleTrySync(req); + g_pika_server->ScheduleReplTrySyncTask( + req, + std::dynamic_pointer_cast(shared_from_this()), + NULL); break; case InnerMessage::kBinlogSync: - res = HandleBinlogSync(req); + DispatchBinlogReq(req); + break; default: break; } return res; } -int PikaReplServerConn::HandleMetaSyncRequest(const InnerMessage::InnerRequest& req) { - std::vector table_structs = g_pika_conf->table_structs(); - InnerMessage::InnerResponse response; - response.set_code(InnerMessage::StatusCode::kOk); - response.set_type(InnerMessage::Type::kMetaSync); - InnerMessage::InnerResponse_MetaSync* meta_sync = response.mutable_meta_sync(); - meta_sync->set_classic_mode(g_pika_conf->classic_mode()); - for (const auto& table_struct : table_structs) { - InnerMessage::InnerResponse_MetaSync_TableInfo* table_info = meta_sync->add_tables_info(); - table_info->set_table_name(table_struct.table_name); - table_info->set_partition_num(table_struct.partition_num); - } - - std::string reply_str; - if (!response.SerializeToString(&reply_str) - || WriteResp(reply_str)) { - return -1; - } - NotifyWrite(); - return 0; -} - -int PikaReplServerConn::HandleTrySync(const InnerMessage::InnerRequest& req) { - InnerMessage::InnerRequest::TrySync try_sync_request = req.try_sync(); - InnerMessage::Partition partition_request = try_sync_request.partition(); - std::string table_name = partition_request.table_name(); - uint32_t partition_id = partition_request.partition_id(); - bool force = try_sync_request.force(); - std::string partition_name = table_name + "_" + std::to_string(partition_id); - InnerMessage::BinlogOffset slave_boffset = try_sync_request.binlog_offset(); - InnerMessage::Node node = try_sync_request.node(); - LOG(INFO) << "Trysync, Slave ip: " << node.ip() << ", Slave port:" - << node.port() << ", Partition: " << partition_name << ", filenum: " - << slave_boffset.filenum() << ", pro_offset: " << slave_boffset.offset() - << ", force: " << (force ? "yes" : "no"); - - InnerMessage::InnerResponse response; - response.set_type(InnerMessage::Type::kTrySync); - response.set_code(InnerMessage::StatusCode::kOk); - InnerMessage::InnerResponse::TrySync* try_sync_response = response.mutable_try_sync(); - InnerMessage::Partition* partition_response = try_sync_response->mutable_partition(); - partition_response->set_table_name(table_name); - partition_response->set_partition_id(partition_id); - if (force) { - LOG(INFO) << "Partition: " << partition_name << " force full sync, BgSave and DbSync first"; - g_pika_server->TryDBSync(node.ip(), node.port(), table_name, partition_id, slave_boffset.filenum()); - try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kWait); - } else { - BinlogOffset boffset; - if (!g_pika_server->GetTablePartitionBinlogOffset(table_name, partition_id, &boffset)) { - try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kError); - LOG(WARNING) << "Handle TrySync, Partition: " - << partition_name << " not found, TrySync failed"; - } else { - if (boffset.filenum < slave_boffset.filenum() - || (boffset.filenum == slave_boffset.filenum() && boffset.offset < slave_boffset.offset())) { - try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kInvalidOffset); - LOG(WARNING) << "Slave offset is larger than mine, Slave ip: " - << node.ip() << ", Slave port: " << node.port() << ", Partition: " - << partition_name << ", filenum: " << slave_boffset.filenum() - << ", pro_offset_: " << slave_boffset.offset() << ", force: " - << (force ? "yes" : "no"); - } else { - LOG(INFO) << "Partition: " << partition_name << " TrySync success"; - try_sync_response->set_reply_code(InnerMessage::InnerResponse::TrySync::kOk); - try_sync_response->set_sid(0); - } +void PikaReplServerConn::DispatchBinlogReq(const std::shared_ptr req) { + // partition to a bunch of binlog chips + std::unordered_map*> par_binlog; + for (int i = 0; i < req->binlog_sync_size(); ++i) { + const InnerMessage::InnerRequest::BinlogSync& binlog_req = req->binlog_sync(i); + // hash key: table + partition_id + std::string key = binlog_req.table_name() + std::to_string(binlog_req.partition_id()); + if (par_binlog.find(key) == par_binlog.end()) { + par_binlog[key] = new std::vector(); } + par_binlog[key]->push_back(i); } - - std::string reply_str; - if (!response.SerializeToString(&reply_str) - || WriteResp(reply_str)) { - return -1; - } - NotifyWrite(); - return 0; -} - -int PikaReplServerConn::HandleBinlogSync(const InnerMessage::InnerRequest& req) { - for (int i = 0; i < req.binlog_sync_size(); ++i) { - const InnerMessage::InnerRequest::BinlogSync& binlog_req = req.binlog_sync(i); - if(!PikaBinlogTransverter::BinlogItemWithoutContentDecode(TypeFirst, binlog_req.binlog(), &binlog_item_)) { - return -1; - } - const char* redis_parser_start = binlog_req.binlog().data() + BINLOG_ENCODE_LEN; - int redis_parser_len = static_cast(binlog_req.binlog().size()) - BINLOG_ENCODE_LEN; - int processed_len = 0; - pink::RedisParserStatus ret = redis_parser_.ProcessInputBuffer( - redis_parser_start, redis_parser_len, &processed_len); - if (ret != pink::kRedisParserDone) { - return -1; - } + for (auto& binlog_nums : par_binlog) { + g_pika_server->ScheduleReplBinlogSyncTask( + binlog_nums.first, + req, + std::dynamic_pointer_cast(shared_from_this()), + reinterpret_cast(binlog_nums.second)); } - return 0; -} - - -bool PikaReplServerConn::ProcessBinlogData(const pink::RedisCmdArgsType& argv, const BinlogItem& binlog_item) { - g_pika_server->UpdateQueryNumAndExecCountTable(argv[0]); - - // Monitor related - std::string monitor_message; - if (g_pika_server->HasMonitorClients()) { - std::string monitor_message = std::to_string(1.0 * slash::NowMicros() / 1000000) - + " [" + this->ip_port() + "]"; - for (const auto& item : argv) { - monitor_message += " " + slash::ToRead(item); - } - g_pika_server->AddMonitorMessage(monitor_message); - } - - bool is_readonly = g_pika_server->readonly(); - - // Here, the binlog dispatch thread, instead of the binlog bgthread takes on the task to write binlog - // Only when the server is readonly - uint64_t serial = binlog_receiver_->GetnPlusSerial(); - if (is_readonly) { - if (!g_pika_server->WaitTillBinlogBGSerial(serial)) { - return false; - } - std::string opt = argv[0]; - Cmd* c_ptr = g_pika_cmd_table_manager->GetCmd(slash::StringToLower(opt)); - c_ptr->Initial(argv, g_pika_conf->default_table()); - - g_pika_server->logger_->Lock(); - g_pika_server->logger_->Put(c_ptr->ToBinlog(binlog_item.exec_time(), - std::to_string(binlog_item.server_id()), - binlog_item.logic_id(), - binlog_item.filenum(), - binlog_item.offset())); - g_pika_server->logger_->Unlock(); - g_pika_server->SignalNextBinlogBGSerial(); - } - - PikaCmdArgsType *v = new PikaCmdArgsType(argv); - BinlogItem *b = new BinlogItem(binlog_item); - std::string dispatch_key = argv.size() >= 2 ? argv[1] : argv[0]; - g_pika_server->DispatchBinlogBG(dispatch_key, v, b, serial, is_readonly); - return true; -} - -int PikaReplServerConn::ParserDealMessage(pink::RedisParser* parser, const pink::RedisCmdArgsType& argv) { - PikaReplServerConn* conn = reinterpret_cast(parser->data); - return conn->ProcessBinlogData(argv, conn->binlog_item_) == true ? 0 : -1; } diff --git a/src/pika_server.cc b/src/pika_server.cc index d59381302f..eda0512f96 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -129,9 +129,9 @@ PikaServer::~PikaServer() { delete ping_thread_; delete pika_pubsub_thread_; + delete pika_auxiliary_thread_; delete pika_repl_client_; delete pika_repl_server_; - delete pika_auxiliary_thread_; binlogbg_exit_ = true; std::vector::iterator binlogbg_iter = binlogbg_workers_.begin(); @@ -242,6 +242,10 @@ void PikaServer::Schedule(pink::TaskFunc func, void* arg) { pika_thread_pool_->Schedule(func, arg); } +void PikaServer::ScheduleReplCliTask(pink::TaskFunc func, void* arg) { + pika_repl_client_->Schedule(func, arg); +} + void PikaServer::RocksdbOptionInit(blackwidow::BlackwidowOptions* bw_option) { bw_option->options.create_if_missing = true; bw_option->options.keep_log_file_num = 10; @@ -1087,8 +1091,7 @@ Status PikaServer::AddBinlogSender(const std::string& table_name, if (!res.ok()) { return res; } - pika_repl_client_->RunStateMachine(slave); - return Status::OK(); + return pika_repl_client_->SendBinlogSync(slave); } /* @@ -1641,6 +1644,11 @@ Status PikaServer::SendPartitionTrySyncRequest(std::shared_ptr partit return status; } +Status PikaServer::SendBinlogSyncRequest(const std::string& table, uint32_t partition, const std::string& ip, int port) { + RmNode slave = RmNode(table, partition, ip, port); + return pika_repl_client_->SendBinlogSync(slave); +} + void PikaServer::AddMonitorClient(std::shared_ptr client_ptr) { monitor_thread_->AddMonitorClient(client_ptr); } @@ -1659,6 +1667,57 @@ void PikaServer::DispatchBinlogBG(const std::string &key, binlogbg_workers_[index]->Schedule(argv, binlog_item, cur_serial, readonly); } +void PikaServer::ScheduleReplBinlogSyncTask(std::string table_partition, + const std::shared_ptr req, + std::shared_ptr conn, + void* req_private_data) { + pika_repl_server_->ScheduleBinlogSyncTask(table_partition, req, conn, req_private_data); +} + +void PikaServer::ScheduleReplMetaSyncTask(const std::shared_ptr req, + std::shared_ptr conn, + void* req_private_data) { + pika_repl_server_->ScheduleMetaSyncTask(req, conn, req_private_data); +} + +void PikaServer::ScheduleReplTrySyncTask(const std::shared_ptr req, + std::shared_ptr conn, + void* req_private_data) { + pika_repl_server_->ScheduleTrySyncTask(req, conn, req_private_data); +} + +void PikaServer::ScheduleReplDbTask(const std::string &key, + PikaCmdArgsType* argv, BinlogItem* binlog_item, + const std::string& table_name, uint32_t partition_id) { + pika_repl_server_->ScheduleDbTask(key, argv, binlog_item, table_name, partition_id); +} + +bool PikaServer::SetBinlogAckInfo(const std::string& table, uint32_t partition, const std::string& ip, int port, + uint32_t ack_file_num, uint64_t ack_offset, uint64_t active_time) { + RmNode slave = RmNode(table, partition, ip, port); + return pika_repl_client_->SetAckInfo(slave, ack_file_num, ack_offset, active_time); +} + +bool PikaServer::GetBinlogAckInfo(const std::string& table, uint32_t partition, const std::string& ip, int port, + uint32_t* ack_file_num, uint64_t* ack_offset, uint64_t* active_time) { + RmNode slave = RmNode(table, partition, ip, port); + return pika_repl_client_->GetAckInfo(slave, ack_file_num, ack_offset, active_time); +} + +int PikaServer::SendToPeer() { + return pika_repl_client_->ConsumeWriteQueue(); +} + +Status PikaServer::TriggerSendBinlogSync() { + return pika_repl_client_->TriggerSendBinlogSync(); +} + +void PikaServer::SignalAuxiliary() { + pika_auxiliary_thread_->mu_.Lock(); + pika_auxiliary_thread_->cv_.Signal(); + pika_auxiliary_thread_->mu_.Unlock(); +} + bool PikaServer::WaitTillBinlogBGSerial(uint64_t my_serial) { binlogbg_mutex_.Lock(); //DLOG(INFO) << "Binlog serial wait: " << my_serial << ", current: " << binlogbg_serial_; diff --git a/third/pink b/third/pink index 00fa498ce9..c9ca6094ce 160000 --- a/third/pink +++ b/third/pink @@ -1 +1 @@ -Subproject commit 00fa498ce9c3a9011197602dc4977cfe40d11948 +Subproject commit c9ca6094ceb69d6d056d0192acf8eb9b62457c26