diff --git a/include/pika_kv.h b/include/pika_kv.h index 827cce58d6..41962fc570 100644 --- a/include/pika_kv.h +++ b/include/pika_kv.h @@ -76,19 +76,20 @@ class GetCmd : public Cmd { class DelCmd : public Cmd { public: DelCmd(const std::string& name , int arity, uint16_t flag) - : Cmd(name, arity, flag) {}; + : Cmd(name, arity, flag), split_res_(0) {}; virtual void Do(std::shared_ptr partition = nullptr); virtual std::vector current_key() const { return keys_; } - virtual void Split(std::shared_ptr partition, const HintKeys& hint_keys) {}; - virtual void Merge() {}; + virtual void Split(std::shared_ptr partition, const HintKeys& hint_keys); + virtual void Merge(); virtual Cmd* Clone() override { return new DelCmd(*this); } private: std::vector keys_; + int64_t split_res_; virtual void DoInitial() override; }; @@ -489,19 +490,20 @@ class StrlenCmd : public Cmd { class ExistsCmd : public Cmd { public: ExistsCmd(const std::string& name, int arity, uint16_t flag) - : Cmd(name, arity, flag) {} + : Cmd(name, arity, flag), split_res_(0) {} virtual void Do(std::shared_ptr partition = nullptr); virtual std::vector current_key() const { return keys_; } - virtual void Split(std::shared_ptr partition, const HintKeys& hint_keys) {}; - virtual void Merge() {}; + virtual void Split(std::shared_ptr partition, const HintKeys& hint_keys); + virtual void Merge(); virtual Cmd* Clone() override { return new ExistsCmd(*this); } private: std::vector keys_; + int64_t split_res_; virtual void DoInitial() override; }; diff --git a/src/pika_command.cc b/src/pika_command.cc index 3a900c21f4..952b458eb1 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -760,6 +760,9 @@ void Cmd::ProcessMultiPartitionCmd() { for (auto& iter : process_map) { ProcessArg& arg = iter.second; ProcessCommand(arg.partition, arg.sync_partition, arg.hint_keys); + if (!res_.ok()) { + return; + } } Merge(); } diff --git a/src/pika_kv.cc b/src/pika_kv.cc index 956fa0b060..12a9bd4a24 100644 --- a/src/pika_kv.cc +++ b/src/pika_kv.cc @@ -185,6 +185,22 @@ void DelCmd::Do(std::shared_ptr partition) { return; } +void DelCmd::Split(std::shared_ptr partition, const HintKeys& hint_keys) { + std::map type_status; + int64_t count = partition->db()->Del(hint_keys.keys, &type_status); + if (count >= 0) { + split_res_ += count; + } else { + res_.SetRes(CmdRes::kErrOther, "delete error"); + } + return; +} + +void DelCmd::Merge() { + res_.AppendInteger(split_res_); + return; +} + void IncrCmd::DoInitial() { if (!CheckArg(argv_.size())) { res_.SetRes(CmdRes::kWrongNum, kCmdNameIncr); @@ -843,6 +859,21 @@ void ExistsCmd::Do(std::shared_ptr partition) { return; } +void ExistsCmd::Split(std::shared_ptr partition, const HintKeys& hint_keys) { + std::map type_status; + int64_t res = partition->db()->Exists(hint_keys.keys, &type_status); + if (res != -1) { + split_res_ += res; + } else { + res_.SetRes(CmdRes::kErrOther, "exists internal error"); + } +} + +void ExistsCmd::Merge() { + res_.AppendInteger(split_res_); + return; +} + void ExpireCmd::DoInitial() { if (!CheckArg(argv_.size())) { res_.SetRes(CmdRes::kWrongNum, kCmdNameExpire);