Skip to content

Commit

Permalink
Detect column family from properties [CF + RepairDB part 2/3]
Browse files Browse the repository at this point in the history
Summary:
This diff uses the CF ID and CF name properties in the SST file
to associate recovered data with the proper column family. Depends on D59775.

- In ScanTable(), create column families in VersionSet each time a new one is discovered (via reading SST file properties)
- In ConvertLogToTable(), dump an SST file for every column family with data in the WAL
- In AddTables(), make a VersionEdit per-column family that adds all of that CF's tables

Test Plan:
  $ ./repair_test

Reviewers: yhchiang, IslamAbdelRahman, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D59781
  • Loading branch information
ajkr committed Jun 24, 2016
1 parent 3fc713e commit 56ac686
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 56 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
### New Features
* Add avoid_flush_during_recovery option.
* Add a read option background_purge_on_iterator_cleanup to avoid deleting files in foreground when destroying iterators. Instead, a job is scheduled in high priority queue and would be executed in a separate background thread.
* RepairDB support for column families. RepairDB now associates data with non-default column families using information embedded in the SST/WAL files (4.7 or later). For data written by 4.6 or earlier, RepairDB associates it with the default column family.

## 4.9.0 (6/9/2016)
### Public API changes
Expand Down
172 changes: 116 additions & 56 deletions db/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ class Repairer {
status = vset_.Recover({{kDefaultColumnFamilyName, cf_options_}}, false);
}
if (status.ok()) {
// Need to scan existing SST files first so the column families are
// created before we process WAL files
ExtractMetaData();

// ExtractMetaData() uses table_fds_ to know which SST files' metadata to
// extract -- we need to clear it here since metadata for existing SST
// files has been extracted already
table_fds_.clear();
ConvertLogFilesToTables();
ExtractMetaData();
status = AddTables();
Expand All @@ -177,6 +185,8 @@ class Repairer {
private:
struct TableInfo {
FileMetaData meta;
uint32_t column_family_id;
std::string column_family_name;
SequenceNumber min_sequence;
SequenceNumber max_sequence;
};
Expand Down Expand Up @@ -294,16 +304,17 @@ class Repairer {
log::Reader reader(options_.info_log, std::move(lfile_reader), &reporter,
true /*enable checksum*/, 0 /*initial_offset*/, log);

// Initialize per-column family memtables
for (auto* cfd : *vset_.GetColumnFamilySet()) {
cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
kMaxSequenceNumber);
}
auto cf_mems = new ColumnFamilyMemTablesImpl(vset_.GetColumnFamilySet());

// Read all the records and add to a memtable
std::string scratch;
Slice record;
WriteBatch batch;
WriteBuffer wb(options_.db_write_buffer_size);
MemTable* mem =
new MemTable(icmp_, ioptions_, MutableCFOptions(options_, ioptions_),
&wb, kMaxSequenceNumber);
auto cf_mems_default = new ColumnFamilyMemTablesDefault(mem);
mem->Ref();
int counter = 0;
while (reader.ReadRecord(&record, &scratch)) {
if (record.size() < WriteBatchInternal::kHeader) {
Expand All @@ -312,7 +323,7 @@ class Repairer {
continue;
}
WriteBatchInternal::SetContents(&batch, record);
status = WriteBatchInternal::InsertInto(&batch, cf_mems_default, nullptr);
status = WriteBatchInternal::InsertInto(&batch, cf_mems, nullptr);
if (status.ok()) {
counter += WriteBatchInternal::Count(&batch);
} else {
Expand All @@ -323,36 +334,40 @@ class Repairer {
}
}

// Do not record a version edit for this conversion to a Table
// since ExtractMetaData() will also generate edits.
FileMetaData meta;
meta.fd = FileDescriptor(next_file_number_++, 0, 0);
{
// Dump a table for each column family with entries in this log file.
for (auto* cfd : *vset_.GetColumnFamilySet()) {
// Do not record a version edit for this conversion to a Table
// since ExtractMetaData() will also generate edits.
MemTable* mem = cfd->mem();
if (mem->IsEmpty()) {
continue;
}

FileMetaData meta;
meta.fd = FileDescriptor(next_file_number_++, 0, 0);
ReadOptions ro;
ro.total_order_seek = true;
Arena arena;
ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
MutableCFOptions mutable_cf_options(options_, ioptions_);
status = BuildTable(
dbname_, env_, ioptions_, mutable_cf_options, env_options_,
table_cache_, iter.get(), &meta, icmp_,
&int_tbl_prop_collector_factories_,
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily,
std::string() /* column_family_name */, {}, kMaxSequenceNumber,
kNoCompression, CompressionOptions(), false,
nullptr /* internal_stats */, TableFileCreationReason::kRecovery);
}
delete mem->Unref();
delete cf_mems_default;
mem = nullptr;
if (status.ok()) {
if (meta.fd.GetFileSize() > 0) {
table_fds_.push_back(meta.fd);
dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(),
env_options_, table_cache_, iter.get(), &meta,
cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
cfd->GetID(), cfd->GetName(), {}, kMaxSequenceNumber, kNoCompression,
CompressionOptions(), false, nullptr /* internal_stats */,
TableFileCreationReason::kRecovery);
Log(InfoLogLevel::INFO_LEVEL, options_.info_log,
"Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s", log,
counter, meta.fd.GetNumber(), status.ToString().c_str());
if (status.ok()) {
if (meta.fd.GetFileSize() > 0) {
table_fds_.push_back(meta.fd);
}
} else {
break;
}
}
Log(InfoLogLevel::INFO_LEVEL, options_.info_log,
"Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s",
log, counter, meta.fd.GetNumber(), status.ToString().c_str());
delete cf_mems;
return status;
}

Expand Down Expand Up @@ -385,19 +400,55 @@ class Repairer {
Status status = env_->GetFileSize(fname, &file_size);
t->meta.fd = FileDescriptor(t->meta.fd.GetNumber(), t->meta.fd.GetPathId(),
file_size);
std::shared_ptr<const TableProperties> props;
if (status.ok()) {
status = table_cache_->GetTableProperties(env_options_, icmp_, t->meta.fd,
&props);
}
if (status.ok()) {
t->column_family_id = static_cast<uint32_t>(props->column_family_id);
if (t->column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) {
Log(InfoLogLevel::WARN_LEVEL, options_.info_log,
"Table #%" PRIu64
": column family unknown (probably due to legacy format); "
"adding to default column family id 0.",
t->meta.fd.GetNumber());
t->column_family_id = 0;
}

if (vset_.GetColumnFamilySet()->GetColumnFamily(t->column_family_id) ==
nullptr) {
status =
AddColumnFamily(props->column_family_name, t->column_family_id);
}
}
ColumnFamilyData* cfd;
if (status.ok()) {
cfd = vset_.GetColumnFamilySet()->GetColumnFamily(t->column_family_id);
if (cfd->GetName() != props->column_family_name) {
Log(InfoLogLevel::ERROR_LEVEL, options_.info_log,
"Table #%" PRIu64
": inconsistent column family name '%s'; expected '%s' for column "
"family id %" PRIu32 ".",
t->meta.fd.GetNumber(), props->column_family_name.c_str(),
cfd->GetName().c_str(), t->column_family_id);
status = Status::Corruption(dbname_, "inconsistent column family name");
}
}
if (status.ok()) {
InternalIterator* iter = table_cache_->NewIterator(
ReadOptions(), env_options_, icmp_, t->meta.fd);
ReadOptions(), env_options_, cfd->internal_comparator(), t->meta.fd);
bool empty = true;
ParsedInternalKey parsed;
t->min_sequence = 0;
t->max_sequence = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
Slice key = iter->key();
if (!ParseInternalKey(key, &parsed)) {
Log(InfoLogLevel::ERROR_LEVEL,
options_.info_log, "Table #%" PRIu64 ": unparsable key %s",
t->meta.fd.GetNumber(), EscapeString(key).c_str());
Log(InfoLogLevel::ERROR_LEVEL, options_.info_log,
"Table #%" PRIu64 ": unparsable key %s", t->meta.fd.GetNumber(),
EscapeString(key).c_str());
continue;
}

Expand All @@ -418,42 +469,51 @@ class Repairer {
status = iter->status();
}
delete iter;

Log(InfoLogLevel::INFO_LEVEL, options_.info_log,
"Table #%" PRIu64 ": %d entries %s", t->meta.fd.GetNumber(), counter,
status.ToString().c_str());
}
Log(InfoLogLevel::INFO_LEVEL,
options_.info_log, "Table #%" PRIu64 ": %d entries %s",
t->meta.fd.GetNumber(), counter, status.ToString().c_str());
return status;
}

Status AddTables() {
std::unordered_map<uint32_t, std::vector<const TableInfo*>> cf_id_to_tables;
SequenceNumber max_sequence = 0;
for (size_t i = 0; i < tables_.size(); i++) {
cf_id_to_tables[tables_[i].column_family_id].push_back(&tables_[i]);
if (max_sequence < tables_[i].max_sequence) {
max_sequence = tables_[i].max_sequence;
}
}
vset_.SetLastSequence(max_sequence);

auto* cfd = vset_.GetColumnFamilySet()->GetDefault();
VersionEdit edit;
edit.SetComparatorName(cfd->user_comparator()->Name());
edit.SetLogNumber(0);
edit.SetNextFile(next_file_number_);
edit.SetColumnFamily(cfd->GetID());

// TODO(opt): separate out into multiple levels
for (const auto& table : tables_) {
edit.AddFile(0, table.meta.fd.GetNumber(), table.meta.fd.GetPathId(),
table.meta.fd.GetFileSize(), table.meta.smallest,
table.meta.largest, table.min_sequence, table.max_sequence,
table.meta.marked_for_compaction);
for (const auto& cf_id_and_tables : cf_id_to_tables) {
auto* cfd =
vset_.GetColumnFamilySet()->GetColumnFamily(cf_id_and_tables.first);
VersionEdit edit;
edit.SetComparatorName(cfd->user_comparator()->Name());
edit.SetLogNumber(0);
edit.SetNextFile(next_file_number_);
edit.SetColumnFamily(cfd->GetID());

// TODO(opt): separate out into multiple levels
for (const auto* table : cf_id_and_tables.second) {
edit.AddFile(0, table->meta.fd.GetNumber(), table->meta.fd.GetPathId(),
table->meta.fd.GetFileSize(), table->meta.smallest,
table->meta.largest, table->min_sequence,
table->max_sequence, table->meta.marked_for_compaction);
}
mutex_.Lock();
Status status = vset_.LogAndApply(
cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_,
nullptr /* db_directory */, false /* new_descriptor_log */);
mutex_.Unlock();
if (!status.ok()) {
return status;
}
}
mutex_.Lock();
Status status = vset_.LogAndApply(
cfd, *cfd->GetLatestMutableCFOptions(), &edit, &mutex_,
nullptr /* db_directory */, false /* new_descriptor_log */);
mutex_.Unlock();
return status;
return Status::OK();
}

void ArchiveFile(const std::string& fname) {
Expand Down
39 changes: 39 additions & 0 deletions db/repair_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "rocksdb/db.h"
#include "rocksdb/transaction_log.h"
#include "util/file_util.h"
#include "util/string_util.h"

namespace rocksdb {

Expand Down Expand Up @@ -169,6 +170,44 @@ TEST_F(RepairTest, UnflushedSst) {
ASSERT_EQ(Get("key"), "val");
}

TEST_F(RepairTest, RepairMultipleColumnFamilies) {
// Verify repair logic associates SST files with their original column
// families.
const int kNumCfs = 3;
const int kEntriesPerCf = 2;
DestroyAndReopen(CurrentOptions());
CreateAndReopenWithCF({"pikachu1", "pikachu2"}, CurrentOptions());
for (int i = 0; i < kNumCfs; ++i) {
for (int j = 0; j < kEntriesPerCf; ++j) {
Put(i, "key" + ToString(j), "val" + ToString(j));
if (j == kEntriesPerCf - 1 && i == kNumCfs - 1) {
// Leave one unflushed so we can verify WAL entries are properly
// associated with column families.
continue;
}
Flush(i);
}
}

// Need to get path before Close() deletes db_, but delete it after Close() to
// ensure Close() doesn't re-create the manifest.
std::string manifest_path =
DescriptorFileName(dbname_, dbfull()->TEST_Current_Manifest_FileNo());
Close();
ASSERT_OK(env_->FileExists(manifest_path));
ASSERT_OK(env_->DeleteFile(manifest_path));

ASSERT_OK(RepairDB(dbname_, CurrentOptions()));

ReopenWithColumnFamilies({"default", "pikachu1", "pikachu2"},
CurrentOptions());
for (int i = 0; i < kNumCfs; ++i) {
for (int j = 0; j < kEntriesPerCf; ++j) {
ASSERT_EQ(Get(i, "key" + ToString(j)), "val" + ToString(j));
}
}
}

} // namespace rocksdb

int main(int argc, char** argv) {
Expand Down

0 comments on commit 56ac686

Please sign in to comment.