Skip to content

Commit

Permalink
compact support multiple db (OpenAtomFoundation#598)
Browse files Browse the repository at this point in the history
  • Loading branch information
Axlgrep committed May 15, 2019
1 parent 35738da commit 4d30ea4
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 33 deletions.
2 changes: 1 addition & 1 deletion conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ instance-mode : classic
# Set the number of databases. The default database is DB 0, you can select
# a different one on a per-connection basis using SELECT <dbid> where
# dbid is a number between 0 and 'databases' - 1, limited in [1, 8]
databases : 1
databases : 8
# Table list
table-list : table1:1,table2:1
# Dump Prefix
Expand Down
4 changes: 3 additions & 1 deletion include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,13 @@ class CompactCmd : public Cmd {
virtual void Do(std::shared_ptr<Partition> partition = nullptr);

private:
std::string struct_type_;
virtual void DoInitial() override;
virtual void Clear() {
struct_type_.clear();
compact_tables_.clear();
}
std::string struct_type_;
std::set<std::string> compact_tables_;
};

class PurgelogstoCmd : public Cmd {
Expand Down
4 changes: 3 additions & 1 deletion include/pika_command.h
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,9 @@ class CmdRes {
result = "-ERR invalid DB index\r\n";
break;
case kInvalidDbType:
result = "-ERR invalid DB type\r\n";
result = "-ERR invalid DB for '";
result.append(message_);
result.append("'\r\n");
break;
case kInvalidTable:
result = "-ERR invalid Table for '";
Expand Down
3 changes: 3 additions & 0 deletions include/pika_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class Table : public std::enable_shared_from_this<Table>{
void ScanDatabase(const blackwidow::DataType& type);
KeyScanInfo GetKeyScanInfo();

// Compact use;
void Compact(const blackwidow::DataType& type);

void LeaveAllPartition();
std::shared_ptr<Partition> GetPartitionById(uint32_t partition_id);
std::shared_ptr<Partition> GetPartitionByKey(const std::string& key);
Expand Down
41 changes: 29 additions & 12 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ void BgsaveCmd::DoInitial() {
res_.SetRes(CmdRes::kWrongNum, kCmdNameBgsave);
return;
}
LogCommand();
if (argv_.size() == 2) {
std::vector<std::string> tables;
slash::StringSplit(argv_[1], COMMA, tables);
Expand All @@ -145,12 +144,13 @@ void BgsaveCmd::DoInitial() {

void BgsaveCmd::Do(std::shared_ptr<Partition> partition) {
g_pika_server->DoSameThingSpecificTable(TaskType::kBgSave, bgsave_tables_);
LogCommand();
res_.AppendContent("+Background saving started");
}

void CompactCmd::DoInitial() {
if (!CheckArg(argv_.size())
|| argv_.size() > 2) {
|| argv_.size() > 3) {
res_.SetRes(CmdRes::kWrongNum, kCmdNameCompact);
return;
}
Expand All @@ -160,28 +160,43 @@ void CompactCmd::DoInitial() {
return;
}

if (argv_.size() == 2) {
if (argv_.size() == 1) {
struct_type_ = "all";
} else if (argv_.size() == 2) {
struct_type_ = argv_[1];
} else if (argv_.size() == 3) {
std::vector<std::string> tables;
slash::StringSplit(argv_[1], COMMA, tables);
for (const auto& table : tables) {
if (!g_pika_server->IsTableExist(table)) {
res_.SetRes(CmdRes::kInvalidTable, table);
return;
} else {
compact_tables_.insert(table);
}
}
struct_type_ = argv_[2];
}
}

void CompactCmd::Do(std::shared_ptr<Partition> partition) {
if (struct_type_.empty()) {
g_pika_server->DoSameThingEveryPartition(TaskType::kCompactAll);
if (!strcasecmp(struct_type_.data(), "all")) {
g_pika_server->DoSameThingSpecificTable(TaskType::kCompactAll, compact_tables_);
} else if (!strcasecmp(struct_type_.data(), "string")) {
g_pika_server->DoSameThingEveryPartition(TaskType::kCompactStrings);
g_pika_server->DoSameThingSpecificTable(TaskType::kCompactStrings, compact_tables_);
} else if (!strcasecmp(struct_type_.data(), "hash")) {
g_pika_server->DoSameThingEveryPartition(TaskType::kCompactHashes);
g_pika_server->DoSameThingSpecificTable(TaskType::kCompactHashes, compact_tables_);
} else if (!strcasecmp(struct_type_.data(), "set")) {
g_pika_server->DoSameThingEveryPartition(TaskType::kCompactSets);
g_pika_server->DoSameThingSpecificTable(TaskType::kCompactSets, compact_tables_);
} else if (!strcasecmp(struct_type_.data(), "zset")) {
g_pika_server->DoSameThingEveryPartition(TaskType::kCompactZSets);
g_pika_server->DoSameThingSpecificTable(TaskType::kCompactZSets, compact_tables_);
} else if (!strcasecmp(struct_type_.data(), "list")) {
g_pika_server->DoSameThingEveryPartition(TaskType::kCompactList);
g_pika_server->DoSameThingSpecificTable(TaskType::kCompactList, compact_tables_);
} else {
res_.SetRes(CmdRes::kInvalidDbType);
res_.SetRes(CmdRes::kInvalidDbType, struct_type_);
return;
}
LogCommand();
res_.SetRes(CmdRes::kOk);
}

Expand All @@ -191,6 +206,7 @@ void PingCmd::DoInitial() {
return;
}
}

void PingCmd::Do(std::shared_ptr<Partition> partition) {
res_.SetRes(CmdRes::kPong);
}
Expand Down Expand Up @@ -376,7 +392,6 @@ void InfoCmd::DoInitial() {
} else if (!strcasecmp(argv_[1].data(), kReplicationSection.data())) {
info_section_ = kInfoReplication;
} else if (!strcasecmp(argv_[1].data(), kKeyspaceSection.data())) {
LogCommand();
info_section_ = kInfoKeyspace;
if (argc == 2) {
return;
Expand Down Expand Up @@ -637,6 +652,7 @@ void InfoCmd::InfoKeyspace(std::string& info) {
if (off_) {
g_pika_server->DoSameThingSpecificTable(TaskType::kStopKeyScan);
off_ = false;
LogCommand();
return;
}

Expand Down Expand Up @@ -671,6 +687,7 @@ void InfoCmd::InfoKeyspace(std::string& info) {
if (rescan_) {
g_pika_server->DoSameThingSpecificTable(TaskType::kStartKeyScan, keyspace_scan_tables);
}
LogCommand();
return;
}

Expand Down
36 changes: 18 additions & 18 deletions src/pika_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,24 @@ Status PikaServer::DoSameThingSpecificTable(const TaskType& type, const std::set
continue;
} else {
switch (type) {
case TaskType::kCompactAll:
table_item.second->Compact(blackwidow::DataType::kAll);
break;
case TaskType::kCompactStrings:
table_item.second->Compact(blackwidow::DataType::kStrings);
break;
case TaskType::kCompactHashes:
table_item.second->Compact(blackwidow::DataType::kHashes);
break;
case TaskType::kCompactSets:
table_item.second->Compact(blackwidow::DataType::kSets);
break;
case TaskType::kCompactZSets:
table_item.second->Compact(blackwidow::DataType::kZSets);
break;
case TaskType::kCompactList:
table_item.second->Compact(blackwidow::DataType::kLists);
break;
case TaskType::kStartKeyScan:
table_item.second->KeyScan();
break;
Expand Down Expand Up @@ -571,24 +589,6 @@ Status PikaServer::DoSameThingEveryPartition(const TaskType& type) {
for (const auto& table_item : tables_) {
for (const auto& partition_item : table_item.second->partitions_) {
switch (type) {
case TaskType::kCompactAll:
partition_item.second->Compact(blackwidow::DataType::kAll);
break;
case TaskType::kCompactStrings:
partition_item.second->Compact(blackwidow::DataType::kStrings);
break;
case TaskType::kCompactHashes:
partition_item.second->Compact(blackwidow::DataType::kHashes);
break;
case TaskType::kCompactSets:
partition_item.second->Compact(blackwidow::DataType::kSets);
break;
case TaskType::kCompactZSets:
partition_item.second->Compact(blackwidow::DataType::kZSets);
break;
case TaskType::kCompactList:
partition_item.second->Compact(blackwidow::DataType::kLists);
break;
case TaskType::kResetReplState:
partition_item.second->SetReplState(ReplState::kNoConnect);
break;
Expand Down
7 changes: 7 additions & 0 deletions src/pika_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ KeyScanInfo Table::GetKeyScanInfo() {
return key_scan_info_;
}

void Table::Compact(const blackwidow::DataType& type) {
slash::RWLock rwl(&partitions_rw_, true);
for (const auto& item : partitions_) {
item.second->Compact(type);
}
}

void Table::DoKeyScan(void *arg) {
BgTaskArg* bg_task_arg = reinterpret_cast<BgTaskArg*>(arg);
bg_task_arg->table->RunKeyScan();
Expand Down

0 comments on commit 4d30ea4

Please sign in to comment.