From 19281767f022fcd208b70d895a096d400a005e62 Mon Sep 17 00:00:00 2001 From: vinchen Date: Tue, 23 Feb 2021 14:39:58 +0800 Subject: [PATCH] 1. add binlog_lag for info replication 2. add scanpiont for info indexmanager 3. add rocksdb_bg_error_count for info rocksdbbgerror --- src/tendisplus/commands/debug.cpp | 9 +++- src/tendisplus/replication/repl_manager.cpp | 2 + src/tendisplus/server/index_manager.cpp | 50 +++++++++++++++------ src/tendisplus/server/index_manager.h | 2 + 4 files changed, 48 insertions(+), 15 deletions(-) diff --git a/src/tendisplus/commands/debug.cpp b/src/tendisplus/commands/debug.cpp index 6d1e0261..e1c4f825 100644 --- a/src/tendisplus/commands/debug.cpp +++ b/src/tendisplus/commands/debug.cpp @@ -2415,7 +2415,8 @@ class InfoCommand : public Command { if (allsections || defsections || section == "rocksdbbgerror") { auto server = sess->getServerEntry(); - result << "# RocksdbBgError\r\n"; + uint64_t bgerrcnt = 0; + std::stringstream ss; for (uint64_t i = 0; i < server->getKVStoreCount(); ++i) { auto expdb = server->getSegmentMgr()->getDb( sess, i, mgl::LockMode::LOCK_IS, false, 0); @@ -2426,9 +2427,13 @@ class InfoCommand : public Command { auto store = expdb.value().store; auto ret = store->getBgError(); if (ret != "") { - result << "rocksdb" << store->dbId() << ":" << ret << "\r\n"; + ss << "rocksdb" << store->dbId() << ":" << ret << "\r\n"; + bgerrcnt++; } } + result << "# RocksdbBgError\r\n"; + result << "rocksdb_bg_error_count:" << bgerrcnt << "\r\n"; + result << ss.str(); result << "\r\n"; } } diff --git a/src/tendisplus/replication/repl_manager.cpp b/src/tendisplus/replication/repl_manager.cpp index b62ea429..29d4f42f 100644 --- a/src/tendisplus/replication/repl_manager.cpp +++ b/src/tendisplus/replication/repl_manager.cpp @@ -1417,6 +1417,8 @@ void ReplManager::getReplInfoSimple(std::stringstream& ss) const { << ",state=" << iter.second.state << ",offset=" << iter.second.binlogpos << ",lag=" << (msSinceEpoch() - iter.second.lastBinlogTs) / 1000 + << ",binlog_lag=" + << (int64_t)master_repl_offset - (int64_t)iter.second.binlogpos << "\r\n"; } } diff --git a/src/tendisplus/server/index_manager.cpp b/src/tendisplus/server/index_manager.cpp index 3145f571..209a4f68 100644 --- a/src/tendisplus/server/index_manager.cpp +++ b/src/tendisplus/server/index_manager.cpp @@ -31,8 +31,10 @@ IndexManager::IndexManager(std::shared_ptr svr, _deleterMatrix(std::make_shared()), _totalDequeue(0), _totalEnqueue(0) { + _scanPonitsTtl.resize(svr->getKVStoreCount()); for (size_t storeId = 0; storeId < svr->getKVStoreCount(); ++storeId) { _scanPoints[storeId] = std::move(std::string()); + _scanPonitsTtl[storeId] = -1; _scanJobStatus[storeId] = {false}; _delJobStatus[storeId] = {false}; _disableStatus[storeId] = {false}; @@ -108,26 +110,38 @@ std::string IndexManager::getInfoString() { ss << "deleting_expire_keys:" << _totalEnqueue - _totalDequeue << "\r\n"; ss << "scanner_matrix:" << _scannerMatrix->getInfoString() << "\r\n"; ss << "deleter_matrix:" << _deleterMatrix->getInfoString() << "\r\n"; + uint64_t minttl = -1; + + auto ttlStr = [](uint64_t ttl){ + if (ttl == (uint64_t)- 1) { + return std::to_string(-1); + } else { + return msEpochToDatetime(ttl); + } + }; + + for (uint32_t i = 0; i < _svr->getKVStoreCount(); i++) { - if (_expiredKeys.find(i) != _expiredKeys.end() && - _expiredKeys[i].size() > 0) { - auto index = _expiredKeys[i].front(); - ss << "scanpoint_" << i << ":" << msEpochToDatetime(index.getTTL()) - << "\r\n"; + ss << "scanpoint_" << i << ":" << ttlStr(_scanPonitsTtl[i]) + << "\r\n"; + if (_scanPonitsTtl[i] < minttl) { + minttl = _scanPonitsTtl[i]; } } + ss << "scanpoint" << ":" << ttlStr(minttl); return ss.str(); } Status IndexManager::scanExpiredKeysJob(uint32_t storeId) { bool expected = false; - if (!_scanJobStatus[storeId].compare_exchange_strong( - expected, true, std::memory_order_acq_rel)) { + + if (_disableStatus[storeId].load(std::memory_order_relaxed)) { return {ErrorCodes::ERR_OK, ""}; } - if (_disableStatus[storeId].load(std::memory_order_relaxed)) { + if (!_scanJobStatus[storeId].compare_exchange_strong( + expected, true, std::memory_order_acq_rel)) { return {ErrorCodes::ERR_OK, ""}; } @@ -136,12 +150,21 @@ Status IndexManager::scanExpiredKeysJob(uint32_t storeId) { _scanJobStatus[storeId].store(false, std::memory_order_release); }); + _scanJobCnt[storeId]++; + + auto scanBatch = _cfg->scanCntIndexMgr; bool clusterEnabled = _svr->getParams()->clusterEnabled; if (clusterEnabled && _svr->getMigrateManager()->existMigrateTask()) { return {ErrorCodes::ERR_OK, ""}; } - _scanJobCnt[storeId]++; + { + std::lock_guard lk(_mutex); + if (_expiredKeys[storeId].size() >= scanBatch) { + return {ErrorCodes::ERR_OK, ""}; + } + } + LocalSessionGuard sg(_svr.get()); auto expd = _svr->getSegmentMgr()->getDb( sg.getSession(), storeId, mgl::LockMode::LOCK_IS, true); @@ -188,7 +211,6 @@ Status IndexManager::scanExpiredKeysJob(uint32_t storeId) { // defalut colum_family } - auto scanBatch = _cfg->scanCntIndexMgr; // TODO(takenliu) _scanPoints has error, _expiredKeys[storeId] will be // pushed back twice while (true) { @@ -211,6 +233,7 @@ Status IndexManager::scanExpiredKeysJob(uint32_t storeId) { std::lock_guard lk(_mutex); _scanPoints[storeId].assign(record.value().encode()); _expiredKeys[storeId].push_back(std::move(record.value())); + _scanPonitsTtl[storeId] = record.value().getTTL(); _totalEnqueue++; if (_expiredKeys[storeId].size() >= scanBatch) { break; @@ -239,12 +262,13 @@ Status IndexManager::stopStore(uint32_t storeId) { int IndexManager::tryDelExpiredKeysJob(uint32_t storeId) { bool expect = false; - if (!_delJobStatus[storeId].compare_exchange_strong( - expect, true, std::memory_order_acq_rel)) { + + if (_disableStatus[storeId].load(std::memory_order_relaxed)) { return 0; } - if (_disableStatus[storeId].load(std::memory_order_relaxed)) { + if (!_delJobStatus[storeId].compare_exchange_strong( + expect, true, std::memory_order_acq_rel)) { return 0; } diff --git a/src/tendisplus/server/index_manager.h b/src/tendisplus/server/index_manager.h index d81543ec..4edc579e 100644 --- a/src/tendisplus/server/index_manager.h +++ b/src/tendisplus/server/index_manager.h @@ -10,6 +10,7 @@ #include #include #include +#include #include "tendisplus/server/server_entry.h" #include "tendisplus/network/worker_pool.h" @@ -40,6 +41,7 @@ class IndexManager { std::unique_ptr _keyDeleter; std::unordered_map> _expiredKeys; std::unordered_map _scanPoints; + std::vector _scanPonitsTtl; JobStatus _scanJobStatus; JobStatus _delJobStatus; // when destroystore, _disableStatus[storeId] = true