Skip to content

Commit

Permalink
pika distinguish classic mode and sharding mode (OpenAtomFoundation#486)
Browse files Browse the repository at this point in the history
  • Loading branch information
Axlgrep committed May 15, 2019
1 parent 7666574 commit 06de839
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 47 deletions.
12 changes: 10 additions & 2 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,16 @@ masterauth :
userpass :
# User Blacklist
userblacklist :
# Pika Tables
table-struct : table1:1,table2:1
# if this option is set to 'classic', that means pika support multiple DB, in
# this mode, option db-list enable
# if this option is set to 'sharding', that means pika support multiple Table, you
# can specify partition num for each table, in this mode, option table-list enable
# Pika instance mode [classic | sharding]
instance-mode : classic
# Db list
db-list : db1,db2
# Table list
table-list : table1:1,table2:1
# Dump Prefix
dump-prefix :
# daemonize [yes | no]
Expand Down
2 changes: 2 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class PikaConf : public slash::BaseConf {
const std::vector<std::string>& vuser_blacklist() {
RWLock l(&rwlock_, false); return user_blacklist_;
}
bool classic_mode() { RWLock l(&rwlock_, false); return classic_mode_;}
const std::vector<TableStruct>& table_structs() {
RWLock l(&rwlock_, false);
return table_structs_;
Expand Down Expand Up @@ -262,6 +263,7 @@ class PikaConf : public slash::BaseConf {
std::string masterauth_;
std::string userpass_;
std::vector<std::string> user_blacklist_;
bool classic_mode_;
std::vector<TableStruct> table_structs_;
std::string default_table_;
std::string bgsave_path_;
Expand Down
1 change: 1 addition & 0 deletions include/pika_partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class Partition : public std::enable_shared_from_this<Partition> {

std::string db_path_;
std::string log_path_;
std::string bgsave_sub_path_;
std::string partition_name_;

std::shared_ptr<Binlog> logger_;
Expand Down
12 changes: 10 additions & 2 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ using slash::Slice;
static std::unordered_set<std::string> CurrentNotSupportCommands {kCmdNameSlaveof,
kCmdNamePurgelogsto };

static std::set<std::string> ShardingModeNotSupportCommands {kCmdNameDel,
kCmdNameMget, kCmdNameKeys, kCmdNameMset,
kCmdNameMsetnx, kCmdNameScan, kCmdNameScanx,
kCmdNamePKScanRange, kCmdNamePKRScanRange, kCmdNameRPopLPush,
kCmdNameZUnionstore, kCmdNameZInterstore, kCmdNameSUnion,
kCmdNameSUnionstore, kCmdNameSInter, kCmdNameSInterstore,
kCmdNameSDiff, kCmdNameSDiffstore, kCmdNameSMove,
kCmdNamePfCount, kCmdNamePfMerge};

extern PikaConf *g_pika_conf;

enum TaskType {
Expand Down Expand Up @@ -159,8 +168,7 @@ class PikaServer {
void PartitionRecordUnLock(const std::string& table_name,
const std::string& key);
bool IsTableExist(const std::string& table_name);
bool IsTableSupportCommand(const std::string& table_name,
const std::string& command);
bool IsCommandSupport(const std::string& command);
uint32_t GetPartitionNumByTable(const std::string& table_name);
std::shared_ptr<Partition> GetTablePartitionById(
const std::string& table_name,
Expand Down
9 changes: 0 additions & 9 deletions include/pika_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,6 @@
#include "include/pika_command.h"
#include "include/pika_partition.h"

static std::set<std::string> TableMayNotSupportCommands {kCmdNameDel,
kCmdNameMget, kCmdNameKeys, kCmdNameMset,
kCmdNameMsetnx, kCmdNameScan, kCmdNameScanx,
kCmdNamePKScanRange, kCmdNamePKRScanRange, kCmdNameRPopLPush,
kCmdNameZUnionstore, kCmdNameZInterstore, kCmdNameSUnion,
kCmdNameSUnionstore, kCmdNameSInter, kCmdNameSInterstore,
kCmdNameSDiff, kCmdNameSDiffstore, kCmdNameSMove,
kCmdNamePfCount, kCmdNamePfMerge};
/*
*Keyscan used
*/
Expand Down Expand Up @@ -51,7 +43,6 @@ class Table : public std::enable_shared_from_this<Table>{
void CompactTable(const blackwidow::DataType& type);
bool FlushAllTable();
bool FlushDbTable(const std::string& db_name);
bool IsCommandSupport(const std::string& cmd) const;
bool IsBinlogIoError();
uint32_t PartitionNum();

Expand Down
6 changes: 3 additions & 3 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,11 @@ std::string PikaClientConn::DoCmd(const PikaCmdArgsType& argv,
}

if (!g_pika_server->IsCommandCurrentSupport(opt)) {
return "-ERR Command current not support\r\n";
return "-ERR This command current not support\r\n";
}

if (!g_pika_server->IsTableSupportCommand(current_table_, opt)) {
return "-ERR CROSS PARTITION Keys in this request command may don't hash to the same partition\r\n";
if (!g_pika_server->IsCommandSupport(opt)) {
return "-ERR This command only support in classic mode\r\n";
}

// TODO: Consider special commands, like flushall, flushdb?
Expand Down
40 changes: 29 additions & 11 deletions src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "include/pika_conf.h"

#include <algorithm>
#include <strings.h>

PikaConf::PikaConf(const std::string& path):
slash::BaseConf(path), conf_path_(path)
Expand Down Expand Up @@ -69,17 +70,34 @@ int PikaConf::Load()
slash::StringToLower(item);
}

int32_t partition_num;
std::vector<std::string> elems;
std::vector<std::string> table_struct_strs;
std::unordered_set<std::string> unique;
GetConfStrVec("table-struct", &table_struct_strs);
for (const auto& item : table_struct_strs) {
slash::StringSplit(item, ':', elems);
if (elems.size() == 2 && unique.find(elems[0]) == unique.end()) {
partition_num = atoi(elems[1].data());
table_structs_.push_back({elems[0], partition_num > 0 ? static_cast<uint32_t>(partition_num) : 1});
unique.insert(elems[0]);
std::string instance_mode;
GetConfStr("instance-mode", &instance_mode);
classic_mode_ = !strcasecmp(instance_mode.data(), "classic");

if (classic_mode_) {
std::vector<std::string> db_list;
std::unordered_set<std::string> unique;
GetConfStrVec("db-list", &db_list);
for (const auto& db : db_list) {
if (!unique.count(db)) {
table_structs_.push_back({db, 1});
unique.insert(db);
}
}
} else {
int32_t partition_num;
std::vector<std::string> elems;
std::vector<std::string> table_strs;
std::unordered_set<std::string> unique;
GetConfStrVec("table-list", &table_strs);
for (const auto& item : table_strs) {
slash::StringSplit(item, ':', elems);
if (elems.size() == 2 && !unique.count(elems[0])) {
partition_num = atoi(elems[1].data());
table_structs_.push_back({elems[0], partition_num > 0
? static_cast<uint32_t>(partition_num) : 1});
unique.insert(elems[0]);
}
}
}
if (table_structs_.empty()) {
Expand Down
21 changes: 17 additions & 4 deletions src/pika_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ std::string PartitionName(const std::string& table_name,
return std::string(buf);
}

std::string BgsaveSubPath(const std::string& table_name,
uint32_t partition_id) {
char buf[256];
std::string partition_id_str = std::to_string(partition_id);
snprintf(buf, sizeof(buf), "%s/%s", table_name.data(), partition_id_str.data());
return std::string(buf);
}

Partition::Partition(const std::string& table_name,
uint32_t partition_id,
const std::string& table_db_path,
Expand All @@ -37,9 +45,14 @@ Partition::Partition(const std::string& table_name,
bgsave_engine_(NULL),
purging_(false) {

db_path_ = PartitionPath(table_db_path, partition_id_);
log_path_ = PartitionPath(table_log_path, partition_id_);
partition_name_ = PartitionName(table_name_, partition_id_);
db_path_ = g_pika_conf->classic_mode() ?
table_db_path : PartitionPath(table_db_path, partition_id_);
log_path_ = g_pika_conf->classic_mode() ?
table_log_path : PartitionPath(table_log_path, partition_id_);
bgsave_sub_path_ = g_pika_conf->classic_mode() ?
table_name : BgsaveSubPath(table_name_, partition_id_);
partition_name_ = g_pika_conf->classic_mode() ?
table_name : PartitionName(table_name_, partition_id_);

pthread_rwlock_init(&db_rwlock_, NULL);

Expand Down Expand Up @@ -244,7 +257,7 @@ bool Partition::InitBgsaveEnv() {
int len = strftime(s_time, sizeof(s_time), "%Y%m%d%H%M%S", localtime(&bgsave_info_.start_time));
bgsave_info_.s_start_time.assign(s_time, len);
std::string time_sub_path = g_pika_conf->bgsave_prefix() + std::string(s_time, 8);
bgsave_info_.path = g_pika_conf->bgsave_path() + time_sub_path + "/" + table_name_ + "/" + std::to_string(partition_id_);
bgsave_info_.path = g_pika_conf->bgsave_path() + time_sub_path + "/" + bgsave_sub_path_;
if (!slash::DeleteDirIfExist(bgsave_info_.path)) {
LOG(WARNING) << partition_name_ << " remove exist bgsave dir failed";
return false;
Expand Down
11 changes: 6 additions & 5 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,13 +386,14 @@ bool PikaServer::IsTableExist(const std::string& table_name) {
return GetTable(table_name) ? true : false;
}

bool PikaServer::IsTableSupportCommand(const std::string& table_name,
const std::string& command) {
std::shared_ptr<Table> table = GetTable(table_name);
if (table && table->IsCommandSupport(command)) {
bool PikaServer::IsCommandSupport(const std::string& command) {
if (g_pika_conf->classic_mode()) {
return true;
} else {
std::string cmd = command;
slash::StringToLower(cmd);
return !ShardingModeNotSupportCommands.count(cmd);
}
return false;
}

uint32_t PikaServer::GetPartitionNumByTable(const std::string& table_name) {
Expand Down
11 changes: 0 additions & 11 deletions src/pika_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,6 @@ bool Table::FlushDbTable(const std::string& db_name) {
return true;
}

bool Table::IsCommandSupport(const std::string& cmd) const {
if (partition_num_ == 1) {
return true;
} else {
std::string command = cmd;
slash::StringToLower(command);
auto iter = TableMayNotSupportCommands.find(command);
return (iter == TableMayNotSupportCommands.end()) ? true : false;
}
}

bool Table::IsBinlogIoError() {
slash::RWLock l(&partitions_rw_, false);
for (const auto& item : partitions_) {
Expand Down

0 comments on commit 06de839

Please sign in to comment.