Skip to content

Commit

Permalink
feature: support add/del table command (OpenAtomFoundation#866)
Browse files Browse the repository at this point in the history
pkcluster cmd add table option
del table with del slots
  • Loading branch information
kernelai authored and whoiami committed Mar 20, 2020
1 parent b5628c2 commit de21a23
Show file tree
Hide file tree
Showing 13 changed files with 463 additions and 50 deletions.
40 changes: 37 additions & 3 deletions include/pika_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class PkClusterInfoCmd : public Cmd {
class SlotParentCmd : public Cmd {
public:
SlotParentCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag) {}
: Cmd(name, arity, flag) {}

protected:
std::set<uint32_t> slots_;
Expand All @@ -58,6 +58,7 @@ class SlotParentCmd : public Cmd {
virtual void Clear() {
slots_.clear();
p_infos_.clear();
table_name_.clear();
}
};

Expand All @@ -71,7 +72,7 @@ class PkClusterAddSlotsCmd : public SlotParentCmd {
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
private:
virtual void DoInitial() override;
Status AddSlotsSanityCheck(const std::string& table_name);
Status AddSlotsSanityCheck();
};

class PkClusterDelSlotsCmd : public SlotParentCmd {
Expand All @@ -84,7 +85,7 @@ class PkClusterDelSlotsCmd : public SlotParentCmd {
}
private:
virtual void DoInitial() override;
Status RemoveSlotsSanityCheck(const std::string& table_name);
Status RemoveSlotsSanityCheck();
};

class PkClusterSlotsSlaveofCmd : public Cmd {
Expand All @@ -108,7 +109,40 @@ class PkClusterSlotsSlaveofCmd : public Cmd {
slots_.clear();
force_sync_ = false;
is_noone_ = false;
table_name_.clear();
}
};


class PkClusterAddTableCmd : public Cmd {
public:
PkClusterAddTableCmd(const std::string& name, int arity, uint16_t flag)
: Cmd(name, arity, flag), slot_num_(0) {}
Cmd* Clone() override {
return new PkClusterAddTableCmd(*this);
}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
private:
uint64_t slot_num_;
void DoInitial() override;
Status AddTableSanityCheck();
void Clear() override {
slot_num_ = 0;
table_name_.clear();
}
};

class PkClusterDelTableCmd : public PkClusterDelSlotsCmd {
public:
PkClusterDelTableCmd(const std::string& name, int arity, uint16_t flag)
: PkClusterDelSlotsCmd(name, arity, flag) {}
Cmd* Clone() override {
return new PkClusterDelTableCmd(*this);
}
virtual void Do(std::shared_ptr<Partition> partition = nullptr);
private:
void DoInitial() override;
Status DelTableSanityCheck(const std::string& table_name);
};

#endif // PIKA_CLUSTER_H_
2 changes: 2 additions & 0 deletions include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ const std::string kCmdNamePkClusterInfo = "pkclusterinfo";
const std::string kCmdNamePkClusterAddSlots = "pkclusteraddslots";
const std::string kCmdNamePkClusterDelSlots = "pkclusterdelslots";
const std::string kCmdNamePkClusterSlotsSlaveof = "pkclusterslotsslaveof";
const std::string kCmdNamePkClusterAddTable = "pkclusteraddtable";
const std::string kCmdNamePkClusterDelTable = "pkclusterdeltable";

const std::string kClusterPrefix = "pkcluster";
typedef pink::RedisCmdArgsType PikaCmdArgsType;
Expand Down
5 changes: 5 additions & 0 deletions include/pika_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class PikaConf : public slash::BaseConf {
bool daemonize() { return daemonize_; }
std::string pidfile() { return pidfile_; }
int binlog_file_size() { return binlog_file_size_; }
PikaMeta * local_meta() { return local_meta_; }

// Setter
void SetPort(const int value) {
Expand Down Expand Up @@ -248,6 +249,10 @@ class PikaConf : public slash::BaseConf {
const std::set<uint32_t>& partition_ids);
Status RemoveTablePartitions(const std::string& table_name,
const std::set<uint32_t>& partition_ids);
Status AddTable(const std::string &table_name, uint32_t slot_num);
Status AddTableSanityCheck(const std::string &table_name);
Status DelTable(const std::string &table_name);
Status DelTableSanityCheck(const std::string &table_name);

int Load();
int ConfigRewrite();
Expand Down
2 changes: 2 additions & 0 deletions include/pika_rm.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ class PikaReplicaManager {
Status RemoveSyncPartition(const std::set<PartitionInfo>& p_infos);
Status ActivateSyncSlavePartition(const RmNode& node, const ReplState& repl_state);
Status DeactivateSyncSlavePartition(const PartitionInfo& p_info);
Status SyncTableSanityCheck(const std::string& table_name);
Status DelSyncTable(const std::string& table_name);

// For Pika Repl Client Thread
Status SendMetaSyncRequest();
Expand Down
4 changes: 4 additions & 0 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ class PikaServer {
* Table use
*/
void InitTableStruct();
Status AddTableStruct(std::string table_name, uint32_t num);
Status DelTableStruct(std::string table_name);
std::shared_ptr<Table> GetTable(const std::string& table_name);
std::set<uint32_t> GetTablePartitionIds(const std::string& table_name);
bool IsBgSaving();
Expand Down Expand Up @@ -305,6 +307,8 @@ class PikaServer {
friend class InfoCmd;
friend class PkClusterAddSlotsCmd;
friend class PkClusterDelSlotsCmd;
friend class PkClusterAddTableCmd;
friend class PkClusterDelTableCmd;
friend class PikaReplClientConn;
friend class PkClusterInfoCmd;

Expand Down
4 changes: 4 additions & 0 deletions include/pika_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class Table : public std::enable_shared_from_this<Table>{
bool FlushPartitionSubDB(const std::string& db_name);
bool IsBinlogIoError();
uint32_t PartitionNum();
void GetAllPartitions(std::set<uint32_t>& partition_ids);

// Dynamic change partition
Status AddPartitions(const std::set<uint32_t>& partition_ids);
Expand All @@ -52,6 +53,9 @@ class Table : public std::enable_shared_from_this<Table>{
std::set<uint32_t> GetPartitionIds();
std::shared_ptr<Partition> GetPartitionById(uint32_t partition_id);
std::shared_ptr<Partition> GetPartitionByKey(const std::string& key);
bool TableIsEmpty();
Status MovetoToTrash(const std::string& path);
Status Leave();

private:
std::string table_name_;
Expand Down
19 changes: 7 additions & 12 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -392,23 +392,18 @@ void SelectCmd::DoInitial() {
res_.SetRes(CmdRes::kWrongNum, kCmdNameSelect);
return;
}
int index = atoi(argv_[1].data());
if (std::to_string(index) != argv_[1]) {
res_.SetRes(CmdRes::kInvalidIndex, kCmdNameSelect);
return;
}
if (g_pika_conf->classic_mode()) {
int index = atoi(argv_[1].data());
if (std::to_string(index) != argv_[1]) {
res_.SetRes(CmdRes::kInvalidIndex, kCmdNameSelect);
return;
} else if (index < 0 || index >= g_pika_conf->databases()) {
if (index < 0 || index >= g_pika_conf->databases()) {
res_.SetRes(CmdRes::kInvalidIndex, kCmdNameSelect + " DB index is out of range");
return;
} else {
table_name_ = "db" + argv_[1];
}
} else {
// only pika codis use sharding mode currently, but pika
// codis only support single db, so in sharding mode we
// do no thing in select command
table_name_ = g_pika_conf->default_table();
}
table_name_ = "db" + argv_[1];
if (!g_pika_server->IsTableExist(table_name_)) {
res_.SetRes(CmdRes::kInvalidTable, kCmdNameSelect);
return;
Expand Down
Loading

0 comments on commit de21a23

Please sign in to comment.