Skip to content

Commit

Permalink
Upgrade pink and slash
Browse files Browse the repository at this point in the history
  • Loading branch information
gaodunqiao committed Jun 15, 2017
1 parent 33072be commit 0487618
Show file tree
Hide file tree
Showing 64 changed files with 618 additions and 455 deletions.
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
url = https://github.com/Qihoo360/pink.git
[submodule "third/slash"]
path = third/slash
url = https://github.com/baotiao/slash.git
url = https://github.com/Qihoo360/slash.git
21 changes: 10 additions & 11 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,13 @@ INCLUDE_PATH = -I./include/ \
-I$(THIRD_PATH)/nemo/output/include/ \
-I$(THIRD_PATH)/nemo/3rdparty/nemo-rocksdb/rocksdb/ \
-I$(THIRD_PATH)/nemo/3rdparty/nemo-rocksdb/rocksdb/include \
-I$(THIRD_PATH)/slash/output/include/ \
-I$(THIRD_PATH)/pink/output/include/ \
-I$(THIRD_PATH)/pink/output/
-I$(THIRD_PATH)/slash \
-I$(THIRD_PATH)/pink \

LIB_PATH = -L./ \
-L$(THIRD_PATH)/nemo/output/lib/ \
-L$(THIRD_PATH)/slash/output/lib/ \
-L$(THIRD_PATH)/pink/output/lib/ \
-L$(THIRD_PATH)/slash/slash/lib/ \
-L$(THIRD_PATH)/pink/pink/lib/ \
-L$(THIRD_PATH)/glog/.libs/


Expand All @@ -69,8 +68,8 @@ CXXFLAGS += $(TCMALLOC_EXTENSION_FLAGS)

NEMO = $(THIRD_PATH)/nemo/output/lib/libnemo.a
GLOG = $(SO_DIR)/libglog.so.0
PINK = $(THIRD_PATH)/pink/output/lib/libpink.a
SLASH = $(THIRD_PATH)/slash/output/lib/libslash.a
PINK = $(THIRD_PATH)/pink/lib/libpink.a
SLASH = $(THIRD_PATH)/slash/lib/libslash.a

.PHONY: all clean

Expand Down Expand Up @@ -114,10 +113,10 @@ $(NEMO):
make -C $(THIRD_PATH)/nemo/

$(SLASH):
make -C $(THIRD_PATH)/slash/
make -C $(THIRD_PATH)/slash/slash/

$(PINK):
make -C $(THIRD_PATH)/pink/
make -C $(THIRD_PATH)/pink/pink/ SLASH_PATH=$(THIRD_PATH)/slash

$(OBJS): %.o : %.cc
$(CXX) $(CXXFLAGS) -c $< -o $@ $(INCLUDE_PATH)
Expand All @@ -142,8 +141,8 @@ distclean:
rm -rf $(CURDIR)/make_config.mk
make distclean -C $(THIRD_PATH)/nemo/3rdparty/nemo-rocksdb/
make clean -C $(THIRD_PATH)/nemo/
make clean -C $(THIRD_PATH)/pink/
make clean -C $(THIRD_PATH)/slash/
make clean -C $(THIRD_PATH)/pink/pink
make clean -C $(THIRD_PATH)/slash/slash
make distclean -C $(THIRD_PATH)/glog/
make clean -C $(CURDIR)/tools/aof_to_pika
make clean -C $(CURDIR)/tools/pika_monitor
Expand Down
9 changes: 3 additions & 6 deletions include/pika_binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,14 @@
# include <inttypes.h>
#endif

#include "env.h"
//#include "port.h"
#include "slash/include/slash_status.h"
#include "slash/include/slash_mutex.h"
#include "slash/include/env.h"
#include "pika_define.h"

#include "slash_status.h"
#include "slash_mutex.h"

using slash::Status;
using slash::Slice;


std::string NewFileName(const std::string name, const uint32_t current);

class Version;
Expand Down
14 changes: 8 additions & 6 deletions include/pika_binlog_bgworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,26 @@
#ifndef PIKA_BINLOG_BGWORKER_H_
#define PIKA_BINLOG_BGWORKER_H_
#include "pika_command.h"
#include "bg_thread.h"
#include "pink/include/bg_thread.h"

