Skip to content

Commit

Permalink
Support Pub/Sub (OpenAtomFoundation#207)
Browse files Browse the repository at this point in the history
* add pubsub interface

* modify client connection flag

* bugfix

* bugfix

* bugfix

* add pubsub()

* type

* bugfix channels name

* format

* optimize code

* bugfix interface

* bugfix

* bugfix

* optimize code

* bugfix

* bugfix

* fix conflict

* fix conflict

* modify header file
  • Loading branch information
Leviathan1995 authored and KernelMaker committed Nov 16, 2017
1 parent 036b8fd commit a76dd54
Show file tree
Hide file tree
Showing 9 changed files with 566 additions and 173 deletions.
7 changes: 7 additions & 0 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,17 @@ class PikaClientConn: public pink::RedisConn {
void* worker_specific_data);
virtual ~PikaClientConn() {}
virtual int DealMessage();
bool IsPubSub() {
return is_pubsub_;
}
void SetIsPubSub(bool is_pubsub) {
is_pubsub_ = is_pubsub;
}

private:
pink::ServerThread* const server_thread_;
CmdTable* const cmds_table_;
bool is_pubsub_;

std::string DoCmd(const std::string& opt);

Expand Down
9 changes: 9 additions & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,14 @@ const std::string kCmdNameGeoHash = "geohash";
const std::string kCmdNameGeoRadius = "georadius";
const std::string kCmdNameGeoRadiusByMember = "georadiusbymember";

//Pub/Sub
const std::string kCmdNamePublish = "publish";
const std::string kCmdNameSubscribe = "subscribe";
const std::string kCmdNameUnSubscribe = "unsubscribe";
const std::string kCmdNamePubSub = "pubsub";
const std::string kCmdNamePSubscribe = "psubscribe";
const std::string kCmdNamePUnSubscribe = "punsubscribe";

typedef pink::RedisCmdArgsType PikaCmdArgsType;
static const int RAW_ARGS_LEN = 1024 * 1024;

Expand All @@ -201,6 +209,7 @@ enum CmdFlags {
kCmdFlagsBit = 12,
kCmdFlagsHyperLogLog = 14,
kCmdFlagsGeo = 16,
kCmdFlagsPubSub = 18,
kCmdFlagsNoLocal = 0, //default nolocal
kCmdFlagsLocal = 32,
kCmdFlagsNoSuspend = 0, //default nosuspend
Expand Down
68 changes: 68 additions & 0 deletions include/pika_pubsub.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright (c) 2015-present, Qihoo, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.

#ifndef PIKA_PUBSUB_H_
#define PIKA_PUBSUB_H_
#include "pika_command.h"

/*
* pubsub
*/
class PublishCmd : public Cmd {
public:
PublishCmd() {}
virtual void Do();
private:
std::string channel_;
std::string msg_;
virtual void DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info);
};

class SubscribeCmd : public Cmd {
public:
SubscribeCmd() {}
virtual void Do();
private:
virtual void DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info);
};

class UnSubscribeCmd : public Cmd {
public:
UnSubscribeCmd() {}
virtual void Do();
private:
virtual void DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info);
};

class PUnSubscribeCmd : public Cmd {
public:
PUnSubscribeCmd() {}
virtual void Do();
private:
virtual void DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info);
};

class PSubscribeCmd : public Cmd {
public:
PSubscribeCmd() {}
virtual void Do();
private:
virtual void DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info);
};

class PubSubCmd : public Cmd {
public:
PubSubCmd() {}
virtual void Do();
private:
std::string subcommand_;
std::vector<std::string > arguments_;
virtual void DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info);
virtual void Clear() {
arguments_.clear();
}
};

#endif // INCLUDE_PIKA_PUBSUB_H_
28 changes: 28 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "slash/include/slash_status.h"
#include "slash/include/slash_mutex.h"
#include "pink/include/bg_thread.h"
#include "pink/include/pink_pubsub.h"
#include "nemo_backupable.h"

