Skip to content

Commit

Permalink
Merge
Browse files Browse the repository at this point in the history
  • Loading branch information
CatKang committed Apr 14, 2016
2 parents 6e32516 + 7888efd commit e04d88b
Show file tree
Hide file tree
Showing 19 changed files with 249 additions and 237 deletions.
24 changes: 17 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,29 @@
RPATH = /usr/local/pika11/lib/
LFLAGS = -Wl,-rpath=$(RPATH)


UNAME := $(shell if [ -f "/etc/redhat-release" ]; then echo "CentOS"; else echo "Ubuntu"; fi)

OSVERSION := $(shell cat /etc/redhat-release | cut -d "." -f 1 | awk '{print $$NF}')
ifeq ($(OSVERSION), 5)
SO_DIR = $(CURDIR)/lib/5.4
TOOLS_DIR = $(CURDIR)/tools/5.4

ifeq ($(UNAME), Ubuntu)
SO_DIR = $(CURDIR)/lib/ubuntu
TOOLS_DIR = $(CURDIR)/tools/ubuntu
else ifeq ($(OSVERSION), 5)
SO_DIR = $(CURDIR)/lib/5.4
TOOLS_DIR = $(CURDIR)/tools/5.4
else
SO_DIR = $(CURDIR)/lib/6.2
TOOLS_DIR = $(CURDIR)/tools/6.2
SO_DIR = $(CURDIR)/lib/6.2
TOOLS_DIR = $(CURDIR)/tools/6.2
endif

CXX = g++

ifeq ($(__REL), 1)
#CXXFLAGS = -Wall -W -DDEBUG -g -O0 -D__XDEBUG__ -fPIC -Wno-unused-function -std=c++11
CXXFLAGS = -O2 -g -pipe -fPIC -W -DNDEBUG -Wwrite-strings -Wpointer-arith -Wreorder -Wswitch -Wsign-promo -Wredundant-decls -Wformat -Wall -D_GNU_SOURCE -D__STDC_FORMAT_MACROS -std=c++11 -gdwarf-2 -Wno-redundant-decls
CXXFLAGS = -O2 -g -pipe -fPIC -W -DNDEBUG -Wwrite-strings -Wpointer-arith -Wreorder -Wswitch -Wsign-promo -Wredundant-decls -Wformat -Wall -Wno-unused-parameter -D_GNU_SOURCE -D__STDC_FORMAT_MACROS -std=c++11 -gdwarf-2 -Wno-redundant-decls
else
CXXFLAGS = -O0 -g -pg -pipe -fPIC -W -DDEBUG -Wwrite-strings -Wpointer-arith -Wreorder -Wswitch -Wsign-promo -Wredundant-decls -Wformat -Wall -D_GNU_SOURCE -std=c++11 -D__STDC_FORMAT_MACROS -std=c++11 -Wno-redundant-decls
CXXFLAGS = -O0 -g -pg -pipe -fPIC -W -DDEBUG -Wwrite-strings -Wpointer-arith -Wreorder -Wswitch -Wsign-promo -Wredundant-decls -Wformat -Wall -Wno-unused-parameter -D_GNU_SOURCE -std=c++11 -D__STDC_FORMAT_MACROS -std=c++11 -Wno-redundant-decls
endif

OBJECT = pika
Expand Down Expand Up @@ -65,6 +72,9 @@ OBJS = $(patsubst %.cc,%.o,$(BASE_OBJS))


all: $(OBJECT)
@echo "UNAME : $(UNAME)"
@echo "SO_DIR : $(SO_DIR)"
@echo "TOOLS_DIR: $(TOOLS_DIR)"
rm -rf $(OUTPUT)
mkdir $(OUTPUT)
mkdir $(OUTPUT)/bin
Expand Down
3 changes: 2 additions & 1 deletion include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class SlaveofCmd : public Cmd {
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
virtual void Clear() {
is_noone_ = false;
have_offset_ == false;
have_offset_ = false;
}
};

