Skip to content

Commit

Permalink
Merge support hub branch
Browse files Browse the repository at this point in the history
  • Loading branch information
gaodunqiao committed Nov 7, 2017
1 parent 1505d18 commit 4a70d00
Show file tree
Hide file tree
Showing 64 changed files with 2,029 additions and 384 deletions.
24 changes: 12 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -87,30 +87,30 @@ GLOG_PATH = $(THIRD_PATH)/glog
endif

ifeq ($(360), 1)
GLOG := $(GLOG_PATH)/.libs/libglog.so.0.0.0
GLOG := $(GLOG_PATH)/.libs/libglog.a
endif

INCLUDE_PATH = -I./include \
-I$(SLASH_PATH)/ \
-I$(PINK_PATH)/ \
INCLUDE_PATH = -I. \
-I$(SLASH_PATH) \
-I$(PINK_PATH) \
-I$(NEMO_PATH)/include \
-I$(NEMODB_PATH)/include \
-I$(ROCKSDB_PATH)/ \
-I$(ROCKSDB_PATH) \
-I$(ROCKSDB_PATH)/include

ifeq ($(360),1)
INCLUDE_PATH += -I$(GLOG_PATH)/src/
INCLUDE_PATH += -I$(GLOG_PATH)/src
endif

LIB_PATH = -L./ \
-L$(SLASH_PATH)/slash/lib/ \
-L$(PINK_PATH)/pink/lib/ \
-L$(NEMO_PATH)/lib/ \
-L$(SLASH_PATH)/slash/lib \
-L$(PINK_PATH)/pink/lib \
-L$(NEMO_PATH)/lib \
-L$(NEMODB_PATH)/lib \
-L$(ROCKSDB_PATH)/
-L$(ROCKSDB_PATH)

ifeq ($(360),1)
LIB_PATH += -L$(GLOG_PATH)/.libs/
LIB_PATH += -L$(GLOG_PATH)/.libs
endif

LDFLAGS += $(LIB_PATH) \
Expand Down Expand Up @@ -232,7 +232,7 @@ $(NEMO):
$(AM_V_at)make -C $(NEMO_PATH) NEMODB_PATH=$(NEMODB_PATH) ROCKSDB_PATH=$(ROCKSDB_PATH) DEBUG_LEVEL=$(DEBUG_LEVEL)

$(GLOG):
cd $(THIRD_PATH)/glog; if [ ! -f ./Makefile ]; then ./configure; fi; make; echo '*' > $(CURDIR)/third/glog/.gitignore;
cd $(THIRD_PATH)/glog; if [ ! -f ./Makefile ]; then ./configure --disable-shared; fi; make; echo '*' > $(CURDIR)/third/glog/.gitignore;

clean:
rm -rf $(OUTPUT)
Expand Down
8 changes: 8 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,20 @@ db-sync-speed : -1
#
# CronTask, format: start-end/ratio, like 02-04/60, pika will check to schedule compaction between 2 to 4 o'clock everyday
# if the freesize/disksize > 60%. NOTICE:if compact-interval is set, compact-cron will be mask and disable.
#
# server-id for hub
server-id : 1
# compact-cron :
#
# Compact-interval, format: interval/ratio, like 6/60, pika will check to schedule compaction every 6 hours,
# if the freesize/disksize > 60%. NOTICE:compact-interval is prior than compact-cron;
# compact-interval :

# The peer-master config
double-master-ip :
double-master-port :
double-master-server-id :

###################
## Critical Settings
###################
Expand Down
25 changes: 22 additions & 3 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

#ifndef PIKA_ADMIN_H_
#define PIKA_ADMIN_H_
#include "pika_command.h"
#include "pika_client_conn.h"
#include "include/pika_command.h"
#include "include/pika_client_conn.h"

/*
* Admin
Expand Down Expand Up @@ -44,6 +44,19 @@ class TrysyncCmd : public Cmd {
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class InternalTrysyncCmd : public Cmd {
public:
InternalTrysyncCmd() {
}
virtual void Do();
private:
std::string hub_ip_;
int64_t hub_port_;
int64_t filenum_;
int64_t pro_offset_;
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class AuthCmd : public Cmd {
public:
AuthCmd() {
Expand Down Expand Up @@ -146,13 +159,15 @@ class InfoCmd : public Cmd {
kInfoErr = 0x0,
kInfoServer,
kInfoClients,
kInfoHub,
kInfoStats,
kInfoReplication,
kInfoKeyspace,
kInfoBgstats,
kInfoLog,
kInfoData,
kInfoAll
kInfoAll,
kInfoDoubleMaster
};

InfoCmd() : rescan_(false), off_(false) {
Expand All @@ -166,11 +181,13 @@ class InfoCmd : public Cmd {
const static std::string kAllSection;
const static std::string kServerSection;
const static std::string kClientsSection;
const static std::string kHubSection;
const static std::string kStatsSection;
const static std::string kReplicationSection;
const static std::string kKeyspaceSection;
const static std::string kLogSection;
const static std::string kDataSection;
const static std::string kDoubleMaster;

virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
virtual void Clear() {
Expand All @@ -180,11 +197,13 @@ class InfoCmd : public Cmd {

void InfoServer(std::string &info);
void InfoClients(std::string &info);
void InfoHub(std::string &info);
void InfoStats(std::string &info);
void InfoReplication(std::string &info);
void InfoKeyspace(std::string &info);
void InfoLog(std::string &info);
void InfoData(std::string &info);
void InfoDoubleMaster(std::string &info);
};

class ShutdownCmd : public Cmd {
Expand Down
13 changes: 11 additions & 2 deletions include/pika_binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include "slash/include/slash_status.h"
#include "slash/include/slash_mutex.h"
#include "slash/include/env.h"
#include "pika_define.h"
#include "include/pika_define.h"

using slash::Status;
using slash::Slice;
Expand All @@ -46,11 +46,15 @@ class Binlog {
*/
Status SetProducerStatus(uint32_t filenum, uint64_t pro_offset);

