Skip to content

Commit

Permalink
Move functions from VersionSet to Version
Browse files Browse the repository at this point in the history
Summary:
There were some functions in VersionSet that had no reason to be there instead of Version. Moving them to Version will make column families implementation easier.

The functions moved are:
* NumLevelBytes
* LevelSummary
* LevelFileSummary
* MaxNextLevelOverlappingBytes
* AddLiveFiles (previously AddLiveFilesCurrentVersion())
* NeedSlowdownForNumLevel0Files

The diff continues on (and depends on) D15171

Test Plan: make check

Reviewers: dhruba, haobo, kailiu, sdong, emayanke

Reviewed By: sdong

CC: leveldb

Differential Revision: https://reviews.facebook.net/D15183
  • Loading branch information
igorcanadi committed Jan 16, 2014
1 parent 65a8a52 commit 2f4eda7
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 154 deletions.
2 changes: 1 addition & 1 deletion db/db_filesnapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,

// Make a set of all of the live *.sst files
std::set<uint64_t> live;
versions_->AddLiveFilesCurrentVersion(&live);
versions_->current()->AddLiveFiles(&live);

ret.clear();
ret.reserve(live.size() + 2); //*.sst + CURRENT + MANIFEST
Expand Down
50 changes: 25 additions & 25 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1313,13 +1313,13 @@ void DBImpl::CompactRange(const Slice* begin,
// return the same level if it cannot be moved
int DBImpl::FindMinimumEmptyLevelFitting(int level) {
mutex_.AssertHeld();
Version* current = versions_->current();
int minimum_level = level;
for (int i = level - 1; i > 0; --i) {
// stop if level i is not empty
if (versions_->current()->NumLevelFiles(i) > 0) break;

if (current->NumLevelFiles(i) > 0) break;
// stop if level i is too small (cannot fit the level files)
if (versions_->MaxBytesForLevel(i) < versions_->NumLevelBytes(level)) break;
if (versions_->MaxBytesForLevel(i) < current->NumLevelBytes(level)) break;

minimum_level = i;
}
Expand Down Expand Up @@ -1826,6 +1826,11 @@ void DBImpl::TEST_PurgeObsoleteteWAL() {
PurgeObsoleteWALFiles();
}

uint64_t DBImpl::TEST_GetLevel0TotalSize() {
MutexLock l(&mutex_);
return versions_->current()->NumLevelBytes(0);
}

void DBImpl::BackgroundCallCompaction() {
bool madeProgress = false;
DeletionState deletion_state(options_.max_write_buffer_number, true);
Expand Down Expand Up @@ -1939,13 +1944,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress,
f->smallest_seqno, f->largest_seqno);
status = versions_->LogAndApply(c->edit(), &mutex_);
InstallSuperVersion(deletion_state);
VersionSet::LevelSummaryStorage tmp;
Version::LevelSummaryStorage tmp;
Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
static_cast<unsigned long long>(f->number),
c->level() + 1,
static_cast<unsigned long long>(f->number), c->level() + 1,
static_cast<unsigned long long>(f->file_size),
status.ToString().c_str(),
versions_->LevelSummary(&tmp));
status.ToString().c_str(), versions_->current()->LevelSummary(&tmp));
versions_->ReleaseCompactionFiles(c.get(), status);
*madeProgress = true;
} else {
Expand Down Expand Up @@ -2605,22 +2608,21 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
status = InstallCompactionResults(compact);
InstallSuperVersion(deletion_state);
}
VersionSet::LevelSummaryStorage tmp;
Version::LevelSummaryStorage tmp;
Log(options_.info_log,
"compacted to: %s, %.1f MB/sec, level %d, files in(%d, %d) out(%d) "
"MB in(%.1f, %.1f) out(%.1f), read-write-amplify(%.1f) "
"write-amplify(%.1f) %s\n",
versions_->LevelSummary(&tmp),
versions_->current()->LevelSummary(&tmp),
(stats.bytes_readn + stats.bytes_readnp1 + stats.bytes_written) /
(double) stats.micros,
compact->compaction->output_level(),
stats.files_in_leveln, stats.files_in_levelnp1, stats.files_out_levelnp1,
stats.bytes_readn / 1048576.0,
stats.bytes_readnp1 / 1048576.0,
(double)stats.micros,
compact->compaction->output_level(), stats.files_in_leveln,
stats.files_in_levelnp1, stats.files_out_levelnp1,
stats.bytes_readn / 1048576.0, stats.bytes_readnp1 / 1048576.0,
stats.bytes_written / 1048576.0,
(stats.bytes_written + stats.bytes_readnp1 + stats.bytes_readn) /
(double) stats.bytes_readn,
stats.bytes_written / (double) stats.bytes_readn,
(double)stats.bytes_readn,
stats.bytes_written / (double)stats.bytes_readn,
status.ToString().c_str());

