Skip to content

Commit

Permalink
flat_mutation_reader: get rid of timeout parameter
Browse files Browse the repository at this point in the history
Now that the timeout is taken from the reader_permit.

Signed-off-by: Benny Halevy <[email protected]>
  • Loading branch information
bhalevy committed Aug 24, 2021
1 parent 4e3dcfd commit 4476800
Show file tree
Hide file tree
Showing 74 changed files with 881 additions and 909 deletions.
54 changes: 27 additions & 27 deletions cache_flat_mutation_reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ class cache_flat_mutation_reader final : public flat_mutation_reader::impl {
flat_mutation_reader* _underlying = nullptr;
flat_mutation_reader_opt _underlying_holder;

future<> do_fill_buffer(db::timeout_clock::time_point);
future<> ensure_underlying(db::timeout_clock::time_point);
future<> do_fill_buffer();
future<> ensure_underlying();
void copy_from_cache_to_buffer();
future<> process_static_row(db::timeout_clock::time_point);
future<> process_static_row();
void move_to_end();
void move_to_next_range();
void move_to_range(query::clustering_row_ranges::const_iterator);
Expand All @@ -128,7 +128,7 @@ class cache_flat_mutation_reader final : public flat_mutation_reader::impl {
void add_to_buffer(range_tombstone&&);
void add_range_tombstone_to_buffer(range_tombstone&&);
void add_to_buffer(mutation_fragment&&);
future<> read_from_underlying(db::timeout_clock::time_point);
future<> read_from_underlying();
void start_reading_from_underlying();
bool after_current_range(position_in_partition_view position);
bool can_populate() const;
Expand Down Expand Up @@ -187,20 +187,20 @@ public:
}
cache_flat_mutation_reader(const cache_flat_mutation_reader&) = delete;
cache_flat_mutation_reader(cache_flat_mutation_reader&&) = delete;
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override;
virtual future<> fill_buffer() override;
virtual future<> next_partition() override {
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
_end_of_stream = true;
}
return make_ready_future<>();
}
virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point timeout) override {
virtual future<> fast_forward_to(const dht::partition_range&) override {
clear_buffer();
_end_of_stream = true;
return make_ready_future<>();
}
virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override {
virtual future<> fast_forward_to(position_range pr) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> close() noexcept {
Expand All @@ -211,7 +211,7 @@ public:
};

inline
future<> cache_flat_mutation_reader::process_static_row(db::timeout_clock::time_point timeout) {
future<> cache_flat_mutation_reader::process_static_row() {
if (_snp->static_row_continuous()) {
_read_context.cache().on_row_hit();
static_row sr = _lsa_manager.run_in_read_section([this] {
Expand All @@ -223,8 +223,8 @@ future<> cache_flat_mutation_reader::process_static_row(db::timeout_clock::time_
return make_ready_future<>();
} else {
_read_context.cache().on_row_miss();
return ensure_underlying(timeout).then([this, timeout] {
return (*_underlying)(timeout).then([this] (mutation_fragment_opt&& sr) {
return ensure_underlying().then([this] {
return (*_underlying)().then([this] (mutation_fragment_opt&& sr) {
if (sr) {
assert(sr->is_static_row());
maybe_add_to_cache(sr->as_static_row());
Expand All @@ -242,10 +242,10 @@ void cache_flat_mutation_reader::touch_partition() {
}

inline
future<> cache_flat_mutation_reader::fill_buffer(db::timeout_clock::time_point timeout) {
future<> cache_flat_mutation_reader::fill_buffer() {
if (_state == state::before_static_row) {
touch_partition();
auto after_static_row = [this, timeout] {
auto after_static_row = [this] {
if (_ck_ranges_curr == _ck_ranges_end) {
finish_reader();
return make_ready_future<>();
Expand All @@ -254,26 +254,26 @@ future<> cache_flat_mutation_reader::fill_buffer(db::timeout_clock::time_point t
_lsa_manager.run_in_read_section([this] {
move_to_range(_ck_ranges_curr);
});
return fill_buffer(timeout);
return fill_buffer();
};
if (_schema->has_static_columns()) {
return process_static_row(timeout).then(std::move(after_static_row));
return process_static_row().then(std::move(after_static_row));
} else {
return after_static_row();
}
}
clogger.trace("csm {}: fill_buffer(), range={}, lb={}", fmt::ptr(this), *_ck_ranges_curr, _lower_bound);
return do_until([this] { return _end_of_stream || is_buffer_full(); }, [this, timeout] {
return do_fill_buffer(timeout);
return do_until([this] { return _end_of_stream || is_buffer_full(); }, [this] {
return do_fill_buffer();
});
}

inline
future<> cache_flat_mutation_reader::ensure_underlying(db::timeout_clock::time_point timeout) {
future<> cache_flat_mutation_reader::ensure_underlying() {
if (_underlying) {
return make_ready_future<>();
}
return _read_context.ensure_underlying(timeout).then([this] {
return _read_context.ensure_underlying().then([this] {
flat_mutation_reader& ctx_underlying = _read_context.underlying().underlying();
if (ctx_underlying.schema() != _schema) {
_underlying_holder = make_delegating_reader(ctx_underlying);
Expand All @@ -286,26 +286,26 @@ future<> cache_flat_mutation_reader::ensure_underlying(db::timeout_clock::time_p
}

inline
future<> cache_flat_mutation_reader::do_fill_buffer(db::timeout_clock::time_point timeout) {
future<> cache_flat_mutation_reader::do_fill_buffer() {
if (_state == state::move_to_underlying) {
if (!_underlying) {
return ensure_underlying(timeout).then([this, timeout] {
return do_fill_buffer(timeout);
return ensure_underlying().then([this] {
return do_fill_buffer();
});
}
_state = state::reading_from_underlying;
_population_range_starts_before_all_rows = _lower_bound.is_before_all_clustered_rows(*_schema);
if (!_read_context.partition_exists()) {
return read_from_underlying(timeout);
return read_from_underlying();
}
auto end = _next_row_in_range ? position_in_partition(_next_row.position())
: position_in_partition(_upper_bound);
return _underlying->fast_forward_to(position_range{_lower_bound, std::move(end)}, timeout).then([this, timeout] {
return read_from_underlying(timeout);
return _underlying->fast_forward_to(position_range{_lower_bound, std::move(end)}).then([this] {
return read_from_underlying();
});
}
if (_state == state::reading_from_underlying) {
return read_from_underlying(timeout);
return read_from_underlying();
}
// assert(_state == state::reading_from_cache)
return _lsa_manager.run_in_read_section([this] {
Expand Down Expand Up @@ -340,7 +340,7 @@ future<> cache_flat_mutation_reader::do_fill_buffer(db::timeout_clock::time_poin
}

inline
future<> cache_flat_mutation_reader::read_from_underlying(db::timeout_clock::time_point timeout) {
future<> cache_flat_mutation_reader::read_from_underlying() {
return consume_mutation_fragments_until(*_underlying,
[this] { return _state != state::reading_from_underlying || is_buffer_full(); },
[this] (mutation_fragment mf) {
Expand Down Expand Up @@ -415,7 +415,7 @@ future<> cache_flat_mutation_reader::read_from_underlying(db::timeout_clock::tim
}
});
return make_ready_future<>();
}, timeout);
});
}

inline
Expand Down
14 changes: 7 additions & 7 deletions compaction/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ class compaction {
get_compacting_sstable_writer(),
std::move(gc_consumer));

reader.consume_in_thread(std::move(cfc), db::no_timeout);
reader.consume_in_thread(std::move(cfc));
});
});
return consumer(make_sstable_reader());
Expand Down Expand Up @@ -1397,12 +1397,12 @@ class scrub_compaction final : public regular_compaction {
, _reader(std::move(underlying))
, _validator(*_schema)
{ }
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override {
virtual future<> fill_buffer() override {
if (_end_of_stream) {
return make_ready_future<>();
}
return repeat([this, timeout] {
return _reader.fill_buffer(timeout).then([this] {
return repeat([this] {
return _reader.fill_buffer().then([this] {
fill_buffer_from_underlying();
return stop_iteration(is_buffer_full() || _end_of_stream);
});
Expand All @@ -1428,10 +1428,10 @@ class scrub_compaction final : public regular_compaction {
virtual future<> next_partition() override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override {
virtual future<> fast_forward_to(position_range pr) override {
return make_exception_future<>(make_backtraced_exception_ptr<std::bad_function_call>());
}
virtual future<> close() noexcept override {
Expand Down Expand Up @@ -1650,7 +1650,7 @@ future<bool> scrub_validate_mode_validate_reader(flat_mutation_reader reader, co
try {
auto validator = mutation_fragment_stream_validator(*schema);

while (auto mf_opt = co_await reader(db::no_timeout)) {
while (auto mf_opt = co_await reader()) {
if (info.is_stop_requested()) [[unlikely]] {
// Compaction manager will catch this exception and re-schedule the compaction.
co_return coroutine::make_exception(compaction_stop_exception(info.ks_name, info.cf_name, info.stop_requested));
Expand Down
2 changes: 1 addition & 1 deletion database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1650,7 +1650,7 @@ future<mutation> database::do_apply_counter_update(column_family& cf, const froz

tracing::trace(trace_state, "Reading counter values from the CF");
auto permit = get_reader_concurrency_semaphore().make_tracking_only_permit(m_schema.get(), "counter-read-before-write", timeout);
return counter_write_query(m_schema, cf.as_mutation_source(), std::move(permit), m.decorated_key(), slice, trace_state, timeout)
return counter_write_query(m_schema, cf.as_mutation_source(), std::move(permit), m.decorated_key(), slice, trace_state)
.then([this, &cf, &m, m_schema, timeout, trace_state] (auto mopt) {
// ...now, that we got existing state of all affected counter
// cells we can look for our shard in each of them, increment
Expand Down
28 changes: 14 additions & 14 deletions db/chained_delegating_reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
// All calls will first wait for a future to resolve, then forward to a given underlying reader.
class chained_delegating_reader : public flat_mutation_reader::impl {
std::unique_ptr<flat_mutation_reader> _underlying;
std::function<future<flat_mutation_reader>(db::timeout_clock::time_point)> _populate_reader;
std::function<future<flat_mutation_reader>()> _populate_reader;
std::function<void()> _on_destroyed;

public:
chained_delegating_reader(schema_ptr s, std::function<future<flat_mutation_reader>(db::timeout_clock::time_point)>&& populate, reader_permit permit, std::function<void()> on_destroyed = []{})
chained_delegating_reader(schema_ptr s, std::function<future<flat_mutation_reader>()>&& populate, reader_permit permit, std::function<void()> on_destroyed = []{})
: impl(s, std::move(permit))
, _populate_reader(std::move(populate))
, _on_destroyed(std::move(on_destroyed))
Expand All @@ -45,35 +45,35 @@ public:
_on_destroyed();
}

virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override {
virtual future<> fill_buffer() override {
if (!_underlying) {
return _populate_reader(timeout).then([this, timeout] (flat_mutation_reader&& rd) {
return _populate_reader().then([this] (flat_mutation_reader&& rd) {
_underlying = std::make_unique<flat_mutation_reader>(std::move(rd));
return fill_buffer(timeout);
return fill_buffer();
});
}

if (is_buffer_full()) {
return make_ready_future<>();
}

return _underlying->fill_buffer(timeout).then([this] {
return _underlying->fill_buffer().then([this] {
_end_of_stream = _underlying->is_end_of_stream();
_underlying->move_buffer_content_to(*this);
});
}

virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override {
virtual future<> fast_forward_to(position_range pr) override {
if (!_underlying) {
return _populate_reader(timeout).then([this, timeout, pr = std::move(pr)] (flat_mutation_reader&& rd) mutable {
return _populate_reader().then([this, pr = std::move(pr)] (flat_mutation_reader&& rd) mutable {
_underlying = std::make_unique<flat_mutation_reader>(std::move(rd));
return fast_forward_to(pr, timeout);
return fast_forward_to(pr);
});
}

_end_of_stream = false;
forward_buffer_to(pr.start());
return _underlying->fast_forward_to(std::move(pr), timeout);
return _underlying->fast_forward_to(std::move(pr));
}

virtual future<> next_partition() override {
Expand All @@ -91,17 +91,17 @@ public:
return f;
}

virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
if (!_underlying) {
return _populate_reader(timeout).then([this, timeout, &pr] (flat_mutation_reader&& rd) mutable {
return _populate_reader().then([this, &pr] (flat_mutation_reader&& rd) mutable {
_underlying = std::make_unique<flat_mutation_reader>(std::move(rd));
return fast_forward_to(pr, timeout);
return fast_forward_to(pr);
});
}

_end_of_stream = false;
clear_buffer();
return _underlying->fast_forward_to(pr, timeout);
return _underlying->fast_forward_to(pr);
}

virtual future<> close() noexcept override {
Expand Down
13 changes: 6 additions & 7 deletions db/size_estimates_virtual_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
#include "range.hh"
#include "mutation_fragment.hh"
#include "sstables/sstables.hh"
#include "db/timeout_clock.hh"
#include "database.hh"

#include "db/size_estimates_virtual_reader.hh"
Expand Down Expand Up @@ -270,15 +269,15 @@ future<> size_estimates_mutation_reader::close_partition_reader() noexcept {
return _partition_reader ? _partition_reader->close() : make_ready_future<>();
}

future<> size_estimates_mutation_reader::fill_buffer(db::timeout_clock::time_point timeout) {
return do_until([this, timeout] { return is_end_of_stream() || is_buffer_full(); }, [this, timeout] {
future<> size_estimates_mutation_reader::fill_buffer() {
return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this] {
if (!_partition_reader) {
return get_next_partition();
}
return _partition_reader->consume_pausable([this] (mutation_fragment mf) {
push_mutation_fragment(std::move(mf));
return stop_iteration(is_buffer_full());
}, timeout).then([this] {
}).then([this] {
if (_partition_reader->is_end_of_stream() && _partition_reader->is_buffer_empty()) {
return _partition_reader->close();
}
Expand All @@ -295,19 +294,19 @@ future<> size_estimates_mutation_reader::next_partition() {
return make_ready_future<>();
}

future<> size_estimates_mutation_reader::fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
future<> size_estimates_mutation_reader::fast_forward_to(const dht::partition_range& pr) {
clear_buffer();
_prange = &pr;
_keyspaces = std::nullopt;
_end_of_stream = false;
return close_partition_reader();
}

future<> size_estimates_mutation_reader::fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) {
future<> size_estimates_mutation_reader::fast_forward_to(position_range pr) {
forward_buffer_to(pr.start());
_end_of_stream = false;
if (_partition_reader) {
return _partition_reader->fast_forward_to(std::move(pr), timeout);
return _partition_reader->fast_forward_to(std::move(pr));
}
return make_ready_future<>();
}
Expand Down
6 changes: 3 additions & 3 deletions db/size_estimates_virtual_reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ class size_estimates_mutation_reader final : public flat_mutation_reader::impl {
public:
size_estimates_mutation_reader(database& db, schema_ptr, reader_permit, const dht::partition_range&, const query::partition_slice&, streamed_mutation::forwarding);

virtual future<> fill_buffer(db::timeout_clock::time_point) override;
virtual future<> fill_buffer() override;
virtual future<> next_partition() override;
virtual future<> fast_forward_to(const dht::partition_range&, db::timeout_clock::time_point) override;
virtual future<> fast_forward_to(position_range, db::timeout_clock::time_point) override;
virtual future<> fast_forward_to(const dht::partition_range&) override;
virtual future<> fast_forward_to(position_range) override;
virtual future<> close() noexcept override;
private:
future<> get_next_partition();
Expand Down
2 changes: 1 addition & 1 deletion db/system_keyspace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1808,7 +1808,7 @@ class nodetool_status_table : public memtable_filling_virtual_table {
.build();
}

future<> execute(std::function<void(mutation)> mutation_sink, db::timeout_clock::time_point timeout) override {
future<> execute(std::function<void(mutation)> mutation_sink) override {
return _ss.get_ownership().then([&, mutation_sink] (std::map<gms::inet_address, float> ownership) {
const locator::token_metadata& tm = _ss.get_token_metadata();
gms::gossiper& gs = gms::get_local_gossiper();
Expand Down
Loading

0 comments on commit 4476800

Please sign in to comment.