Skip to content

Commit

Permalink
fix: PkPatternMatchDel inconsistent between rediscache and db (OpenAt…
Browse files Browse the repository at this point in the history
…omFoundation#2777)

* update PkPatternMatchDel with rediscache

* fix bug

* fix bug

* feat:add count argument with pkpatternmatchdel

* fix bug

* remove unused function

* fix bug
  • Loading branch information
haiyang426 authored Jul 29, 2024
1 parent 394f517 commit 66af31e
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 71 deletions.
1 change: 0 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,6 @@ ExternalProject_Add(rediscache
set(REDISCACHE_INCLUDE_DIR ${INSTALL_INCLUDEDIR})
set(REDISCACHE_LIBRARY ${INSTALL_LIBDIR}/librediscache.a)


option(USE_PIKA_TOOLS "compile pika-tools" OFF)
if (USE_PIKA_TOOLS)
ExternalProject_Add(hiredis
Expand Down
7 changes: 6 additions & 1 deletion include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -475,13 +475,18 @@ class PKPatternMatchDelCmd : public Cmd {
PKPatternMatchDelCmd(const std::string& name, int arity, uint32_t flag)
: Cmd(name, arity, flag, static_cast<uint32_t>(AclCategory::ADMIN)) {}
void Do() override;
void DoThroughDB() override;
void DoUpdateCache() override;
void Split(const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new PKPatternMatchDelCmd(*this); }
void DoBinlog() override;

private:
storage::DataType type_ = storage::DataType::kAll;
storage::DataType type_;
std::vector<std::string> remove_keys_;
std::string pattern_;
int64_t max_count_;
void DoInitial() override;
};

Expand Down
47 changes: 42 additions & 5 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3181,16 +3181,53 @@ void PKPatternMatchDelCmd::DoInitial() {
return;
}
pattern_ = argv_[1];
max_count_ = storage::BATCH_DELETE_LIMIT;
if (argv_.size() > 2) {
if (pstd::string2int(argv_[2].data(), argv_[2].size(), &max_count_) == 0 || max_count_ < 1 || max_count_ > storage::BATCH_DELETE_LIMIT) {
res_.SetRes(CmdRes::kInvalidInt);
return;
}
}
}

//TODO: may lead to inconsistent between rediscache and db, because currently it only cleans db
void PKPatternMatchDelCmd::Do() {
int ret = 0;
rocksdb::Status s = db_->storage()->PKPatternMatchDel(type_, pattern_, &ret);
if (s.ok()) {
res_.AppendInteger(ret);
int64_t count = 0;
rocksdb::Status s = db_->storage()->PKPatternMatchDelWithRemoveKeys(pattern_, &count, &remove_keys_, max_count_);

if(s.ok()) {
res_.AppendInteger(count);
s_ = rocksdb::Status::OK();
for (const auto& key : remove_keys_) {
RemSlotKey(key, db_);
}
} else {
res_.SetRes(CmdRes::kErrOther, s.ToString());
if (count >= 0) {
s_ = rocksdb::Status::OK();
for (const auto& key : remove_keys_) {
RemSlotKey(key, db_);
}
}
}
}

void PKPatternMatchDelCmd::DoThroughDB() {
Do();
}

void PKPatternMatchDelCmd::DoUpdateCache() {
if(s_.ok()) {
db_->cache()->Del(remove_keys_);
}
}

void PKPatternMatchDelCmd::DoBinlog() {
std::string opt = "del";
for(auto& key: remove_keys_) {
argv_.clear();
argv_.emplace_back(opt);
argv_.emplace_back(key);
Cmd::DoBinlog();
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/pika_command.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ void InitCmdTable(CmdTable* cmd_table) {
cmd_table->insert(std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePadding, std::move(paddingptr)));

std::unique_ptr<Cmd> pkpatternmatchdelptr =
std::make_unique<PKPatternMatchDelCmd>(kCmdNamePKPatternMatchDel, 2, kCmdFlagsWrite | kCmdFlagsAdmin);
std::make_unique<PKPatternMatchDelCmd>(kCmdNamePKPatternMatchDel, -2, kCmdFlagsWrite | kCmdFlagsAdmin);
cmd_table->insert(
std::pair<std::string, std::unique_ptr<Cmd>>(kCmdNamePKPatternMatchDel, std::move(pkpatternmatchdelptr)));
std::unique_ptr<Cmd> dummyptr = std::make_unique<DummyCmd>(kCmdDummy, 0, kCmdFlagsWrite);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/include/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,7 @@ class Storage {

// Traverses the database of the specified type, removing the Key that matches
// the pattern
Status PKPatternMatchDel(const DataType& data_type, const std::string& pattern, int32_t* ret);
Status PKPatternMatchDelWithRemoveKeys(const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count);

// Iterate over a collection of elements
// return next_key that the user need to use as the start_key argument
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/redis.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ class Redis {
Status Expireat(const Slice& key, int64_t timestamp);
Status Persist(const Slice& key);
Status TTL(const Slice& key, int64_t* timestamp);
Status PKPatternMatchDel(const std::string& pattern, int32_t* ret);
Status PKPatternMatchDelWithRemoveKeys(const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count);

Status GetType(const Slice& key, enum DataType& type);
Status IsExist(const Slice& key);
Expand Down
29 changes: 12 additions & 17 deletions src/storage/src/redis_strings.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1689,7 +1689,7 @@ rocksdb::Status Redis::IsExist(const storage::Slice& key) {
/*
* Example Delete the specified prefix key
*/
rocksdb::Status Redis::PKPatternMatchDel(const std::string& pattern, int32_t* ret) {
rocksdb::Status Redis::PKPatternMatchDelWithRemoveKeys(const std::string& pattern, int64_t* ret, std::vector<std::string>* remove_keys, const int64_t& max_count) {
rocksdb::ReadOptions iterator_options;
const rocksdb::Snapshot* snapshot;
ScopeSnapshot ss(db_, &snapshot);
Expand All @@ -1698,12 +1698,12 @@ rocksdb::Status Redis::PKPatternMatchDel(const std::string& pattern, int32_t* re

std::string key;
std::string meta_value;
int32_t total_delete = 0;
int64_t total_delete = 0;
rocksdb::Status s;
rocksdb::WriteBatch batch;
rocksdb::Iterator* iter = db_->NewIterator(iterator_options, handles_[kMetaCF]);
iter->SeekToFirst();
while (iter->Valid()) {
while (iter->Valid() && static_cast<int64_t>(batch.Count()) < max_count) {
auto meta_type = static_cast<enum DataType>(static_cast<uint8_t>(iter->value()[0]));
ParsedBaseMetaKey parsed_meta_key(iter->key().ToString());
key = iter->key().ToString();
Expand All @@ -1714,6 +1714,7 @@ rocksdb::Status Redis::PKPatternMatchDel(const std::string& pattern, int32_t* re
if (!parsed_strings_value.IsStale() &&
(StringMatch(pattern.data(), pattern.size(), parsed_meta_key.Key().data(), parsed_meta_key.Key().size(), 0) != 0)) {
batch.Delete(key);
remove_keys->push_back(parsed_meta_key.Key().data());
}
} else if (meta_type == DataType::kLists) {
ParsedListsMetaValue parsed_lists_meta_value(&meta_value);
Expand All @@ -1722,6 +1723,7 @@ rocksdb::Status Redis::PKPatternMatchDel(const std::string& pattern, int32_t* re
0)) {
parsed_lists_meta_value.InitialMetaValue();
batch.Put(handles_[kMetaCF], iter->key(), meta_value);
remove_keys->push_back(parsed_meta_key.Key().data());
}
} else if (meta_type == DataType::kStreams) {
StreamMetaValue stream_meta_value;
Expand All @@ -1730,6 +1732,7 @@ rocksdb::Status Redis::PKPatternMatchDel(const std::string& pattern, int32_t* re
(StringMatch(pattern.data(), pattern.size(), parsed_meta_key.Key().data(), parsed_meta_key.Key().size(), 0) != 0)) {
stream_meta_value.InitMetaValue();
batch.Put(handles_[kMetaCF], key, stream_meta_value.value());
remove_keys->push_back(parsed_meta_key.Key().data());
}
} else {
ParsedBaseMetaValue parsed_meta_value(&meta_value);
Expand All @@ -1738,32 +1741,24 @@ rocksdb::Status Redis::PKPatternMatchDel(const std::string& pattern, int32_t* re
0)) {
parsed_meta_value.InitialMetaValue();
batch.Put(handles_[kMetaCF], iter->key(), meta_value);
}
}

if (static_cast<size_t>(batch.Count()) >= BATCH_DELETE_LIMIT) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>(batch.Count());
batch.Clear();
} else {
*ret = total_delete;
delete iter;
return s;
remove_keys->push_back(parsed_meta_key.Key().data());
}
}
iter->Next();
}
if (batch.Count() != 0U) {
s = db_->Write(default_write_options_, &batch);
if (s.ok()) {
total_delete += static_cast<int32_t>(batch.Count());
total_delete += static_cast<int64_t>(batch.Count());
batch.Clear();
} else {
remove_keys->erase(remove_keys->end() - batch.Count(), remove_keys->end());
}
}

delete iter;
*ret = total_delete;
delete iter;
return s;
}

} // namespace storage
12 changes: 8 additions & 4 deletions src/storage/src/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1399,16 +1399,20 @@ Status Storage::PKRScanRange(const DataType& data_type, const Slice& key_start,
return Status::OK();
}

Status Storage::PKPatternMatchDel(const DataType& data_type, const std::string& pattern, int32_t* ret) {
Status Storage::PKPatternMatchDelWithRemoveKeys(const std::string& pattern, int64_t* ret,
std::vector<std::string>* remove_keys, const int64_t& max_count) {
Status s;
*ret = 0;
for (const auto& inst : insts_) {
int32_t tmp_ret = 0;
s = inst->PKPatternMatchDel(pattern, &tmp_ret);
int64_t tmp_ret = 0;
s = inst->PKPatternMatchDelWithRemoveKeys(pattern, &tmp_ret, remove_keys, max_count - *ret);
if (!s.ok()) {
return s;
}
}
*ret += tmp_ret;
if (*ret == max_count) {
return s;
}
}
return s;
}
Expand Down
Loading

0 comments on commit 66af31e

Please sign in to comment.