Skip to content

Commit

Permalink
Fix the handling of wide-column base values in the max_successive_mer…
Browse files Browse the repository at this point in the history
…ges logic (facebook#11913)

Summary:
Pull Request resolved: facebook#11913

The `max_successive_merges` logic currently does not handle wide-column base values correctly, since it uses the `Get` API, which only returns the value of the default column. The patch fixes this by switching to `GetEntity` and passing all columns (if applicable) to the merge operator.

Reviewed By: jaykorean

Differential Revision: D49795097

fbshipit-source-id: 75eb7cc9476226255062cdb3d43ab6bd1cc2faa3
  • Loading branch information
ltamasi authored and facebook-github-bot committed Oct 2, 2023
1 parent 7bebd30 commit b00fa55
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 20 deletions.
94 changes: 94 additions & 0 deletions db/db_merge_operator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
#include <vector>

#include "db/db_test_util.h"
#include "db/dbformat.h"
#include "db/forward_iterator.h"
#include "port/stack_trace.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/snapshot.h"
#include "rocksdb/utilities/debug.h"
#include "util/random.h"
#include "utilities/merge_operators.h"
#include "utilities/merge_operators/string_append/stringappend2.h"
Expand Down Expand Up @@ -949,6 +951,98 @@ TEST_P(PerConfigMergeOperatorPinningTest, Randomized) {
VerifyDBFromMap(true_data);
}

TEST_F(DBMergeOperatorTest, MaxSuccessiveMergesBaseValues) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.merge_operator = MergeOperators::CreatePutOperator();
options.max_successive_merges = 1;
options.env = env_;
Reopen(options);

constexpr char foo[] = "foo";
constexpr char bar[] = "bar";
constexpr char baz[] = "baz";
constexpr char qux[] = "qux";
constexpr char corge[] = "corge";

// No base value
{
constexpr char key[] = "key1";

ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), key, foo));
ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), key, bar));

PinnableSlice result;
ASSERT_OK(
db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result));
ASSERT_EQ(result, bar);

// We expect the second Merge to be converted to a Put because of
// max_successive_merges.
constexpr size_t max_key_versions = 8;
std::vector<KeyVersion> key_versions;
ASSERT_OK(GetAllKeyVersions(db_, db_->DefaultColumnFamily(), key, key,
max_key_versions, &key_versions));
ASSERT_EQ(key_versions.size(), 2);
ASSERT_EQ(key_versions[0].type, kTypeValue);
ASSERT_EQ(key_versions[1].type, kTypeMerge);
}

// Plain base value
{
constexpr char key[] = "key2";

ASSERT_OK(db_->Put(WriteOptions(), db_->DefaultColumnFamily(), key, foo));
ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), key, bar));
ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), key, baz));

PinnableSlice result;
ASSERT_OK(
db_->Get(ReadOptions(), db_->DefaultColumnFamily(), key, &result));
ASSERT_EQ(result, baz);

// We expect the second Merge to be converted to a Put because of
// max_successive_merges.
constexpr size_t max_key_versions = 8;
std::vector<KeyVersion> key_versions;
ASSERT_OK(GetAllKeyVersions(db_, db_->DefaultColumnFamily(), key, key,
max_key_versions, &key_versions));
ASSERT_EQ(key_versions.size(), 3);
ASSERT_EQ(key_versions[0].type, kTypeValue);
ASSERT_EQ(key_versions[1].type, kTypeMerge);
ASSERT_EQ(key_versions[2].type, kTypeValue);
}

// Wide-column base value
{
constexpr char key[] = "key3";
const WideColumns columns{{kDefaultWideColumnName, foo}, {bar, baz}};

ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(), key,
columns));
ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), key, qux));
ASSERT_OK(
db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), key, corge));

PinnableWideColumns result;
ASSERT_OK(db_->GetEntity(ReadOptions(), db_->DefaultColumnFamily(), key,
&result));
const WideColumns expected{{kDefaultWideColumnName, corge}, {bar, baz}};
ASSERT_EQ(result.columns(), expected);

