Skip to content

Commit

Permalink
Merge branch 'pika2.0' of github.com:baotiao/pika into pika2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
flabby committed Apr 13, 2016
2 parents d2a5207 + a156b2c commit 7888efd
Show file tree
Hide file tree
Showing 52 changed files with 8,436 additions and 1,028 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ CXX = g++

ifeq ($(__REL), 1)
#CXXFLAGS = -Wall -W -DDEBUG -g -O0 -D__XDEBUG__ -fPIC -Wno-unused-function -std=c++11
CXXFLAGS = -O2 -g -pipe -fPIC -W -DNDEBUG -Wwrite-strings -Wpointer-arith -Wreorder -Wswitch -Wsign-promo -Wredundant-decls -Wformat -Wall -D_GNU_SOURCE -D__STDC_FORMAT_MACROS -std=c++11 -gdwarf-2 -Wno-redundant-decls
CXXFLAGS = -O2 -g -pipe -fPIC -W -DNDEBUG -Wwrite-strings -Wpointer-arith -Wreorder -Wswitch -Wsign-promo -Wredundant-decls -Wformat -Wall -Wno-unused-parameter -D_GNU_SOURCE -D__STDC_FORMAT_MACROS -std=c++11 -gdwarf-2 -Wno-redundant-decls
else
CXXFLAGS = -O0 -g -pg -pipe -fPIC -W -DDEBUG -Wwrite-strings -Wpointer-arith -Wreorder -Wswitch -Wsign-promo -Wredundant-decls -Wformat -Wall -D_GNU_SOURCE -std=c++11 -D__STDC_FORMAT_MACROS -std=c++11 -gdwarf-2 -Wno-redundant-decls
CXXFLAGS = -O0 -g -pg -pipe -fPIC -W -DDEBUG -Wwrite-strings -Wpointer-arith -Wreorder -Wswitch -Wsign-promo -Wredundant-decls -Wformat -Wall -Wno-unused-parameter -D_GNU_SOURCE -std=c++11 -D__STDC_FORMAT_MACROS -std=c++11 -Wno-redundant-decls
endif

OBJECT = pika
Expand Down
8 changes: 5 additions & 3 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@
port : 9221
# Thread Number
thread_num : 1
# Slave Thread Number
slave_thread_num : 1
# Pika log path
log_path : ./log/
# Pika glog level
log_level : 1
log_level : 0
# Pika db path
db_path : ./db/
# Pika write_buffer_size
Expand All @@ -16,6 +14,10 @@ write_buffer_size : 268435456
timeout : 60
# Requirepass
requirepass :
# Userpass
userpass :
# User Blacklist
userblacklist :
# Dump Prefix
dump_prefix :
# daemonize [yes | no]
Expand Down
196 changes: 196 additions & 0 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
#ifndef __PIKA_ADMIN_H__
#define __PIKA_ADMIN_H__
#include "pika_command.h"

/*
* Admin
*/
class SlaveofCmd : public Cmd {
public:
SlaveofCmd() : is_noone_(false), have_offset_(false),
filenum_(0), pro_offset_(0) {
}
virtual void Do();
private:
std::string master_ip_;
int64_t master_port_;
bool is_noone_;
bool have_offset_;
int64_t filenum_;
int64_t pro_offset_;
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
virtual void Clear() {
is_noone_ = false;
have_offset_ = false;
}
};

