Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
ThreadModle done.
TODO redis response parser, redis command builder.
  • Loading branch information
whoiami authored and kernelai committed Nov 5, 2020
1 parent 9e74c8c commit 6519e86
Show file tree
Hide file tree
Showing 10 changed files with 541 additions and 1 deletion.
54 changes: 54 additions & 0 deletions include/pika_proxy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef PIKA_PROXY_H_
#define PIKA_PROXY_H_

#include "include/pika_proxy_conn.h"
#include "include/pika_proxy_cli.h"
#include "include/pika_client_conn.h"

class ProxyCliManager {
public:
ProxyCliManager(int conn_every_backend, int keepalive_time);
~ProxyCliManager();
int Start();
int Stop();
// no need mutex lock here
Status ChooseForwardToBackend(ProxyTask* task);

private:
Status ForwardNextAvailableConn(ProxyTask* task);
std::vector<std::shared_ptr<ProxyCli>> clis_;
std::atomic<uint64_t> rr_counter_;
int conn_every_backend_;
};

class PikaProxy {
public:
PikaProxy();
~PikaProxy();
int Start();
int Stop();
// write to client_thread and put it into task_queue
static void ForwardToBackend(void* arg);
// grep task_queue and
static void WritebackToCliConn(void* arg);
// bypass to g_pika_server
void ScheduleForwardToBackend(
const std::shared_ptr<PikaClientConn>& conn_ptr,
const std::vector<pink::RedisCmdArgsType>& redis_cmds,
const std::vector<Node>& dst);
void MayScheduleWritebackToCliConn(std::shared_ptr<PikaProxyConn> conn_ptr,
std::shared_ptr<ProxyCli> cli, const std::string res);
std::shared_ptr<ProxyCliManager> cli_manager() {
return cli_manager_ptr_;
}

private:
std::shared_ptr<ProxyCliManager> cli_manager_ptr_;
};

#endif // PIKA_PROXY_H_
92 changes: 92 additions & 0 deletions include/pika_proxy_cli.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef PIKA_PROXY_CLI_H_
#define PIKA_PROXY_CLI_H_

#include "pink/include/pink_conn.h"
#include "pink/include/client_thread.h"

#include "include/pika_proxy_conn.h"

using slash::Status;
class ProxyCli;
class ProxyFactory : public pink::ConnFactory {
public:
explicit ProxyFactory(std::shared_ptr<ProxyCli> proxy_cli);
virtual std::shared_ptr<pink::PinkConn> NewPinkConn(
int connfd,
const std::string &ip_port,
pink::Thread *thread,
void* worker_specific_data,
pink::PinkEpoll* pink_epoll) const override {
return std::static_pointer_cast<pink::PinkConn>
(std::make_shared<PikaProxyConn>(
connfd, ip_port, thread, pink_epoll, proxy_cli_));
}
private:
std::shared_ptr<ProxyCli> proxy_cli_;
};

class ProxyHandle : public pink::ClientHandle {
public:
explicit ProxyHandle(std::shared_ptr<ProxyCli> proxy_cli) : ClientHandle() {
proxy_cli_ = proxy_cli;
}
~ProxyHandle() {
}
void CronHandle() const override {
}
void FdTimeoutHandle(int fd, const std::string& ip_port) const override {
}
void FdClosedHandle(int fd, const std::string& ip_port) const override;
bool AccessHandle(std::string& ip) const override {
return true;
}
int CreateWorkerSpecificData(void** data) const override {
return 0;
}
int DeleteWorkerSpecificData(void* data) const override {
return 0;
}
void DestConnectFailedHandle(
std::string ip_port, std::string reason) const override {
}

private:
std::shared_ptr<ProxyCli> proxy_cli_;
};

class ProxyCli : public std::enable_shared_from_this<ProxyCli> {
public:
ProxyCli(int cron_interval, int keepalive_timeout);
int Start();
int Stop();
Status ForwardToBackend(ProxyTask* task);
Status WritebackUpdate(const std::string& ip_port,
const std::string& res, bool* write_back, ProxyTask** res_task);

struct ProxyCliTask {
std::shared_ptr<PikaClientConn> conn_ptr;
std::shared_ptr<std::string> resp_ptr;
};
void LostConn(const std::string& ip_port);

private:
int cron_interval_;
int keepalive_timeout_;
ProxyFactory* proxy_factory_;
ProxyHandle* proxy_handle_;

slash::Mutex input_l_;
std::shared_ptr<pink::ClientThread> client_ptr_;
// pair(backend conn ip + port, std::deque<ProxyCliTask>)
std::unordered_map<std::string, std::deque<ProxyCliTask>> backend_task_queue_;
// pair(client ip + port, ProxyTask*)
std::unordered_map<std::string, ProxyTask*> task_queue_;
};