// We expect the second Merge to be converted to a PutEntity because of
// max_successive_merges.
constexpr size_t max_key_versions = 8;
std::vector<KeyVersion> key_versions;
ASSERT_OK(GetAllKeyVersions(db_, db_->DefaultColumnFamily(), key, key,
max_key_versions, &key_versions));
ASSERT_EQ(key_versions.size(), 3);
ASSERT_EQ(key_versions[0].type, kTypeWideColumnEntity);
ASSERT_EQ(key_versions[1].type, kTypeMerge);
ASSERT_EQ(key_versions[2].type, kTypeWideColumnEntity);
}
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
29 changes: 29 additions & 0 deletions db/merge_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,20 @@ Status MergeHelper::TimedFullMerge(
result_type, op_failure_scope);
}

Status MergeHelper::TimedFullMerge(
const MergeOperator* merge_operator, const Slice& key, WideBaseValueTag,
const WideColumns& columns, const std::vector<Slice>& operands,
Logger* logger, Statistics* statistics, SystemClock* clock,
bool update_num_ops_stats, std::string* result, Slice* result_operand,
ValueType* result_type, MergeOperator::OpFailureScope* op_failure_scope) {
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(columns);

return TimedFullMergeImpl(merge_operator, key, std::move(existing_value),
operands, logger, statistics, clock,
update_num_ops_stats, result, result_operand,
result_type, op_failure_scope);
}

Status MergeHelper::TimedFullMerge(
const MergeOperator* merge_operator, const Slice& key, NoBaseValueTag,
const std::vector<Slice>& operands, Logger* logger, Statistics* statistics,
Expand Down Expand Up @@ -351,6 +365,21 @@ Status MergeHelper::TimedFullMerge(
op_failure_scope);
}

Status MergeHelper::TimedFullMerge(
const MergeOperator* merge_operator, const Slice& key, WideBaseValueTag,
const WideColumns& columns, const std::vector<Slice>& operands,
Logger* logger, Statistics* statistics, SystemClock* clock,
bool update_num_ops_stats, std::string* result_value,
PinnableWideColumns* result_entity,
MergeOperator::OpFailureScope* op_failure_scope) {
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(columns);

return TimedFullMergeImpl(merge_operator, key, std::move(existing_value),
operands, logger, statistics, clock,
update_num_ops_stats, result_value, result_entity,
op_failure_scope);
}

// PRE: iter points to the first merge type entry
// POST: iter points to the first entry beyond the merge process (or the end)
// keys_, operands_ are updated to reflect the merge result.
Expand Down
17 changes: 17 additions & 0 deletions db/merge_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ class MergeHelper {
std::string* result, Slice* result_operand, ValueType* result_type,
MergeOperator::OpFailureScope* op_failure_scope);

static Status TimedFullMerge(
const MergeOperator* merge_operator, const Slice& key, WideBaseValueTag,
const WideColumns& columns, const std::vector<Slice>& operands,
Logger* logger, Statistics* statistics, SystemClock* clock,
bool update_num_ops_stats, std::string* result, Slice* result_operand,
ValueType* result_type, MergeOperator::OpFailureScope* op_failure_scope);

// Variants that expose the merge result translated to the form requested by
// the client. (For example, if the result is a wide-column structure but the
// client requested the results in plain-value form, the value of the default
Expand Down Expand Up @@ -112,6 +119,16 @@ class MergeHelper {
std::string* result_value, PinnableWideColumns* result_entity,
MergeOperator::OpFailureScope* op_failure_scope);

static Status TimedFullMerge(const MergeOperator* merge_operator,
const Slice& key, WideBaseValueTag,
const WideColumns& columns,
const std::vector<Slice>& operands,
Logger* logger, Statistics* statistics,
SystemClock* clock, bool update_num_ops_stats,
std::string* result_value,
PinnableWideColumns* result_entity,
MergeOperator::OpFailureScope* op_failure_scope);

// During compaction, merge entries until we hit
// - a corrupted key
// - a Put/Delete,
Expand Down
5 changes: 5 additions & 0 deletions db/wide/wide_columns_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ class WideColumnsHelper {
return !columns.empty() && columns.front().name() == kDefaultWideColumnName;
}

static bool HasDefaultColumnOnly(const WideColumns& columns) {
return columns.size() == 1 &&
columns.front().name() == kDefaultWideColumnName;
}

