Skip to content

Commit

Permalink
fix db_test error with scribe logger turned on
Browse files Browse the repository at this point in the history
Summary: as subject

Test Plan: db_test

Reviewers: dhruba

Reviewed By: dhruba

Differential Revision: https://reviews.facebook.net/D4929
  • Loading branch information
heyongqiang committed Aug 28, 2012
1 parent fc20273 commit d3759ca
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 38 deletions.
5 changes: 4 additions & 1 deletion db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
log_(NULL),
tmp_batch_(new WriteBatch),
bg_compaction_scheduled_(false),
bg_logstats_scheduled_(false),
manual_compaction_(NULL),
logger_(NULL) {
mem_->Ref();
Expand Down Expand Up @@ -168,7 +169,7 @@ DBImpl::~DBImpl() {
// Wait for background work to finish
mutex_.Lock();
shutting_down_.Release_Store(this); // Any non-NULL value is ok
while (bg_compaction_scheduled_) {
while (bg_compaction_scheduled_ || bg_logstats_scheduled_) {
bg_cv_.Wait();
}
mutex_.Unlock();
Expand All @@ -192,6 +193,8 @@ DBImpl::~DBImpl() {
if (owns_cache_) {
delete options_.block_cache;
}

delete logger_;
}

Status DBImpl::NewDB() {
Expand Down
6 changes: 5 additions & 1 deletion db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ class DBImpl : public DB {
Status WaitForCompactMemTable();

void MaybeScheduleLogDBDeployStats();
static void LogDBDeployStats(void* db);
static void BGLogDBDeployStats(void* db);
void LogDBDeployStats();

void MaybeScheduleCompaction();
static void BGWork(void* db);
Expand Down Expand Up @@ -169,6 +170,9 @@ class DBImpl : public DB {
// Has a background compaction been scheduled or is running?
bool bg_compaction_scheduled_;

// Has a background stats log thread scheduled?
bool bg_logstats_scheduled_;

// Information for a manual compaction
struct ManualCompaction {
int level;
Expand Down
43 changes: 31 additions & 12 deletions db/db_stats_logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include "db/version_set.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "port/port.h"
#include "util/mutexlock.h"

namespace leveldb {

Expand All @@ -18,7 +20,8 @@ void DBImpl::MaybeScheduleLogDBDeployStats() {
|| host_name_.empty()) {
return;
}
if (shutting_down_.Acquire_Load()) {

if(bg_logstats_scheduled_ || shutting_down_.Acquire_Load()) {
// Already scheduled
} else {
int64_t current_ts = 0;
Expand All @@ -30,43 +33,59 @@ void DBImpl::MaybeScheduleLogDBDeployStats() {
return;
}
last_log_ts = current_ts;
env_->Schedule(&DBImpl::LogDBDeployStats, this);
bg_logstats_scheduled_ = true;
env_->Schedule(&DBImpl::BGLogDBDeployStats, this);
}
}

void DBImpl::LogDBDeployStats(void* db) {
void DBImpl::BGLogDBDeployStats(void* db) {
DBImpl* db_inst = reinterpret_cast<DBImpl*>(db);
db_inst->LogDBDeployStats();
}

void DBImpl::LogDBDeployStats() {
mutex_.Lock();

if (db_inst->shutting_down_.Acquire_Load()) {
if (shutting_down_.Acquire_Load()) {
bg_logstats_scheduled_ = false;
bg_cv_.SignalAll();
mutex_.Unlock();
return;
}

mutex_.Unlock();

std::string version_info;
version_info += boost::lexical_cast<std::string>(kMajorVersion);
version_info += ".";
version_info += boost::lexical_cast<std::string>(kMinorVersion);
std::string data_dir;
db_inst->env_->GetAbsolutePath(db_inst->dbname_, &data_dir);
env_->GetAbsolutePath(dbname_, &data_dir);

uint64_t file_total_size = 0;
uint32_t file_total_num = 0;
for (int i = 0; i < db_inst->versions_->NumberLevels(); i++) {
file_total_num += db_inst->versions_->NumLevelFiles(i);
file_total_size += db_inst->versions_->NumLevelBytes(i);
for (int i = 0; i < versions_->NumberLevels(); i++) {
file_total_num += versions_->NumLevelFiles(i);
file_total_size += versions_->NumLevelBytes(i);
}

VersionSet::LevelSummaryStorage scratch;
const char* file_num_summary = db_inst->versions_->LevelSummary(&scratch);
const char* file_num_summary = versions_->LevelSummary(&scratch);
std::string file_num_per_level(file_num_summary);
const char* file_size_summary = db_inst->versions_->LevelDataSizeSummary(
const char* file_size_summary = versions_->LevelDataSizeSummary(
&scratch);
std::string data_size_per_level(file_num_summary);
int64_t unix_ts;
db_inst->env_->GetCurrentTime(&unix_ts);
env_->GetCurrentTime(&unix_ts);

db_inst->logger_->Log_Deploy_Stats(version_info, db_inst->host_name_,
logger_->Log_Deploy_Stats(version_info, host_name_,
data_dir, file_total_size, file_total_num, file_num_per_level,
data_size_per_level, unix_ts);

mutex_.Lock();
bg_logstats_scheduled_ = false;
bg_cv_.SignalAll();
mutex_.Unlock();
}

}
7 changes: 5 additions & 2 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ namespace leveldb {

static int64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
int64_t sum = 0;
for (size_t i = 0; i < files.size(); i++) {
for (size_t i = 0; i < files.size() && files[i]; i++) {
sum += files[i]->file_size;
}
return sum;
Expand Down Expand Up @@ -1223,7 +1223,10 @@ void VersionSet::AddLiveFiles(std::set<uint64_t>* live) {
int64_t VersionSet::NumLevelBytes(int level) const {
assert(level >= 0);
assert(level < NumberLevels());
return TotalFileSize(current_->files_[level]);
if(current_ && level < current_->files_->size())
return TotalFileSize(current_->files_[level]);
else
return 0;
}

int64_t VersionSet::MaxNextLevelOverlappingBytes() {
Expand Down
30 changes: 11 additions & 19 deletions scribe/scribe_logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ const std::string ScribeLogger::COL_SEPERATOR = "\x1";
const std::string ScribeLogger::DEPLOY_STATS_CATEGORY = "leveldb_deploy_stats";

ScribeLogger::ScribeLogger(const std::string& host, int port,
int retry_times, uint32_t retry_intervals, int batch_size)
int retry_times, uint32_t retry_intervals)
: host_(host),
port_(port),
retry_times_(retry_times),
retry_intervals_ (retry_intervals),
batch_size_ (batch_size) {
retry_intervals_ (retry_intervals) {
shared_ptr<TSocket> socket(new TSocket(host_, port_));
shared_ptr<TFramedTransport> framedTransport(new TFramedTransport(socket));
framedTransport->open();
Expand All @@ -25,23 +24,16 @@ void ScribeLogger::Log(const std::string& category,
entry.category = category;
entry.message = message;

logger_mutex_.Lock();
logs_.push_back(entry);

if (logs_.size() >= batch_size_) {
ResultCode ret = scribe_client_->Log(logs_);
int retries_left = retry_times_;
while (ret == TRY_LATER && retries_left > 0) {
Env::Default()->SleepForMicroseconds(retry_intervals_);
ret = scribe_client_->Log(logs_);
retries_left--;
}
std::vector<LogEntry> logs;
logs.push_back(entry);

// Clear the local messages if either successfully write out
// or has failed in the last 10 calls.
if (ret == OK || logs_.size() > batch_size_ * 5) {
logs_.clear();
}
logger_mutex_.Lock();
ResultCode ret = scribe_client_->Log(logs);
int retries_left = retry_times_;
while (ret == TRY_LATER && retries_left > 0) {
Env::Default()->SleepForMicroseconds(retry_intervals_);
ret = scribe_client_->Log(logs);
retries_left--;
}

logger_mutex_.Unlock();
Expand Down
4 changes: 1 addition & 3 deletions scribe/scribe_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ class ScribeLogger : public StatsLogger{
int batch_size_;

scribeClient* scribe_client_;
std::vector<LogEntry> logs_;
port::Mutex logger_mutex_;

int retry_times_;
Expand All @@ -48,8 +47,7 @@ class ScribeLogger : public StatsLogger{
static const std::string DEPLOY_STATS_CATEGORY;

ScribeLogger(const std::string& host, int port,
int retry_times=3, uint32_t retry_intervals=1000000,
int batch_size=1);
int retry_times=3, uint32_t retry_intervals=1000000);
virtual ~ScribeLogger();

virtual void Log(const std::string& category, const std::string& message);
Expand Down

0 comments on commit d3759ca

Please sign in to comment.