Skip to content

Commit

Permalink
Track per-SST user-defined timestamp information in MANIFEST (faceboo…
Browse files Browse the repository at this point in the history
…k#9092)

Summary:
Track per-SST user-defined timestamp information in MANIFEST facebook#8957

Rockdb has supported user-defined timestamp feature. Application can specify a timestamp
when writing each k-v pair. When data flush from memory to disk file called SST files, file
creation activity will commit to MANIFEST. This commit is for tracking timestamp info in the
MANIFEST for each file. The changes involved are as follows:
1) Track max/min timestamp in FileMetaData, and fix invoved codes.
2) Add NewFileCustomTag::kMinTimestamp and NewFileCustomTag::kMinTimestamp in
    NewFileCustomTag ( in the kNewFile4 part ), and support invoved codes such as
    VersionEdit Encode and Decode etc.
3) Add unit test code for VersionEdit EncodeDecodeNewFile4, and fix invoved test codes.

Pull Request resolved: facebook#9092

Reviewed By: ajkr, akankshamahajan15

Differential Revision: D32252323

Pulled By: riversand963

fbshipit-source-id: d2642898d6e3ad1fef0eb866b98045408bd4e162
  • Loading branch information
sunlike-Lipo authored and facebook-github-bot committed Nov 10, 2021
1 parent 1a8eec4 commit 937fbcb
Show file tree
Hide file tree
Showing 18 changed files with 182 additions and 74 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### New Features
* Added new ChecksumType kXXH3 which is faster than kCRC32c on almost all x86\_64 hardware.
* Added a new online consistency check for BlobDB which validates that the number/total size of garbage blobs does not exceed the number/total size of all blobs in any given blob file.
* Provided support for tracking per-sst user-defined timestamp information in MANIFEST.

### Bug Fixes
* Prevent a `CompactRange()` with `CompactRangeOptions::change_level == true` from possibly causing corruption to the LSM state (overlapping files within a level) when run in parallel with another manual compaction. Note that setting `force_consistency_checks == true` (the default) would cause the DB to enter read-only mode in this scenario and return `Status::Corruption`, rather than committing any corruption.
Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,8 @@ class CompactionJobTestBase : public testing::Test {
edit.AddFile(level, file_number, 0, 10, smallest_key, largest_key,
smallest_seqno, largest_seqno, false, oldest_blob_file_number,
kUnknownOldestAncesterTime, kUnknownFileCreationTime,
kUnknownFileChecksum, kUnknownFileChecksumFuncName);
kUnknownFileChecksum, kUnknownFileChecksumFuncName,
kDisableUserTimestamp, kDisableUserTimestamp);

mutex_.Lock();
EXPECT_OK(
Expand Down
3 changes: 2 additions & 1 deletion db/compaction/compaction_picker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ class CompactionPickerTest : public testing::Test {
InternalKey(largest, largest_seq, kTypeValue), smallest_seq,
largest_seq, marked_for_compact, kInvalidBlobFileNumber,
kUnknownOldestAncesterTime, kUnknownFileCreationTime,
kUnknownFileChecksum, kUnknownFileChecksumFuncName);
kUnknownFileChecksum, kUnknownFileChecksumFuncName,
kDisableUserTimestamp, kDisableUserTimestamp);
f->compensated_file_size =
(compensated_file_size != 0) ? compensated_file_size : file_size;
f->temperature = temperature;
Expand Down
26 changes: 13 additions & 13 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1589,12 +1589,12 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) {
edit.SetColumnFamily(cfd->GetID());
for (const auto& f : vstorage->LevelFiles(level)) {
edit.DeleteFile(level, f->fd.GetNumber());
edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest,
f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction, f->oldest_blob_file_number,
f->oldest_ancester_time, f->file_creation_time,
f->file_checksum, f->file_checksum_func_name);
edit.AddFile(
to_level, f->fd.GetNumber(), f->fd.GetPathId(), f->fd.GetFileSize(),
f->smallest, f->largest, f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction, f->oldest_blob_file_number,
f->oldest_ancester_time, f->file_creation_time, f->file_checksum,
f->file_checksum_func_name, f->min_timestamp, f->max_timestamp);
}
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] Apply version edit:\n%s", cfd->GetName().c_str(),
Expand Down Expand Up @@ -3170,13 +3170,13 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
for (size_t i = 0; i < c->num_input_files(l); i++) {
FileMetaData* f = c->input(l, i);
c->edit()->DeleteFile(c->level(l), f->fd.GetNumber());
c->edit()->AddFile(c->output_level(), f->fd.GetNumber(),
f->fd.GetPathId(), f->fd.GetFileSize(), f->smallest,
f->largest, f->fd.smallest_seqno,
f->fd.largest_seqno, f->marked_for_compaction,
f->oldest_blob_file_number, f->oldest_ancester_time,
f->file_creation_time, f->file_checksum,
f->file_checksum_func_name);
c->edit()->AddFile(
c->output_level(), f->fd.GetNumber(), f->fd.GetPathId(),
f->fd.GetFileSize(), f->smallest, f->largest, f->fd.smallest_seqno,
f->fd.largest_seqno, f->marked_for_compaction,
f->oldest_blob_file_number, f->oldest_ancester_time,
f->file_creation_time, f->file_checksum, f->file_checksum_func_name,
f->min_timestamp, f->max_timestamp);

ROCKS_LOG_BUFFER(
log_buffer,
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl/db_impl_experimental.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ Status DBImpl::PromoteL0(ColumnFamilyHandle* column_family, int target_level) {
f->fd.smallest_seqno, f->fd.largest_seqno,
f->marked_for_compaction, f->oldest_blob_file_number,
f->oldest_ancester_time, f->file_creation_time,
f->file_checksum, f->file_checksum_func_name);
f->file_checksum, f->file_checksum_func_name,
f->min_timestamp, f->max_timestamp);
}