// Double master used
Status GetDoubleRecvInfo(uint32_t* double_filenum, uint64_t* double_offset);

Status SetDoubleRecvInfo(uint32_t double_filenum, uint64_t double_offset);

static Status AppendBlank(slash::WritableFile *file, uint64_t len);

slash::WritableFile *queue() { return queue_; }


uint64_t file_size() {
return file_size_;
}
Expand Down Expand Up @@ -114,6 +118,11 @@ class Version {

uint64_t pro_offset_;
uint32_t pro_num_;

// Double master used
uint64_t double_master_recv_offset_;
uint32_t double_master_recv_num_;

pthread_rwlock_t rwlock_;

void debug() {
Expand Down
12 changes: 5 additions & 7 deletions include/pika_binlog_bgworker.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

#ifndef PIKA_BINLOG_BGWORKER_H_
#define PIKA_BINLOG_BGWORKER_H_
#include "pika_command.h"
#include "include/pika_command.h"
#include "pink/include/bg_thread.h"

class BinlogBGWorker {
Expand All @@ -22,9 +22,8 @@ class BinlogBGWorker {
Cmd* GetCmd(const std::string& opt) {
return GetCmdFromTable(opt, cmds_);
}
void Schedule(PikaCmdArgsType *argv, const std::string& raw_args,
uint64_t serial, bool readonly) {
BinlogBGArg *arg = new BinlogBGArg(argv, raw_args, serial, readonly, this);
void Schedule(PikaCmdArgsType *argv, uint64_t serial, bool readonly) {
BinlogBGArg *arg = new BinlogBGArg(argv, serial, readonly, this);
binlogbg_thread_.StartThread();
binlogbg_thread_.Schedule(&DoBinlogBG, static_cast<void*>(arg));
}
Expand All @@ -36,13 +35,12 @@ class BinlogBGWorker {

struct BinlogBGArg {
PikaCmdArgsType *argv;
std::string raw_args;
uint64_t serial;
bool readonly; // Server readonly status at the view of binlog dispatch thread
BinlogBGWorker *myself;
BinlogBGArg(PikaCmdArgsType* _argv, const std::string& _raw, uint64_t _s,
BinlogBGArg(PikaCmdArgsType* _argv, uint64_t _s,
bool _readonly, BinlogBGWorker* _my)
: argv(_argv), raw_args(_raw), serial(_s), readonly(_readonly), myself(_my) {
: argv(_argv), serial(_s), readonly(_readonly), myself(_my) {
}
};

Expand Down
12 changes: 9 additions & 3 deletions include/pika_binlog_receiver_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
#include "pink/include/server_thread.h"
#include "slash/include/slash_mutex.h"
#include "slash/include/env.h"
#include "pika_define.h"
#include "pika_master_conn.h"
#include "pika_command.h"
#include "include/pika_define.h"
#include "include/pika_master_conn.h"
#include "include/pika_command.h"

class PikaBinlogReceiverThread {
public:
Expand All @@ -29,6 +29,10 @@ class PikaBinlogReceiverThread {
return serial_++;
}

Cmd* GetCmd(const std::string& opt) {
return GetCmdFromTable(opt, cmds_);
}

private:
class MasterConnFactory : public pink::ConnFactory {
public:
Expand Down Expand Up @@ -65,6 +69,8 @@ class PikaBinlogReceiverThread {
Handles handles_;
pink::ServerThread* thread_rep_;

CmdTable cmds_;

uint64_t serial_;
};
#endif
4 changes: 1 addition & 3 deletions include/pika_binlog_sender_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@ class PikaBinlogSenderThread : public pink::Thread {
}

int trim();
uint64_t get_next(bool &is_error);


private:

uint64_t get_next(bool &is_error);
Status Parse(std::string &scratch);
Status Consume(std::string &scratch);
unsigned int ReadPhysicalRecord(slash::Slice *fragment);
Expand Down
2 changes: 1 addition & 1 deletion include/pika_bit.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

#ifndef PIKA_BIT_H_
#define PIKA_BIT_H_
#include "pika_command.h"
#include "include/pika_command.h"
#include "nemo.h"


Expand Down
3 changes: 1 addition & 2 deletions include/pika_client_conn.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

#include "pink/include/redis_conn.h"
#include "pink/include/pink_thread.h"
#include "pika_command.h"
#include "include/pika_command.h"

class PikaWorkerSpecificData;

Expand All @@ -27,7 +27,6 @@ class PikaClientConn: public pink::RedisConn {
CmdTable* const cmds_table_;

std::string DoCmd(const std::string& opt);
std::string RestoreArgs();

// Auth related
class AuthStat {
Expand Down
Loading

0 comments on commit 4a70d00

Please sign in to comment.