forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmutation_source.hh
207 lines (189 loc) · 8.99 KB
/
mutation_source.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
/*
* Copyright (C) 2022-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <seastar/core/io_priority_class.hh>
#include "dht/i_partitioner.hh"
#include "query-request.hh"
#include "tracing/trace_state.hh"
#include "readers/flat_mutation_reader_fwd.hh"
#include "readers/flat_mutation_reader_v2.hh"
#include "readers/mutation_fragment_v1_stream.hh"
/// A partition_presence_checker quickly returns whether a key is known not to exist
/// in a data source (it may return false positives, but not false negatives).
enum class partition_presence_checker_result {
definitely_doesnt_exist,
maybe_exists
};
using partition_presence_checker = std::function<partition_presence_checker_result (const dht::decorated_key& key)>;
inline
partition_presence_checker make_default_partition_presence_checker() {
return [] (const dht::decorated_key&) { return partition_presence_checker_result::maybe_exists; };
}
// mutation_source represents source of data in mutation form. The data source
// can be queried multiple times and in parallel. For each query it returns
// independent mutation_reader.
//
// The reader returns mutations having all the same schema, the one passed
// when invoking the source.
//
// When reading in reverse, a reverse schema has to be passed (compared to the
// table's schema), and a half-reverse (legacy) slice.
// See docs/dev/reverse-reads.md for more details.
// Partition-range forwarding is not yet supported in reverse mode.
class mutation_source {
using partition_range = const dht::partition_range&;
using io_priority = const io_priority_class&;
using flat_reader_v2_factory_type = std::function<flat_mutation_reader_v2(schema_ptr,
reader_permit,
partition_range,
const query::partition_slice&,
io_priority,
tracing::trace_state_ptr,
streamed_mutation::forwarding,
mutation_reader::forwarding)>;
// We could have our own version of std::function<> that is nothrow
// move constructible and save some indirection and allocation.
// Probably not worth the effort though.
lw_shared_ptr<flat_reader_v2_factory_type> _fn;
lw_shared_ptr<std::function<partition_presence_checker()>> _presence_checker_factory;
private:
mutation_source() = default;
explicit operator bool() const { return bool(_fn); }
friend class optimized_optional<mutation_source>;
public:
mutation_source(flat_reader_v2_factory_type fn, std::function<partition_presence_checker()> pcf = [] { return make_default_partition_presence_checker(); })
: _fn(make_lw_shared<flat_reader_v2_factory_type>(std::move(fn)))
, _presence_checker_factory(make_lw_shared<std::function<partition_presence_checker()>>(std::move(pcf)))
{ }
mutation_source(std::function<flat_mutation_reader_v2(schema_ptr, reader_permit, partition_range, const query::partition_slice&, io_priority,
tracing::trace_state_ptr, streamed_mutation::forwarding)> fn)
: mutation_source([fn = std::move(fn)] (schema_ptr s,
reader_permit permit,
partition_range range,
const query::partition_slice& slice,
io_priority pc,
tracing::trace_state_ptr tr,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding) {
return fn(std::move(s), std::move(permit), range, slice, pc, std::move(tr), fwd);
}) {}
mutation_source(std::function<flat_mutation_reader_v2(schema_ptr, reader_permit, partition_range, const query::partition_slice&, io_priority)> fn)
: mutation_source([fn = std::move(fn)] (schema_ptr s,
reader_permit permit,
partition_range range,
const query::partition_slice& slice,
io_priority pc,
tracing::trace_state_ptr,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding) {
assert(!fwd);
return fn(std::move(s), std::move(permit), range, slice, pc);
}) {}
mutation_source(std::function<flat_mutation_reader_v2(schema_ptr, reader_permit, partition_range, const query::partition_slice&)> fn)
: mutation_source([fn = std::move(fn)] (schema_ptr s,
reader_permit permit,
partition_range range,
const query::partition_slice& slice,
io_priority,
tracing::trace_state_ptr,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding) {
assert(!fwd);
return fn(std::move(s), std::move(permit), range, slice);
}) {}
mutation_source(std::function<flat_mutation_reader_v2(schema_ptr, reader_permit, partition_range range)> fn)
: mutation_source([fn = std::move(fn)] (schema_ptr s,
reader_permit permit,
partition_range range,
const query::partition_slice&,
io_priority,
tracing::trace_state_ptr,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding) {
assert(!fwd);
return fn(std::move(s), std::move(permit), range);
}) {}
mutation_source(const mutation_source& other) = default;
mutation_source& operator=(const mutation_source& other) = default;
mutation_source(mutation_source&&) = default;
mutation_source& operator=(mutation_source&&) = default;
mutation_fragment_v1_stream
make_fragment_v1_stream(
schema_ptr s,
reader_permit permit,
partition_range range,
const query::partition_slice& slice,
io_priority pc = default_priority_class(),
tracing::trace_state_ptr trace_state = nullptr,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) const
{
return mutation_fragment_v1_stream(
(*_fn)(std::move(s), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr));
}
mutation_fragment_v1_stream
make_fragment_v1_stream(
schema_ptr s,
reader_permit permit,
partition_range range = query::full_partition_range) const
{
auto& full_slice = s->full_slice();
return this->make_fragment_v1_stream(std::move(s), std::move(permit), range, full_slice);
}
// Creates a new reader.
//
// All parameters captured by reference must remain live as long as returned
// mutation_reader or streamed_mutation obtained through it are alive.
flat_mutation_reader_v2
make_reader_v2(
schema_ptr s,
reader_permit permit,
partition_range range,
const query::partition_slice& slice,
io_priority pc = default_priority_class(),
tracing::trace_state_ptr trace_state = nullptr,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::yes) const
{
return (*_fn)(std::move(s), std::move(permit), range, slice, pc, std::move(trace_state), fwd, fwd_mr);
}
flat_mutation_reader_v2
make_reader_v2(
schema_ptr s,
reader_permit permit,
partition_range range = query::full_partition_range) const
{
auto& full_slice = s->full_slice();
return make_reader_v2(std::move(s), std::move(permit), range, full_slice);
}
partition_presence_checker make_partition_presence_checker() {
return (*_presence_checker_factory)();
}
};
// Returns a mutation_source which is the sum of given mutation_sources.
//
// Adding two mutation sources gives a mutation source which contains
// the sum of writes contained in the addends.
mutation_source make_combined_mutation_source(std::vector<mutation_source>);
// Represent mutation_source which can be snapshotted.
class snapshot_source {
private:
std::function<mutation_source()> _func;
public:
snapshot_source(std::function<mutation_source()> func)
: _func(std::move(func))
{ }
// Creates a new snapshot.
// The returned mutation_source represents all earlier writes and only those.
// Note though that the mutations in the snapshot may get compacted over time.
mutation_source operator()() {
return _func();
}
};
mutation_source make_empty_mutation_source();
snapshot_source make_empty_snapshot_source();
using mutation_source_opt = optimized_optional<mutation_source>;