Skip to content

Commit

Permalink
1. add binlog_lag for info replication
Browse files Browse the repository at this point in the history
2. add scanpiont for info indexmanager
3. add rocksdb_bg_error_count for info rocksdbbgerror
  • Loading branch information
TendisDev committed Feb 23, 2021
1 parent 017561f commit 1928176
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 15 deletions.
9 changes: 7 additions & 2 deletions src/tendisplus/commands/debug.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2415,7 +2415,8 @@ class InfoCommand : public Command {
if (allsections || defsections || section == "rocksdbbgerror") {
auto server = sess->getServerEntry();

result << "# RocksdbBgError\r\n";
uint64_t bgerrcnt = 0;
std::stringstream ss;
for (uint64_t i = 0; i < server->getKVStoreCount(); ++i) {
auto expdb = server->getSegmentMgr()->getDb(
sess, i, mgl::LockMode::LOCK_IS, false, 0);
Expand All @@ -2426,9 +2427,13 @@ class InfoCommand : public Command {
auto store = expdb.value().store;
auto ret = store->getBgError();
if (ret != "") {
result << "rocksdb" << store->dbId() << ":" << ret << "\r\n";
ss << "rocksdb" << store->dbId() << ":" << ret << "\r\n";
bgerrcnt++;
}
}
result << "# RocksdbBgError\r\n";
result << "rocksdb_bg_error_count:" << bgerrcnt << "\r\n";
result << ss.str();
result << "\r\n";
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/tendisplus/replication/repl_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1417,6 +1417,8 @@ void ReplManager::getReplInfoSimple(std::stringstream& ss) const {
<< ",state=" << iter.second.state
<< ",offset=" << iter.second.binlogpos
<< ",lag=" << (msSinceEpoch() - iter.second.lastBinlogTs) / 1000
<< ",binlog_lag="
<< (int64_t)master_repl_offset - (int64_t)iter.second.binlogpos
<< "\r\n";
}
}
Expand Down
50 changes: 37 additions & 13 deletions src/tendisplus/server/index_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ IndexManager::IndexManager(std::shared_ptr<ServerEntry> svr,
_deleterMatrix(std::make_shared<PoolMatrix>()),
_totalDequeue(0),
_totalEnqueue(0) {
_scanPonitsTtl.resize(svr->getKVStoreCount());
for (size_t storeId = 0; storeId < svr->getKVStoreCount(); ++storeId) {
_scanPoints[storeId] = std::move(std::string());
_scanPonitsTtl[storeId] = -1;
_scanJobStatus[storeId] = {false};
_delJobStatus[storeId] = {false};
_disableStatus[storeId] = {false};
Expand Down Expand Up @@ -108,26 +110,38 @@ std::string IndexManager::getInfoString() {
ss << "deleting_expire_keys:" << _totalEnqueue - _totalDequeue << "\r\n";
ss << "scanner_matrix:" << _scannerMatrix->getInfoString() << "\r\n";
ss << "deleter_matrix:" << _deleterMatrix->getInfoString() << "\r\n";
uint64_t minttl = -1;

auto ttlStr = [](uint64_t ttl){
if (ttl == (uint64_t)- 1) {
return std::to_string(-1);
} else {
return msEpochToDatetime(ttl);
}
};


for (uint32_t i = 0; i < _svr->getKVStoreCount(); i++) {
if (_expiredKeys.find(i) != _expiredKeys.end() &&
_expiredKeys[i].size() > 0) {
auto index = _expiredKeys[i].front();
ss << "scanpoint_" << i << ":" << msEpochToDatetime(index.getTTL())
<< "\r\n";
ss << "scanpoint_" << i << ":" << ttlStr(_scanPonitsTtl[i])
<< "\r\n";
if (_scanPonitsTtl[i] < minttl) {
minttl = _scanPonitsTtl[i];
}
}
ss << "scanpoint" << ":" << ttlStr(minttl);

return ss.str();
}

Status IndexManager::scanExpiredKeysJob(uint32_t storeId) {
bool expected = false;
if (!_scanJobStatus[storeId].compare_exchange_strong(
expected, true, std::memory_order_acq_rel)) {

if (_disableStatus[storeId].load(std::memory_order_relaxed)) {
return {ErrorCodes::ERR_OK, ""};
}

if (_disableStatus[storeId].load(std::memory_order_relaxed)) {
if (!_scanJobStatus[storeId].compare_exchange_strong(
expected, true, std::memory_order_acq_rel)) {
return {ErrorCodes::ERR_OK, ""};
}

Expand All @@ -136,12 +150,21 @@ Status IndexManager::scanExpiredKeysJob(uint32_t storeId) {
_scanJobStatus[storeId].store(false, std::memory_order_release);
});

_scanJobCnt[storeId]++;

auto scanBatch = _cfg->scanCntIndexMgr;
bool clusterEnabled = _svr->getParams()->clusterEnabled;
if (clusterEnabled && _svr->getMigrateManager()->existMigrateTask()) {
return {ErrorCodes::ERR_OK, ""};
}

_scanJobCnt[storeId]++;
{
std::lock_guard<std::mutex> lk(_mutex);
if (_expiredKeys[storeId].size() >= scanBatch) {
return {ErrorCodes::ERR_OK, ""};
}
}

LocalSessionGuard sg(_svr.get());
auto expd = _svr->getSegmentMgr()->getDb(
sg.getSession(), storeId, mgl::LockMode::LOCK_IS, true);
Expand Down Expand Up @@ -188,7 +211,6 @@ Status IndexManager::scanExpiredKeysJob(uint32_t storeId) {
// defalut colum_family
}

auto scanBatch = _cfg->scanCntIndexMgr;
// TODO(takenliu) _scanPoints has error, _expiredKeys[storeId] will be
// pushed back twice
while (true) {
Expand All @@ -211,6 +233,7 @@ Status IndexManager::scanExpiredKeysJob(uint32_t storeId) {
std::lock_guard<std::mutex> lk(_mutex);
_scanPoints[storeId].assign(record.value().encode());
_expiredKeys[storeId].push_back(std::move(record.value()));
_scanPonitsTtl[storeId] = record.value().getTTL();
_totalEnqueue++;
if (_expiredKeys[storeId].size() >= scanBatch) {
break;
Expand Down Expand Up @@ -239,12 +262,13 @@ Status IndexManager::stopStore(uint32_t storeId) {

int IndexManager::tryDelExpiredKeysJob(uint32_t storeId) {
bool expect = false;
if (!_delJobStatus[storeId].compare_exchange_strong(
expect, true, std::memory_order_acq_rel)) {

if (_disableStatus[storeId].load(std::memory_order_relaxed)) {
return 0;
}

if (_disableStatus[storeId].load(std::memory_order_relaxed)) {
if (!_delJobStatus[storeId].compare_exchange_strong(
expect, true, std::memory_order_acq_rel)) {
return 0;
}

Expand Down
2 changes: 2 additions & 0 deletions src/tendisplus/server/index_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <list>
#include <string>
#include <memory>
#include <vector>
#include "tendisplus/server/server_entry.h"
#include "tendisplus/network/worker_pool.h"

Expand Down Expand Up @@ -40,6 +41,7 @@ class IndexManager {
std::unique_ptr<WorkerPool> _keyDeleter;
std::unordered_map<std::size_t, std::list<TTLIndex>> _expiredKeys;
std::unordered_map<std::size_t, std::string> _scanPoints;
std::vector<uint64_t> _scanPonitsTtl;
JobStatus _scanJobStatus;
JobStatus _delJobStatus;
// when destroystore, _disableStatus[storeId] = true
Expand Down

0 comments on commit 1928176

Please sign in to comment.