Expand Down Expand Up @@ -153,6 +153,7 @@ class InfoCmd : public Cmd {
InfoSection info_section_;
bool rescan_; //whether to rescan the keyspace

const static std::string kAllSection;
const static std::string kServerSection;
const static std::string kClientsSection;
const static std::string kStatsSection;
Expand Down
75 changes: 19 additions & 56 deletions include/pika_binlog.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,30 +45,23 @@ class Binlog
*/
Status SetProducerStatus(uint32_t filenum, uint64_t pro_offset);


// Set the filenum and con_offset of the consumer which has the given ip and port;
// return NotFound when can not find the consumer with the given ip and port;
// return InvalidArgument when the filenum and con_offset are invalid;
Status SetConsumer(int fd, uint32_t filenum, uint64_t con_offset);
// no lock
Status GetConsumerStatus(int fd, uint32_t* filenum, uint64_t* con_offset);

static Status AppendBlank(slash::WritableFile *file, uint64_t len);

slash::WritableFile *queue() { return queue_; }
//slash::WritableFile *writefile() { return writefile_; }


uint64_t file_size() {
return file_size_;
}

std::string filename;
Version* version_;

private:

void InitLogFile();
Status EmitPhysicalRecord(RecordType t, const char *ptr, size_t n, int *temp_pro_offset);


/*
* Produce
*/
Expand All @@ -77,13 +70,13 @@ class Binlog
uint32_t consumer_num_;
uint64_t item_num_;

Version* version_;
slash::WritableFile *queue_;
slash::RWFile *versionfile_;

slash::Mutex mutex_;

uint32_t pro_num_;
int32_t retry_;

int block_offset_;

Expand All @@ -92,59 +85,35 @@ class Binlog
const std::string binlog_path_;

uint64_t file_size_;

// Not use
//int32_t retry_;

// No copying allowed
Binlog(const Binlog&);
void operator=(const Binlog&);
};

