Skip to content

Commit

Permalink
cdc: Implementations of delta_mode::off/keys
Browse files Browse the repository at this point in the history
At the stage of `finish`ing CDC mutation, deltas are removed (mode
`off`) or edited to keep only PK+CK of the base table (mode `keys`).

Fixes scylladb#6838
  • Loading branch information
jul-stas committed Jul 27, 2020
1 parent c05128d commit 9e42470
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 1 deletion.
51 changes: 51 additions & 0 deletions cdc/log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,56 @@ class transformer final : public change_processor {
m.set_cell(ck, _ttl_col, atomic_cell::make_live(*_ttl_col.type, _ts, _ttl_col.type->decompose(ttl.count()), _cdc_ttl_opt));
}

// Remove non-key columns or entire delta rows, according to `delta` setting in `cdc` options.
void adjust_or_delete_deltas() {
if (_schema->cdc_options().get_delta_mode() == cdc::delta_mode::full) {
return;
}

static const auto preimg_op_bytes = _op_col.type->decompose(operation_native_type(operation::pre_image));
static const auto postimg_op_bytes = _op_col.type->decompose(operation_native_type(operation::post_image));
for (auto& m : _result_mutations) {
auto& clustered_rows = m.partition().clustered_rows();
int deleted_rows_cnt = 0;
for (auto it = clustered_rows.begin(); it != clustered_rows.end(); /* no increment */) {
const auto& op_cell = it->row().cells().cell_at(_op_col.id).as_atomic_cell(_op_col);
const auto op_val = op_cell.value().linearize();
if (op_val != preimg_op_bytes && op_val != postimg_op_bytes) {
if (_schema->cdc_options().get_delta_mode() == cdc::delta_mode::off) {
it = m.partition().clustered_rows().erase_and_dispose(it, current_deleter<rows_entry>());
++deleted_rows_cnt;
continue;
}

// The case of `get_delta_mode() == delta_mode::keys`:
it->row().cells().remove_if([this, log_s = m.schema()] (column_id id, atomic_cell_or_collection& acoc) {
const auto& log_cdef = log_s->column_at(column_kind::regular_column, id);
// We can surely remove "cdc$*" columns.
if (is_cdc_metacolumn_name(log_cdef.name_as_text())) {
return true;
}
const auto* base_cdef = _schema->get_column_definition(log_cdef.name());
// Remove columns from delta that correspond to non-PK/CK columns in the base table.
return base_cdef != nullptr
&& (base_cdef->kind != column_kind::partition_key && base_cdef->kind != column_kind::clustering_key);
});
}

// Deletion of deltas might leave gaps in `batch_seq_no` - let's fix them.
if (deleted_rows_cnt > 0) {
const auto* batch_seq_no_cdef = m.schema()->get_column_definition(log_meta_column_name_bytes("batch_seq_no"));
assert(batch_seq_no_cdef != nullptr);
auto exploded_ck = it->key().explode();
const size_t batch_seq_no_idx = batch_seq_no_cdef->component_index();
const auto old_batch_seq_no = value_cast<int32_t>(int32_type->deserialize(exploded_ck[batch_seq_no_idx]));
exploded_ck[batch_seq_no_idx] = int32_type->decompose(old_batch_seq_no - deleted_rows_cnt);
it->key() = clustering_key::from_exploded(std::move(exploded_ck));
}
++it;
}
}
}

public:
transformer(db_context ctx, schema_ptr s, dht::decorated_key dk)
: _ctx(ctx)
Expand Down Expand Up @@ -1258,6 +1308,7 @@ class transformer final : public change_processor {
// Takes and returns generated cdc log mutations and associated statistics about parts touched during transformer's lifetime.
// The `transformer` object on which this method was called on should not be used anymore.
std::tuple<std::vector<mutation>, stats::part_type_set> finish() && {
adjust_or_delete_deltas();
return std::make_pair<std::vector<mutation>, stats::part_type_set>(std::move(_result_mutations), std::move(_touched_parts));
}

Expand Down
2 changes: 1 addition & 1 deletion mutation_partition.hh
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public:
const atomic_cell_or_collection* find_cell(column_id id) const;
// Returns a pointer to cell's value and hash or nullptr if column is not set.
const cell_and_hash* find_cell_and_hash(column_id id) const;
private:

template<typename Func>
void remove_if(Func&& func) {
if (_type == storage_type::vector) {
Expand Down

0 comments on commit 9e42470

Please sign in to comment.