return status;
Expand Down Expand Up @@ -2701,7 +2703,7 @@ Iterator* DBImpl::TEST_NewInternalIterator() {

int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
MutexLock l(&mutex_);
return versions_->MaxNextLevelOverlappingBytes();
return versions_->current()->MaxNextLevelOverlappingBytes();
}

Status DBImpl::Get(const ReadOptions& options,
Expand Down Expand Up @@ -3193,9 +3195,7 @@ Status DBImpl::MakeRoomForWrite(bool force,
// Yield previous error
s = bg_error_;
break;
} else if (
allow_delay &&
versions_->NeedSlowdownForNumLevel0Files()) {
} else if (allow_delay && versions_->NeedSlowdownForNumLevel0Files()) {
// We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several
// seconds when we hit the hard limit, start delaying each
Expand Down Expand Up @@ -3403,7 +3403,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
"%3d %8d %8.0f\n",
level,
current->NumLevelFiles(level),
versions_->NumLevelBytes(level) / 1048576.0);
current->NumLevelBytes(level) / 1048576.0);
value->append(buf);
}
return true;
Expand Down Expand Up @@ -3446,7 +3446,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
"--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n"
);
value->append(buf);
for (int level = 0; level < NumberLevels(); level++) {
for (int level = 0; level < current->NumberLevels(); level++) {
int files = current->NumLevelFiles(level);
if (stats_[level].micros > 0 || files > 0) {
int64_t bytes_read = stats_[level].bytes_readn +
Expand All @@ -3468,8 +3468,8 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
"%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %10.1f %9.1f %11.1f %8d %8d %8d %8d %8d %9.1f %9lu\n",
level,
files,
versions_->NumLevelBytes(level) / 1048576.0,
versions_->NumLevelBytes(level) /
current->NumLevelBytes(level) / 1048576.0,
current->NumLevelBytes(level) /
versions_->MaxBytesForLevel(level),
stats_[level].micros / 1e6,
bytes_read / 1048576.0,
Expand Down
2 changes: 1 addition & 1 deletion db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class DBImpl : public DB {
void TEST_PurgeObsoleteteWAL();

// get total level0 file size. Only for testing.
uint64_t TEST_GetLevel0TotalSize() { return versions_->NumLevelBytes(0);}
uint64_t TEST_GetLevel0TotalSize();

void TEST_SetDefaultTimeToCheck(uint64_t default_interval_to_delete_obsolete_WAL)
{
Expand Down
6 changes: 3 additions & 3 deletions db/db_stats_logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ void DBImpl::LogDBDeployStats() {
Version* current = versions_->current();
for (int i = 0; i < current->NumberLevels(); i++) {
file_total_num += current->NumLevelFiles(i);
file_total_size += versions_->NumLevelBytes(i);
file_total_size += current->NumLevelBytes(i);
}

VersionSet::LevelSummaryStorage scratch;
const char* file_num_summary = versions_->LevelSummary(&scratch);
Version::LevelSummaryStorage scratch;
const char* file_num_summary = current->LevelSummary(&scratch);
std::string file_num_per_level(file_num_summary);
std::string data_size_per_level(file_num_summary);

Expand Down
152 changes: 65 additions & 87 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,67 @@ bool Version::HasOverlappingUserKey(
return false;
}

int64_t Version::NumLevelBytes(int level) const {
assert(level >= 0);
assert(level < NumberLevels());
return TotalFileSize(files_[level]);
}

const char* Version::LevelSummary(LevelSummaryStorage* scratch) const {
int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files[");
for (int i = 0; i < NumberLevels(); i++) {
int sz = sizeof(scratch->buffer) - len;
int ret = snprintf(scratch->buffer + len, sz, "%d ", int(files_[i].size()));
if (ret < 0 || ret >= sz) break;
len += ret;
}
snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]");
return scratch->buffer;
}

const char* Version::LevelFileSummary(FileSummaryStorage* scratch,
int level) const {
int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size[");
for (const auto& f : files_[level]) {
int sz = sizeof(scratch->buffer) - len;
int ret = snprintf(scratch->buffer + len, sz,
"#%lu(seq=%lu,sz=%lu,%lu) ",
(unsigned long)f->number,
(unsigned long)f->smallest_seqno,
(unsigned long)f->file_size,
(unsigned long)f->being_compacted);
if (ret < 0 || ret >= sz)
break;
len += ret;
}
snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]");
return scratch->buffer;
}

int64_t Version::MaxNextLevelOverlappingBytes() {
uint64_t result = 0;
std::vector<FileMetaData*> overlaps;
for (int level = 1; level < NumberLevels() - 1; level++) {
for (const auto& f : files_[level]) {
GetOverlappingInputs(level + 1, &f->smallest, &f->largest, &overlaps);
const uint64_t sum = TotalFileSize(overlaps);
if (sum > result) {
result = sum;
}
}
}
return result;
}

void Version::AddLiveFiles(std::set<uint64_t>* live) {
for (int level = 0; level < NumberLevels(); level++) {
const std::vector<FileMetaData*>& files = files_[level];
for (const auto& file : files) {
live->insert(file->number);
}
}
}

std::string Version::DebugString(bool hex) const {
std::string r;
for (int level = 0; level < num_levels_; level++) {
Expand Down Expand Up @@ -1145,7 +1206,7 @@ VersionSet::VersionSet(const std::string& dbname, const Options* options,
num_levels_(options_->num_levels),
dummy_versions_(this),
current_(nullptr),
need_slowdown_for_num_level0_files(false),
need_slowdown_for_num_level0_files_(false),
compactions_in_progress_(options_->num_levels),
current_version_number_(0),
manifest_file_size_(0),
Expand Down Expand Up @@ -1197,7 +1258,7 @@ void VersionSet::AppendVersion(Version* v) {
current_->Unref();
}
current_ = v;
need_slowdown_for_num_level0_files =
need_slowdown_for_num_level0_files_ =
(options_->level0_slowdown_writes_trigger >= 0 && current_ != nullptr &&
v->NumLevelFiles(0) >= options_->level0_slowdown_writes_trigger);
v->Ref();
Expand Down Expand Up @@ -1861,55 +1922,6 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
return log->AddRecord(record);
}

const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const {
int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files[");
for (int i = 0; i < current_->NumberLevels(); i++) {
int sz = sizeof(scratch->buffer) - len;
int ret = snprintf(scratch->buffer + len, sz, "%d ",
int(current_->files_[i].size()));
if (ret < 0 || ret >= sz)
break;
len += ret;
}
snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]");
return scratch->buffer;
}

const char* VersionSet::LevelDataSizeSummary(LevelSummaryStorage* scratch)
const {
int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size[");
for (int i = 0; i < current_->NumberLevels(); i++) {
int sz = sizeof(scratch->buffer) - len;
int ret = snprintf(scratch->buffer + len, sz, "%lu ",
(unsigned long)NumLevelBytes(i));
if (ret < 0 || ret >= sz)
break;
len += ret;
}
snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]");
return scratch->buffer;
}

const char* VersionSet::LevelFileSummary(
FileSummaryStorage* scratch, int level) const {
int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size[");
for (unsigned int i = 0; i < current_->files_[level].size(); i++) {
FileMetaData* f = current_->files_[level][i];
int sz = sizeof(scratch->buffer) - len;
int ret = snprintf(scratch->buffer + len, sz,
"#%lu(seq=%lu,sz=%lu,%lu) ",
(unsigned long)f->number,
(unsigned long)f->smallest_seqno,
(unsigned long)f->file_size,
(unsigned long)f->being_compacted);
if (ret < 0 || ret >= sz)
break;
len += ret;
}
snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]");
return scratch->buffer;
}

// Opens the mainfest file and reads all records
// till it finds the record we are looking for.
bool VersionSet::ManifestContains(const std::string& record) const {
Expand Down Expand Up @@ -1997,40 +2009,6 @@ void VersionSet::AddLiveFiles(std::vector<uint64_t>* live_list) {
}
}

void VersionSet::AddLiveFilesCurrentVersion(std::set<uint64_t>* live) {
Version* v = current_;
for (int level = 0; level < v->NumberLevels(); level++) {
const std::vector<FileMetaData*>& files = v->files_[level];
for (size_t i = 0; i < files.size(); i++) {
live->insert(files[i]->number);
}
}
}

int64_t VersionSet::NumLevelBytes(int level) const {
assert(level >= 0);
assert(level < current_->NumberLevels());
assert(current_);
return TotalFileSize(current_->files_[level]);
}

int64_t VersionSet::MaxNextLevelOverlappingBytes() {
uint64_t result = 0;
std::vector<FileMetaData*> overlaps;
for (int level = 1; level < current_->NumberLevels() - 1; level++) {
for (size_t i = 0; i < current_->files_[level].size(); i++) {
const FileMetaData* f = current_->files_[level][i];
current_->GetOverlappingInputs(level+1, &f->smallest, &f->largest,
&overlaps);
const uint64_t sum = TotalFileSize(overlaps);
if (sum > result) {
result = sum;
}
}
}
return result;
}

// Stores the minimal range that covers all entries in inputs in
// *smallest, *largest.
// REQUIRES: inputs is not empty
Expand Down Expand Up @@ -2456,10 +2434,10 @@ Compaction* VersionSet::PickCompactionUniversal(int level, double score) {
Log(options_->info_log, "Universal: nothing to do\n");
return nullptr;
}
VersionSet::FileSummaryStorage tmp;
Version::FileSummaryStorage tmp;
Log(options_->info_log, "Universal: candidate files(%lu): %s\n",
current_->files_[level].size(),
LevelFileSummary(&tmp, 0));
current_->LevelFileSummary(&tmp, 0));

// Check for size amplification first.
Compaction* c = PickCompactionUniversalSizeAmp(level, score);
Expand Down
Loading

0 comments on commit 2f4eda7

Please sign in to comment.