static const Slice& GetDefaultColumn(const WideColumns& columns) {
assert(HasDefaultColumn(columns));
return columns.front().value();
Expand Down
56 changes: 38 additions & 18 deletions db/write_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2483,15 +2483,15 @@ class MemTableInserter : public WriteBatch::Handler {
}

if (perform_merge) {
// TODO: support wide-column base values for max_successive_merges

// 1) Get the existing value
std::string get_value;
// 1) Get the existing value. Use the wide column APIs to make sure we
// don't lose any columns in the process.
PinnableWideColumns existing;

// Pass in the sequence number so that we also include previous merge
// operations in the same batch.
SnapshotImpl read_from_snapshot;
read_from_snapshot.number_ = sequence_;

// TODO: plumb Env::IOActivity
ReadOptions read_options;
read_options.snapshot = &read_from_snapshot;
Expand All @@ -2500,28 +2500,47 @@ class MemTableInserter : public WriteBatch::Handler {
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
Status get_status = db_->Get(read_options, cf_handle, key, &get_value);

Status get_status =
db_->GetEntity(read_options, cf_handle, key, &existing);
if (!get_status.ok()) {
// Failed to read a key we know exists. Store the delta in memtable.
perform_merge = false;
} else {
Slice get_value_slice = Slice(get_value);

// 2) Apply this merge
auto merge_operator = moptions->merge_operator;
assert(merge_operator);

const auto& columns = existing.columns();

Status merge_status;
std::string new_value;
ValueType new_value_type;
// `op_failure_scope` (an output parameter) is not provided (set to
// nullptr) since a failure must be propagated regardless of its value.
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator, key, MergeHelper::kPlainBaseValue, get_value_slice,
{value}, moptions->info_log, moptions->statistics,
SystemClock::Default().get(),
/* update_num_ops_stats */ false, &new_value,
/* result_operand */ nullptr, &new_value_type,
/* op_failure_scope */ nullptr);

if (WideColumnsHelper::HasDefaultColumnOnly(columns)) {
// `op_failure_scope` (an output parameter) is not provided (set to
// nullptr) since a failure must be propagated regardless of its
// value.
merge_status = MergeHelper::TimedFullMerge(
merge_operator, key, MergeHelper::kPlainBaseValue,
WideColumnsHelper::GetDefaultColumn(columns), {value},
moptions->info_log, moptions->statistics,
SystemClock::Default().get(),
/* update_num_ops_stats */ false, &new_value,
/* result_operand */ nullptr, &new_value_type,
/* op_failure_scope */ nullptr);
} else {
// `op_failure_scope` (an output parameter) is not provided (set to
// nullptr) since a failure must be propagated regardless of its
// value.
merge_status = MergeHelper::TimedFullMerge(
merge_operator, key, MergeHelper::kWideBaseValue, columns,
{value}, moptions->info_log, moptions->statistics,
SystemClock::Default().get(),
/* update_num_ops_stats */ false, &new_value,
/* result_operand */ nullptr, &new_value_type,
/* op_failure_scope */ nullptr);
}

if (!merge_status.ok()) {
// Failed to merge!
Expand All @@ -2530,12 +2549,13 @@ class MemTableInserter : public WriteBatch::Handler {
} else {
// 3) Add value to memtable
assert(!concurrent_memtable_writes_);
assert(new_value_type == kTypeValue ||
new_value_type == kTypeWideColumnEntity);

if (kv_prot_info != nullptr) {
auto merged_kv_prot_info =
kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
merged_kv_prot_info.UpdateV(value, new_value);
assert(new_value_type == kTypeValue ||
new_value_type == kTypeWideColumnEntity);
merged_kv_prot_info.UpdateO(kTypeMerge, new_value_type);
ret_status = mem->Add(sequence_, new_value_type, key, new_value,
&merged_kv_prot_info);
Expand Down
3 changes: 1 addition & 2 deletions tools/ldb_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1100,8 +1100,7 @@ std::string LDBCommand::PrintKeyValueOrWideColumns(
const Slice& key, const Slice& value, const WideColumns& wide_columns,
bool is_key_hex, bool is_value_hex) {
if (wide_columns.empty() ||
(wide_columns.size() == 1 &&
WideColumnsHelper::HasDefaultColumn(wide_columns))) {
WideColumnsHelper::HasDefaultColumnOnly(wide_columns)) {
return PrintKeyValue(key.ToString(), value.ToString(), is_key_hex,
is_value_hex);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed the handling of wide-column base values in the `max_successive_merges` logic.

0 comments on commit b00fa55

Please sign in to comment.