Skip to content

Commit

Permalink
use shared_ptr to hold the PikaClientConn
Browse files Browse the repository at this point in the history
  • Loading branch information
Axlgrep committed Jan 17, 2019
1 parent bd185b9 commit 6ba7f06
Show file tree
Hide file tree
Showing 10 changed files with 39 additions and 27 deletions.
6 changes: 3 additions & 3 deletions include/pika_binlog_receiver_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,18 @@ class PikaBinlogReceiverThread {
: binlog_receiver_(binlog_receiver) {
}

virtual pink::PinkConn *NewPinkConn(
virtual std::shared_ptr<pink::PinkConn> NewPinkConn(
int connfd,
const std::string &ip_port,
pink::ServerThread *thread,
void* worker_specific_data,
pink::PinkEpoll* pink_epoll) const override {
if (g_pika_conf->identify_binlog_type() == "old") {
LOG(INFO) << "Master conn factory create pika master conn";
return new PikaMasterConn(connfd, ip_port, binlog_receiver_);
return std::make_shared<PikaMasterConn>(connfd, ip_port, binlog_receiver_);
} else {
LOG(INFO) << "Master conn factory create pika new master conn";
return new PikaNewMasterConn(connfd, ip_port, binlog_receiver_);
return std::make_shared<PikaNewMasterConn>(connfd, ip_port, binlog_receiver_);
}
}

Expand Down
6 changes: 5 additions & 1 deletion include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ class PikaWorkerSpecificData;

class PikaClientConn: public pink::AsynRedisConn {
public:
struct BgTaskArg {
std::shared_ptr<PikaClientConn> pcc;
};

PikaClientConn(int fd, std::string ip_port, pink::ServerThread *server_thread,
void* worker_specific_data, pink::PinkEpoll* pink_epoll);
virtual ~PikaClientConn() {}
Expand Down Expand Up @@ -58,7 +62,7 @@ struct ClientInfo {
int fd;
std::string ip_port;
int64_t last_interaction;
PikaClientConn* conn;
std::shared_ptr<PikaClientConn> conn;
};

#endif
4 changes: 2 additions & 2 deletions include/pika_dispatch_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ class PikaDispatchThread {
private:
class ClientConnFactory : public pink::ConnFactory {
public:
virtual pink::PinkConn *NewPinkConn(
virtual std::shared_ptr<pink::PinkConn> NewPinkConn(
int connfd,
const std::string &ip_port,
pink::ServerThread *server_thread,
void* worker_specific_data,
pink::PinkEpoll* pink_epoll) const {
return new PikaClientConn(connfd, ip_port, server_thread, worker_specific_data, pink_epoll);
return std::make_shared<PikaClientConn>(connfd, ip_port, server_thread, worker_specific_data, pink_epoll);
}
};

Expand Down
4 changes: 2 additions & 2 deletions include/pika_heartbeat_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ class PikaHeartbeatThread {
private:
class HeartbeatConnFactory : public pink::ConnFactory {
public:
virtual pink::PinkConn *NewPinkConn(
virtual std::shared_ptr<pink::PinkConn> NewPinkConn(
int connfd,
const std::string &ip_port,
pink::ServerThread *thread,
void* worker_specific_data,
pink::PinkEpoll* pink_epoll) const override {
return new PikaHeartbeatConn(connfd, ip_port);
return std::make_shared<PikaHeartbeatConn>(connfd, ip_port);
}
};

Expand Down
2 changes: 1 addition & 1 deletion include/pika_monitor_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class PikaMonitorThread : public pink::Thread {
PikaMonitorThread();
virtual ~PikaMonitorThread();

void AddMonitorClient(PikaClientConn* client_ptr);
void AddMonitorClient(std::shared_ptr<PikaClientConn> client_ptr);
void AddMonitorMessage(const std::string &monitor_message);
int32_t ThreadClientList(std::vector<ClientInfo>* client = NULL);
bool ThreadClientKill(const std::string& ip_port = "all");
Expand Down
6 changes: 3 additions & 3 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,12 @@ class PikaServer {
* PubSub used
*/
int Publish(const std::string& channel, const std::string& msg);
void Subscribe(pink::PinkConn* conn,
void Subscribe(std::shared_ptr<pink::PinkConn> conn,
const std::vector<std::string>& channels,
const bool pattern,
std::vector<std::pair<std::string, int>>* result);

int UnSubscribe(pink::PinkConn* conn,
int UnSubscribe(std::shared_ptr<pink::PinkConn> conn,
const std::vector<std::string>& channels,
const bool pattern,
std::vector<std::pair<std::string, int>>* result);
Expand All @@ -363,7 +363,7 @@ class PikaServer {
/*
* Monitor used
*/
void AddMonitorClient(PikaClientConn* client_ptr);
void AddMonitorClient(std::shared_ptr<PikaClientConn> client_ptr);
void AddMonitorMessage(const std::string &monitor_message);
bool HasMonitorClients();

Expand Down
20 changes: 12 additions & 8 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,16 @@ std::string PikaClientConn::DoCmd(

// Monitor
if (opt == kCmdNameMonitor) {
pink::PinkConn* conn = server_thread_->MoveConnOut(fd());
assert(conn == this);
g_pika_server->AddMonitorClient(static_cast<PikaClientConn*>(conn));
std::shared_ptr<PinkConn> conn = server_thread_->MoveConnOut(fd());
assert(conn.get() == this);
g_pika_server->AddMonitorClient(std::dynamic_pointer_cast<PikaClientConn>(conn));
g_pika_server->AddMonitorMessage("OK");
return ""; // Monitor thread will return "OK"
}

//PubSub
if (opt == kCmdNamePSubscribe || opt == kCmdNameSubscribe) { // PSubscribe or Subscribe
pink::PinkConn* conn = this;
std::shared_ptr<PinkConn> conn = std::dynamic_pointer_cast<PikaClientConn>(shared_from_this());
if (!this->IsPubSub()) {
conn = server_thread_->MoveConnOut(fd());
}
Expand All @@ -134,7 +134,8 @@ std::string PikaClientConn::DoCmd(
channels.push_back(slash::StringToLower(argv[i]));
}
std::vector<std::pair<std::string, int>> result;
int subscribed = g_pika_server->UnSubscribe(this, channels, opt == kCmdNamePUnSubscribe, &result);
std::shared_ptr<PinkConn> conn = std::dynamic_pointer_cast<PikaClientConn>(shared_from_this());
int subscribed = g_pika_server->UnSubscribe(conn, channels, opt == kCmdNamePUnSubscribe, &result);
if (subscribed == 0 && this->IsPubSub()) {
/*
* if the number of client subscribed is zero,
Expand Down Expand Up @@ -235,7 +236,9 @@ std::string PikaClientConn::DoCmd(
}

void PikaClientConn::AsynProcessRedisCmd() {
g_pika_server->Schedule(&DoBackgroundTask, this);
BgTaskArg* arg = new BgTaskArg();
arg->pcc = std::dynamic_pointer_cast<PikaClientConn>(shared_from_this());
g_pika_server->Schedule(&DoBackgroundTask, arg);
}

void PikaClientConn::BatchExecRedisCmd() {
Expand Down Expand Up @@ -267,8 +270,9 @@ int PikaClientConn::DealMessage(PikaCmdArgsType& argv) {
}

void PikaClientConn::DoBackgroundTask(void* arg) {
PikaClientConn* conn = static_cast<PikaClientConn*>(arg);
conn->BatchExecRedisCmd();
BgTaskArg* bg_arg = reinterpret_cast<BgTaskArg*>(arg);
bg_arg->pcc->BatchExecRedisCmd();
delete bg_arg;
}

// Initial permission status
Expand Down
4 changes: 1 addition & 3 deletions src/pika_monitor_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ PikaMonitorThread::~PikaMonitorThread() {
LOG(INFO) << " PikaMonitorThread " << pthread_self() << " exit!!!";
}

void PikaMonitorThread::AddMonitorClient(PikaClientConn* client_ptr) {
void PikaMonitorThread::AddMonitorClient(std::shared_ptr<PikaClientConn> client_ptr) {
StartThread();
slash::MutexLock lm(&monitor_mutex_protector_);
monitor_clients_.push_back(ClientInfo{client_ptr->fd(), client_ptr->ip_port(), 0, client_ptr});
Expand All @@ -46,12 +46,10 @@ void PikaMonitorThread::RemoveMonitorClient(const std::string& ip_port) {
for (; iter != monitor_clients_.end(); ++iter) {
if (ip_port == "all") {
close(iter->fd);
delete iter->conn;
continue;
}
if (iter->ip_port == ip_port) {
close(iter->fd);
delete iter->conn;
break;
}
}
Expand Down
12 changes: 9 additions & 3 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1485,12 +1485,18 @@ int PikaServer::Publish(const std::string& channel, const std::string& msg) {
}

// Subscribe/PSubscribe
void PikaServer::Subscribe(pink::PinkConn* conn, const std::vector<std::string>& channels, bool pattern, std::vector<std::pair<std::string, int>>* result) {
void PikaServer::Subscribe(std::shared_ptr<pink::PinkConn> conn,
const std::vector<std::string>& channels,
bool pattern,
std::vector<std::pair<std::string, int>>* result) {
pika_pubsub_thread_->Subscribe(conn, channels, pattern, result);
}

// UnSubscribe/PUnSubscribe
int PikaServer::UnSubscribe(pink::PinkConn* conn, const std::vector<std::string>& channels, bool pattern, std::vector<std::pair<std::string, int>>* result) {
int PikaServer::UnSubscribe(std::shared_ptr<pink::PinkConn> conn,
const std::vector<std::string>& channels,
bool pattern,
std::vector<std::pair<std::string, int>>* result) {
int subscribed = pika_pubsub_thread_->UnSubscribe(conn, channels, pattern, result);
return subscribed;
}
Expand All @@ -1511,7 +1517,7 @@ int PikaServer::PubSubNumPat() {
return pika_pubsub_thread_->PubSubNumPat();
}

void PikaServer::AddMonitorClient(PikaClientConn* client_ptr) {
void PikaServer::AddMonitorClient(std::shared_ptr<PikaClientConn> client_ptr) {
monitor_thread_->AddMonitorClient(client_ptr);
}

Expand Down

0 comments on commit 6ba7f06

Please sign in to comment.