class TrysyncCmd : public Cmd {
public:
TrysyncCmd() {
}
virtual void Do();
private:
std::string slave_ip_;
int64_t slave_port_;
int64_t filenum_;
int64_t pro_offset_;
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class AuthCmd : public Cmd {
public:
AuthCmd() {
}
virtual void Do();
private:
std::string pwd_;
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class BgsaveCmd : public Cmd {
public:
BgsaveCmd() {
}
virtual void Do();
private:
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class BgsaveoffCmd : public Cmd {
public:
BgsaveoffCmd() {
}
virtual void Do();
private:
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class CompactCmd : public Cmd {
public:
CompactCmd() {
}
virtual void Do();
private:
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class PurgelogstoCmd : public Cmd {
public:
PurgelogstoCmd() : num_(0){
}
virtual void Do();
private:
uint32_t num_;
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class PingCmd : public Cmd {
public:
PingCmd() {
}
virtual void Do();
private:
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class SelectCmd : public Cmd {
public:
SelectCmd() {
}
virtual void Do();
private:
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class FlushallCmd : public Cmd {
public:
FlushallCmd() {
}
virtual void Do();
private:
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class ReadonlyCmd : public Cmd {
public:
ReadonlyCmd() : is_open_(false) {
}
virtual void Do();
private:
bool is_open_;
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class ClientCmd : public Cmd {
public:
ClientCmd() {
}
virtual void Do();
const static std::string CLIENT_LIST_S;
const static std::string CLIENT_KILL_S;
private:
std::string operation_, ip_port_;
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class InfoCmd : public Cmd {
public:
enum InfoSection {
kInfoErr = 0x0,
kInfoServer,
kInfoClients,
kInfoStats,
kInfoReplication,
kInfoKeyspace,
kInfoAll
};

InfoCmd() : rescan_(false) {
}
virtual void Do();
private:
InfoSection info_section_;
bool rescan_; //whether to rescan the keyspace

const static std::string kAllSection;
const static std::string kServerSection;
const static std::string kClientsSection;
const static std::string kStatsSection;
const static std::string kReplicationSection;
const static std::string kKeyspaceSection;

virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
virtual void Clear() {
rescan_ = false;
}

void InfoServer(std::string &info);
void InfoClients(std::string &info);
void InfoStats(std::string &info);
void InfoReplication(std::string &info);
void InfoKeyspace(std::string &info);
};

class ShutdownCmd : public Cmd {
public:
ShutdownCmd() {
}
virtual void Do();
private:
virtual void DoInitial(PikaCmdArgsType &argvs, const CmdInfo* const ptr_info);
};

class ConfigCmd : public Cmd {
public:
ConfigCmd() {
}
virtual void Do();
private:
std::vector<std::string> config_args_v_;
virtual void DoInitial(PikaCmdArgsType &argv, const CmdInfo* const ptr_info);
void ConfigGet(std::string &ret);
void ConfigSet(std::string &ret);
void ConfigRewrite(std::string &ret);
};
#endif
137 changes: 137 additions & 0 deletions include/pika_binlog.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
#ifndef PIKA_BINLOG_H_
#define PIKA_BINLOG_H_

#include <cstdio>
#include <list>
#include <string>
#include <deque>
#include <pthread.h>

#ifndef __STDC_FORMAT_MACROS
# define __STDC_FORMAT_MACROS
# include <inttypes.h>
#endif

#include "env.h"
//#include "port.h"
#include "pika_define.h"

#include "slash_status.h"
#include "slash_mutex.h"

using slash::Status;
using slash::Slice;


std::string NewFileName(const std::string name, const uint32_t current);

class Version;

class Binlog
{
public:
Binlog(const std::string& Binlog_path, const int file_size = 100 * 1024 * 1024);
~Binlog();

void Lock() { mutex_.Lock(); }
void Unlock() { mutex_.Unlock(); }

Status Put(const std::string &item);
Status Put(const char* item, int len);

Status GetProducerStatus(uint32_t* filenum, uint64_t* pro_offset);
/*
* Set Producer pro_num and pro_offset with lock
*/
Status SetProducerStatus(uint32_t filenum, uint64_t pro_offset);

static Status AppendBlank(slash::WritableFile *file, uint64_t len);

slash::WritableFile *queue() { return queue_; }


uint64_t file_size() {
return file_size_;
}

std::string filename;

private:

void InitLogFile();
Status EmitPhysicalRecord(RecordType t, const char *ptr, size_t n, int *temp_pro_offset);


/*
* Produce
*/
Status Produce(const Slice &item, int *pro_offset);

uint32_t consumer_num_;
uint64_t item_num_;

Version* version_;
slash::WritableFile *queue_;
slash::RWFile *versionfile_;

slash::Mutex mutex_;

uint32_t pro_num_;

int block_offset_;

char* pool_;
bool exit_all_consume_;
const std::string binlog_path_;

uint64_t file_size_;

// Not use
//int32_t retry_;

// No copying allowed
Binlog(const Binlog&);
void operator=(const Binlog&);
};

// We have to reserve the useless con_offset_, con_num_ and item_num,
// to be compatable with version 1.x .
class Version {
public:
Version(slash::RWFile *save);
~Version();

Status Init();

// RWLock should be held when access members.
Status StableSave();

uint32_t item_num() { return item_num_; }
void set_item_num(uint32_t item_num) { item_num_ = item_num; }
void plus_item_num() { item_num_++; }
void minus_item_num() { item_num_--; }

uint64_t pro_offset_;
uint32_t pro_num_;
pthread_rwlock_t rwlock_;

void debug() {
slash::RWLock(&rwlock_, false);
printf ("Current pro_num %u pro_offset %lu\n", pro_num_, pro_offset_);
}

private:

slash::RWFile *save_;

// Not used
uint64_t con_offset_;
uint32_t con_num_;
uint32_t item_num_;

// No copying allowed;
Version(const Version&);
void operator=(const Version&);
};

#endif
Loading

0 comments on commit 7888efd

Please sign in to comment.