diff --git a/include/pika_cluster.h b/include/pika_cluster.h index bb34c37c31..11d820d101 100644 --- a/include/pika_cluster.h +++ b/include/pika_cluster.h @@ -49,7 +49,7 @@ class PkClusterInfoCmd : public Cmd { class SlotParentCmd : public Cmd { public: SlotParentCmd(const std::string& name, int arity, uint16_t flag) - : Cmd(name, arity, flag) {} + : Cmd(name, arity, flag) {} protected: std::set slots_; @@ -58,6 +58,7 @@ class SlotParentCmd : public Cmd { virtual void Clear() { slots_.clear(); p_infos_.clear(); + table_name_.clear(); } }; @@ -71,7 +72,7 @@ class PkClusterAddSlotsCmd : public SlotParentCmd { virtual void Do(std::shared_ptr partition = nullptr); private: virtual void DoInitial() override; - Status AddSlotsSanityCheck(const std::string& table_name); + Status AddSlotsSanityCheck(); }; class PkClusterDelSlotsCmd : public SlotParentCmd { @@ -84,7 +85,7 @@ class PkClusterDelSlotsCmd : public SlotParentCmd { } private: virtual void DoInitial() override; - Status RemoveSlotsSanityCheck(const std::string& table_name); + Status RemoveSlotsSanityCheck(); }; class PkClusterSlotsSlaveofCmd : public Cmd { @@ -108,7 +109,40 @@ class PkClusterSlotsSlaveofCmd : public Cmd { slots_.clear(); force_sync_ = false; is_noone_ = false; + table_name_.clear(); + } +}; + + +class PkClusterAddTableCmd : public Cmd { + public: + PkClusterAddTableCmd(const std::string& name, int arity, uint16_t flag) + : Cmd(name, arity, flag), slot_num_(0) {} + Cmd* Clone() override { + return new PkClusterAddTableCmd(*this); + } + virtual void Do(std::shared_ptr partition = nullptr); + private: + uint64_t slot_num_; + void DoInitial() override; + Status AddTableSanityCheck(); + void Clear() override { + slot_num_ = 0; + table_name_.clear(); } }; +class PkClusterDelTableCmd : public PkClusterDelSlotsCmd { + public: + PkClusterDelTableCmd(const std::string& name, int arity, uint16_t flag) + : PkClusterDelSlotsCmd(name, arity, flag) {} + Cmd* Clone() override { + return new PkClusterDelTableCmd(*this); + } + virtual void Do(std::shared_ptr partition = nullptr); + private: + void DoInitial() override; + Status DelTableSanityCheck(const std::string& table_name); +}; + #endif // PIKA_CLUSTER_H_ diff --git a/include/pika_command.h b/include/pika_command.h index b09a3023b5..15552c1be4 100644 --- a/include/pika_command.h +++ b/include/pika_command.h @@ -210,6 +210,8 @@ const std::string kCmdNamePkClusterInfo = "pkclusterinfo"; const std::string kCmdNamePkClusterAddSlots = "pkclusteraddslots"; const std::string kCmdNamePkClusterDelSlots = "pkclusterdelslots"; const std::string kCmdNamePkClusterSlotsSlaveof = "pkclusterslotsslaveof"; +const std::string kCmdNamePkClusterAddTable = "pkclusteraddtable"; +const std::string kCmdNamePkClusterDelTable = "pkclusterdeltable"; const std::string kClusterPrefix = "pkcluster"; typedef pink::RedisCmdArgsType PikaCmdArgsType; diff --git a/include/pika_conf.h b/include/pika_conf.h index be34fcf9d1..8d7fe33a2b 100644 --- a/include/pika_conf.h +++ b/include/pika_conf.h @@ -93,6 +93,7 @@ class PikaConf : public slash::BaseConf { bool daemonize() { return daemonize_; } std::string pidfile() { return pidfile_; } int binlog_file_size() { return binlog_file_size_; } + PikaMeta * local_meta() { return local_meta_; } // Setter void SetPort(const int value) { @@ -248,6 +249,10 @@ class PikaConf : public slash::BaseConf { const std::set& partition_ids); Status RemoveTablePartitions(const std::string& table_name, const std::set& partition_ids); + Status AddTable(const std::string &table_name, uint32_t slot_num); + Status AddTableSanityCheck(const std::string &table_name); + Status DelTable(const std::string &table_name); + Status DelTableSanityCheck(const std::string &table_name); int Load(); int ConfigRewrite(); diff --git a/include/pika_rm.h b/include/pika_rm.h index 0fad260059..bd970641b7 100644 --- a/include/pika_rm.h +++ b/include/pika_rm.h @@ -190,6 +190,8 @@ class PikaReplicaManager { Status RemoveSyncPartition(const std::set& p_infos); Status ActivateSyncSlavePartition(const RmNode& node, const ReplState& repl_state); Status DeactivateSyncSlavePartition(const PartitionInfo& p_info); + Status SyncTableSanityCheck(const std::string& table_name); + Status DelSyncTable(const std::string& table_name); // For Pika Repl Client Thread Status SendMetaSyncRequest(); diff --git a/include/pika_server.h b/include/pika_server.h index 6380906366..fbb8b9a8b4 100644 --- a/include/pika_server.h +++ b/include/pika_server.h @@ -133,6 +133,8 @@ class PikaServer { * Table use */ void InitTableStruct(); + Status AddTableStruct(std::string table_name, uint32_t num); + Status DelTableStruct(std::string table_name); std::shared_ptr GetTable(const std::string& table_name); std::set GetTablePartitionIds(const std::string& table_name); bool IsBgSaving(); @@ -305,6 +307,8 @@ class PikaServer { friend class InfoCmd; friend class PkClusterAddSlotsCmd; friend class PkClusterDelSlotsCmd; + friend class PkClusterAddTableCmd; + friend class PkClusterDelTableCmd; friend class PikaReplClientConn; friend class PkClusterInfoCmd; diff --git a/include/pika_table.h b/include/pika_table.h index adf6b62b6c..bc7c172397 100644 --- a/include/pika_table.h +++ b/include/pika_table.h @@ -31,6 +31,7 @@ class Table : public std::enable_shared_from_this
{ bool FlushPartitionSubDB(const std::string& db_name); bool IsBinlogIoError(); uint32_t PartitionNum(); + void GetAllPartitions(std::set& partition_ids); // Dynamic change partition Status AddPartitions(const std::set& partition_ids); @@ -52,6 +53,9 @@ class Table : public std::enable_shared_from_this
{ std::set GetPartitionIds(); std::shared_ptr GetPartitionById(uint32_t partition_id); std::shared_ptr GetPartitionByKey(const std::string& key); + bool TableIsEmpty(); + Status MovetoToTrash(const std::string& path); + Status Leave(); private: std::string table_name_; diff --git a/src/pika_admin.cc b/src/pika_admin.cc index e44cfb794d..215ff0a504 100644 --- a/src/pika_admin.cc +++ b/src/pika_admin.cc @@ -392,23 +392,18 @@ void SelectCmd::DoInitial() { res_.SetRes(CmdRes::kWrongNum, kCmdNameSelect); return; } + int index = atoi(argv_[1].data()); + if (std::to_string(index) != argv_[1]) { + res_.SetRes(CmdRes::kInvalidIndex, kCmdNameSelect); + return; + } if (g_pika_conf->classic_mode()) { - int index = atoi(argv_[1].data()); - if (std::to_string(index) != argv_[1]) { - res_.SetRes(CmdRes::kInvalidIndex, kCmdNameSelect); - return; - } else if (index < 0 || index >= g_pika_conf->databases()) { + if (index < 0 || index >= g_pika_conf->databases()) { res_.SetRes(CmdRes::kInvalidIndex, kCmdNameSelect + " DB index is out of range"); return; - } else { - table_name_ = "db" + argv_[1]; } - } else { - // only pika codis use sharding mode currently, but pika - // codis only support single db, so in sharding mode we - // do no thing in select command - table_name_ = g_pika_conf->default_table(); } + table_name_ = "db" + argv_[1]; if (!g_pika_server->IsTableExist(table_name_)) { res_.SetRes(CmdRes::kInvalidTable, kCmdNameSelect); return; diff --git a/src/pika_cluster.cc b/src/pika_cluster.cc index 403e5200c8..a71350c3e8 100644 --- a/src/pika_cluster.cc +++ b/src/pika_cluster.cc @@ -196,10 +196,22 @@ void SlotParentCmd::DoInitial() { if (!s.ok()) { res_.SetRes(CmdRes::kErrOther, s.ToString()); } - - std::string table_name = g_pika_conf->default_table(); + if (argv_.size() == 3) { + table_name_ = g_pika_conf->default_table(); + } else if (argv_.size() == 4) { + uint64_t table_id; + if (!slash::string2ul(argv_[3].data(), argv_[3].size(), &table_id)) { + res_.SetRes(CmdRes::kErrOther, "syntax error"); + return; + } + table_name_ = "db"; + table_name_ += std::to_string(table_id); + } else { + res_.SetRes(CmdRes::kSyntaxErr, "too many argument"); + return; + } for (const auto& slot_id : slots_) { - p_infos_.insert(PartitionInfo(table_name, slot_id)); + p_infos_.insert(PartitionInfo(table_name_, slot_id)); } } @@ -216,8 +228,7 @@ void PkClusterAddSlotsCmd::DoInitial() { } void PkClusterAddSlotsCmd::Do(std::shared_ptr partition) { - std::string table_name = g_pika_conf->default_table(); - std::shared_ptr
table_ptr = g_pika_server->GetTable(table_name); + std::shared_ptr
table_ptr = g_pika_server->GetTable(table_name_); if (!table_ptr) { res_.SetRes(CmdRes::kErrOther, "Internal error: table not found!"); return; @@ -232,13 +243,13 @@ void PkClusterAddSlotsCmd::Do(std::shared_ptr partition) { } bool pre_success = true; - Status s = AddSlotsSanityCheck(table_name); + Status s = AddSlotsSanityCheck(); if (!s.ok()) { LOG(WARNING) << "Addslots sanity check failed: " << s.ToString(); pre_success = false; } if (pre_success) { - s = g_pika_conf->AddTablePartitions(table_name, slots_); + s = g_pika_conf->AddTablePartitions(table_name_, slots_); if (!s.ok()) { LOG(WARNING) << "Addslots add to pika conf failed: " << s.ToString(); pre_success = false; @@ -270,13 +281,13 @@ void PkClusterAddSlotsCmd::Do(std::shared_ptr partition) { LOG(INFO) << "Pika meta file overwrite success"; } -Status PkClusterAddSlotsCmd::AddSlotsSanityCheck(const std::string& table_name) { - Status s = g_pika_conf->TablePartitionsSanityCheck(table_name, slots_, true); +Status PkClusterAddSlotsCmd::AddSlotsSanityCheck() { + Status s = g_pika_conf->TablePartitionsSanityCheck(table_name_, slots_, true); if (!s.ok()) { return s; } - std::shared_ptr
table_ptr = g_pika_server->GetTable(table_name); + std::shared_ptr
table_ptr = g_pika_server->GetTable(table_name_); if (!table_ptr) { return Status::NotFound("table not found!"); } @@ -306,8 +317,7 @@ void PkClusterDelSlotsCmd::DoInitial() { } void PkClusterDelSlotsCmd::Do(std::shared_ptr partition) { - std::string table_name = g_pika_conf->default_table(); - std::shared_ptr
table_ptr = g_pika_server->GetTable(table_name); + std::shared_ptr
table_ptr = g_pika_server->GetTable(table_name_); if (!table_ptr) { res_.SetRes(CmdRes::kErrOther, "Internal error: default table not found!"); return; @@ -322,14 +332,14 @@ void PkClusterDelSlotsCmd::Do(std::shared_ptr partition) { } bool pre_success = true; - Status s = RemoveSlotsSanityCheck(table_name); + Status s = RemoveSlotsSanityCheck(); if (!s.ok()) { LOG(WARNING) << "Removeslots sanity check failed: " << s.ToString(); pre_success = false; } // remove order maters if (pre_success) { - s = g_pika_conf->RemoveTablePartitions(table_name, slots_); + s = g_pika_conf->RemoveTablePartitions(table_name_, slots_); if (!s.ok()) { LOG(WARNING) << "Removeslots remove from pika conf failed: " << s.ToString(); pre_success = false; @@ -360,13 +370,13 @@ void PkClusterDelSlotsCmd::Do(std::shared_ptr partition) { LOG(INFO) << "Pika meta file overwrite success"; } -Status PkClusterDelSlotsCmd::RemoveSlotsSanityCheck(const std::string& table_name) { - Status s = g_pika_conf->TablePartitionsSanityCheck(table_name, slots_, false); +Status PkClusterDelSlotsCmd::RemoveSlotsSanityCheck() { + Status s = g_pika_conf->TablePartitionsSanityCheck(table_name_, slots_, false); if (!s.ok()) { return s; } - std::shared_ptr
table_ptr = g_pika_server->GetTable(table_name); + std::shared_ptr
table_ptr = g_pika_server->GetTable(table_name_); if (!table_ptr) { return Status::NotFound("table not found"); } @@ -384,9 +394,10 @@ Status PkClusterDelSlotsCmd::RemoveSlotsSanityCheck(const std::string& table_nam return Status::OK(); } -/* pkcluster slotsslaveof no one [0-3,8-11 | all] - * pkcluster slotsslaveof ip port [0-3,8,9,10,11 | all] +/* pkcluster slotsslaveof no one [0-3,8-11 | all] [table_id] + * pkcluster slotsslaveof ip port [0-3,8,9,10,11 | all] [table_id] * pkcluster slotsslaveof ip port [0,2,4,6,7,8,9 | all] force + * pkcluster slotsslaveof ip port [0,2,4,6,7,8,9 | all] force [table_id] */ void PkClusterSlotsSlaveofCmd::DoInitial() { if (!CheckArg(argv_.size())) { @@ -431,23 +442,49 @@ void PkClusterSlotsSlaveofCmd::DoInitial() { res_.SetRes(CmdRes::kErrOther, "Slots set empty"); } - if (argv_.size() == 5) { - // do nothing - } else if (argv_.size() == 6 - && !strcasecmp(argv_[5].data(), "force")) { - force_sync_ = true; - } else { - res_.SetRes(CmdRes::kSyntaxErr); + uint64_t table_id; + switch (argv_.size()) { + case 5: + table_name_ = g_pika_conf->default_table(); + break; + case 6: + if (!strcasecmp(argv_[5].data(), "force")) { + } else if (slash::string2ul(argv_[5].data(), argv_[5].size(), &table_id)) { + table_name_ = "db"; + table_name_ += std::to_string(table_id); + } else { + res_.SetRes(CmdRes::kErrOther, "syntax error"); + return; + } + break; + case 7: + if (strcasecmp(argv_[5].data(), "force") != 0 + && slash::string2ul(argv_[6].data(), argv_[6].size(), &table_id)) { + force_sync_ = true; + table_name_ = "db"; + table_name_ += std::to_string(table_id); + } else { + res_.SetRes(CmdRes::kErrOther, "syntax error"); + return; + } + break; + default: + res_.SetRes(CmdRes::kErrOther, "syntax error"); + return; } + if (!g_pika_server->IsTableExist(table_name_)) { + res_.SetRes(CmdRes::kInvalidTable); + return; + } + } void PkClusterSlotsSlaveofCmd::Do(std::shared_ptr partition) { - std::string table_name = g_pika_conf->default_table(); std::vector to_del_slots; for (const auto& slot : slots_) { std::shared_ptr slave_partition = g_pika_rm->GetSyncSlavePartitionByName( - PartitionInfo(table_name, slot)); + PartitionInfo(table_name_, slot)); if (!slave_partition) { res_.SetRes(CmdRes::kErrOther, "Slot " + std::to_string(slot) + " not found!"); return; @@ -470,9 +507,9 @@ void PkClusterSlotsSlaveofCmd::Do(std::shared_ptr partition) { for (const auto& slot : slots_) { std::shared_ptr slave_partition = g_pika_rm->GetSyncSlavePartitionByName( - PartitionInfo(table_name, slot)); + PartitionInfo(table_name_, slot)); if (slave_partition->State() == ReplState::kConnected) { - s = g_pika_rm->SendRemoveSlaveNodeRequest(table_name, slot); + s = g_pika_rm->SendRemoveSlaveNodeRequest(table_name_, slot); } if (!s.ok()) { break; @@ -484,7 +521,7 @@ void PkClusterSlotsSlaveofCmd::Do(std::shared_ptr partition) { if (is_noone_) { } else { s = g_pika_rm->ActivateSyncSlavePartition( - RmNode(ip_, port_, table_name, slot), state); + RmNode(ip_, port_, table_name_, slot), state); if (!s.ok()) { break; } @@ -498,3 +535,188 @@ void PkClusterSlotsSlaveofCmd::Do(std::shared_ptr partition) { } } +/* + * pkcluster addtable table_id slot_num + * pkcluster addtable 1 1024 + */ +void PkClusterAddTableCmd::DoInitial() { + if (!CheckArg(argv_.size())) { + res_.SetRes(CmdRes::kSyntaxErr); + return; + } + if (g_pika_conf->classic_mode()) { + res_.SetRes(CmdRes::kErrOther, "PkClusterTable Cmd only support on sharding mode"); + return; + } + uint64_t table_id; + if (!slash::string2ul(argv_[2].data(), argv_[2].size(), &table_id)) { + res_.SetRes(CmdRes::kErrOther, "syntax error"); + return; + } + table_name_ = "db"; + table_name_ += std::to_string(table_id); + if (!slash::string2ul(argv_[3].data(), argv_[3].size(), &slot_num_) + || slot_num_ == 0) { + res_.SetRes(CmdRes::kErrOther, "syntax error"); + return; + } +} + +void PkClusterAddTableCmd::Do(std::shared_ptr partition) { + std::shared_ptr
table_ptr = g_pika_server->GetTable(table_name_); + if (table_ptr) { + res_.SetRes(CmdRes::kErrOther, "Internal error: table already exist!"); + return; + } + + SlotState expected = INFREE; + if (!std::atomic_compare_exchange_strong(&g_pika_server->slot_state_, + &expected, INBUSY)) { + res_.SetRes(CmdRes::kErrOther, + "Table/Slot in syncing or a change operation is under way, retry later"); + return; + } + + bool pre_success = true; + Status s = AddTableSanityCheck(); + if (!s.ok()) { + LOG(WARNING) << "AddTable sanity check failed: " << s.ToString(); + pre_success = false; + } + if (pre_success) { + s = g_pika_conf->AddTable(table_name_,slot_num_); + if (!s.ok()) { + LOG(WARNING) << "Addslots add to pika conf failed: " << s.ToString(); + pre_success = false; + } + } + if (pre_success) { + s = g_pika_server->AddTableStruct(table_name_,slot_num_); + if (!s.ok()) { + LOG(WARNING) << "Addslots add to pika conf failed: " << s.ToString(); + pre_success = false; + } + } + + g_pika_server->slot_state_.store(INFREE); + + if (!pre_success) { + res_.SetRes(CmdRes::kErrOther, s.ToString()); + return; + } + res_.SetRes(CmdRes::kOk); + LOG(INFO) << "Pika meta file overwrite success"; +} + +Status PkClusterAddTableCmd::AddTableSanityCheck() { + Status s = g_pika_conf->AddTableSanityCheck(table_name_); + if (!s.ok()) { + return s; + } + std::shared_ptr
table_ptr = g_pika_server->GetTable(table_name_); + if (table_ptr) { + return Status::Corruption("table already exist!"); + } + s = g_pika_rm->SyncTableSanityCheck(table_name_); + return s; +} + +/* + * pkcluster deltable table_id + */ +void PkClusterDelTableCmd::DoInitial() { + if (!CheckArg(argv_.size())) { + res_.SetRes(CmdRes::kSyntaxErr); + return; + } + if (g_pika_conf->classic_mode()) { + res_.SetRes(CmdRes::kErrOther, "PkClusterTable Cmd only support on sharding mode"); + return; + } + uint64_t table_id; + if (!slash::string2ul(argv_[2].data(), argv_[2].size(), &table_id)) { + res_.SetRes(CmdRes::kErrOther, "syntax error"); + return; + } + table_name_ = "db"; + table_name_ += std::to_string(table_id); +} + + +void PkClusterDelTableCmd::Do(std::shared_ptr partition) { + std::shared_ptr
table_ptr = g_pika_server->GetTable(table_name_); + if (!table_ptr) { + res_.SetRes(CmdRes::kErrOther, "Internal error: table not found!"); + return; + } + + if (!table_ptr->TableIsEmpty()) { + table_ptr->GetAllPartitions(slots_); + for (const auto& slot_id : slots_) { + p_infos_.insert(PartitionInfo(table_name_, slot_id)); + } + PkClusterDelSlotsCmd::Do(); + } + + SlotState expected = INFREE; + if (!std::atomic_compare_exchange_strong(&g_pika_server->slot_state_, + &expected, INBUSY)) { + res_.SetRes(CmdRes::kErrOther, + "Table/Slot in syncing or a change operation is under way, retry later"); + return; + } + + bool pre_success = true; + Status s = DelTableSanityCheck(table_name_); + if (!s.ok()) { + LOG(WARNING) << "DelTable sanity check failed: " << s.ToString(); + pre_success = false; + } + // remove order maters + if (pre_success) { + s = g_pika_conf->DelTable(table_name_); + if (!s.ok()) { + LOG(WARNING) << "DelTable remove from pika conf failed: " << s.ToString(); + pre_success = false; + } + } + if (pre_success) { + s = g_pika_rm->DelSyncTable(table_name_); + if (!s.ok()) { + LOG(WARNING) << "DelTable remove from pika rm failed: " << s.ToString(); + pre_success = false; + } + } + if (pre_success) { + s = g_pika_server->DelTableStruct(table_name_); + if (!s.ok()) { + LOG(WARNING) << "DelTable remove from pika server failed: " << s.ToString(); + pre_success = false; + } + } + + g_pika_server->slot_state_.store(INFREE); + + if (!pre_success) { + res_.SetRes(CmdRes::kErrOther, s.ToString()); + return; + } + res_.SetRes(CmdRes::kOk); + LOG(INFO) << "Pika meta file overwrite success"; +} + +Status PkClusterDelTableCmd::DelTableSanityCheck(const std::string &table_name) { + Status s = g_pika_conf->DelTableSanityCheck(table_name); + if (!s.ok()) { + return s; + } + std::shared_ptr
table_ptr = g_pika_server->GetTable(table_name); + if (!table_ptr) { + return Status::Corruption("table not found!"); + } + if (!table_ptr->TableIsEmpty()) { + return Status::Corruption("table have slots!"); + } + s = g_pika_rm->SyncTableSanityCheck(table_name); + return s; +} diff --git a/src/pika_command.cc b/src/pika_command.cc index e93c704f5c..85e9f7d724 100644 --- a/src/pika_command.cc +++ b/src/pika_command.cc @@ -104,13 +104,16 @@ void InitCmdTable(std::unordered_map *cmd_table) { // Cluster related Cmd* pkclusterinfoptr = new PkClusterInfoCmd(kCmdNamePkClusterInfo, -3, kCmdFlagsRead | kCmdFlagsAdmin); cmd_table->insert(std::pair(kCmdNamePkClusterInfo, pkclusterinfoptr)); - Cmd* pkclusteraddslotsptr = new PkClusterAddSlotsCmd(kCmdNamePkClusterAddSlots, 3, kCmdFlagsRead | kCmdFlagsAdmin); + Cmd* pkclusteraddslotsptr = new PkClusterAddSlotsCmd(kCmdNamePkClusterAddSlots, -3, kCmdFlagsRead | kCmdFlagsAdmin); cmd_table->insert(std::pair(kCmdNamePkClusterAddSlots, pkclusteraddslotsptr)); - Cmd* pkclusterdelslotsptr = new PkClusterDelSlotsCmd(kCmdNamePkClusterDelSlots, 3, kCmdFlagsRead | kCmdFlagsAdmin); + Cmd* pkclusterdelslotsptr = new PkClusterDelSlotsCmd(kCmdNamePkClusterDelSlots, -3, kCmdFlagsRead | kCmdFlagsAdmin); cmd_table->insert(std::pair(kCmdNamePkClusterDelSlots, pkclusterdelslotsptr)); Cmd* pkclusterslotsslaveofptr = new PkClusterSlotsSlaveofCmd(kCmdNamePkClusterSlotsSlaveof, -5, kCmdFlagsRead | kCmdFlagsAdmin); cmd_table->insert(std::pair(kCmdNamePkClusterSlotsSlaveof, pkclusterslotsslaveofptr)); - + Cmd* pkclusteraddtableptr = new PkClusterAddTableCmd(kCmdNamePkClusterAddTable, 4, kCmdFlagsRead | kCmdFlagsAdmin); + cmd_table->insert(std::pair(kCmdNamePkClusterAddTable, pkclusteraddtableptr)); + Cmd* pkclusterdeltableptr = new PkClusterDelTableCmd(kCmdNamePkClusterDelTable, 3, kCmdFlagsRead | kCmdFlagsAdmin); + cmd_table->insert(std::pair(kCmdNamePkClusterDelTable, pkclusterdeltableptr)); #ifdef TCMALLOC_EXTENSION Cmd* tcmallocptr = new TcmallocCmd(kCmdNameTcmalloc, -2, kCmdFlagsRead | kCmdFlagsAdmin); cmd_table->insert(std::pair(kCmdNameTcmalloc, tcmallocptr)); diff --git a/src/pika_conf.cc b/src/pika_conf.cc index 4476062cbc..7d245cb24d 100644 --- a/src/pika_conf.cc +++ b/src/pika_conf.cc @@ -27,7 +27,7 @@ PikaConf::~PikaConf() { Status PikaConf::InternalGetTargetTable(const std::string& table_name, uint32_t* const target) { int32_t table_index = -1; - for (size_t idx = 0; table_structs_.size(); ++idx) { + for (size_t idx = 0; idx < table_structs_.size(); ++idx) { if (table_structs_[idx].table_name == table_name) { table_index = idx; break; @@ -100,6 +100,48 @@ Status PikaConf::RemoveTablePartitions(const std::string& table_name, return s; } +Status PikaConf::AddTable(const std::string &table_name, const uint32_t slot_num) { + Status s = AddTableSanityCheck(table_name); + if (!s.ok()) { + return s; + } + RWLock l(&rwlock_, true); + table_structs_.push_back({table_name,slot_num,{}}); + s = local_meta_->StableSave(table_structs_); + return s; +} + +Status PikaConf::DelTable(const std::string &table_name) { + Status s = DelTableSanityCheck(table_name); + if (!s.ok()) { + return s; + } + RWLock l(&rwlock_, true); + for (auto iter = table_structs_.begin();iter != table_structs_.end();iter++) { + if (iter->table_name == table_name) { + table_structs_.erase(iter); + break; + } + } + return local_meta_->StableSave(table_structs_); +} + +Status PikaConf::AddTableSanityCheck(const std::string &table_name) { + RWLock l(&rwlock_, false); + uint32_t table_index = 0; + Status s = InternalGetTargetTable(table_name, &table_index); + if (!s.IsNotFound()) { + return Status::Corruption("table: " + table_name + " already exist"); + } + return Status::OK(); +} + +Status PikaConf::DelTableSanityCheck(const std::string &table_name) { + RWLock l(&rwlock_, false); + uint32_t table_index = 0; + return InternalGetTargetTable(table_name, &table_index); +} + int PikaConf::Load() { int ret = LoadConf(); diff --git a/src/pika_rm.cc b/src/pika_rm.cc index 8ddba79b8a..890574b77b 100644 --- a/src/pika_rm.cc +++ b/src/pika_rm.cc @@ -1260,6 +1260,41 @@ Status PikaReplicaManager::RemoveSyncPartition( return Status::OK(); } +Status PikaReplicaManager::SyncTableSanityCheck(const std::string& table_name) { + slash::RWLock l(&partitions_rw_, false); + for (const auto& master_partition : sync_master_partitions_) { + if (master_partition.first.table_name_ == table_name) { + LOG(WARNING) << "sync partition: " << master_partition.first.ToString() << " exist"; + return Status::Corruption("sync partition " + master_partition.first.ToString() + + " exist"); + } + } + for (const auto& slave_partition : sync_slave_partitions_) { + if (slave_partition.first.table_name_ == table_name) { + LOG(WARNING) << "sync partition: " << slave_partition.first.ToString() << " exist"; + return Status::Corruption("sync partition " + slave_partition.first.ToString() + + " exist"); + } + } + return Status::OK(); +} + +Status PikaReplicaManager::DelSyncTable(const std::string& table_name) { + Status s = SyncTableSanityCheck(table_name); + if (!s.ok()) { + return s; + } + std::string table_log_path = g_pika_conf->log_path() + "log_" + table_name ; + std::string table_log_path_tmp = table_log_path + "_deleting/"; + if (slash::RenameFile(table_log_path, table_log_path_tmp)) { + LOG(WARNING) << "Failed to move log to trash, error: " << strerror(errno); + return Status::Corruption("Failed to move log to trash"); + } + g_pika_server->PurgeDir(table_log_path_tmp); + LOG(WARNING) << "Partition StableLog: " << table_name << " move to trash success"; + return Status::OK(); +} + void PikaReplicaManager::FindCompleteReplica(std::vector* replica) { std::unordered_map replica_slotnum; slash::RWLock l(&partitions_rw_, false); diff --git a/src/pika_server.cc b/src/pika_server.cc index 46ef8e3d1a..19f803c9ca 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -426,6 +426,36 @@ void PikaServer::InitTableStruct() { } } +Status PikaServer::AddTableStruct(std::string table_name, uint32_t num) { + std::shared_ptr
table = g_pika_server->GetTable(table_name); + if (table) { + return Status::Corruption("table already exist"); + } + std::string db_path = g_pika_conf->db_path(); + std::string log_path = g_pika_conf->log_path(); + std::shared_ptr
table_ptr = std::make_shared
( + table_name, num, db_path, log_path); + slash::RWLock rwl(&tables_rw_, true); + tables_.emplace(table_name, table_ptr); + return Status::OK(); +} + +Status PikaServer::DelTableStruct(std::string table_name) { + std::shared_ptr
table = g_pika_server->GetTable(table_name); + if (!table) { + return Status::Corruption("table not found"); + } + if (!table->TableIsEmpty()) { + return Status::Corruption("table have partitions"); + } + Status s = table->Leave(); + if (!s.ok()) { + return s; + } + tables_.erase(table_name); + return Status::OK(); +} + std::shared_ptr
PikaServer::GetTable(const std::string &table_name) { slash::RWLock l(&tables_rw_, false); auto iter = tables_.find(table_name); diff --git a/src/pika_table.cc b/src/pika_table.cc index c56959b832..359bd7cec2 100644 --- a/src/pika_table.cc +++ b/src/pika_table.cc @@ -135,6 +135,13 @@ Status Table::RemovePartitions(const std::set& partition_ids) { return Status::OK(); } +void Table::GetAllPartitions(std::set& partition_ids) { + slash::RWLock l(&partitions_rw_, false); + for (const auto& iter : partitions_) { + partition_ids.insert(iter.first); + } +} + void Table::KeyScan() { slash::MutexLock ml(&key_scan_protector_); if (key_scan_info_.key_scaning_) { @@ -264,3 +271,31 @@ std::shared_ptr Table::GetPartitionByKey(const std::string& key) { auto iter = partitions_.find(index); return (iter == partitions_.end()) ? NULL : iter->second; } + +bool Table::TableIsEmpty() { + slash::RWLock rwl(&partitions_rw_, false); + return partitions_.empty(); +} + +Status Table::Leave() { + if (!TableIsEmpty()) { + return Status::Corruption("Table have partitions!"); + } + return MovetoToTrash(db_path_); +} + +Status Table::MovetoToTrash(const std::string& path) { + + std::string path_tmp = path; + if (path_tmp[path_tmp.length() - 1] == '/') { + path_tmp.erase(path_tmp.length() - 1); + } + path_tmp += "_deleting/"; + if (slash::RenameFile(path, path_tmp)) { + LOG(WARNING) << "Failed to move " << path <<" to trash, error: " << strerror(errno); + return Status::Corruption("Failed to move %s to trash", path); + } + g_pika_server->PurgeDir(path_tmp); + LOG(WARNING) << path << " move to trash success"; + return Status::OK(); +}