using slash::Status;
Expand Down Expand Up @@ -332,6 +333,28 @@ class PikaServer {
void RWLockReader();
void RWUnlock();

/*
* PubSub used
*/
int Publish(const std::string& channel, const std::string& msg);
void Subscribe(pink::PinkConn* conn,
const std::vector<std::string>& channels,
const bool pattern,
std::vector<std::pair<std::string, int>>* result);

int UnSubscribe(pink::PinkConn* conn,
const std::vector<std::string>& channels,
const bool pattern,
std::vector<std::pair<std::string, int>>* result);

void PubSubChannels(const std::string& pattern,
std::vector<std::string>* result);

void PubSubNumSub(const std::vector<std::string>& channels,
std::vector<std::pair<std::string, int>>* result);

int PubSubNumPat();

/*
* Monitor used
*/
Expand Down Expand Up @@ -463,6 +486,11 @@ class PikaServer {
*/
PikaMonitorThread* monitor_thread_;

/*
* Pubsub use
*/
pink::PubSubThread * pika_pubsub_thread_;

/*
* Binlog Receiver use
*/
Expand Down
84 changes: 79 additions & 5 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ PikaClientConn::PikaClientConn(int fd, std::string ip_port,
void* worker_specific_data)
: RedisConn(fd, ip_port, server_thread),
server_thread_(server_thread),
cmds_table_(reinterpret_cast<CmdTable*>(worker_specific_data)) {
cmds_table_(reinterpret_cast<CmdTable*>(worker_specific_data)),
is_pubsub_(false) {
auth_stat_.Init();
}

Expand All @@ -39,7 +40,7 @@ std::string PikaClientConn::DoCmd(const std::string& opt) {
if (!auth_stat_.IsAuthed(cinfo_ptr)) {
return "-ERR NOAUTH Authentication required.\r\n";
}

uint64_t start_us = 0;
if (g_pika_conf->slowlog_slower_than() >= 0) {
start_us = slash::NowMicros();
Expand Down Expand Up @@ -70,7 +71,19 @@ std::string PikaClientConn::DoCmd(const std::string& opt) {
if (!c_ptr->res().ok()) {
return c_ptr->res().message();
}

// PubSub connection
if (this->IsPubSub()) {
if (opt != kCmdNameSubscribe &&
opt != kCmdNameUnSubscribe &&
opt != kCmdNamePing &&
opt != kCmdNamePSubscribe &&
opt != kCmdNamePUnSubscribe) {
return "-ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context\r\n";
}
}

// Monitor
if (opt == kCmdNameMonitor) {
pink::PinkConn* conn = server_thread_->MoveConnOut(fd());
assert(conn == this);
Expand All @@ -79,6 +92,67 @@ std::string PikaClientConn::DoCmd(const std::string& opt) {
return ""; // Monitor thread will return "OK"
}

// PubSub
if (opt == kCmdNameSubscribe) { // Subscribe
pink::PinkConn* conn = this;
if (!this->IsPubSub()) {
conn = server_thread_->MoveConnOut(fd());
}
std::vector<std::string > channels;
for (size_t i = 1; i < argv_.size(); i++) {
channels.push_back(slash::StringToLower(argv_[i]));
}
std::vector<std::pair<std::string, int>> result;
g_pika_server->Subscribe(conn, channels, false, &result);
this->SetIsPubSub(true);
return this->ConstructPubSubResp(kCmdNameSubscribe, result);
} else if (opt == kCmdNameUnSubscribe) { // UnSubscribe
std::vector<std::string > channels;
for (size_t i = 1; i < argv_.size(); i++) {
channels.push_back(slash::StringToLower(argv_[i]));
}
std::vector<std::pair<std::string, int>> result;
int subscribed = g_pika_server->UnSubscribe(this, channels, false, &result);
if (subscribed == 0 && this->IsPubSub()) {
/*
* if the number of client subscribed is zero,
* the client will exit the Pub/Sub state
*/
server_thread_->HandleNewConn(fd(), ip_port());
this->SetIsPubSub(false);
}
return this->ConstructPubSubResp(kCmdNameUnSubscribe, result);
} else if (opt == kCmdNamePSubscribe) { // PSubscribe
pink::PinkConn* conn = this;
if (!this->IsPubSub()) {
conn = server_thread_->MoveConnOut(fd());
}
std::vector<std::string > channels;
for (size_t i = 1; i < argv_.size(); i++) {
channels.push_back(slash::StringToLower(argv_[i]));
}
std::vector<std::pair<std::string, int>> result;
g_pika_server->Subscribe(conn, channels, true, &result);
this->SetIsPubSub(true);
return this->ConstructPubSubResp(kCmdNamePSubscribe, result);
} else if (opt == kCmdNamePUnSubscribe) { // PUnSubscribe
std::vector<std::string > channels;
for (size_t i = 1; i < argv_.size(); i++) {
channels.push_back(slash::StringToLower(argv_[i]));
}
std::vector<std::pair<std::string, int>> result;
int subscribed = g_pika_server->UnSubscribe(this, channels, true, &result);
if (subscribed == 0 && this->IsPubSub()) {
/*
* if the number of client subscribed is zero,
* the client will exit the Pub/Sub state
*/
server_thread_->HandleNewConn(fd(), ip_port());
this->SetIsPubSub(false);
}
return this->ConstructPubSubResp(kCmdNamePUnSubscribe, result);
}

bool need_send_to_hub = false;
if (cinfo_ptr->is_write()) {
if (g_pika_server->BinlogIoError()) {
Expand Down Expand Up @@ -162,7 +236,7 @@ std::string PikaClientConn::DoCmd(const std::string& opt) {
}

if (opt == kCmdNameAuth) {
if(!auth_stat_.ChecknUpdate(c_ptr->res().raw_message())) {
if (!auth_stat_.ChecknUpdate(c_ptr->res().raw_message())) {
// LOG(WARNING) << "(" << ip_port() << ")Wrong Password";
}
}
Expand All @@ -171,7 +245,7 @@ std::string PikaClientConn::DoCmd(const std::string& opt) {

int PikaClientConn::DealMessage() {
g_pika_server->PlusThreadQuerynum();

if (argv_.empty()) return -2;
std::string opt = argv_[0];
slash::StringToLower(opt);
Expand Down Expand Up @@ -233,7 +307,7 @@ bool PikaClientConn::AuthStat::ChecknUpdate(const std::string& message) {
// Situations to change auth status
if (message == "USER") {
stat_ = kLimitAuthed;
} else if (message == "ROOT"){
} else if (message == "ROOT") {
stat_ = kAdminAuthed;
} else {
return false;
Expand Down
41 changes: 41 additions & 0 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "include/pika_bit.h"
#include "include/pika_hyperloglog.h"
#include "include/pika_geo.h"
#include "include/pika_pubsub.h"

static std::unordered_map<std::string, CmdInfo*> cmd_infos(300); /* Table for CmdInfo */

Expand Down Expand Up @@ -417,6 +418,26 @@ void InitCmdInfoTable() {
////GeoRadiusByMember
CmdInfo* georadiusbymemberptr = new CmdInfo(kCmdNameGeoRadiusByMember, -5, kCmdFlagsRead | kCmdFlagsGeo);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameGeoRadiusByMember, georadiusbymemberptr));

//Pub/Sub
//Publish
CmdInfo* publishptr = new CmdInfo(kCmdNamePublish, 3, kCmdFlagsRead | kCmdFlagsPubSub);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNamePublish, publishptr));
//Subscribe
CmdInfo* subscribeptr = new CmdInfo(kCmdNameSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameSubscribe, subscribeptr));
//UnSubscribe
CmdInfo* unsubscribeptr = new CmdInfo(kCmdNameUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNameUnSubscribe, unsubscribeptr));
//PSubscribe
CmdInfo* psubscribeptr = new CmdInfo(kCmdNamePSubscribe, -2, kCmdFlagsRead | kCmdFlagsPubSub);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNamePSubscribe, psubscribeptr));
//PUnSubscribe
CmdInfo* punsubscribeptr = new CmdInfo(kCmdNamePUnSubscribe, -1, kCmdFlagsRead | kCmdFlagsPubSub);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNamePUnSubscribe, punsubscribeptr));
//PubSub
CmdInfo* pubsubptr = new CmdInfo(kCmdNamePubSub, -2, kCmdFlagsRead | kCmdFlagsPubSub);
cmd_infos.insert(std::pair<std::string, CmdInfo*>(kCmdNamePubSub, pubsubptr));
}

void DestoryCmdInfoTable() {
Expand Down Expand Up @@ -829,6 +850,26 @@ void InitCmdTable(std::unordered_map<std::string, Cmd*> *cmd_table) {
////GeoRadiusByMember
Cmd * georadiusbymemberptr = new GeoRadiusByMemberCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameGeoRadiusByMember, georadiusbymemberptr));

//PubSub
////Publish
Cmd * publishptr = new PublishCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNamePublish, publishptr));
////Subscribe
Cmd * subscribeptr = new SubscribeCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameSubscribe, subscribeptr));
////UnSubscribe
Cmd * unsubscribeptr = new UnSubscribeCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameUnSubscribe, unsubscribeptr));
////PSubscribe
Cmd * psubscribeptr = new PSubscribeCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNamePSubscribe, psubscribeptr));
////PUnSubscribe
Cmd * punsubscribeptr = new PUnSubscribeCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNamePUnSubscribe, punsubscribeptr));
////PubSub
Cmd * pubsubptr = new PubSubCmd();
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNamePubSub, pubsubptr));
}

Cmd* GetCmdFromTable(const std::string& opt, const CmdTable& cmd_table) {
Expand Down
Loading

0 comments on commit a76dd54

Please sign in to comment.