diff --git a/include/pika_proxy.h b/include/pika_proxy.h new file mode 100644 index 0000000000..00c37b82c3 --- /dev/null +++ b/include/pika_proxy.h @@ -0,0 +1,54 @@ +// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef PIKA_PROXY_H_ +#define PIKA_PROXY_H_ + +#include "include/pika_proxy_conn.h" +#include "include/pika_proxy_cli.h" +#include "include/pika_client_conn.h" + +class ProxyCliManager { + public: + ProxyCliManager(int conn_every_backend, int keepalive_time); + ~ProxyCliManager(); + int Start(); + int Stop(); + // no need mutex lock here + Status ChooseForwardToBackend(ProxyTask* task); + + private: + Status ForwardNextAvailableConn(ProxyTask* task); + std::vector> clis_; + std::atomic rr_counter_; + int conn_every_backend_; +}; + +class PikaProxy { + public: + PikaProxy(); + ~PikaProxy(); + int Start(); + int Stop(); + // write to client_thread and put it into task_queue + static void ForwardToBackend(void* arg); + // grep task_queue and + static void WritebackToCliConn(void* arg); + // bypass to g_pika_server + void ScheduleForwardToBackend( + const std::shared_ptr& conn_ptr, + const std::vector& redis_cmds, + const std::vector& dst); + void MayScheduleWritebackToCliConn(std::shared_ptr conn_ptr, + std::shared_ptr cli, const std::string res); + std::shared_ptr cli_manager() { + return cli_manager_ptr_; + } + + private: + std::shared_ptr cli_manager_ptr_; +}; + +#endif // PIKA_PROXY_H_ diff --git a/include/pika_proxy_cli.h b/include/pika_proxy_cli.h new file mode 100644 index 0000000000..2595d5ef89 --- /dev/null +++ b/include/pika_proxy_cli.h @@ -0,0 +1,92 @@ +// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef PIKA_PROXY_CLI_H_ +#define PIKA_PROXY_CLI_H_ + +#include "pink/include/pink_conn.h" +#include "pink/include/client_thread.h" + +#include "include/pika_proxy_conn.h" + +using slash::Status; +class ProxyCli; +class ProxyFactory : public pink::ConnFactory { + public: + explicit ProxyFactory(std::shared_ptr proxy_cli); + virtual std::shared_ptr NewPinkConn( + int connfd, + const std::string &ip_port, + pink::Thread *thread, + void* worker_specific_data, + pink::PinkEpoll* pink_epoll) const override { + return std::static_pointer_cast + (std::make_shared( + connfd, ip_port, thread, pink_epoll, proxy_cli_)); + } + private: + std::shared_ptr proxy_cli_; +}; + +class ProxyHandle : public pink::ClientHandle { + public: + explicit ProxyHandle(std::shared_ptr proxy_cli) : ClientHandle() { + proxy_cli_ = proxy_cli; + } + ~ProxyHandle() { + } + void CronHandle() const override { + } + void FdTimeoutHandle(int fd, const std::string& ip_port) const override { + } + void FdClosedHandle(int fd, const std::string& ip_port) const override; + bool AccessHandle(std::string& ip) const override { + return true; + } + int CreateWorkerSpecificData(void** data) const override { + return 0; + } + int DeleteWorkerSpecificData(void* data) const override { + return 0; + } + void DestConnectFailedHandle( + std::string ip_port, std::string reason) const override { + } + + private: + std::shared_ptr proxy_cli_; +}; + +class ProxyCli : public std::enable_shared_from_this { + public: + ProxyCli(int cron_interval, int keepalive_timeout); + int Start(); + int Stop(); + Status ForwardToBackend(ProxyTask* task); + Status WritebackUpdate(const std::string& ip_port, + const std::string& res, bool* write_back, ProxyTask** res_task); + + struct ProxyCliTask { + std::shared_ptr conn_ptr; + std::shared_ptr resp_ptr; + }; + void LostConn(const std::string& ip_port); + + private: + int cron_interval_; + int keepalive_timeout_; + ProxyFactory* proxy_factory_; + ProxyHandle* proxy_handle_; + + slash::Mutex input_l_; + std::shared_ptr client_ptr_; + // pair(backend conn ip + port, std::deque) + std::unordered_map> backend_task_queue_; + // pair(client ip + port, ProxyTask*) + std::unordered_map task_queue_; +}; + +#endif // PIKA_PROXY_CLI_H_ +# diff --git a/include/pika_proxy_conn.h b/include/pika_proxy_conn.h new file mode 100644 index 0000000000..dfcb151130 --- /dev/null +++ b/include/pika_proxy_conn.h @@ -0,0 +1,40 @@ +// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#ifndef PIKA_PROXY_CONN_H_ +#define PIKA_PROXY_CONN_H_ + +#include "pink/include/redis_conn.h" + +#include "include/pika_client_conn.h" + +class ProxyCli; +class PikaProxyConn; + +struct ProxyTask { + ProxyTask() { + } + std::shared_ptr conn_ptr; + std::vector redis_cmds; + std::vector redis_cmds_forward_dst; +}; + +class PikaProxyConn: public pink::RedisConn { + public: + PikaProxyConn(int fd, std::string ip_port, + pink::Thread *server_thread, + pink::PinkEpoll* pink_epoll, + std::shared_ptr proxy_cli); + virtual ~PikaProxyConn() {} + + private: + int DealMessage( + const pink::RedisCmdArgsType& argv, + std::string* response) override; + + std::shared_ptr proxy_cli_; +}; + +#endif // PIKA_PROXY_CONN_H_ diff --git a/include/pika_server.h b/include/pika_server.h index f555d3eb89..4e8a832904 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -296,6 +296,9 @@ class PikaServer { void PubSubNumSub(const std::vector& channels, std::vector>* result); + Status GetCmdRouting(std::vector& redis_cmds, + std::vector* dst, bool* all_local); + // info debug use void ServerStatus(std::string* info); diff --git a/src/pika.cc b/src/pika.cc index 2339a83cdf..6a5072b10f 100644 --- a/src/pika.cc +++ b/src/pika.cc @@ -9,6 +9,7 @@ #include "slash/include/env.h" #include "include/pika_rm.h" +#include "include/pika_proxy.h" #include "include/pika_server.h" #include "include/pika_command.h" #include "include/pika_conf.h" @@ -24,6 +25,7 @@ PikaConf* g_pika_conf; PikaServer* g_pika_server; PikaReplicaManager* g_pika_rm; +PikaProxy* g_pika_proxy; PikaCmdTableManager* g_pika_cmd_table_manager; @@ -192,11 +194,13 @@ int main(int argc, char *argv[]) { g_pika_cmd_table_manager = new PikaCmdTableManager(); g_pika_server = new PikaServer(); g_pika_rm = new PikaReplicaManager(); + g_pika_proxy = new PikaProxy(); if (g_pika_conf->daemonize()) { close_std(); } + g_pika_proxy->Start(); g_pika_rm->Start(); g_pika_server->Start(); @@ -208,8 +212,11 @@ int main(int argc, char *argv[]) { // may references to dead PikaServer g_pika_rm->Stop(); + g_pika_proxy->Stop(); + delete g_pika_server; delete g_pika_rm; + delete g_pika_proxy; delete g_pika_cmd_table_manager; ::google::ShutdownGoogleLogging(); delete g_pika_conf; diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index fa30790e57..9f557d627d 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -15,11 +15,13 @@ #include "include/pika_cmd_table_manager.h" #include "include/pika_admin.h" #include "include/pika_rm.h" +#include "include/pika_proxy.h" extern PikaConf* g_pika_conf; extern PikaServer* g_pika_server; extern PikaReplicaManager* g_pika_rm; extern PikaCmdTableManager* g_pika_cmd_table_manager; +extern PikaProxy* g_pika_proxy; PikaClientConn::PikaClientConn(int fd, std::string ip_port, pink::Thread* thread, @@ -201,7 +203,20 @@ void PikaClientConn::DoBackgroundTask(void* arg) { return; } } - conn_ptr->BatchExecRedisCmd(bg_arg->redis_cmds); + std::vector dst; + bool all_local = true; + Status s = g_pika_server->GetCmdRouting( + bg_arg->redis_cmds, &dst, &all_local); + if (!s.ok()) { + delete bg_arg; + conn_ptr->NotifyEpoll(false); + return; + } + if (!all_local) { + g_pika_proxy->ScheduleForwardToBackend(conn_ptr, bg_arg->redis_cmds, dst); + } else { + conn_ptr->BatchExecRedisCmd(bg_arg->redis_cmds); + } delete bg_arg; } diff --git a/src/pika_proxy.cc b/src/pika_proxy.cc new file mode 100644 index 0000000000..f40b0714c1 --- /dev/null +++ b/src/pika_proxy.cc @@ -0,0 +1,131 @@ +// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "include/pika_proxy.h" +#include "include/pika_server.h" + +extern PikaProxy* g_pika_proxy; +extern PikaServer* g_pika_server; + +/* ProxyCliManager */ + +ProxyCliManager::ProxyCliManager( + int conn_every_backend, int keepalive_time) + : rr_counter_(0), conn_every_backend_(conn_every_backend) { + for (int i = 0; i < conn_every_backend; ++i) { + clis_.push_back( + std::make_shared(10 /*cron interval*/, keepalive_time)); + } +} + +ProxyCliManager::~ProxyCliManager() { +} + +Status ProxyCliManager::ChooseForwardToBackend(ProxyTask* task) { + return ForwardNextAvailableConn(task); +} + +Status ProxyCliManager::ForwardNextAvailableConn(ProxyTask* task) { + uint64_t counter = rr_counter_.load() % conn_every_backend_; + rr_counter_++; + return clis_[counter]->ForwardToBackend(task); +} + +int ProxyCliManager::Start() { + for (auto& cli : clis_) { + int res = cli->Start(); + if (res != pink::kSuccess) { + LOG(ERROR) << "ProxyCliManager Start Failed: " << + (res == pink::kCreateThreadError ? + ": create thread error " : ": other error"); + return res; + } + } + return pink::kSuccess; +} + +int ProxyCliManager::Stop() { + for (auto& cli : clis_) { + cli->Stop(); + } + return pink::kSuccess; +} + +/* PikaProxy */ + +PikaProxy::PikaProxy() { + // conn_every_backend: 10, keepalive_time: 60s + cli_manager_ptr_ = std::make_shared(10, 60); +} + +PikaProxy::~PikaProxy() { +} + +int PikaProxy::Start() { + int res = cli_manager_ptr_->Start(); + if (res != pink::kSuccess) { + LOG(ERROR) << "PikaProxy Start Failed: " << + (res == pink::kCreateThreadError ? + ": create thread error " : ": other error"); + return res; + } + return pink::kSuccess; +} + +int PikaProxy::Stop() { + cli_manager_ptr_->Stop(); + return pink::kSuccess; +} + +void PikaProxy::ForwardToBackend(void* arg) { + ProxyTask* task = reinterpret_cast(arg); + Status s = g_pika_proxy->cli_manager()->ChooseForwardToBackend(task); + if (!s.ok()) { + delete task; + task = NULL; + LOG(WARNING) << "Forward to backend" << s.ToString(); + } +} + +// just one therad invoke this, no lock guard +void PikaProxy::WritebackToCliConn(void* arg) { + ProxyTask* task = reinterpret_cast(arg); + std::shared_ptr conn_ptr = task->conn_ptr; + // TODO(AZ) build redis resp + // for (auto& resp : conn_ptr->resp_array) { + // conn_ptr->WriteResp(std::move(*resp) + "\r\n"); + // } + conn_ptr->resp_array.clear(); + conn_ptr->NotifyEpoll(true); + delete task; +} + +void PikaProxy::MayScheduleWritebackToCliConn( + std::shared_ptr conn_ptr, + std::shared_ptr cli, const std::string res) { + bool write_back = false; + ProxyTask* task = NULL; + Status s = cli->WritebackUpdate(conn_ptr->ip_port(), res, &write_back, &task); + if (!s.ok()) { + LOG(WARNING) << "WritebaclUpdate failed " + s.ToString(); + return; + } + if (!write_back) { + return; + } + g_pika_server->ScheduleClientPool(&PikaProxy::WritebackToCliConn, task); +} + +void PikaProxy::ScheduleForwardToBackend( + const std::shared_ptr& conn_ptr, + const std::vector& redis_cmds, + const std::vector& dst) { + ProxyTask* arg = new ProxyTask(); + arg->conn_ptr = conn_ptr; + arg->redis_cmds = std::move(redis_cmds); + arg->redis_cmds_forward_dst = std::move(dst); + // choose ip and update + g_pika_server->ScheduleClientPool(&PikaProxy::ForwardToBackend, arg); +} diff --git a/src/pika_proxy_cli.cc b/src/pika_proxy_cli.cc new file mode 100644 index 0000000000..84f3a6fc39 --- /dev/null +++ b/src/pika_proxy_cli.cc @@ -0,0 +1,155 @@ +// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "include/pika_proxy_cli.h" + +#include + +#include + +/* ProxyFactory */ + +ProxyFactory::ProxyFactory(std::shared_ptr proxy_cli) + : proxy_cli_(proxy_cli) { +} + +/* ProxyHandle */ + +void ProxyHandle::FdClosedHandle(int fd, const std::string& ip_port) const { + proxy_cli_->LostConn(ip_port); +} + +/* ProxyCli */ + +ProxyCli::ProxyCli(int cron_interval, int keepalive_timeout) + : cron_interval_(cron_interval), + keepalive_timeout_(keepalive_timeout) { +} + +int ProxyCli::Start() { + ProxyFactory* proxy_factory_ = new ProxyFactory(shared_from_this()); + ProxyHandle* proxy_handle_ = new ProxyHandle(shared_from_this()); + client_ptr_ = std::make_shared( + proxy_factory_, cron_interval_, + keepalive_timeout_, proxy_handle_, nullptr); + + int res = client_ptr_->StartThread(); + if (res != pink::kSuccess) { + LOG(FATAL) << "Start Proxy ClientThread Error: " + << res << (res == pink::kCreateThreadError ? + ": create thread error " : ": other error"); + return res; + } + return pink::kSuccess; +} + +int ProxyCli::Stop() { + client_ptr_->StopThread(); + delete proxy_factory_; + delete proxy_handle_; + return pink::kSuccess; +} + +Status ProxyCli::WritebackUpdate( + const std::string& ip_port, + const std::string& res, + bool* write_back, + ProxyTask** res_task) { + slash::MutexLock l(&input_l_); + auto iter = backend_task_queue_.find(ip_port); + if (iter == backend_task_queue_.end()) { + return Status::NotFound(ip_port); + } + std::deque& queue = iter->second; + ProxyCliTask cli_task; + if (!queue.empty()) { + cli_task = queue.front(); + } else { + backend_task_queue_.erase(iter); + return Status::NotFound(ip_port); + } + queue.pop_front(); + if (queue.empty()) { + backend_task_queue_.erase(iter); + } + + cli_task.resp_ptr->append(res); + std::shared_ptr conn_ptr = cli_task.conn_ptr; + conn_ptr->resp_num--; + + if (conn_ptr->resp_num.load() == 0) { + *write_back = true; + const auto& iter = task_queue_.find(conn_ptr->ip_port()); + if (iter == task_queue_.end()) { + LOG(WARNING) << "find ip_port()" << conn_ptr->ip_port() << " not found"; + return Status::Corruption(conn_ptr->ip_port()); + } + *res_task = iter->second; + task_queue_.erase(iter); + } + + return Status::OK(); +} + +Status ProxyCli::ForwardToBackend(ProxyTask* task) { + std::shared_ptr conn_ptr = task->conn_ptr; + conn_ptr->resp_num.store(task->redis_cmds.size()); + + slash::MutexLock l(&input_l_); + size_t loopsize = + task->redis_cmds.size() == task->redis_cmds_forward_dst.size() + ? task->redis_cmds.size() : 0; + if (loopsize == 0) { + return Status::Corruption("cmd and calculated routing not match"); + } + for (size_t i = 0; i < loopsize; ++i) { + std::shared_ptr resp_ptr = std::make_shared(); + conn_ptr->resp_array.push_back(resp_ptr); + ProxyCliTask cli_task; + cli_task.conn_ptr = conn_ptr; + cli_task.resp_ptr = resp_ptr; + pink::RedisCmdArgsType& redis_cmd = task->redis_cmds[i]; + + std::string redis_cmd_str; + // TODO(AZ) build more complex redis command + redis_cmd_str.append("*" + std::to_string(redis_cmd.size()) + "\r\n"); + for (auto& cmd_param : redis_cmd) { + redis_cmd_str.append( + "$" + std::to_string(cmd_param.size()) + "\r\n" + cmd_param + "\r\n"); + } + + Node& node = task->redis_cmds_forward_dst[i]; + Status s = client_ptr_->Write(node.Ip(), node.Port(), redis_cmd_str); + + std::string ip_port = node.Ip() + ":" + std::to_string(node.Port()); + backend_task_queue_[ip_port].push_back(cli_task); + } + std::string ip_port = conn_ptr->ip_port(); + if (task_queue_.find(ip_port) != task_queue_.end()) { + ProxyTask* tmp_task = task_queue_[ip_port]; + if (tmp_task) { + delete tmp_task; + } + } + task_queue_[ip_port] = task; + return Status::OK(); +} + +void ProxyCli::LostConn(const std::string& ip_port) { + slash::MutexLock l(&input_l_); + auto iter = backend_task_queue_.find(ip_port); + if (iter == backend_task_queue_.end()) { + return; + } + std::deque& queue = iter->second; + // all client whole cmd which sheduled to this ip_port will timeout + for (auto& cli_task : queue) { + std::shared_ptr conn_ptr = cli_task.conn_ptr; + auto iter = task_queue_.find(conn_ptr->ip_port()); + ProxyTask* proxy_task = iter->second; + task_queue_.erase(iter); + delete proxy_task; + } +} diff --git a/src/pika_proxy_conn.cc b/src/pika_proxy_conn.cc new file mode 100644 index 0000000000..8eeee002df --- /dev/null +++ b/src/pika_proxy_conn.cc @@ -0,0 +1,36 @@ +// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "include/pika_proxy_conn.h" + +#include "pink/include/redis_cli.h" + +#include "include/pika_proxy.h" + +extern PikaProxy* g_pika_proxy; + +PikaProxyConn::PikaProxyConn(int fd, std::string ip_port, + pink::Thread* thread, + pink::PinkEpoll* pink_epoll, + std::shared_ptr proxy_cli) + : RedisConn(fd, ip_port, thread, pink_epoll, + pink::HandleType::kSynchronous, PIKA_MAX_CONN_RBUF_HB), + proxy_cli_(proxy_cli) { +} + + +int PikaProxyConn::DealMessage( + const pink::RedisCmdArgsType& argv, std::string* response) { + std::string res; + for (auto& arg : argv) { + res += arg; + } + g_pika_proxy->MayScheduleWritebackToCliConn( + std::dynamic_pointer_cast(shared_from_this()), + proxy_cli_, res); + return 0; +} + + diff --git a/src/pika_server.cc b/src/pika_server.cc index ff65b1ffe8..8c03e8f440 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -1680,6 +1680,13 @@ blackwidow::Status PikaServer::RewriteBlackwidowOptions(const blackwidow::Option return s; } +Status PikaServer::GetCmdRouting(std::vector& redis_cmds, std::vector* dst, bool* all_local) { + UNUSED(redis_cmds); + UNUSED(dst); + *all_local = true; + return Status::OK(); +} + void PikaServer::ServerStatus(std::string* info) { std::stringstream tmp_stream; size_t q_size = ClientProcessorThreadPoolCurQueueSize();