Skip to content

Commit

Permalink
Implement multislots command "exists" and "del" in sharding mode. (Op…
Browse files Browse the repository at this point in the history
  • Loading branch information
whoiami committed May 21, 2020
1 parent 3ddf1bb commit ff36e83
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 6 deletions.
14 changes: 8 additions & 6 deletions include/pika_kv.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,20 @@ class GetCmd : public Cmd {
class DelCmd : public Cmd {
public:
DelCmd(const std::string& name , int arity, uint16_t flag)
: Cmd(name, arity, flag) {};
: Cmd(name, arity, flag), split_res_(0) {};
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual std::vector<std::string> current_key() const {
return keys_;
}
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys);
virtual void Merge();
virtual Cmd* Clone() override {
return new DelCmd(*this);
}

private:
std::vector<std::string> keys_;
int64_t split_res_;
virtual void DoInitial() override;
};

Expand Down Expand Up @@ -489,19 +490,20 @@ class StrlenCmd : public Cmd {
class ExistsCmd : public Cmd {
public:
ExistsCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag) {}
: Cmd(name, arity, flag), split_res_(0) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual std::vector<std::string> current_key() const {
return keys_;
}
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys);
virtual void Merge();
virtual Cmd* Clone() override {
return new ExistsCmd(*this);
}

private:
std::vector<std::string> keys_;
int64_t split_res_;
virtual void DoInitial() override;
};

Expand Down
3 changes: 3 additions & 0 deletions src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,9 @@ void Cmd::ProcessMultiPartitionCmd() {
for (auto& iter : process_map) {
ProcessArg& arg = iter.second;
ProcessCommand(arg.partition, arg.sync_partition, arg.hint_keys);
if (!res_.ok()) {
return;
}
}
Merge();
}
Expand Down
31 changes: 31 additions & 0 deletions src/pika_kv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,22 @@ void DelCmd::Do(std::shared_ptr<Partition> partition) {
return;
}

void DelCmd::Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {
std::map<blackwidow::DataType, blackwidow::Status> type_status;
int64_t count = partition->db()->Del(hint_keys.keys, &type_status);
if (count >= 0) {
split_res_ += count;
} else {
res_.SetRes(CmdRes::kErrOther, "delete error");
}
return;
}

void DelCmd::Merge() {
res_.AppendInteger(split_res_);
return;
}

void IncrCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameIncr);
Expand Down Expand Up @@ -843,6 +859,21 @@ void ExistsCmd::Do(std::shared_ptr<Partition> partition) {
return;
}

void ExistsCmd::Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {
std::map<blackwidow::DataType, rocksdb::Status> type_status;
int64_t res = partition->db()->Exists(hint_keys.keys, &type_status);
if (res != -1) {
split_res_ += res;
} else {
res_.SetRes(CmdRes::kErrOther, "exists internal error");
}
}

void ExistsCmd::Merge() {
res_.AppendInteger(split_res_);
return;
}

void ExpireCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameExpire);
Expand Down

0 comments on commit ff36e83

Please sign in to comment.