Skip to content

Commit

Permalink
Merge pull request PikaLabs#23 from Tapeete/master
Browse files Browse the repository at this point in the history
optimize code, move subpub related out from base class
  • Loading branch information
Gao Dunqiao authored Dec 28, 2017
2 parents 7b72292 + 8baceb0 commit 1dbf150
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 57 deletions.
14 changes: 1 addition & 13 deletions pink/include/pink_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,7 @@ class PinkConn {

virtual ReadStatus GetRequest() = 0;
virtual WriteStatus SendReply() = 0;

virtual int ConstructPublishResp(
const std::string& subscribe_channel,
const std::string& channel,
const std::string& msg,
const bool pattern) {
return 0;
}
virtual std::string ConstructPubSubResp(
const std::string& cmd,
const std::vector<std::pair<std::string, int>>& result) {
return nullptr;
}
virtual void WriteResp(const std::string& resp) { }

virtual void TryResizeBuffer() {}

Expand Down
7 changes: 1 addition & 6 deletions pink/include/redis_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,7 @@ class RedisConn: public PinkConn {

virtual ReadStatus GetRequest();
virtual WriteStatus SendReply();
virtual int ConstructPublishResp(const std::string& subscribe_channel,
const std::string& channel,
const std::string& msg,
const bool pattern);
virtual std::string ConstructPubSubResp(const std::string& cmd,
const std::vector<std::pair<std::string, int>>& result);
virtual void WriteResp(const std::string& resp);

void TryResizeBuffer() override;

Expand Down
27 changes: 25 additions & 2 deletions pink/src/pink_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <vector>
#include <algorithm>
#include <sstream>

#include "pink/src/worker_thread.h"

Expand All @@ -15,6 +16,26 @@

namespace pink {

static std::string ConstructPublishResp(const std::string& subscribe_channel,
const std::string& publish_channel,
const std::string& msg,
const bool pattern) {
std::stringstream resp;
std::string common_msg = "message";
std::string pattern_msg = "pmessage";
if (pattern) {
resp << "*4\r\n" << "$" << pattern_msg.length() << "\r\n" << pattern_msg << "\r\n" <<
"$" << subscribe_channel.length() << "\r\n" << subscribe_channel << "\r\n" <<
"$" << publish_channel.length() << "\r\n" << publish_channel << "\r\n" <<
"$" << msg.length() << "\r\n" << msg << "\r\n";
} else {
resp << "*3\r\n" << "$" << common_msg.length() << "\r\n" << common_msg << "\r\n" <<
"$" << publish_channel.length() << "\r\n" << publish_channel << "\r\n" <<
"$" << msg.length() << "\r\n" << msg << "\r\n";
}
return resp.str();
}

void CloseFd(PinkConn* conn) {
close(conn->fd());
}
Expand Down Expand Up @@ -355,7 +376,8 @@ void *PubSubThread::ThreadMain() {
for (auto it = pubsub_channel_.begin(); it != pubsub_channel_.end(); it++) {
if (channel == it->first) {
for (size_t i = 0; i < it->second.size(); i++) {
it->second[i]->ConstructPublishResp(it->first, channel, msg, false);
std::string resp = ConstructPublishResp(it->first, channel, msg, false);
it->second[i]->WriteResp(resp);
WriteStatus write_status = it->second[i]->SendReply();
if (write_status == kWriteHalf) {
pink_epoll_->PinkModEvent(it->second[i]->fd(),
Expand All @@ -380,7 +402,8 @@ void *PubSubThread::ThreadMain() {
if (slash::stringmatchlen(it->first.c_str(), it->first.size(),
channel.c_str(), channel.size(), 0)) {
for (size_t i = 0; i < it->second.size(); i++) {
it->second[i]->ConstructPublishResp(it->first, channel, msg, true);
std::string resp = ConstructPublishResp(it->first, channel, msg, true);
it->second[i]->WriteResp(resp);
WriteStatus write_status = it->second[i]->SendReply();
if (write_status == kWriteHalf) {
pink_epoll_->PinkModEvent(it->second[i]->fd(),
Expand Down
38 changes: 2 additions & 36 deletions pink/src/redis_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -351,43 +351,9 @@ WriteStatus RedisConn::SendReply() {
}
}

int RedisConn::ConstructPublishResp(const std::string& subscribe_channel,
const std::string& publish_channel,
const std::string& msg,
const bool pattern) {
std::stringstream resp;
std::string common_msg = "message";
std::string pattern_msg = "pmessage";
if (pattern) {
resp << "*4\r\n" << "$" << pattern_msg.length() << "\r\n" << pattern_msg << "\r\n" <<
"$" << subscribe_channel.length() << "\r\n" << subscribe_channel << "\r\n" <<
"$" << publish_channel.length() << "\r\n" << publish_channel << "\r\n" <<
"$" << msg.length() << "\r\n" << msg << "\r\n";
} else {
resp << "*3\r\n" << "$" << common_msg.length() << "\r\n" << common_msg << "\r\n" <<
"$" << publish_channel.length() << "\r\n" << publish_channel << "\r\n" <<
"$" << msg.length() << "\r\n" << msg << "\r\n";
}

response_.append(resp.str());
void RedisConn::WriteResp(const std::string& resp) {
response_.append(resp);
set_is_reply(true);
return 0;
}

std::string RedisConn::ConstructPubSubResp(
const std::string& cmd,
const std::vector<std::pair<std::string, int>>& result) {
std::stringstream resp;
if (result.size() == 0) {
resp << "*3\r\n" << "$" << cmd.length() << "\r\n" << cmd << "\r\n" <<
"$" << -1 << "\r\n" << ":" << 0 << "\r\n";
}
for (auto it = result.begin(); it != result.end(); it++) {
resp << "*3\r\n" << "$" << cmd.length() << "\r\n" << cmd << "\r\n" <<
"$" << it->first.length() << "\r\n" << it->first << "\r\n" <<
":" << it->second << "\r\n";
}
return resp.str();
}

void RedisConn::TryResizeBuffer() {
Expand Down

0 comments on commit 1dbf150

Please sign in to comment.