status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
Expand Down
3 changes: 2 additions & 1 deletion db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1463,7 +1463,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
meta.fd.smallest_seqno, meta.fd.largest_seqno,
meta.marked_for_compaction, meta.oldest_blob_file_number,
meta.oldest_ancester_time, meta.file_creation_time,
meta.file_checksum, meta.file_checksum_func_name);
meta.file_checksum, meta.file_checksum_func_name,
meta.min_timestamp, meta.max_timestamp);

for (const auto& blob : blob_file_additions) {
edit->AddBlobFile(blob);
Expand Down
1 change: 1 addition & 0 deletions db/dbformat.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ namespace ROCKSDB_NAMESPACE {
// ValueType, not the lowest).
const ValueType kValueTypeForSeek = kTypeDeletionWithTimestamp;
const ValueType kValueTypeForSeekForPrev = kTypeDeletion;
const std::string kDisableUserTimestamp("");

EntryType GetEntryType(ValueType value_type) {
switch (value_type) {
Expand Down
3 changes: 3 additions & 0 deletions db/dbformat.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ static const SequenceNumber kDisableGlobalSequenceNumber = port::kMaxUint64;

constexpr uint64_t kNumInternalBytes = 8;

// Defined in dbformat.cc
extern const std::string kDisableUserTimestamp;

// The data structure that represents an internal key in the way that user_key,
// sequence number and type are stored in separated forms.
struct ParsedInternalKey {
Expand Down
3 changes: 2 additions & 1 deletion db/external_sst_file_ingestion_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,8 @@ Status ExternalSstFileIngestionJob::Run() {
f.fd.GetNumber(), f.fd.GetPathId(), f.fd.GetFileSize(),
f.smallest_internal_key, f.largest_internal_key, f.assigned_seqno,
f.assigned_seqno, false, kInvalidBlobFileNumber, oldest_ancester_time,
current_time, f.file_checksum, f.file_checksum_func_name);
current_time, f.file_checksum, f.file_checksum_func_name,
kDisableUserTimestamp, kDisableUserTimestamp);
f_metadata.temperature = f.file_temperature;
edit_.AddFile(f.picked_level, f_metadata);
}
Expand Down
3 changes: 2 additions & 1 deletion db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,8 @@ Status FlushJob::WriteLevel0Table() {
meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
meta_.marked_for_compaction, meta_.oldest_blob_file_number,
meta_.oldest_ancester_time, meta_.file_creation_time,
meta_.file_checksum, meta_.file_checksum_func_name);
meta_.file_checksum, meta_.file_checksum_func_name,
meta_.min_timestamp, meta_.max_timestamp);

edit_->SetBlobFileAdditions(std::move(blob_file_additions));
}
Expand Down
3 changes: 2 additions & 1 deletion db/import_column_family_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ Status ImportColumnFamilyJob::Run() {
f.largest_internal_key, file_metadata.smallest_seqno,
file_metadata.largest_seqno, false, kInvalidBlobFileNumber,
oldest_ancester_time, current_time, kUnknownFileChecksum,
kUnknownFileChecksumFuncName);
kUnknownFileChecksumFuncName, kDisableUserTimestamp,
kDisableUserTimestamp);

// If incoming sequence number is higher, update local sequence number.
if (file_metadata.largest_seqno > versions_->LastSequence()) {
Expand Down
3 changes: 2 additions & 1 deletion db/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,8 @@ class Repairer {
table->meta.fd.largest_seqno, table->meta.marked_for_compaction,
table->meta.oldest_blob_file_number,
table->meta.oldest_ancester_time, table->meta.file_creation_time,
table->meta.file_checksum, table->meta.file_checksum_func_name);
table->meta.file_checksum, table->meta.file_checksum_func_name,
table->meta.min_timestamp, table->meta.max_timestamp);
}
assert(next_file_number_ > 0);
vset_.MarkFileNumberUsed(next_file_number_ - 1);
Expand Down
Loading

0 comments on commit 937fbcb

Please sign in to comment.