Skip to content

Commit

Permalink
flat_mutation_reader: expose reverse reader as a standalone reader
Browse files Browse the repository at this point in the history
Currently reverse reads just pass a flag to
`flat_mutation_reader::consume()` to make the read happen in reverse.
This is deceptively simple and streamlined -- while in fact behind the
scenes a reversing reader is created to wrap the reader in question to
reverse partitions, one-by-one.

This patch makes this apparent by exposing the reversing reader via
`make_reversing_reader()`. This now makes how reversing works more
apparent. It also allows for more configuration to be passed to the
reversing reader (in the next patches).

This change is forward compatible, as in time we plan to add reversing
support to the sstable layer, in which case the reversing reader will
go.
  • Loading branch information
denesb committed Feb 27, 2020
1 parent 956b092 commit 091d80e
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 38 deletions.
16 changes: 7 additions & 9 deletions flat_mutation_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,9 @@ void flat_mutation_reader::impl::clear_buffer_to_next_partition() {
_buffer_size = compute_buffer_size(*_schema, _buffer);
}

flat_mutation_reader flat_mutation_reader::impl::reverse_partitions(flat_mutation_reader::impl& original) {
// FIXME: #1413 Full partitions get accumulated in memory.

flat_mutation_reader make_reversing_reader(flat_mutation_reader& original) {
class partition_reversing_mutation_reader final : public flat_mutation_reader::impl {
flat_mutation_reader::impl* _source;
flat_mutation_reader* _source;
range_tombstone_list _range_tombstones;
std::stack<mutation_fragment> _mutation_fragments;
mutation_fragment_opt _partition_end;
Expand All @@ -76,7 +74,7 @@ flat_mutation_reader flat_mutation_reader::impl::reverse_partitions(flat_mutatio
auto rt_owner = alloc_strategy_unique_ptr<range_tombstone>(&rt);
push_mutation_fragment(mutation_fragment(std::move(rt)));
};
position_in_partition::less_compare cmp(*_source->_schema);
position_in_partition::less_compare cmp(*_schema);
while (!_mutation_fragments.empty() && !is_buffer_full()) {
auto& mf = _mutation_fragments.top();
if (!_range_tombstones.empty() && !cmp(_range_tombstones.tombstones().rbegin()->end_position(), mf.position())) {
Expand Down Expand Up @@ -113,18 +111,18 @@ flat_mutation_reader flat_mutation_reader::impl::reverse_partitions(flat_mutatio
return make_ready_future<stop_iteration>(stop_iteration::yes);
}
} else if (mf.is_range_tombstone()) {
_range_tombstones.apply(*_source->_schema, std::move(mf.as_range_tombstone()));
_range_tombstones.apply(*_schema, std::move(mf.as_range_tombstone()));
} else {
_mutation_fragments.emplace(std::move(mf));
}
}
return make_ready_future<stop_iteration>(is_buffer_full());
}
public:
explicit partition_reversing_mutation_reader(flat_mutation_reader::impl& mr)
: flat_mutation_reader::impl(mr._schema)
explicit partition_reversing_mutation_reader(flat_mutation_reader& mr)
: flat_mutation_reader::impl(mr.schema())
, _source(&mr)
, _range_tombstones(*mr._schema)
, _range_tombstones(*_schema)
{ }

virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override {
Expand Down
36 changes: 17 additions & 19 deletions flat_mutation_reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,6 @@ GCC6_CONCEPT(
*/
class flat_mutation_reader final {
public:
// Causes a stream of reversed mutations to be emitted.
// 1. Static row is still emitted first.
// 2. Range tombstones are ordered by their end position.
// 3. Clustered rows and range tombstones are emitted in descending order.
// Because of 2 and 3 the guarantee that a range tombstone is emitted before
// any mutation fragment affected by it still holds.
// Ordering of partitions themselves remains unchanged.
using consume_reversed_partitions = seastar::bool_class<class consume_reversed_partitions_tag>;

class impl {
private:
circular_buffer<mutation_fragment> _buffer;
Expand Down Expand Up @@ -122,8 +113,6 @@ public:
const circular_buffer<mutation_fragment>& buffer() const {
return _buffer;
}
private:
static flat_mutation_reader reverse_partitions(flat_mutation_reader::impl&);
public:
impl(schema_ptr s) : _schema(std::move(s)) { }
virtual ~impl() {}
Expand Down Expand Up @@ -353,14 +342,7 @@ public:
GCC6_CONCEPT(
requires FlattenedConsumer<Consumer>()
)
auto consume(Consumer consumer,
db::timeout_clock::time_point timeout,
consume_reversed_partitions reversed = consume_reversed_partitions::no) {
if (reversed) {
return do_with(impl::reverse_partitions(*_impl), [&] (auto& reversed_partition_stream) {
return reversed_partition_stream._impl->consume(std::move(consumer), timeout);
});
}
auto consume(Consumer consumer, db::timeout_clock::time_point timeout) {
return _impl->consume(std::move(consumer), timeout);
}

Expand Down Expand Up @@ -747,6 +729,22 @@ future<> consume_partitions(flat_mutation_reader& reader, Consumer consumer, db:
flat_mutation_reader
make_generating_reader(schema_ptr s, std::function<future<mutation_fragment_opt> ()> get_next_fragment);

/// A reader that emits partitions in reverse.
///
/// 1. Static row is still emitted first.
/// 2. Range tombstones are ordered by their end position.
/// 3. Clustered rows and range tombstones are emitted in descending order.
/// Because of 2 and 3 the guarantee that a range tombstone is emitted before
/// any mutation fragment affected by it still holds.
/// Ordering of partitions themselves remains unchanged.
///
/// \param original the reader to be reversed, has to be kept alive while the
/// reversing reader is in use.
///
/// FIXME: reversing should be done in the sstable layer, see #1413.
flat_mutation_reader
make_reversing_reader(flat_mutation_reader& original);

/// Low level fragment stream validator.
///
/// Tracks and validates the monotonicity of the passed in fragment kinds,
Expand Down
2 changes: 1 addition & 1 deletion mutation_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2528,7 +2528,7 @@ future<mutation_opt> counter_write_query(schema_ptr s, const mutation_source& so
auto cwqrb = counter_write_query_result_builder(*s);
auto cfq = make_stable_flattened_mutations_consumer<compact_for_query<emit_only_live_rows::yes, counter_write_query_result_builder>>(
*s, gc_clock::now(), slice, query::max_rows, query::max_rows, std::move(cwqrb));
auto f = r_a_r->reader.consume(std::move(cfq), db::no_timeout, flat_mutation_reader::consume_reversed_partitions::no);
auto f = r_a_r->reader.consume(std::move(cfq), db::no_timeout);
return f.finally([r_a_r = std::move(r_a_r)] { });
}

Expand Down
4 changes: 2 additions & 2 deletions querier.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ static sstring cannot_use_reason(can_use cu)

static bool ring_position_matches(const schema& s, const dht::partition_range& range, const query::partition_slice& slice,
const position_view& pos) {
const auto is_reversed = flat_mutation_reader::consume_reversed_partitions(slice.options.contains(query::partition_slice::option::reversed));
const auto is_reversed = slice.options.contains(query::partition_slice::option::reversed);

const auto expected_start = dht::ring_position_view(*pos.partition_key);
// If there are no clustering columns or the select is distinct we don't
Expand Down Expand Up @@ -93,7 +93,7 @@ static bool clustering_position_matches(const schema& s, const query::partition_

clustering_key_prefix::equality eq(s);

const auto is_reversed = flat_mutation_reader::consume_reversed_partitions(slice.options.contains(query::partition_slice::option::reversed));
const auto is_reversed = slice.options.contains(query::partition_slice::option::reversed);

// If the page ended mid-partition the first partition range should start
// with the last clustering key (exclusive).
Expand Down
15 changes: 11 additions & 4 deletions querier.hh
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,22 @@ auto consume_page(flat_mutation_reader& reader,
const auto next_fragment_kind = next_fragment ? next_fragment->mutation_fragment_kind() : mutation_fragment::kind::partition_end;
compaction_state->start_new_page(row_limit, partition_limit, query_time, next_fragment_kind, *consumer);

const auto is_reversed = flat_mutation_reader::consume_reversed_partitions(
slice.options.contains(query::partition_slice::option::reversed));

auto last_ckey = make_lw_shared<std::optional<clustering_key_prefix>>();
auto reader_consumer = make_stable_flattened_mutations_consumer<compact_for_query<OnlyLive, clustering_position_tracker<Consumer>>>(
compaction_state,
clustering_position_tracker(std::move(consumer), last_ckey));

return reader.consume(std::move(reader_consumer), timeout, is_reversed).then([last_ckey] (auto&&... results) mutable {
auto consume = [&reader, &slice, reader_consumer = std::move(reader_consumer), timeout] () mutable {
if (slice.options.contains(query::partition_slice::option::reversed)) {
return do_with(make_reversing_reader(reader),
[reader_consumer = std::move(reader_consumer), timeout] (flat_mutation_reader& reversing_reader) mutable {
return reversing_reader.consume(std::move(reader_consumer), timeout);
});
}
return reader.consume(std::move(reader_consumer), timeout);
};

return consume().then([last_ckey] (auto&&... results) mutable {
static_assert(sizeof...(results) <= 1);
return make_ready_future<std::tuple<std::optional<clustering_key_prefix>, std::decay_t<decltype(results)>...>>(std::tuple(std::move(*last_ckey), std::move(results)...));
});
Expand Down
7 changes: 5 additions & 2 deletions test/boost/flat_mutation_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -589,8 +589,11 @@ void test_flat_stream(schema_ptr s, std::vector<mutation> muts, reversed_partiti
assert(bool(!reversed));
return fmr.consume_in_thread(std::move(fsc), db::no_timeout);
} else {
auto reversed_flag = flat_mutation_reader::consume_reversed_partitions(bool(reversed));
return fmr.consume(std::move(fsc), db::no_timeout, reversed_flag).get0();
if (reversed) {
auto reverse_reader = make_reversing_reader(fmr);
return reverse_reader.consume(std::move(fsc), db::no_timeout).get0();
}
return fmr.consume(std::move(fsc), db::no_timeout).get0();
}
};

Expand Down
2 changes: 1 addition & 1 deletion test/boost/mutation_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2928,7 +2928,7 @@ void run_compaction_data_stream_split_test(const schema& schema, gc_clock::time_
survived_compacted_fragments_consumer(schema, query_time, get_max_purgeable),
purged_compacted_fragments_consumer(schema, query_time, get_max_purgeable));

auto [survived_partitions, purged_partitions] = reader.consume(std::move(consumer), db::no_timeout, flat_mutation_reader::consume_reversed_partitions::no).get0();
auto [survived_partitions, purged_partitions] = reader.consume(std::move(consumer), db::no_timeout).get0();

tlog.info("Survived data: {}", create_stats(survived_partitions));
tlog.info("Purged data: {}", create_stats(purged_partitions));
Expand Down

0 comments on commit 091d80e

Please sign in to comment.