class BinlogBGWorker{
class BinlogBGWorker {
public:
BinlogBGWorker(int full) : binlogbg_thread_(full){
BinlogBGWorker(int full)
: binlogbg_thread_(full) {
cmds_.reserve(300);
InitCmdTable(&cmds_);
}
~BinlogBGWorker() {
binlogbg_thread_.Stop();
binlogbg_thread_.StopThread();
DestoryCmdTable(cmds_);
}
Cmd* GetCmd(const std::string& opt) {
return GetCmdFromTable(opt, cmds_);
}
void Schedule(PikaCmdArgsType *argv, const std::string& raw_args, uint64_t serial, bool readonly) {
void Schedule(PikaCmdArgsType *argv, const std::string& raw_args,
uint64_t serial, bool readonly) {
BinlogBGArg *arg = new BinlogBGArg(argv, raw_args, serial, readonly, this);
binlogbg_thread_.StartIfNeed();
binlogbg_thread_.StartThread();
binlogbg_thread_.Schedule(&DoBinlogBG, static_cast<void*>(arg));
}
static void DoBinlogBG(void* arg);
Expand Down
56 changes: 42 additions & 14 deletions include/pika_binlog_receiver_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,20 @@
#include <queue>
#include <set>

#include "holy_thread.h"
#include "slash_mutex.h"
#include "env.h"
#include "pink/include/server_thread.h"
#include "slash/include/slash_mutex.h"
#include "slash/include/env.h"
#include "pika_define.h"
#include "pika_master_conn.h"
#include "pika_command.h"

class PikaBinlogReceiverThread : public pink::HolyThread<PikaMasterConn>
{
public:
class PikaBinlogReceiverThread {
public:
PikaBinlogReceiverThread(std::string &ip, int port, int cron_interval = 0);
PikaBinlogReceiverThread(std::set<std::string> &ips, int port, int cron_interval = 0);
virtual ~PikaBinlogReceiverThread();
virtual void CronHandle();
virtual bool AccessHandle(std::string& ip);
~PikaBinlogReceiverThread();
void KillBinlogSender();
int StartThread();

uint64_t thread_querynum() {
slash::RWLock(&rwlock_, false);
Expand Down Expand Up @@ -60,21 +58,51 @@ class PikaBinlogReceiverThread : public pink::HolyThread<PikaMasterConn>
}

int32_t ThreadClientNum() {
slash::RWLock(&rwlock_, false);
int32_t num = conns_.size();
return num;
return thread_rep_->conn_num();
}

Cmd* GetCmd(const std::string& opt) {
return GetCmdFromTable(opt, cmds_);
}

private:
slash::Mutex mutex_; // protect cron_task_
private:
class MasterConnFactory : public pink::ConnFactory {
public:
virtual pink::PinkConn *NewPinkConn(int connfd,
const std::string &ip_port,
pink::Thread *thread) const {
return new PikaMasterConn(connfd, ip_port, thread);
}
};

class PikaBinlogReceiverHandles : public pink::ServerHandle {
public:
explicit PikaBinlogReceiverHandles(PikaBinlogReceiverThread* binlog_receiver)
: binlog_receiver_(binlog_receiver) {
}
void CronHandle() {
binlog_receiver_->CronHandle();
}
void AccessHandle(std::string& ip) {
binlog_receiver_->AccessHandle(ip);
}

private:
PikaBinlogReceiverThread* binlog_receiver_;
};

MasterConnFactory* conn_factory_;
PikaBinlogReceiverHandles* handles_;
pink::ServerThread* thread_rep_;

bool AccessHandle(std::string& ip);
void CronHandle();
void AddCronTask(WorkerCronTask task);
void KillAll();
slash::Mutex mutex_; // protect cron_task_
std::queue<WorkerCronTask> cron_tasks_;

pthread_rwlock_t rwlock_;
std::unordered_map<std::string, Cmd*> cmds_;
uint64_t thread_querynum_;
uint64_t last_thread_querynum_;
Expand Down
31 changes: 15 additions & 16 deletions include/pika_binlog_sender_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,22 @@
#ifndef PIKA_BINLOG_SENDER_THREAD_H_
#define PIKA_BINLOG_SENDER_THREAD_H_

#include "pink_thread.h"
//#include "redis_cli.h"
#include "slice.h"
#include "status.h"
#include "pink/include/pink_thread.h"
#include "pink/include/pink_cli.h"
#include "slash/include/slash_slice.h"
#include "slash/include/slash_status.h"
#include "slash/include/env.h"
#include "slash/include/slash_mutex.h"

#include "env.h"
#include "slash_mutex.h"


namespace pink {
class RedisCli;
}
using slash::Status;
using slash::Slice;

class PikaBinlogSenderThread : public pink::Thread {
public:

PikaBinlogSenderThread(const std::string &ip, int port, slash::SequentialFile *queue, uint32_t filenum, uint64_t con_offset);
PikaBinlogSenderThread(const std::string &ip, int port,
slash::SequentialFile *queue,
uint32_t filenum, uint64_t con_offset);

virtual ~PikaBinlogSenderThread();

Expand All @@ -48,8 +47,8 @@ class PikaBinlogSenderThread : public pink::Thread {

private:

slash::Status Parse(std::string &scratch);
slash::Status Consume(std::string &scratch);
Status Parse(std::string &scratch);
Status Consume(std::string &scratch);
unsigned int ReadPhysicalRecord(slash::Slice *fragment);

uint64_t con_offset_;
Expand All @@ -61,7 +60,7 @@ class PikaBinlogSenderThread : public pink::Thread {

slash::SequentialFile* queue_;
char* const backing_store_;
slash::Slice buffer_;
Slice buffer_;


std::string ip_;
Expand All @@ -70,7 +69,7 @@ class PikaBinlogSenderThread : public pink::Thread {
pthread_rwlock_t rwlock_;

int timeout_ms_;
pink::RedisCli *cli_;
pink::PinkCli *cli_;

virtual void* ThreadMain();
};
Expand Down
4 changes: 2 additions & 2 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
#include <glog/logging.h>
#include <atomic>

#include "redis_conn.h"
#include "pink_thread.h"
#include "pink/include/redis_conn.h"
#include "pink/include/pink_thread.h"
#include "pika_command.h"


Expand Down
5 changes: 3 additions & 2 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
#include <deque>
#include <string>
#include <memory>
#include <redis_conn.h>
#include <unordered_map>
#include "slash_string.h"

#include "slash/include/slash_string.h"
#include "pink/include/redis_conn.h"


//Constant for command name
Expand Down
12 changes: 6 additions & 6 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,20 @@
#ifndef PIKA_CONF_H_
#define PIKA_CONF_H_
#include <pthread.h>
#include "stdlib.h"
#include <stdlib.h>
#include <string>
#include <vector>

#include "base_conf.h"
#include "slash_mutex.h"
#include "slash_string.h"
#include "slash/include/base_conf.h"
#include "slash/include/slash_mutex.h"
#include "slash/include/slash_string.h"
#include "slash/include/xdebug.h"
#include "pika_define.h"
#include "xdebug.h"

typedef slash::RWLock RWLock;

class PikaConf : public slash::BaseConf {
public:
public:
PikaConf(const std::string& path);
~PikaConf() { pthread_rwlock_destroy(&rwlock_); }

Expand Down
50 changes: 41 additions & 9 deletions include/pika_dispatch_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,56 @@
#ifndef PIKA_DISPATCH_THREAD_H_
#define PIKA_DISPATCH_THREAD_H_

#include "pink/include/server_thread.h"
#include "pika_worker_thread.h"
#include "dispatch_thread.h"
#include "pika_client_conn.h"

class PikaDispatchThread : public pink::DispatchThread<PikaClientConn>
{
public:
PikaDispatchThread(int port, int work_num, PikaWorkerThread** pika_worker_thread,
class PikaDispatchThread {
public:
PikaDispatchThread(int port, int work_num,
int cron_interval, int queue_limit);
PikaDispatchThread(std::string &ip, int port, int work_num,
PikaWorkerThread** pika_worker_thread,
int cron_interval, int queue_limit);
PikaDispatchThread(std::set<std::string> &ips, int port, int work_num,
PikaWorkerThread** pika_worker_thread,
int cron_interval, int queue_limit);
virtual ~PikaDispatchThread();
virtual bool AccessHandle(std::string& ip);
~PikaDispatchThread();
int StartThread();

PikaWorkerThread** pika_worker_threads() {
return pika_worker_threads_;
}
int ClientNum();

private:
class ClientConnFactory : public pink::ConnFactory {
public:
virtual pink::PinkConn *NewPinkConn(int connfd,
const std::string &ip_port,
pink::Thread *thread) const {
return new PikaClientConn(connfd, ip_port, thread);
}
};

class PikaDispatchHandles : public pink::ServerHandle {
public:
explicit PikaDispatchHandles(PikaDispatchThread* pika_disptcher)
: pika_disptcher_(pika_disptcher) {
}
void AccessHandle(std::string& ip) {
pika_disptcher_->AccessHandle(ip);
}

private:
PikaDispatchThread* pika_disptcher_;
};


ClientConnFactory* conn_factory_;
PikaDispatchHandles* handles_;
int work_num_;
PikaWorkerThread** pika_worker_threads_;
pink::ServerThread* thread_rep_;

bool AccessHandle(std::string& ip);
};
#endif
Loading

0 comments on commit 0487618

Please sign in to comment.