#endif // PIKA_PROXY_CLI_H_
#
40 changes: 40 additions & 0 deletions include/pika_proxy_conn.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef PIKA_PROXY_CONN_H_
#define PIKA_PROXY_CONN_H_

#include "pink/include/redis_conn.h"

#include "include/pika_client_conn.h"

class ProxyCli;
class PikaProxyConn;

struct ProxyTask {
ProxyTask() {
}
std::shared_ptr<PikaClientConn> conn_ptr;
std::vector<pink::RedisCmdArgsType> redis_cmds;
std::vector<Node> redis_cmds_forward_dst;
};

class PikaProxyConn: public pink::RedisConn {
public:
PikaProxyConn(int fd, std::string ip_port,
pink::Thread *server_thread,
pink::PinkEpoll* pink_epoll,
std::shared_ptr<ProxyCli> proxy_cli);
virtual ~PikaProxyConn() {}

private:
int DealMessage(
const pink::RedisCmdArgsType& argv,
std::string* response) override;

std::shared_ptr<ProxyCli> proxy_cli_;
};

#endif // PIKA_PROXY_CONN_H_
3 changes: 3 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,9 @@ class PikaServer {
void PubSubNumSub(const std::vector<std::string>& channels,
std::vector<std::pair<std::string, int>>* result);

Status GetCmdRouting(std::vector<pink::RedisCmdArgsType>& redis_cmds,
std::vector<Node>* dst, bool* all_local);

// info debug use
void ServerStatus(std::string* info);

Expand Down
7 changes: 7 additions & 0 deletions src/pika.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "slash/include/env.h"
#include "include/pika_rm.h"
#include "include/pika_proxy.h"
#include "include/pika_server.h"
#include "include/pika_command.h"
#include "include/pika_conf.h"
Expand All @@ -24,6 +25,7 @@
PikaConf* g_pika_conf;
PikaServer* g_pika_server;
PikaReplicaManager* g_pika_rm;
PikaProxy* g_pika_proxy;

PikaCmdTableManager* g_pika_cmd_table_manager;

Expand Down Expand Up @@ -192,11 +194,13 @@ int main(int argc, char *argv[]) {
g_pika_cmd_table_manager = new PikaCmdTableManager();
g_pika_server = new PikaServer();
g_pika_rm = new PikaReplicaManager();
g_pika_proxy = new PikaProxy();

if (g_pika_conf->daemonize()) {
close_std();
}

g_pika_proxy->Start();
g_pika_rm->Start();
g_pika_server->Start();

Expand All @@ -208,8 +212,11 @@ int main(int argc, char *argv[]) {
// may references to dead PikaServer
g_pika_rm->Stop();

g_pika_proxy->Stop();

delete g_pika_server;
delete g_pika_rm;
delete g_pika_proxy;
delete g_pika_cmd_table_manager;
::google::ShutdownGoogleLogging();
delete g_pika_conf;
Expand Down
17 changes: 16 additions & 1 deletion src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
#include "include/pika_cmd_table_manager.h"
#include "include/pika_admin.h"
#include "include/pika_rm.h"
#include "include/pika_proxy.h"

extern PikaConf* g_pika_conf;
extern PikaServer* g_pika_server;
extern PikaReplicaManager* g_pika_rm;
extern PikaCmdTableManager* g_pika_cmd_table_manager;
extern PikaProxy* g_pika_proxy;

PikaClientConn::PikaClientConn(int fd, std::string ip_port,
pink::Thread* thread,
Expand Down Expand Up @@ -201,7 +203,20 @@ void PikaClientConn::DoBackgroundTask(void* arg) {
return;
}
}
conn_ptr->BatchExecRedisCmd(bg_arg->redis_cmds);
std::vector<Node> dst;
bool all_local = true;
Status s = g_pika_server->GetCmdRouting(
bg_arg->redis_cmds, &dst, &all_local);
if (!s.ok()) {
delete bg_arg;
conn_ptr->NotifyEpoll(false);
return;
}
if (!all_local) {
g_pika_proxy->ScheduleForwardToBackend(conn_ptr, bg_arg->redis_cmds, dst);
} else {
conn_ptr->BatchExecRedisCmd(bg_arg->redis_cmds);
}
delete bg_arg;
}

Expand Down
Loading

0 comments on commit 6519e86

Please sign in to comment.