diff --git a/include/pika_binlog_receiver_thread.h b/include/pika_binlog_receiver_thread.h index 6d0b6a5629..aa04f711a1 100644 --- a/include/pika_binlog_receiver_thread.h +++ b/include/pika_binlog_receiver_thread.h @@ -44,7 +44,7 @@ class PikaBinlogReceiverThread { : binlog_receiver_(binlog_receiver) { } - virtual pink::PinkConn *NewPinkConn( + virtual std::shared_ptr NewPinkConn( int connfd, const std::string &ip_port, pink::ServerThread *thread, @@ -52,10 +52,10 @@ class PikaBinlogReceiverThread { pink::PinkEpoll* pink_epoll) const override { if (g_pika_conf->identify_binlog_type() == "old") { LOG(INFO) << "Master conn factory create pika master conn"; - return new PikaMasterConn(connfd, ip_port, binlog_receiver_); + return std::make_shared(connfd, ip_port, binlog_receiver_); } else { LOG(INFO) << "Master conn factory create pika new master conn"; - return new PikaNewMasterConn(connfd, ip_port, binlog_receiver_); + return std::make_shared(connfd, ip_port, binlog_receiver_); } } diff --git a/include/pika_client_conn.h b/include/pika_client_conn.h index 3228b67a60..5d2805c6f5 100644 --- a/include/pika_client_conn.h +++ b/include/pika_client_conn.h @@ -17,6 +17,10 @@ class PikaWorkerSpecificData; class PikaClientConn: public pink::AsynRedisConn { public: + struct BgTaskArg { + std::shared_ptr pcc; + }; + PikaClientConn(int fd, std::string ip_port, pink::ServerThread *server_thread, void* worker_specific_data, pink::PinkEpoll* pink_epoll); virtual ~PikaClientConn() {} @@ -58,7 +62,7 @@ struct ClientInfo { int fd; std::string ip_port; int64_t last_interaction; - PikaClientConn* conn; + std::shared_ptr conn; }; #endif diff --git a/include/pika_dispatch_thread.h b/include/pika_dispatch_thread.h index 73322b9aab..8cacfbc129 100644 --- a/include/pika_dispatch_thread.h +++ b/include/pika_dispatch_thread.h @@ -29,13 +29,13 @@ class PikaDispatchThread { private: class ClientConnFactory : public pink::ConnFactory { public: - virtual pink::PinkConn *NewPinkConn( + virtual std::shared_ptr NewPinkConn( int connfd, const std::string &ip_port, pink::ServerThread *server_thread, void* worker_specific_data, pink::PinkEpoll* pink_epoll) const { - return new PikaClientConn(connfd, ip_port, server_thread, worker_specific_data, pink_epoll); + return std::make_shared(connfd, ip_port, server_thread, worker_specific_data, pink_epoll); } }; diff --git a/include/pika_heartbeat_thread.h b/include/pika_heartbeat_thread.h index 1c0c231900..6602531f29 100644 --- a/include/pika_heartbeat_thread.h +++ b/include/pika_heartbeat_thread.h @@ -22,13 +22,13 @@ class PikaHeartbeatThread { private: class HeartbeatConnFactory : public pink::ConnFactory { public: - virtual pink::PinkConn *NewPinkConn( + virtual std::shared_ptr NewPinkConn( int connfd, const std::string &ip_port, pink::ServerThread *thread, void* worker_specific_data, pink::PinkEpoll* pink_epoll) const override { - return new PikaHeartbeatConn(connfd, ip_port); + return std::make_shared(connfd, ip_port); } }; diff --git a/include/pika_monitor_thread.h b/include/pika_monitor_thread.h index f5af0946c6..d7e26cb4d9 100644 --- a/include/pika_monitor_thread.h +++ b/include/pika_monitor_thread.h @@ -22,7 +22,7 @@ class PikaMonitorThread : public pink::Thread { PikaMonitorThread(); virtual ~PikaMonitorThread(); - void AddMonitorClient(PikaClientConn* client_ptr); + void AddMonitorClient(std::shared_ptr client_ptr); void AddMonitorMessage(const std::string &monitor_message); int32_t ThreadClientList(std::vector* client = NULL); bool ThreadClientKill(const std::string& ip_port = "all"); diff --git a/include/pika_server.h b/include/pika_server.h index 0f9a29685f..352f206198 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -342,12 +342,12 @@ class PikaServer { * PubSub used */ int Publish(const std::string& channel, const std::string& msg); - void Subscribe(pink::PinkConn* conn, + void Subscribe(std::shared_ptr conn, const std::vector& channels, const bool pattern, std::vector>* result); - int UnSubscribe(pink::PinkConn* conn, + int UnSubscribe(std::shared_ptr conn, const std::vector& channels, const bool pattern, std::vector>* result); @@ -363,7 +363,7 @@ class PikaServer { /* * Monitor used */ - void AddMonitorClient(PikaClientConn* client_ptr); + void AddMonitorClient(std::shared_ptr client_ptr); void AddMonitorMessage(const std::string &monitor_message); bool HasMonitorClients(); diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index 8b8afc3c8b..70db071079 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -107,16 +107,16 @@ std::string PikaClientConn::DoCmd( // Monitor if (opt == kCmdNameMonitor) { - pink::PinkConn* conn = server_thread_->MoveConnOut(fd()); - assert(conn == this); - g_pika_server->AddMonitorClient(static_cast(conn)); + std::shared_ptr conn = server_thread_->MoveConnOut(fd()); + assert(conn.get() == this); + g_pika_server->AddMonitorClient(std::dynamic_pointer_cast(conn)); g_pika_server->AddMonitorMessage("OK"); return ""; // Monitor thread will return "OK" } //PubSub if (opt == kCmdNamePSubscribe || opt == kCmdNameSubscribe) { // PSubscribe or Subscribe - pink::PinkConn* conn = this; + std::shared_ptr conn = std::dynamic_pointer_cast(shared_from_this()); if (!this->IsPubSub()) { conn = server_thread_->MoveConnOut(fd()); } @@ -134,7 +134,8 @@ std::string PikaClientConn::DoCmd( channels.push_back(slash::StringToLower(argv[i])); } std::vector> result; - int subscribed = g_pika_server->UnSubscribe(this, channels, opt == kCmdNamePUnSubscribe, &result); + std::shared_ptr conn = std::dynamic_pointer_cast(shared_from_this()); + int subscribed = g_pika_server->UnSubscribe(conn, channels, opt == kCmdNamePUnSubscribe, &result); if (subscribed == 0 && this->IsPubSub()) { /* * if the number of client subscribed is zero, @@ -235,7 +236,9 @@ std::string PikaClientConn::DoCmd( } void PikaClientConn::AsynProcessRedisCmd() { - g_pika_server->Schedule(&DoBackgroundTask, this); + BgTaskArg* arg = new BgTaskArg(); + arg->pcc = std::dynamic_pointer_cast(shared_from_this()); + g_pika_server->Schedule(&DoBackgroundTask, arg); } void PikaClientConn::BatchExecRedisCmd() { @@ -267,8 +270,9 @@ int PikaClientConn::DealMessage(PikaCmdArgsType& argv) { } void PikaClientConn::DoBackgroundTask(void* arg) { - PikaClientConn* conn = static_cast(arg); - conn->BatchExecRedisCmd(); + BgTaskArg* bg_arg = reinterpret_cast(arg); + bg_arg->pcc->BatchExecRedisCmd(); + delete bg_arg; } // Initial permission status diff --git a/src/pika_monitor_thread.cc b/src/pika_monitor_thread.cc index 5e79678392..b281fd5fc6 100644 --- a/src/pika_monitor_thread.cc +++ b/src/pika_monitor_thread.cc @@ -35,7 +35,7 @@ PikaMonitorThread::~PikaMonitorThread() { LOG(INFO) << " PikaMonitorThread " << pthread_self() << " exit!!!"; } -void PikaMonitorThread::AddMonitorClient(PikaClientConn* client_ptr) { +void PikaMonitorThread::AddMonitorClient(std::shared_ptr client_ptr) { StartThread(); slash::MutexLock lm(&monitor_mutex_protector_); monitor_clients_.push_back(ClientInfo{client_ptr->fd(), client_ptr->ip_port(), 0, client_ptr}); @@ -46,12 +46,10 @@ void PikaMonitorThread::RemoveMonitorClient(const std::string& ip_port) { for (; iter != monitor_clients_.end(); ++iter) { if (ip_port == "all") { close(iter->fd); - delete iter->conn; continue; } if (iter->ip_port == ip_port) { close(iter->fd); - delete iter->conn; break; } } diff --git a/src/pika_server.cc b/src/pika_server.cc index b8c239aa6d..26af436808 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -1485,12 +1485,18 @@ int PikaServer::Publish(const std::string& channel, const std::string& msg) { } // Subscribe/PSubscribe -void PikaServer::Subscribe(pink::PinkConn* conn, const std::vector& channels, bool pattern, std::vector>* result) { +void PikaServer::Subscribe(std::shared_ptr conn, + const std::vector& channels, + bool pattern, + std::vector>* result) { pika_pubsub_thread_->Subscribe(conn, channels, pattern, result); } // UnSubscribe/PUnSubscribe -int PikaServer::UnSubscribe(pink::PinkConn* conn, const std::vector& channels, bool pattern, std::vector>* result) { +int PikaServer::UnSubscribe(std::shared_ptr conn, + const std::vector& channels, + bool pattern, + std::vector>* result) { int subscribed = pika_pubsub_thread_->UnSubscribe(conn, channels, pattern, result); return subscribed; } @@ -1511,7 +1517,7 @@ int PikaServer::PubSubNumPat() { return pika_pubsub_thread_->PubSubNumPat(); } -void PikaServer::AddMonitorClient(PikaClientConn* client_ptr) { +void PikaServer::AddMonitorClient(std::shared_ptr client_ptr) { monitor_thread_->AddMonitorClient(client_ptr); } diff --git a/third/pink b/third/pink index 4c8d083798..9eedb94646 160000 --- a/third/pink +++ b/third/pink @@ -1 +1 @@ -Subproject commit 4c8d0837980d6af367c8876b400ae49657261404 +Subproject commit 9eedb94646c8d565a06a2f855ae90cfb9e107814