// We have to reserve the useless con_offset_, con_num_ and item_num,
// to be compatable with version 1.x .
class Version {
public:
Version(slash::RWFile *save);
~Version();

// Status Recovery(WritableFile *save);

Status StableSave();
Status Init();

uint64_t pro_offset() {
slash::RWLock(&rwlock_, false);
return pro_offset_;
}
void set_pro_offset(uint64_t pro_offset) {
slash::RWLock(&rwlock_, true);
pro_offset_ = pro_offset;
}
void rise_pro_offset(uint64_t r) {
slash::RWLock(&rwlock_, true);
pro_offset_ += r;
}
// RWLock should be held when access members.
Status StableSave();

uint32_t pro_num() {
slash::RWLock(&rwlock_, false);
return pro_num_;
}
void set_pro_num(uint32_t pro_num) {
slash::RWLock(&rwlock_, true);
pro_num_ = pro_num;
}
uint32_t item_num() { return item_num_; }
void set_item_num(uint32_t item_num) { item_num_ = item_num; }
void plus_item_num() { item_num_++; }
void minus_item_num() { item_num_--; }

uint32_t item_num() {
slash::RWLock(&rwlock_, false);
return item_num_;
}
void set_item_num(uint32_t item_num) {
slash::RWLock(&rwlock_, true);
item_num_ = item_num;
}
void plus_item_num() {
slash::RWLock(&rwlock_, true);
item_num_++;
}
void minus_item_num() {
slash::RWLock(&rwlock_, true);
item_num_--;
}
uint64_t pro_offset_;
uint32_t pro_num_;
pthread_rwlock_t rwlock_;

void debug() {
slash::RWLock(&rwlock_, false);
Expand All @@ -153,13 +122,7 @@ class Version {

private:

uint64_t pro_offset_;
uint32_t pro_num_;


slash::RWFile *save_;
pthread_rwlock_t rwlock_;
// port::Mutex mutex_;

// Not used
uint64_t con_offset_;
Expand Down
9 changes: 7 additions & 2 deletions include/pika_binlog_receiver_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "holy_thread.h"
#include "slash_mutex.h"
#include "env.h"
#include "pika_define.h"
#include "pika_master_conn.h"
#include "pika_command.h"
Expand All @@ -31,12 +32,14 @@ class PikaBinlogReceiverThread : public pink::HolyThread<PikaMasterConn>
void PlusThreadQuerynum() {
slash::RWLock(&rwlock_, true);
thread_querynum_++;
last_sec_thread_querynum_++;
}

void ResetLastSecQuerynum() {
uint64_t cur_time_ms = slash::NowMicros();
slash::RWLock(&rwlock_, true);
last_sec_thread_querynum_ = 0;
last_sec_thread_querynum_ = (thread_querynum_ - last_thread_querynum_) * 1000000 / (cur_time_ms - last_time_us_+1);
last_time_us_ = cur_time_ms;
last_thread_querynum_ = thread_querynum_;
}

int32_t ThreadClientNum() {
Expand All @@ -57,6 +60,8 @@ class PikaBinlogReceiverThread : public pink::HolyThread<PikaMasterConn>

std::unordered_map<std::string, Cmd*> cmds_;
uint64_t thread_querynum_;
uint64_t last_thread_querynum_;
uint64_t last_time_us_;
uint64_t last_sec_thread_querynum_;
};
#endif
5 changes: 3 additions & 2 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class PikaServer
* Master use
*/
int64_t GenSid() {
slash::MutexLock l(&slave_mutex_);
// slash::MutexLock l(&slave_mutex_);
int64_t sid = sid_;
sid_++;
return sid;
Expand Down Expand Up @@ -218,7 +218,7 @@ class PikaServer
std::string s_start_time;
std::vector<uint64_t> key_nums_v; //the order is kv, hash, list, zset, set
bool key_scaning_;
KeyScanInfo() : start_time(0), key_nums_v({0, 0, 0, 0, 0}), key_scaning_(false) {
KeyScanInfo() : start_time(0), s_start_time("1970-01-01 08:00:00"), key_nums_v({0, 0, 0, 0, 0}), key_scaning_(false) {
}
};
bool key_scaning() {
Expand Down Expand Up @@ -251,6 +251,7 @@ class PikaServer
void incr_accumulative_connections() {
++accumulative_connections_;
}
slash::RecordMutex mutex_record_;

private:
std::atomic<bool> exit_;
Expand Down
12 changes: 8 additions & 4 deletions include/pika_worker_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "worker_thread.h"
#include "pika_define.h"
#include "slash_mutex.h"
#include "env.h"
#include "pika_client_conn.h"
#include "pika_command.h"

Expand Down Expand Up @@ -33,23 +34,26 @@ class PikaWorkerThread : public pink::WorkerThread<PikaClientConn>
void PlusThreadQuerynum() {
slash::RWLock(&rwlock_, true);
thread_querynum_++;
last_sec_thread_querynum_++;
}

void ResetLastSecQuerynum() {
slash::RWLock(&rwlock_, true);
last_sec_thread_querynum_ = 0;
uint64_t cur_time_us = slash::NowMicros();
slash::RWLock l(&rwlock_, true);
last_sec_thread_querynum_ = ((thread_querynum_ - last_thread_querynum_) * 1000000 / (cur_time_us - last_time_us_+1));
last_thread_querynum_ = thread_querynum_;
last_time_us_ = cur_time_us;
}

Cmd* GetCmd(const std::string& opt) {
return GetCmdFromTable(opt, cmds_);
}

private:
slash::Mutex mutex_; // protect cron_task_
std::queue<WorkerCronTask> cron_tasks_;

uint64_t thread_querynum_;
uint64_t last_thread_querynum_;
uint64_t last_time_us_;
uint64_t last_sec_thread_querynum_;

std::unordered_map<std::string, Cmd*> cmds_;
Expand Down
Binary file added lib/ubuntu/libglog.so.0
Binary file not shown.
Loading

0 comments on commit e04d88b

Please sign in to comment.