Skip to content

Commit

Permalink
pkscanrange and pkrscanrange support hashtag in non classic mode (Ope…
Browse files Browse the repository at this point in the history
…nAtomFoundation#962)

* feature: support hashtag in sharding mode

* feature: pkscanrange and pkrscanrange support hashtag in non classic
mode

* feature: rpoplpush support hashtag
  • Loading branch information
kernelai authored Oct 15, 2020
1 parent fd90670 commit 2929eaf
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 9 deletions.
5 changes: 5 additions & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ class CmdRes {
kInvalidIndex,
kInvalidDbType,
kInvalidTable,
kInconsistentHashTag,
kErrOther
};

Expand Down Expand Up @@ -355,6 +356,8 @@ class CmdRes {
result.append(message_);
result.append("'\r\n");
break;
case kInconsistentHashTag:
return "-ERR parameters hashtag is inconsistent\r\n";
case kInvalidTable:
result = "-ERR invalid Table for '";
result.append(message_);
Expand Down Expand Up @@ -458,6 +461,8 @@ class Cmd: public std::enable_shared_from_this<Cmd> {
bool is_admin_require() const;
bool is_single_partition() const;
bool is_multi_partition() const;
bool is_classic_mode() const;
bool HashtagIsConsistent(const std::string& lhs, const std::string& rhs) const;

std::string name() const;
CmdRes& res();
Expand Down
2 changes: 2 additions & 0 deletions include/pika_data_distribution.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ class Crc32 : public PikaDataDistribution {
uint32_t crc32tab[256];
};

std::string GetHashkey(const std::string& key);

#endif
12 changes: 11 additions & 1 deletion include/pika_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,11 @@ class PKScanRangeCmd : public Cmd {
public:
PKScanRangeCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag), pattern_("*"), limit_(10), string_with_value(false) {}
std::vector<std::string> current_key() const override {
std::vector<std::string> res;
res.push_back(key_start_);
return res;
}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
Expand All @@ -786,6 +791,11 @@ class PKRScanRangeCmd : public Cmd {
public:
PKRScanRangeCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag), pattern_("*"), limit_(10), string_with_value(false) {}
std::vector<std::string> current_key() const override {
std::vector<std::string> res;
res.push_back(key_start_);
return res;
}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
Expand All @@ -799,7 +809,7 @@ class PKRScanRangeCmd : public Cmd {
std::string pattern_;
int64_t limit_;
bool string_with_value;
virtual void DoInitial() override;
void DoInitial() override;
virtual void Clear() {
pattern_ = "*";
limit_ = 10;
Expand Down
5 changes: 5 additions & 0 deletions include/pika_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ class RPopLPushCmd : public Cmd {
public:
RPopLPushCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag) {};
std::vector<std::string> current_key() const override{
std::vector<std::string> res;
res.push_back(source_);
return res;
}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
Expand Down
6 changes: 2 additions & 4 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ static std::set<std::string> MultiKvCommands {kCmdNameDel,

static std::set<std::string> ConsensusNotSupportCommands {
kCmdNameMsetnx, kCmdNameScan, kCmdNameKeys,
kCmdNameScanx, kCmdNamePKScanRange, kCmdNamePKRScanRange,
kCmdNameRPopLPush, kCmdNameZUnionstore, kCmdNameZInterstore,
kCmdNameSUnion, kCmdNameSUnionstore, kCmdNameSInter,
kCmdNameSInterstore, kCmdNameSDiff, kCmdNameSDiffstore,
Expand All @@ -59,12 +58,11 @@ static std::set<std::string> ConsensusNotSupportCommands {
kCmdNameGeoPos, kCmdNameGeoDist, kCmdNameGeoHash,
kCmdNameGeoRadius, kCmdNameGeoRadiusByMember, kCmdNamePKPatternMatchDel,
kCmdNameSlaveof, kCmdNameDbSlaveof, kCmdNameMset,
kCmdNameMget};
kCmdNameMget, kCmdNameScanx};

static std::set<std::string> ShardingModeNotSupportCommands {
kCmdNameMsetnx, kCmdNameScan, kCmdNameKeys,
kCmdNameScanx, kCmdNamePKScanRange, kCmdNamePKRScanRange,
kCmdNameRPopLPush, kCmdNameZUnionstore, kCmdNameZInterstore,
kCmdNameScanx, kCmdNameZUnionstore, kCmdNameZInterstore,
kCmdNameSUnion, kCmdNameSUnionstore, kCmdNameSInter,
kCmdNameSInterstore, kCmdNameSDiff, kCmdNameSDiffstore,
kCmdNameSMove, kCmdNameBitOp, kCmdNamePfAdd,
Expand Down
21 changes: 18 additions & 3 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,10 @@ void InitCmdTable(std::unordered_map<std::string, Cmd*> *cmd_table) {
Cmd* pksetexatptr = new PKSetexAtCmd(kCmdNamePKSetexAt, 4, kCmdFlagsWrite | kCmdFlagsSinglePartition | kCmdFlagsKv);
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNamePKSetexAt, pksetexatptr));
////PKScanRange
Cmd* pkscanrangeptr = new PKScanRangeCmd(kCmdNamePKScanRange, -4, kCmdFlagsRead | kCmdFlagsMultiPartition | kCmdFlagsKv);
Cmd* pkscanrangeptr = new PKScanRangeCmd(kCmdNamePKScanRange, -4, kCmdFlagsRead | kCmdFlagsSinglePartition | kCmdFlagsKv);
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNamePKScanRange, pkscanrangeptr));
////PKRScanRange
Cmd* pkrscanrangeptr = new PKRScanRangeCmd(kCmdNamePKRScanRange, -4, kCmdFlagsRead | kCmdFlagsMultiPartition | kCmdFlagsKv);
Cmd* pkrscanrangeptr = new PKRScanRangeCmd(kCmdNamePKRScanRange, -4, kCmdFlagsRead | kCmdFlagsSinglePartition | kCmdFlagsKv);
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNamePKRScanRange, pkrscanrangeptr));

//Hash
Expand Down Expand Up @@ -309,7 +309,7 @@ void InitCmdTable(std::unordered_map<std::string, Cmd*> *cmd_table) {
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameLTrim, ltrimptr));
Cmd* rpopptr = new RPopCmd(kCmdNameRPop, 2, kCmdFlagsWrite | kCmdFlagsSinglePartition | kCmdFlagsList);
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameRPop, rpopptr));
Cmd* rpoplpushptr = new RPopLPushCmd(kCmdNameRPopLPush, 3, kCmdFlagsWrite | kCmdFlagsMultiPartition | kCmdFlagsList);
Cmd* rpoplpushptr = new RPopLPushCmd(kCmdNameRPopLPush, 3, kCmdFlagsWrite | kCmdFlagsSinglePartition | kCmdFlagsList);
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameRPopLPush, rpoplpushptr));
Cmd* rpushptr = new RPushCmd(kCmdNameRPush, -3, kCmdFlagsWrite | kCmdFlagsSinglePartition | kCmdFlagsList);
cmd_table->insert(std::pair<std::string, Cmd*>(kCmdNameRPush, rpushptr));
Expand Down Expand Up @@ -733,6 +733,7 @@ void Cmd::ProcessMultiPartitionCmd() {
res_.SetRes(CmdRes::kErrOther, "Table not found");
}
for (auto& key : cur_key) {
LOG(INFO) << "process key: "<< key;
// in sharding mode we select partition by key
uint32_t partition_id = g_pika_cmd_table_manager->DistributeKey(key, table->PartitionNum());
std::unordered_map<uint32_t, ProcessArg>::iterator iter = process_map.find(partition_id);
Expand Down Expand Up @@ -792,6 +793,20 @@ bool Cmd::is_multi_partition() const {
return ((flag_ & kCmdFlagsMaskPartition) == kCmdFlagsMultiPartition);
}

bool Cmd::is_classic_mode() const {
return g_pika_conf->classic_mode();
}

bool Cmd::HashtagIsConsistent(const std::string& lhs, const std::string& rhs) const {
if (is_classic_mode() == false) {
if (GetHashkey(lhs) != GetHashkey(rhs)) {
return false;
}
}
return true;
}


std::string Cmd::name() const {
return name_;
}
Expand Down
19 changes: 18 additions & 1 deletion src/pika_data_distribution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

#include "include/pika_data_distribution.h"

const std::string kTagBegin = "{";
const std::string kTagEnd = "}";

void HashModulo::Init() {
}

Expand Down Expand Up @@ -32,7 +35,8 @@ void Crc32::Crc32TableInit(uint32_t poly) {
}

uint32_t Crc32::Distribute(const std::string &str, uint32_t partition_num) {
uint32_t crc = Crc32Update(0, str.data(), (int)str.size());
std::string key = GetHashkey(str);
uint32_t crc = Crc32Update(0, key.data(), (int)key.size());
assert(partition_num != 0);
return crc % partition_num;
}
Expand All @@ -45,3 +49,16 @@ uint32_t Crc32::Crc32Update(uint32_t crc, const char* buf, int len) {
}
return ~crc;
}

