Skip to content

Commit

Permalink
feat:cache layer for pika (OpenAtomFoundation#2171)
Browse files Browse the repository at this point in the history
* add dependences

* fix: add_ci (OpenAtomFoundation#229)

add_ci

Co-authored-by: wuxianrong <[email protected]>

* formate code style && fix centos build bug

* fix centos build bug

* format code style

* format

* rename classname

* format code

* use cache define

* upgrade rediscache version to v0.1.3

* change file name

* using DEFER

* fix github ci failure

* add_string_cache【暂时不修改格式及代码风格,会在最后一个PR统一做修改】 (OpenAtomFoundation#237)

add_string_cache

* add_slave_cache (OpenAtomFoundation#243)

Co-authored-by: wuxianrong <[email protected]>

* fix:cache string (OpenAtomFoundation#247)

fix_cache_string

* add_bitmap_cache (OpenAtomFoundation#255)

* add_bitmap_cache
Co-authored-by: chejinge <[email protected]>

* add string cache cmd (OpenAtomFoundation#252)

add_string_cache_cmd

Co-authored-by: wuxianrong <[email protected]>

* add list cache cmd (OpenAtomFoundation#250)

add_list_cache_cmd

Co-authored-by: wuxianrong <[email protected]>

* cache set command (OpenAtomFoundation#256)

* set_cache

Co-authored-by: chejinge <[email protected]>

* add hash cache cmd (OpenAtomFoundation#251)

add_hash_cache_cmd

Co-authored-by: wuxianrong <[email protected]>

* cache zset command (OpenAtomFoundation#257)

* cache zset command

Co-authored-by: chejinge <[email protected]>

* Update pika_zset.h

* add Pika Cache Status (OpenAtomFoundation#259)

Add Pika-Cache-Status

Co-authored-by: wuxianrong <[email protected]>

* cache index

* git revert

* 【fix】cache Asynchronous transmission (OpenAtomFoundation#265)

fix cache Asynchronous transmission

Co-authored-by: wuxianrong <[email protected]>

* add defer (OpenAtomFoundation#269)

add_defer

Co-authored-by: wuxianrong <[email protected]>

* cache index (OpenAtomFoundation#260)

* cache index

Co-authored-by: chejinge <[email protected]>
Co-authored-by: wuxianrong <[email protected]>

* Support for multiple databases   (OpenAtomFoundation#275)

* delete_pika_cache_manager

* Support for multiple databases

---------

Co-authored-by: wuxianrong <[email protected]>

* smart pointer and lock granularity && set don't update cache (OpenAtomFoundation#277)

Co-authored-by: chejinge <[email protected]>

* fix_lock (OpenAtomFoundation#278)

Co-authored-by: wuxianrong <[email protected]>

* fix_command_bug (OpenAtomFoundation#282)

* fix_command_bug

Co-authored-by: chejinge <[email protected]>

* Fix del mget (OpenAtomFoundation#291)

 fix_del_command&&HMSET HMGET

---------

Co-authored-by: chejinge <[email protected]>

* bugfix:ttl&&exits command (OpenAtomFoundation#300)

* fix_ttl_exits
---------

Co-authored-by: chejinge <[email protected]>

* fix: hash/list/set/zset test (OpenAtomFoundation#285)

* fix hash/list test

* fix set/zset test

---------

Co-authored-by: wuxianrong <[email protected]>

* fix bitpos strlen msetnx

* fix bitpos strlen msetnx

* Update pika_kv.cc

* Update pika_command.cc

* fix: hash/zset test cache (OpenAtomFoundation#302)

* fix hash/list test

Co-authored-by: wuxianrong <[email protected]>

* fix Master-slave test

* code format (OpenAtomFoundation#316)

Co-authored-by: wuxianrong <[email protected]>

* fix:cache code style

* fix:cache code style

* fix

* Supports multi-DB asynchronous data transfer   (OpenAtomFoundation#317)

* code format

* support dbs

---------

Co-authored-by: wuxianrong <[email protected]>

* fix conflict and change name

* fix conflict and change name

* fix conflict and change name

* Update pika_admin.cc

* Update pika_admin.cc

* Initialization variable   (OpenAtomFoundation#321)

add_comment

Co-authored-by: wuxianrong <[email protected]>

* fix_flushdb

* fix: Cache flushdb (OpenAtomFoundation#322)

fix_flushdb

Co-authored-by: wuxianrong <[email protected]>

* Fix cache mget (OpenAtomFoundation#323)

* fix_mget

* fix_lock_and_multi_caches

* fix_mem_leak

* fix_mem_leak

* fix

* add cacheclean command

* fix coreedump

* Fix:mem leak (OpenAtomFoundation#329)

fix:hdel

Co-authored-by: chejinge <[email protected]>

* Update pika_admin.cc

* Update pika.conf

* Update pika.conf

* fix_license

* remove nullpter

---------

Co-authored-by: chejinge <[email protected]>
Co-authored-by: Mixficsol <[email protected]>
Co-authored-by: wuxianrong <[email protected]>
Co-authored-by: Xin.Zh <[email protected]>
Co-authored-by: alexstocks <[email protected]>
  • Loading branch information
6 people authored Dec 1, 2023
1 parent 6048295 commit 9aa1495
Show file tree
Hide file tree
Showing 60 changed files with 8,061 additions and 586 deletions.
42 changes: 40 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ elseif(${BUILD_TYPE} STREQUAL RELWITHDEBINFO)
else()
set(LIB_BUILD_TYPE RELEASE)
set(CMAKE_CXX_FLAGS_RELEASE "-O2 -g -DNDEBUG")

endif()

if(CMAKE_SYSTEM_NAME MATCHES "Darwin")
Expand Down Expand Up @@ -629,6 +630,35 @@ ExternalProject_Add(rocksdb
make -j${CPU_CORE}
)

ExternalProject_Add(rediscache
URL
https://github.com/pikiwidb/rediscache/archive/refs/tags/v1.0.5.tar.gz
URL_HASH
MD5=99e4d0bde20811a6058a6aa482c18711
DOWNLOAD_NO_PROGRESS
1
UPDATE_COMMAND
""
LOG_BUILD
1
BUILD_IN_SOURCE
1
SOURCE_SUBDIR
""
CMAKE_ARGS
-DCMAKE_INSTALL_PREFIX=${STAGED_INSTALL_PREFIX}
-DCMAKE_INSTALL_INCLUDEDIR=${INSTALL_INCLUDEDIR}
-DCMAKE_INSTALL_LIBDIR=${INSTALL_LIBDIR}
-DCMAKE_BUILD_TYPE=${LIB_BUILD_TYPE}
BUILD_ALWAYS
1
BUILD_COMMAND
make -j${CPU_CORE}
)
set(REDISCACHE_INCLUDE_DIR ${INSTALL_INCLUDEDIR})
set(REDISCACHE_LIBRARY ${INSTALL_LIBDIR}/librediscache.a)


option(USE_PIKA_TOOLS "compile pika-tools" OFF)
if (USE_PIKA_TOOLS)
ExternalProject_Add(hiredis
Expand Down Expand Up @@ -700,6 +730,7 @@ set(ROCKSDB_SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/${EP_BASE_SUFFIX}/Source/rock
add_subdirectory(src/pstd)
add_subdirectory(src/net)
add_subdirectory(src/storage)
add_subdirectory(src/cache)
if (USE_PIKA_TOOLS)
add_subdirectory(tools)
endif()
Expand Down Expand Up @@ -734,7 +765,9 @@ message("pika GIT_DATE = ${PIKA_GIT_DATE}")
message("pika GIT_TAG = ${PIKA_GIT_TAG}")
message("pika BUILD_DATE = ${PIKA_BUILD_DATE}")

set(PIKA_BUILD_VERSION_CC ${CMAKE_BINARY_DIR}/pika_build_version.cc)
set(PIKA_BUILD_VERSION_CC ${CMAKE_BINARY_DIR}/pika_build_version.cc
src/pika_cache_load_thread.cc
)
message("PIKA_BUILD_VERSION_CC : " ${PIKA_BUILD_VERSION_CC})
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/src/build_version.cc.in ${PIKA_BUILD_VERSION_CC} @ONLY)

Expand Down Expand Up @@ -769,7 +802,10 @@ add_dependencies(${PROJECT_NAME}
protobuf
pstd
net
storage)
rediscache
storage
cache
)

target_include_directories(${PROJECT_NAME}
PUBLIC ${CMAKE_CURRENT_BINARY_DIR}
Expand All @@ -778,6 +814,7 @@ target_include_directories(${PROJECT_NAME}
)

target_link_libraries(${PROJECT_NAME}
cache
storage
net
pstd
Expand All @@ -790,6 +827,7 @@ target_link_libraries(${PROJECT_NAME}
libzstd.a
liblz4.a
libz.a
librediscache.a
${LIBUNWIND_LIBRARY}
${JEMALLOC_LIBRARY})

Expand Down
45 changes: 45 additions & 0 deletions conf/pika.conf
Original file line number Diff line number Diff line change
Expand Up @@ -422,3 +422,48 @@ max-rsync-parallel-num : 4
# The synchronization mode of Pika primary/secondary replication is determined by ReplicationID. ReplicationID in one replication_cluster are the same
# replication-id :

###################
## Cache Settings
###################
# the number of caches for every db
cache-num : 16

# cache-model 0:cache_none 1:cache_read
cache-model : 1
# cache-type: string, set, zset, list, hash, bit
cache-type: string, set, zset, list, hash

# Maximum number of keys in the zset redis cache
# On the disk DB, a zset field may have many fields. In the memory cache, we limit the maximum
# number of keys that can exist in a zset, which is zset-zset-cache-field-num-per-key, with a
# default value of 512.
zset-cache-field-num-per-key : 512

# If the number of elements in a zset in the DB exceeds zset-cache-field-num-per-key,
# we determine whether to cache the first 512[zset-cache-field-num-per-key] elements
# or the last 512[zset-cache-field-num-per-key] elements in the zset based on zset-cache-start-direction.
#
# If zset-cache-start-direction is 0, cache the first 512[zset-cache-field-num-per-key] elements from the header
# If zset-cache-start-direction is -1, cache the last 512[zset-cache-field-num-per-key] elements
zset-cache-start-direction : 0

# the cache maxmemory of every db, configuration 10G
cache-maxmemory : 10737418240

# cache-maxmemory-policy
# 0: volatile-lru -> Evict using approximated LRU among the keys with an expire set.
# 1: allkeys-lru -> Evict any key using approximated LRU.
# 2: volatile-lfu -> Evict using approximated LFU among the keys with an expire set.
# 3: allkeys-lfu -> Evict any key using approximated LFU.
# 4: volatile-random -> Remove a random key among the ones with an expire set.
# 5: allkeys-random -> Remove a random key, any key.
# 6: volatile-ttl -> Remove the key with the nearest expire time (minor TTL)
# 7: noeviction -> Don't evict anything, just return an error on write operations.
cache-maxmemory-policy : 1

# cache-maxmemory-samples
cache-maxmemory-samples: 5

# cache-lfu-decay-time
cache-lfu-decay-time: 1

52 changes: 46 additions & 6 deletions include/pika_admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,10 @@ class FlushallCmd : public Cmd {
public:
FlushallCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Merge() override {};
void DoThroughDB(std::shared_ptr<Slot> slot = nullptr) override;
void DoUpdateCache(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new FlushallCmd(*this); }
void Execute() override;
void FlushAllWithoutLock();
Expand All @@ -195,8 +197,10 @@ class FlushdbCmd : public Cmd {
public:
FlushdbCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Merge() override {};
void DoThroughDB(std::shared_ptr<Slot> slot = nullptr) override;
void DoUpdateCache(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new FlushdbCmd(*this); }
void FlushAllSlotsWithoutLock(std::shared_ptr<DB> db);
void Execute() override;
Expand Down Expand Up @@ -241,7 +245,8 @@ class InfoCmd : public Cmd {
kInfo,
kInfoAll,
kInfoDebug,
kInfoCommandStats
kInfoCommandStats,
kInfoCache
};

InfoCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
Expand Down Expand Up @@ -270,6 +275,7 @@ class InfoCmd : public Cmd {
const static std::string kRocksDBSection;
const static std::string kDebugSection;
const static std::string kCommandStatsSection;
const static std::string kCacheSection;

void DoInitial() override;
void Clear() override {
Expand All @@ -290,6 +296,9 @@ class InfoCmd : public Cmd {
void InfoRocksDB(std::string& info);
void InfoDebug(std::string& info);
void InfoCommandStats(std::string& info);
void InfoCache(std::string& info, std::shared_ptr<Slot> slot);

std::string CacheStatusToString(int status);
};

class ShutdownCmd : public Cmd {
Expand Down Expand Up @@ -317,7 +326,7 @@ class ConfigCmd : public Cmd {
std::vector<std::string> config_args_v_;
void DoInitial() override;
void ConfigGet(std::string& ret);
void ConfigSet(std::string& ret);
void ConfigSet(std::string& ret, std::shared_ptr<Slot> slot);
void ConfigRewrite(std::string& ret);
void ConfigResetstat(std::string& ret);
void ConfigRewriteReplicationID(std::string& ret);
Expand Down Expand Up @@ -519,6 +528,37 @@ class DisableWalCmd : public Cmd {
void DoInitial() override;
};

class CacheCmd : public Cmd {
public:
enum CacheCondition {kCLEAR_DB, kCLEAR_HITRATIO, kDEL_KEYS, kRANDOM_KEY};
CacheCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new CacheCmd(*this); }

private:
CacheCondition condition_;
std::vector<std::string> keys_;
rocksdb::Status s_;
void DoInitial() override;
void Clear() override {
keys_.clear();
}
};

class ClearCacheCmd : public Cmd {
public:
ClearCacheCmd(const std::string& name, int arity, uint16_t flag) : Cmd(name, arity, flag) {}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new ClearCacheCmd(*this); }

private:
void DoInitial() override;
};

#ifdef WITH_COMMAND_DOCS
class CommandCmd : public Cmd {
public:
Expand Down
22 changes: 20 additions & 2 deletions include/pika_bit.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ class BitGetCmd : public Cmd {
return res;
}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void ReadCache(std::shared_ptr<Slot> slot = nullptr) override;
void DoUpdateCache(std::shared_ptr<Slot> slot = nullptr) override;
void DoThroughDB(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new BitGetCmd(*this); }

private:
std::string key_;
int64_t bit_offset_ = -1;
rocksdb::Status s_;
void Clear() override {
key_ = "";
bit_offset_ = -1;
Expand All @@ -47,6 +51,8 @@ class BitSetCmd : public Cmd {
return res;
}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void DoUpdateCache(std::shared_ptr<Slot> slot = nullptr) override;
void DoThroughDB(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new BitSetCmd(*this); }
Expand All @@ -55,6 +61,7 @@ class BitSetCmd : public Cmd {
std::string key_;
int64_t bit_offset_;
int64_t on_;
rocksdb::Status s_;
void Clear() override {
key_ = "";
bit_offset_ = -1;
Expand All @@ -72,6 +79,9 @@ class BitCountCmd : public Cmd {
return res;
}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void ReadCache(std::shared_ptr<Slot> slot = nullptr) override;
void DoUpdateCache(std::shared_ptr<Slot> slot = nullptr) override;
void DoThroughDB(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new BitCountCmd(*this); }
Expand All @@ -81,6 +91,7 @@ class BitCountCmd : public Cmd {
bool count_all_;
int64_t start_offset_;
int64_t end_offset_;
rocksdb::Status s_;
void Clear() override {
key_ = "";
count_all_ = false;
Expand All @@ -99,6 +110,9 @@ class BitPosCmd : public Cmd {
return res;
}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void ReadCache(std::shared_ptr<Slot> slot = nullptr) override;
void DoUpdateCache(std::shared_ptr<Slot> slot = nullptr) override;
void DoThroughDB(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Merge() override {};
Cmd* Clone() override { return new BitPosCmd(*this); }
Expand All @@ -110,6 +124,7 @@ class BitPosCmd : public Cmd {
int64_t bit_val_;
int64_t start_offset_;
int64_t end_offset_;
rocksdb::Status s_;
void Clear() override {
key_ = "";
pos_all_ = false;
Expand Down Expand Up @@ -139,13 +154,16 @@ class BitOpCmd : public Cmd {
return {dest_key_};
}
void Do(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override {};
void Merge() override {};
void DoUpdateCache(std::shared_ptr<Slot> slot = nullptr) override;
void DoThroughDB(std::shared_ptr<Slot> slot = nullptr) override;
void Split(std::shared_ptr<Slot> slot, const HintKeys& hint_keys) override{};
void Merge() override{};
Cmd* Clone() override { return new BitOpCmd(*this); }
void DoBinlog(const std::shared_ptr<SyncMasterSlot>& slot) override;

private:
std::string dest_key_;
rocksdb::Status s_;
std::vector<std::string> src_keys_;
storage::BitOpType op_;
void Clear() override {
Expand Down
Loading

0 comments on commit 9aa1495

Please sign in to comment.