Skip to content

Commit

Permalink
To process multislots command, add Func Split and Merge for Cmd class (
Browse files Browse the repository at this point in the history
  • Loading branch information
whoiami committed May 21, 2020
1 parent 1fdd10e commit 3ddf1bb
Show file tree
Hide file tree
Showing 17 changed files with 508 additions and 19 deletions.
50 changes: 50 additions & 0 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class SlaveofCmd : public Cmd {
SlaveofCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag), is_noone_(false) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new SlaveofCmd(*this);
}
Expand All @@ -44,6 +46,8 @@ class DbSlaveofCmd : public Cmd {
DbSlaveofCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new DbSlaveofCmd(*this);
}
Expand All @@ -69,6 +73,8 @@ class AuthCmd : public Cmd {
AuthCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new AuthCmd(*this);
}
Expand All @@ -83,6 +89,8 @@ class BgsaveCmd : public Cmd {
BgsaveCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new BgsaveCmd(*this);
}
Expand All @@ -100,6 +108,8 @@ class CompactCmd : public Cmd {
CompactCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new CompactCmd(*this);
}
Expand All @@ -119,6 +129,8 @@ class PurgelogstoCmd : public Cmd {
PurgelogstoCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag), num_(0) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new PurgelogstoCmd(*this);
}
Expand All @@ -134,6 +146,8 @@ class PingCmd : public Cmd {
PingCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new PingCmd(*this);
}
Expand All @@ -147,6 +161,8 @@ class SelectCmd : public Cmd {
SelectCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new SelectCmd(*this);
}
Expand All @@ -164,6 +180,8 @@ class FlushallCmd : public Cmd {
FlushallCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new FlushallCmd(*this);
}
Expand All @@ -183,6 +201,8 @@ class FlushdbCmd : public Cmd {
FlushdbCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new FlushdbCmd(*this);
}
Expand All @@ -202,6 +222,8 @@ class ClientCmd : public Cmd {
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
const static std::string CLIENT_LIST_S;
const static std::string CLIENT_KILL_S;
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new ClientCmd(*this);
}
Expand Down Expand Up @@ -232,6 +254,8 @@ class InfoCmd : public Cmd {
InfoCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag), rescan_(false), off_(false) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new InfoCmd(*this);
}
Expand Down Expand Up @@ -278,6 +302,8 @@ class ShutdownCmd : public Cmd {
ShutdownCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new ShutdownCmd(*this);
}
Expand All @@ -291,6 +317,8 @@ class ConfigCmd : public Cmd {
ConfigCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new ConfigCmd(*this);
}
Expand All @@ -309,6 +337,8 @@ class MonitorCmd : public Cmd {
MonitorCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new MonitorCmd(*this);
}
Expand All @@ -322,6 +352,8 @@ class DbsizeCmd : public Cmd {
DbsizeCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new DbsizeCmd(*this);
}
Expand All @@ -335,6 +367,8 @@ class TimeCmd : public Cmd {
TimeCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new TimeCmd(*this);
}
Expand All @@ -348,6 +382,8 @@ class DelbackupCmd : public Cmd {
DelbackupCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new DelbackupCmd(*this);
}
Expand All @@ -361,6 +397,8 @@ class EchoCmd : public Cmd {
EchoCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new EchoCmd(*this);
}
Expand All @@ -375,6 +413,8 @@ class ScandbCmd : public Cmd {
ScandbCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag), type_(blackwidow::kAll) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new ScandbCmd(*this);
}
Expand All @@ -393,6 +433,8 @@ class SlowlogCmd : public Cmd {
SlowlogCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag), condition_(kGET) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new SlowlogCmd(*this);
}
Expand All @@ -411,6 +453,8 @@ class PaddingCmd : public Cmd {
PaddingCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new PaddingCmd(*this);
}
Expand All @@ -431,6 +475,8 @@ class TcmallocCmd : public Cmd {
TcmallocCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new TcmallocCmd(*this);
}
Expand All @@ -447,6 +493,8 @@ class PKPatternMatchDelCmd : public Cmd {
PKPatternMatchDelCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new PKPatternMatchDelCmd(*this);
}
Expand All @@ -463,6 +511,8 @@ class DummyCmd : public Cmd {
DummyCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new DummyCmd(*this);
}
Expand Down
10 changes: 10 additions & 0 deletions include/pika_bit.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class BitGetCmd : public Cmd {
return res;
}
virtual void Do(std::shared_ptr<Partition> partition = nullptr) override;
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new BitGetCmd(*this);
}
Expand All @@ -47,6 +49,8 @@ class BitSetCmd : public Cmd {
return res;
}
virtual void Do(std::shared_ptr<Partition> partition = nullptr) override;
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new BitSetCmd(*this);
}
Expand All @@ -72,6 +76,8 @@ class BitCountCmd : public Cmd {
return res;
}
virtual void Do(std::shared_ptr<Partition> partition = nullptr) override;
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new BitCountCmd(*this);
}
Expand Down Expand Up @@ -99,6 +105,8 @@ class BitPosCmd : public Cmd {
return res;
}
virtual void Do(std::shared_ptr<Partition> partition = nullptr) override;
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new BitPosCmd(*this);
}
Expand All @@ -125,6 +133,8 @@ class BitOpCmd : public Cmd {
BitOpCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag) {};
virtual void Do(std::shared_ptr<Partition> partition = nullptr) override;
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new BitOpCmd(*this);
}
Expand Down
12 changes: 12 additions & 0 deletions include/pika_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class PkClusterInfoCmd : public Cmd {
: Cmd(name, arity, flag),
info_section_(kInfoErr), info_range_(kAll) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new PkClusterInfoCmd(*this);
}
Expand Down Expand Up @@ -78,6 +80,8 @@ class PkClusterAddSlotsCmd : public SlotParentCmd {
public:
PkClusterAddSlotsCmd(const std::string& name, int arity, uint16_t flag)
: SlotParentCmd(name, arity, flag) {}
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new PkClusterAddSlotsCmd(*this);
}
Expand All @@ -92,6 +96,8 @@ class PkClusterDelSlotsCmd : public SlotParentCmd {
PkClusterDelSlotsCmd(const std::string& name, int32_t arity, uint16_t flag)
: SlotParentCmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new PkClusterDelSlotsCmd(*this);
}
Expand All @@ -105,6 +111,8 @@ class PkClusterSlotsSlaveofCmd : public Cmd {
PkClusterSlotsSlaveofCmd(const std::string& name , int arity, uint16_t flag)
: Cmd(name, arity, flag) {}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
virtual Cmd* Clone() override {
return new PkClusterSlotsSlaveofCmd(*this);
}
Expand All @@ -130,6 +138,8 @@ class PkClusterAddTableCmd : public Cmd {
public:
PkClusterAddTableCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag), slot_num_(0) {}
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
Cmd* Clone() override {
return new PkClusterAddTableCmd(*this);
}
Expand All @@ -148,6 +158,8 @@ class PkClusterDelTableCmd : public PkClusterDelSlotsCmd {
public:
PkClusterDelTableCmd(const std::string& name, int arity, uint16_t flag)
: PkClusterDelSlotsCmd(name, arity, flag) {}
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) {};
virtual void Merge() {};
Cmd* Clone() override {
return new PkClusterDelTableCmd(*this);
}
Expand Down
31 changes: 28 additions & 3 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,28 @@ class Cmd: public std::enable_shared_from_this<Cmd> {
kBinlogStage,
kExecuteStage
};
struct HintKeys {
HintKeys() {}
void Push(const std::string& key, int hint) {
keys.push_back(key);
hints.push_back(hint);
}
bool empty() const {
return keys.empty() && hints.empty();
}
std::vector<std::string> keys;
std::vector<int> hints;
};
struct ProcessArg {
ProcessArg() {}
ProcessArg(std::shared_ptr<Partition> _partition,
std::shared_ptr<SyncMasterPartition> _sync_partition,
HintKeys _hint_keys) : partition(_partition),
sync_partition(_sync_partition), hint_keys(_hint_keys) {}
std::shared_ptr<Partition> partition;
std::shared_ptr<SyncMasterPartition> sync_partition;
HintKeys hint_keys;
};
Cmd(const std::string& name, int arity, uint16_t flag)
: name_(name), arity_(arity), flag_(flag), stage_(kNone) {}
virtual ~Cmd() {}
Expand All @@ -423,6 +445,9 @@ class Cmd: public std::enable_shared_from_this<Cmd> {
virtual void ProcessDoNotSpecifyPartitionCmd();
virtual void Do(std::shared_ptr<Partition> partition = nullptr) = 0;
virtual Cmd* Clone() = 0;
// used for execute multikey command into different slots
virtual void Split(std::shared_ptr<Partition> partition, const HintKeys& hint_keys) = 0;
virtual void Merge() = 0;

void Initial(const PikaCmdArgsType& argv,
const std::string& table_name);
Expand Down Expand Up @@ -456,10 +481,10 @@ class Cmd: public std::enable_shared_from_this<Cmd> {
// enable copy, used default copy
//Cmd(const Cmd&);
void ProcessCommand(std::shared_ptr<Partition> partition,
std::shared_ptr<SyncMasterPartition> sync_partition);
std::shared_ptr<SyncMasterPartition> sync_partition, const HintKeys& hint_key = HintKeys());
void InternalProcessCommand(std::shared_ptr<Partition> partition,
std::shared_ptr<SyncMasterPartition> sync_partition);
void DoCommand(std::shared_ptr<Partition> partition);
std::shared_ptr<SyncMasterPartition> sync_partition, const HintKeys& hint_key);
void DoCommand(std::shared_ptr<Partition> partition, const HintKeys& hint_key);
void DoBinlog(std::shared_ptr<SyncMasterPartition> partition);
bool CheckArg(int num) const;
void LogCommand() const;
Expand Down
Loading

0 comments on commit 3ddf1bb

Please sign in to comment.