forked from rethinkdb/rethinkdb_rebirth
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathreadgens.hpp
224 lines (192 loc) · 8.01 KB
/
readgens.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
#ifndef RDB_PROTOCOL_DATUM_STREAM_READGENS_HPP_
#define RDB_PROTOCOL_DATUM_STREAM_READGENS_HPP_
#include "rdb_protocol/datum_stream.hpp"
namespace ql {
// This class generates the `read_t`s used in range reads. It's used by
// `reader_t` below. Its subclasses are the different types of range reads we
// need to do.
class readgen_t {
public:
explicit readgen_t(
serializable_env_t s_env,
std::string table_name,
profile_bool_t profile,
read_mode_t read_mode,
sorting_t sorting);
virtual ~readgen_t() { }
virtual read_t terminal_read(
const std::vector<transform_variant_t> &transform,
const terminal_variant_t &_terminal,
const batchspec_t &batchspec) const = 0;
// This has to be on `readgen_t` because we sort differently depending on
// the kinds of reads we're doing.
virtual void sindex_sort(std::vector<rget_item_t> *vec,
const batchspec_t &batchspec) const = 0;
virtual read_t next_read(
const optional<active_ranges_t> &active_ranges,
const optional<reql_version_t> &reql_version,
optional<changefeed_stamp_t> stamp,
std::vector<transform_variant_t> transform,
const batchspec_t &batchspec) const = 0;
virtual key_range_t original_keyrange(reql_version_t rv) const = 0;
virtual void restrict_active_ranges(
sorting_t sorting, active_ranges_t *ranges_inout) const = 0;
virtual optional<std::string> sindex_name() const = 0;
virtual changefeed::keyspec_t::range_t get_range_spec(
std::vector<transform_variant_t>) const = 0;
const std::string &get_table_name() const { return table_name; }
read_mode_t get_read_mode() const { return read_mode; }
// Returns `sorting_` unless the batchspec overrides it.
sorting_t sorting(const batchspec_t &batchspec) const;
protected:
const serializable_env_t serializable_env;
const std::string table_name;
const profile_bool_t profile;
const read_mode_t read_mode;
const sorting_t sorting_;
};
class rget_readgen_t : public readgen_t {
public:
explicit rget_readgen_t(
serializable_env_t s_env,
std::string table_name,
const datumspec_t &datumspec,
profile_bool_t profile,
read_mode_t read_mode,
sorting_t sorting,
require_sindexes_t require_sindex_val);
virtual read_t terminal_read(
const std::vector<transform_variant_t> &transform,
const terminal_variant_t &_terminal,
const batchspec_t &batchspec) const;
virtual read_t next_read(
const optional<active_ranges_t> &active_ranges,
const optional<reql_version_t> &reql_version,
optional<changefeed_stamp_t> stamp,
std::vector<transform_variant_t> transform,
const batchspec_t &batchspec) const;
private:
virtual rget_read_t next_read_impl(
const optional<active_ranges_t> &active_ranges,
const optional<reql_version_t> &reql_version,
optional<changefeed_stamp_t> stamp,
std::vector<transform_variant_t> transforms,
const batchspec_t &batchspec) const = 0;
protected:
datumspec_t datumspec;
require_sindexes_t require_sindex_val;
};
class primary_readgen_t : public rget_readgen_t {
public:
static scoped_ptr_t<readgen_t> make(
env_t *env,
std::string table_name,
read_mode_t read_mode,
const datumspec_t &datumspec = datumspec_t(datum_range_t::universe()),
sorting_t sorting = sorting_t::UNORDERED);
private:
primary_readgen_t(serializable_env_t s_env,
std::string table_name,
const datumspec_t &datumspec,
profile_bool_t profile,
read_mode_t read_mode,
sorting_t sorting);
virtual rget_read_t next_read_impl(
const optional<active_ranges_t> &active_ranges,
const optional<reql_version_t> &reql_version,
optional<changefeed_stamp_t> stamp,
std::vector<transform_variant_t> transform,
const batchspec_t &batchspec) const;
virtual void sindex_sort(std::vector<rget_item_t> *vec,
const batchspec_t &batchspec) const;
virtual key_range_t original_keyrange(reql_version_t rv) const;
virtual optional<std::string> sindex_name() const;
void restrict_active_ranges(
sorting_t sorting, active_ranges_t *active_ranges_inout) const final;
virtual changefeed::keyspec_t::range_t get_range_spec(
std::vector<transform_variant_t> transforms) const;
optional<std::map<store_key_t, uint64_t> > store_keys;
};
class sindex_readgen_t : public rget_readgen_t {
public:
static scoped_ptr_t<readgen_t> make(
env_t *env,
std::string table_name,
read_mode_t read_mode,
const std::string &sindex,
const datumspec_t &datumspec = datumspec_t(datum_range_t::universe()),
sorting_t sorting = sorting_t::UNORDERED,
require_sindexes_t require_sindex_val = require_sindexes_t::NO);
virtual void sindex_sort(std::vector<rget_item_t> *vec,
const batchspec_t &batchspec) const;
virtual key_range_t original_keyrange(reql_version_t rv) const;
virtual optional<std::string> sindex_name() const;
void restrict_active_ranges(sorting_t, active_ranges_t *) const final { }
private:
sindex_readgen_t(
serializable_env_t s_env,
std::string table_name,
const std::string &sindex,
const datumspec_t &datumspec,
profile_bool_t profile,
read_mode_t read_mode,
sorting_t sorting,
require_sindexes_t require_sindex_val);
virtual rget_read_t next_read_impl(
const optional<active_ranges_t> &active_ranges,
const optional<reql_version_t> &reql_version,
optional<changefeed_stamp_t> stamp,
std::vector<transform_variant_t> transform,
const batchspec_t &batchspec) const;
virtual changefeed::keyspec_t::range_t get_range_spec(
std::vector<transform_variant_t> transforms) const;
const std::string sindex;
bool sent_first_read;
};
// For geospatial intersection queries
class intersecting_readgen_t : public readgen_t {
public:
static scoped_ptr_t<readgen_t> make(
env_t *env,
std::string table_name,
read_mode_t read_mode,
const std::string &sindex,
const datum_t &query_geometry);
virtual read_t terminal_read(
const std::vector<transform_variant_t> &transform,
const terminal_variant_t &_terminal,
const batchspec_t &batchspec) const;
virtual read_t next_read(
const optional<active_ranges_t> &active_ranges,
const optional<reql_version_t> &reql_version,
optional<changefeed_stamp_t> stamp,
std::vector<transform_variant_t> transform,
const batchspec_t &batchspec) const;
virtual void sindex_sort(std::vector<rget_item_t> *vec,
const batchspec_t &batchspec) const;
virtual key_range_t original_keyrange(reql_version_t rv) const;
virtual optional<std::string> sindex_name() const;
void restrict_active_ranges(sorting_t, active_ranges_t *) const final { }
virtual changefeed::keyspec_t::range_t get_range_spec(
std::vector<transform_variant_t>) const;
private:
intersecting_readgen_t(
serializable_env_t s_env,
std::string table_name,
const std::string &sindex,
const datum_t &query_geometry,
profile_bool_t profile,
read_mode_t read_mode);
// Analogue to rget_readgen_t::next_read_impl(), but generates an intersecting
// geo read.
intersecting_geo_read_t next_read_impl(
const optional<active_ranges_t> &active_ranges,
const optional<reql_version_t> &reql_version,
optional<changefeed_stamp_t> stamp,
std::vector<transform_variant_t> transforms,
const batchspec_t &batchspec) const;
const std::string sindex;
const datum_t query_geometry;
};
} // namespace ql
#endif // RDB_PROTOCOL_DATUM_STREAM_READGENS_HPP_