Skip to content

Commit

Permalink
merge: fix two CDC bugs with preimage/postimage
Browse files Browse the repository at this point in the history
Merged pull request scylladb#6078 from
Calle Wilund, fixing two CDC preimage/postimage bugs:

Fixes scylladb#6073.
Fixes scylladb#6070.
  • Loading branch information
nyh committed Mar 26, 2020
2 parents cb26de8 + 532a863 commit c639a5e
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 74 deletions.
103 changes: 59 additions & 44 deletions cdc/log.cc
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,19 @@ class transformer final {
const column_definition& _op_col;
const column_definition& _ttl_col;
ttl_opt _cdc_ttl_opt;
/**
* #6070
* When mutation splitting was added, non-atomic column assignments were broken
* into two invocation of transform. This means the second (actual data assignment)
* does not know about the tombstone in first one -> postimage is created as if
* we were _adding_ to the collection, not replacing it.
*
* Not pretty, but to handle this we use the knowledge that we always get
* invoked in timestamp order -> tombstone first, then assign.
* So we simply keep track of non-atomic columns deleted across calls
* and filter out preimage data post this.
*/
std::unordered_set<const column_definition*> _non_atomic_column_deletes;

clustering_key set_pk_columns(const partition_key& pk, api::timestamp_type ts, bytes decomposed_tuuid, int batch_no, mutation& m) const {
const auto log_ck = clustering_key::from_exploded(
Expand Down Expand Up @@ -816,18 +829,18 @@ class transformer final {

// TODO: is pre-image data based on query enough. We only have actual column data. Do we need
// more details like tombstones/ttl? Probably not but keep in mind.
std::tuple<mutation, stats::part_type_set> transform(const mutation& m, const cql3::untyped_result_set* rs, api::timestamp_type ts, bytes tuuid, int& batch_no) const {
std::tuple<mutation, stats::part_type_set> transform(const mutation& m, const cql3::untyped_result_set* rs, api::timestamp_type ts, bytes tuuid, int& batch_no) {
auto stream_id = _ctx._cdc_metadata.get_stream(ts, m.token());
mutation res(_log_schema, stream_id.to_partition_key(*_log_schema));
const auto preimage = _schema->cdc_options().preimage();
const auto postimage = _schema->cdc_options().postimage();
stats::part_type_set touched_parts;
auto& p = m.partition();
if (p.partition_tombstone()) {
// Partition deletion
touched_parts.set<stats::part_type::PARTITION_DELETE>();
auto log_ck = set_pk_columns(m.key(), ts, tuuid, 0, res);
auto log_ck = set_pk_columns(m.key(), ts, tuuid, batch_no++, res);
set_operation(log_ck, ts, operation::partition_delete, res);
++batch_no;
} else if (!p.row_tombstones().empty()) {
// range deletion
touched_parts.set<stats::part_type::RANGE_TOMBSTONE>();
Expand All @@ -849,37 +862,30 @@ class transformer final {
}
};
{
auto log_ck = set_pk_columns(m.key(), ts, tuuid, batch_no, res);
auto log_ck = set_pk_columns(m.key(), ts, tuuid, batch_no++, res);
set_bound(log_ck, rt.start);
const auto start_operation = rt.start_kind == bound_kind::incl_start
? operation::range_delete_start_inclusive
: operation::range_delete_start_exclusive;
set_operation(log_ck, ts, start_operation, res);
++batch_no;
}
{
auto log_ck = set_pk_columns(m.key(), ts, tuuid, batch_no, res);
auto log_ck = set_pk_columns(m.key(), ts, tuuid, batch_no++, res);
set_bound(log_ck, rt.end);
const auto end_operation = rt.end_kind == bound_kind::incl_end
? operation::range_delete_end_inclusive
: operation::range_delete_end_exclusive;
set_operation(log_ck, ts, end_operation, res);
++batch_no;
}
}
} else {
// should be insert, update or deletion
auto process_cells = [&](const row& r, column_kind ckind, const clustering_key& log_ck, std::optional<clustering_key> pikey, const cql3::untyped_result_set_row* pirow, std::optional<clustering_key> poikey) -> std::optional<gc_clock::duration> {
if (postimage && !poikey) {
poikey = set_pk_columns(m.key(), ts, tuuid, ++batch_no, res);
set_operation(*poikey, ts, operation::post_image, res);
}
std::optional<gc_clock::duration> ttl;
std::unordered_set<column_id> columns_assigned;
r.for_each_cell([&](column_id id, const atomic_cell_or_collection& cell) {
auto& cdef = _schema->column_at(ckind, id);
auto* dst = _log_schema->get_column_definition(log_data_column_name_bytes(cdef.name()));
auto has_pirow = pirow && pirow->has(cdef.name_as_text());
bool is_column_delete = true;
bytes_opt value;
bytes_opt deleted_elements = std::nullopt;
Expand Down Expand Up @@ -1000,29 +1006,30 @@ class transformer final {
}
}

bytes_opt prev = get_preimage_col_value(cdef, pirow);

if (prev && pikey) {
assert(std::addressof(res.partition().clustered_row(*_log_schema, *pikey)) != std::addressof(res.partition().clustered_row(*_log_schema, log_ck)));
assert(pikey->explode() != log_ck.explode());
res.set_cell(*pikey, *dst, atomic_cell::make_live(*dst->type, ts, *prev, _cdc_ttl_opt));
}

if (is_column_delete) {
res.set_cell(log_ck, log_data_column_deleted_name_bytes(cdef.name()), data_value(true), ts, _cdc_ttl_opt);
if (!cdef.is_atomic()) {
_non_atomic_column_deletes.insert(&cdef);
}
// don't merge with pre-image iff column delete
prev = std::nullopt;
}

if (value) {
res.set_cell(log_ck, *dst, atomic_cell::make_live(*dst->type, ts, *value, _cdc_ttl_opt));
}

bytes_opt prev;

if (has_pirow) {
prev = get_preimage_col_value(cdef, pirow);
assert(std::addressof(res.partition().clustered_row(*_log_schema, *pikey)) != std::addressof(res.partition().clustered_row(*_log_schema, log_ck)));
assert(pikey->explode() != log_ck.explode());
res.set_cell(*pikey, *dst, atomic_cell::make_live(*dst->type, ts, *prev, _cdc_ttl_opt));
}

if (postimage) {
if (poikey) {
// keep track of actually assigning this already
columns_assigned.emplace(id);
// don't merge with pre-image iff column delete
if (is_column_delete) {
prev = std::nullopt;
}
if (cdef.is_atomic() && !is_column_delete && value) {
res.set_cell(*poikey, *dst, atomic_cell::make_live(*dst->type, ts, *value, _cdc_ttl_opt));
} else if (!cdef.is_atomic() && (value || (deleted_elements && prev))) {
Expand All @@ -1035,7 +1042,7 @@ class transformer final {
});

// fill in all columns not already processed. Note that column nulls are also marked.
if (postimage && pirow) {
if (poikey) {
for (auto& cdef : _schema->columns(ckind)) {
if (!columns_assigned.count(cdef.id)) {
auto v = pirow->get_view_opt(cdef.name_as_text());
Expand All @@ -1057,16 +1064,18 @@ class transformer final {

if (rs && !rs->empty()) {
// For static rows, only one row from the result set is needed
pikey = set_pk_columns(m.key(), ts, tuuid, batch_no, res);
set_operation(*pikey, ts, operation::pre_image, res);
pirow = &rs->front();
++batch_no;
}

auto log_ck = set_pk_columns(m.key(), ts, tuuid, batch_no, res);
if (preimage && pirow) {
pikey = set_pk_columns(m.key(), ts, tuuid, batch_no++, res);
set_operation(*pikey, ts, operation::pre_image, res);
}

auto log_ck = set_pk_columns(m.key(), ts, tuuid, batch_no++, res);

if (postimage) {
poikey = set_pk_columns(m.key(), ts, tuuid, ++batch_no, res);
poikey = set_pk_columns(m.key(), ts, tuuid, batch_no++, res);
set_operation(*poikey, ts, operation::post_image, res);
}

Expand All @@ -1077,7 +1086,6 @@ class transformer final {
if (ttl) {
set_ttl(log_ck, ts, *ttl, res);
}
++batch_no;
} else {
touched_parts.set_if<stats::part_type::CLUSTERING_ROW>(!p.clustered_rows().empty());
for (const rows_entry& r : p.clustered_rows()) {
Expand All @@ -1098,19 +1106,21 @@ class transformer final {
}
}
if (match) {
pikey = set_pk_columns(m.key(), ts, tuuid, batch_no, res);
set_operation(*pikey, ts, operation::pre_image, res);
pirow = &utr;
++batch_no;
break;
}
}
}

auto log_ck = set_pk_columns(m.key(), ts, tuuid, batch_no, res);
if (preimage && pirow) {
pikey = set_pk_columns(m.key(), ts, tuuid, batch_no++, res);
set_operation(*pikey, ts, operation::pre_image, res);
}

auto log_ck = set_pk_columns(m.key(), ts, tuuid, batch_no++, res);

if (postimage) {
poikey = set_pk_columns(m.key(), ts, tuuid, ++batch_no, res);
poikey = set_pk_columns(m.key(), ts, tuuid, batch_no++, res);
set_operation(*poikey, ts, operation::post_image, res);
}

Expand All @@ -1120,7 +1130,7 @@ class transformer final {
auto cdef = _log_schema->get_column_definition(log_data_column_name_bytes(column.name()));
res.set_cell(log_ck, *cdef, atomic_cell::make_live(*column.type, ts, bytes_view(ck_value[pos]), _cdc_ttl_opt));

if (pirow) {
if (pikey) {
assert(pirow->has(column.name_as_text()));
res.set_cell(*pikey, *cdef, atomic_cell::make_live(*column.type, ts, bytes_view(ck_value[pos]), _cdc_ttl_opt));
}
Expand All @@ -1139,8 +1149,8 @@ class transformer final {
for (const column_definition& column: _schema->regular_columns()) {
assert(pirow->has(column.name_as_text()));
auto& cdef = *_log_schema->get_column_definition(log_data_column_name_bytes(column.name()));
auto value = get_preimage_col_value(column, pirow);
res.set_cell(*pikey, cdef, atomic_cell::make_live(*column.type, ts, bytes_view(value), _cdc_ttl_opt));
auto value = get_preimage_col_value(column, pirow);
res.set_cell(*pikey, cdef, atomic_cell::make_live(*column.type, ts, bytes_view(*value), _cdc_ttl_opt));
}
}
} else {
Expand All @@ -1157,15 +1167,20 @@ class transformer final {
}
}
set_operation(log_ck, ts, cdc_op, res);
++batch_no;
}
}
}

return std::make_tuple(std::move(res), touched_parts);
}

static bytes get_preimage_col_value(const column_definition& cdef, const cql3::untyped_result_set_row *pirow) {
bytes_opt get_preimage_col_value(const column_definition& cdef, const cql3::untyped_result_set_row *pirow) {
/**
* #6070 - see comment for _non_atomic_column_deletes
*/
if (!pirow || !pirow->has(cdef.name_as_text()) || _non_atomic_column_deletes.count(&cdef)) {
return std::nullopt;
}
return cdef.is_atomic()
? pirow->get_blob(cdef.name_as_text())
: visit(*cdef.type, make_visitor(
Expand Down Expand Up @@ -1344,7 +1359,7 @@ cdc::cdc_service::impl::augment_mutation_call(lowres_clock::time_point timeout,
tracing::trace(tr_state, "CDC: Preimage not enabled for the table, not querying current value of {}", m.decorated_key());
}

return f.then([trans = std::move(trans), &mutations, idx, tr_state = std::move(tr_state), &details] (lw_shared_ptr<cql3::untyped_result_set> rs) {
return f.then([trans = std::move(trans), &mutations, idx, tr_state = std::move(tr_state), &details] (lw_shared_ptr<cql3::untyped_result_set> rs) mutable {
auto& m = mutations[idx];
auto& s = m.schema();
details.had_preimage |= s->cdc_options().preimage();
Expand Down
Loading

0 comments on commit c639a5e

Please sign in to comment.