forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex_entry.hh
320 lines (272 loc) · 12.1 KB
/
index_entry.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
/*
* Copyright (C) 2017-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <functional>
#include <variant>
#include "position_in_partition.hh"
#include "utils/overloaded_functor.hh"
#include "utils/lsa/chunked_managed_vector.hh"
#include "reader_permit.hh"
#include "sstables/types.hh"
#include "sstables/shared_sstable.hh"
#include "sstables/mx/parsers.hh"
#include "utils/allocation_strategy.hh"
#include "utils/managed_ref.hh"
#include "utils/managed_bytes.hh"
#include "utils/managed_vector.hh"
#include <seastar/core/fstream.hh>
namespace sstables {
using use_caching = bool_class<struct use_caching_tag>;
using promoted_index_block_position_view = std::variant<composite_view, position_in_partition_view>;
using promoted_index_block_position = std::variant<composite, position_in_partition>;
inline
promoted_index_block_position_view to_view(const promoted_index_block_position& v) {
return std::visit(overloaded_functor{
[] (const composite& v) -> promoted_index_block_position_view {
return composite_view(v);
},
[] (const position_in_partition& v) -> promoted_index_block_position_view {
return position_in_partition_view(v);
}
}, v);
}
// Return the owning version of the position given a view.
inline
promoted_index_block_position materialize(const promoted_index_block_position_view& v) {
return std::visit(overloaded_functor{
[] (const composite_view& v) -> promoted_index_block_position {
return composite(v);
},
[] (const position_in_partition_view& v) -> promoted_index_block_position {
return position_in_partition(v);
}
}, v);
}
class promoted_index_block_compare {
const position_in_partition::composite_less_compare _cmp;
public:
explicit promoted_index_block_compare(const schema& s) : _cmp{s} {}
bool operator()(const promoted_index_block_position_view& lhs, position_in_partition_view rhs) const {
return std::visit([this, rhs] (const auto& pos) { return _cmp(pos, rhs); }, lhs);
}
bool operator()(position_in_partition_view lhs, const promoted_index_block_position_view& rhs) const {
return std::visit([this, lhs] (const auto& pos) { return _cmp(lhs, pos); }, rhs);
}
bool operator()(const promoted_index_block_position_view& lhs, composite_view rhs) const {
return std::visit([this, rhs] (const auto& pos) { return _cmp(pos, rhs); }, lhs);
}
bool operator()(composite_view lhs, const promoted_index_block_position_view& rhs) const {
return std::visit([this, lhs] (const auto& pos) { return _cmp(lhs, pos); }, rhs);
}
bool operator()(const promoted_index_block_position_view& lhs, const promoted_index_block_position_view& rhs) const {
return std::visit([this, &lhs] (const auto& pos) { return (*this)(lhs, pos); }, rhs);
}
};
class promoted_index_block {
/*
* Block bounds are read and stored differently for ka/la and mc formats.
* For ka/la formats, we just read and store the whole sequence of bytes representing a 'composite' key,
* but for 'mc' we need to parse the clustering key prefix entirely along with its bound_kind.
* So we store them as a discriminated union, aka std::variant.
* As those representations are used differently for comparing positions in partition,
* we expose it through a discriminated union of views.
*/
using bound_storage = std::variant<temporary_buffer<char>, position_in_partition>;
// The block includes positions in the [_start, _end] range (both bounds inclusive)
bound_storage _start;
bound_storage _end;
uint64_t _offset;
uint64_t _width;
std::optional<deletion_time> _end_open_marker;
inline static
promoted_index_block_position_view get_position(const schema& s, const bound_storage& storage) {
return std::visit(overloaded_functor{
[&s] (const temporary_buffer<char>& buf) -> promoted_index_block_position_view {
return composite_view{to_bytes_view(buf), s.is_compound()}; },
[] (const position_in_partition& pos) -> promoted_index_block_position_view {
return pos;
}}, storage);
}
public:
// Constructor for ka/la format blocks
promoted_index_block(temporary_buffer<char>&& start, temporary_buffer<char>&& end,
uint64_t offset, uint64_t width)
: _start(std::move(start)), _end(std::move(end))
, _offset(offset), _width(width)
{}
// Constructor for mc format blocks
promoted_index_block(position_in_partition&& start, position_in_partition&& end,
uint64_t offset, uint64_t width, std::optional<deletion_time>&& end_open_marker)
: _start{std::move(start)}, _end{std::move(end)}
, _offset{offset}, _width{width}, _end_open_marker{end_open_marker}
{}
promoted_index_block(const promoted_index_block&) = delete;
promoted_index_block(promoted_index_block&&) noexcept = default;
promoted_index_block& operator=(const promoted_index_block&) = delete;
promoted_index_block& operator=(promoted_index_block&&) noexcept = default;
promoted_index_block_position_view start(const schema& s) const { return get_position(s, _start);}
promoted_index_block_position_view end(const schema& s) const { return get_position(s, _end);}
uint64_t offset() const { return _offset; }
uint64_t width() const { return _width; }
std::optional<deletion_time> end_open_marker() const { return _end_open_marker; }
};
// Cursor over the index for clustered elements of a single partition.
//
// The user is expected to call advance_to() for monotonically increasing positions
// in order to check if the index has information about more precise location
// of the fragments relevant for the range starting at given position.
//
// The user must serialize all async methods. The next call may start only when the future
// returned by the previous one has resolved.
//
// The user must call close() and wait for it to resolve before destroying.
//
class clustered_index_cursor {
public:
// Position of indexed elements in the data file realative to the start of the partition.
using offset_in_partition = uint64_t;
struct skip_info {
offset_in_partition offset;
tombstone active_tombstone;
position_in_partition active_tombstone_pos;
};
struct entry_info {
promoted_index_block_position_view start;
promoted_index_block_position_view end;
offset_in_partition offset;
};
virtual ~clustered_index_cursor() {};
// Note: Close must not fail
virtual future<> close() noexcept = 0;
// Advances the cursor to given position. When the cursor has more accurate information about
// location of the fragments from the range [pos, +inf) in the data file (since it was last advanced)
// it resolves with an engaged optional containing skip_info.
//
// The index may not be precise, so fragments from the range [pos, +inf) may be located after the
// position indicated by skip_info. It is guaranteed that no such fragments are located before the returned position.
//
// Offsets returned in skip_info are monotonically increasing.
//
// Must be called for non-decreasing positions.
// The caller must ensure that pos remains valid until the future resolves.
virtual future<std::optional<skip_info>> advance_to(position_in_partition_view pos) = 0;
// Determines the data file offset relative to the start of the partition such that fragments
// from the range (-inf, pos] are located before that offset.
//
// If such offset cannot be determined in a cheap way, returns a disengaged optional.
//
// Does not advance the cursor.
//
// The caller must ensure that pos remains valid until the future resolves.
virtual future<std::optional<offset_in_partition>> probe_upper_bound(position_in_partition_view pos) = 0;
// Returns skip information about the next position after the cursor
// or nullopt if there is no information about further positions.
//
// When entry_info is returned, the cursor was advanced to entry_info::start.
//
// The returned entry_info is only valid until the next invocation of any method on this instance.
virtual future<std::optional<entry_info>> next_entry() = 0;
};
// Allocated inside LSA.
class promoted_index {
deletion_time _del_time;
uint64_t _promoted_index_start;
uint32_t _promoted_index_size;
uint32_t _num_blocks;
public:
promoted_index(const schema& s,
deletion_time del_time,
uint64_t promoted_index_start,
uint32_t promoted_index_size,
uint32_t num_blocks)
: _del_time{del_time}
, _promoted_index_start(promoted_index_start)
, _promoted_index_size(promoted_index_size)
, _num_blocks(num_blocks)
{ }
[[nodiscard]] deletion_time get_deletion_time() const { return _del_time; }
[[nodiscard]] uint32_t get_promoted_index_size() const { return _promoted_index_size; }
// Call under allocating_section.
// For sstable versions >= mc the returned cursor will be of type `bsearch_clustered_cursor`.
std::unique_ptr<clustered_index_cursor> make_cursor(shared_sstable,
reader_permit,
tracing::trace_state_ptr,
file_input_stream_options,
use_caching);
};
// A partition index element.
// Allocated inside LSA.
class index_entry {
private:
managed_bytes _key;
mutable std::optional<dht::token> _token;
uint64_t _position;
managed_ref<promoted_index> _index;
public:
key_view get_key() const {
return key_view{_key};
}
// May allocate so must be called under allocating_section.
decorated_key_view get_decorated_key(const schema& s) const {
if (!_token) {
_token.emplace(s.get_partitioner().get_token(get_key()));
}
return decorated_key_view(*_token, get_key());
}
uint64_t position() const { return _position; };
std::optional<deletion_time> get_deletion_time() const {
if (_index) {
return _index->get_deletion_time();
}
return {};
}
index_entry(managed_bytes&& key, uint64_t position, managed_ref<promoted_index>&& index)
: _key(std::move(key))
, _position(position)
, _index(std::move(index))
{}
index_entry(index_entry&&) = default;
index_entry& operator=(index_entry&&) = default;
// Can be nullptr
const managed_ref<promoted_index>& get_promoted_index() const { return _index; }
managed_ref<promoted_index>& get_promoted_index() { return _index; }
uint32_t get_promoted_index_size() const { return _index ? _index->get_promoted_index_size() : 0; }
size_t external_memory_usage() const {
return _key.external_memory_usage() + _index.external_memory_usage();
}
};
// A partition index page.
//
// Allocated in the standard allocator space but with an LSA allocator as the current allocator.
// So the shallow part is in the standard allocator but all indirect objects are inside LSA.
class partition_index_page {
public:
lsa::chunked_managed_vector<managed_ref<index_entry>> _entries;
public:
partition_index_page() = default;
partition_index_page(partition_index_page&&) noexcept = default;
partition_index_page& operator=(partition_index_page&&) noexcept = default;
bool empty() const { return _entries.empty(); }
size_t size() const { return _entries.size(); }
size_t external_memory_usage() const {
size_t size = _entries.external_memory_usage();
for (auto&& e : _entries) {
size += sizeof(index_entry) + e->external_memory_usage();
}
return size;
}
};
using index_list = partition_index_page;
}
inline std::ostream& operator<<(std::ostream& out, const sstables::promoted_index_block_position_view& pos) {
std::visit([&out] (const auto& pos) mutable { out << pos; }, pos);
return out;
}
inline std::ostream& operator<<(std::ostream& out, const sstables::promoted_index_block_position& pos) {
std::visit([&out] (const auto& pos) mutable { out << pos; }, pos);
return out;
}