std::string GetHashkey(const std::string& key) {
auto beg = key.find_first_of(kTagBegin);
if (beg == std::string::npos) {
return key;
}
auto end = key.find_first_of(kTagEnd, beg + 1);
if (end == std::string::npos) {
return key;
} else {
return key.substr(beg + 1, end - beg - 1);
}
}
11 changes: 11 additions & 0 deletions src/pika_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "slash/include/slash_string.h"

#include "include/pika_conf.h"
#include "include/pika_data_distribution.h"
#include "include/pika_binlog_transverter.h"

extern PikaConf *g_pika_conf;
Expand Down Expand Up @@ -1396,6 +1397,11 @@ void PKScanRangeCmd::DoInitial() {

key_start_ = argv_[2];
key_end_ = argv_[3];
// start key and end key hash tag have to be same in non classic mode
if (!HashtagIsConsistent(key_start_, key_start_)) {
res_.SetRes(CmdRes::kInconsistentHashTag);
return;
}
size_t index = 4, argc = argv_.size();
while (index < argc) {
std::string opt = argv_[index];
Expand Down Expand Up @@ -1477,6 +1483,11 @@ void PKRScanRangeCmd::DoInitial() {

key_start_ = argv_[2];
key_end_ = argv_[3];
// start key and end key hash tag have to be same in non classic mode
if (!HashtagIsConsistent(key_start_, key_start_)) {
res_.SetRes(CmdRes::kInconsistentHashTag);
return;
}
size_t index = 4, argc = argv_.size();
while (index < argc) {
std::string opt = argv_[index];
Expand Down
4 changes: 4 additions & 0 deletions src/pika_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "include/pika_list.h"

#include "slash/include/slash_string.h"
#include "include/pika_data_distribution.h"

void LIndexCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
Expand Down Expand Up @@ -268,6 +269,9 @@ void RPopLPushCmd::DoInitial() {
}
source_ = argv_[1];
receiver_ = argv_[2];
if (!HashtagIsConsistent(source_, receiver_)) {
res_.SetRes(CmdRes::kInconsistentHashTag);
}
}
void RPopLPushCmd::Do(std::shared_ptr<Partition> partition) {
std::string value;
Expand Down

0 comments on commit 2929eaf

Please sign in to comment.