forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmutation_fragment_stream_validator.hh
201 lines (178 loc) · 8.4 KB
/
mutation_fragment_stream_validator.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
/*
* Copyright (C) 2021-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include "mutation_fragment_v2.hh"
enum class mutation_fragment_stream_validation_level {
none, // disable validation altogether
partition_region, // fragment kind
token,
partition_key,
clustering_key,
};
/// Low level fragment stream validator.
///
/// Tracks and validates the monotonicity of the passed in fragment kinds,
/// position in partition, token or partition keys.
class mutation_fragment_stream_validator {
const ::schema& _schema;
mutation_fragment_v2::kind _prev_kind;
position_in_partition _prev_pos;
dht::decorated_key _prev_partition_key;
tombstone _current_tombstone;
private:
bool validate(dht::token t, const partition_key* pkey);
bool validate(mutation_fragment_v2::kind kind, std::optional<position_in_partition_view> pos,
std::optional<tombstone> new_current_tombstone);
public:
explicit mutation_fragment_stream_validator(const schema& s);
const ::schema& schema() const { return _schema; }
/// Validate the monotonicity of the fragment kind.
///
/// Should be used when the full, more heavy-weight position-in-partition
/// monotonicity validation provided by
/// `operator()(const mutation_fragment&)` is not desired.
/// Using both overloads for the same stream is not supported.
/// Advances the previous fragment kind, but only if the validation passes.
/// `new_current_tombstone` should be engaged only when the fragment changes
/// the current tombstone (range tombstone change fragments).
///
/// \returns true if the fragment kind is valid.
bool operator()(mutation_fragment_v2::kind kind, std::optional<tombstone> new_current_tombstone);
bool operator()(mutation_fragment::kind kind);
/// Validates the monotonicity of the mutation fragment kind and position.
///
/// Validates the mutation fragment kind monotonicity and
/// position-in-partition.
/// A more complete version of `operator()(mutation_fragment::kind)`.
/// Using both overloads for the same stream is not supported.
/// Advances the previous fragment kind and position-in-partition, but only
/// if the validation passes.
/// `new_current_tombstone` should be engaged only when the fragment changes
/// the current tombstone (range tombstone change fragments).
///
/// \returns true if the mutation fragment kind is valid.
bool operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos, std::optional<tombstone> new_current_tombstone);
bool operator()(mutation_fragment::kind kind, position_in_partition_view pos);
/// Validates the monotonicity of the mutation fragment.
///
/// Equivalent to calling `operator()(mf.kind(), mf.position())`.
/// See said overload for more details.
///
/// \returns true if the mutation fragment kind is valid.
bool operator()(const mutation_fragment_v2& mf);
bool operator()(const mutation_fragment& mf);
/// Validates the monotonicity of the token.
///
/// Does not check fragment level monotonicity.
/// Advances the previous token, but only if the validation passes.
/// Cannot be used in parallel with the `dht::decorated_key`
/// overload.
///
/// \returns true if the token is valid.
bool operator()(dht::token t);
/// Validates the monotonicity of the partition.
///
/// Does not check fragment level monotonicity.
/// Advances the previous partition-key, but only if the validation passes.
/// Cannot be used in parallel with the `dht::token`
/// overload.
///
/// \returns true if the partition key is valid.
bool operator()(const dht::decorated_key& dk);
/// Reset the state of the validator to the given partition
///
/// Reset the state of the validator as if it has just validated a valid
/// partition start with the provided key. This can be used t force a reset
/// to a given partition that is normally invalid and hence wouldn't advance
/// the internal state. This can be used by users that can correct such
/// invalid streams and wish to continue validating it.
void reset(dht::decorated_key dk);
/// Reset the state of the validator to the given fragment
///
/// Reset the state of the validator as if it has just validated a valid
/// fragment. This can be used t force a reset to a given fragment that is
/// normally invalid and hence wouldn't advance the internal state. This
/// can be used by users that can correct such invalid streams and wish to
/// continue validating it.
void reset(mutation_fragment_v2::kind kind, position_in_partition_view pos, std::optional<tombstone> new_current_tombstone);
void reset(const mutation_fragment&);
void reset(const mutation_fragment_v2&);
/// Validate that the stream was properly closed.
///
/// \returns false if the last partition wasn't closed, i.e. the last
/// fragment wasn't a `partition_end` fragment.
bool on_end_of_stream();
/// The previous valid fragment kind.
mutation_fragment_v2::kind previous_mutation_fragment_kind() const {
return _prev_kind;
}
/// The previous valid position.
///
/// Call only if operator()(position_in_partition_view) was used.
const position_in_partition& previous_position() const {
return _prev_pos;
}
/// Get the current effective tombstone
///
/// Call only if operator()(mutation_fragment_v2) or
/// operator()(mutation_fragment_v2::kind, position_in_partition_view, std::optional<tombstone>)
/// was not used.
tombstone current_tombstone() const {
return _current_tombstone;
}
/// The previous valid partition key.
///
/// Call only if operator()(dht::token) or operator()(const dht::decorated_key&) was used.
dht::token previous_token() const {
return _prev_partition_key.token();
}
/// The previous valid partition key.
///
/// Call only if operator()(const dht::decorated_key&) was used.
const dht::decorated_key& previous_partition_key() const {
return _prev_partition_key;
}
};
struct invalid_mutation_fragment_stream : public std::runtime_error {
explicit invalid_mutation_fragment_stream(std::runtime_error e);
};
/// Track position_in_partition transitions and validate monotonicity.
///
/// Will throw `invalid_mutation_fragment_stream` if any violation is found.
/// If the `abort_on_internal_error` configuration option is set, it will
/// abort instead.
/// Implements the FlattenedConsumerFilter concept.
class mutation_fragment_stream_validating_filter {
mutation_fragment_stream_validator _validator;
sstring _name_storage;
std::string_view _name_view; // always valid
mutation_fragment_stream_validation_level _validation_level;
private:
sstring full_name() const;
mutation_fragment_stream_validating_filter(const char* name_literal, sstring name_value, const schema& s, mutation_fragment_stream_validation_level level);
public:
/// Constructor.
///
/// \arg name is used in log messages to identify the validator, the
/// schema identity is added automatically
/// \arg compare_keys enable validating clustering key monotonicity
mutation_fragment_stream_validating_filter(sstring name, const schema& s, mutation_fragment_stream_validation_level level);
mutation_fragment_stream_validating_filter(const char* name, const schema& s, mutation_fragment_stream_validation_level level);
mutation_fragment_stream_validating_filter(mutation_fragment_stream_validating_filter&&) = delete;
mutation_fragment_stream_validating_filter(const mutation_fragment_stream_validating_filter&) = delete;
bool operator()(const dht::decorated_key& dk);
bool operator()(mutation_fragment_v2::kind kind, position_in_partition_view pos, std::optional<tombstone> new_current_tombstone);
bool operator()(mutation_fragment::kind kind, position_in_partition_view pos);
/// Equivalent to `operator()(mf.kind(), mf.position())`
bool operator()(const mutation_fragment_v2& mv);
bool operator()(const mutation_fragment& mv);
void reset(mutation_fragment_v2::kind kind, position_in_partition_view pos, std::optional<tombstone> new_current_tombstone);
void reset(const mutation_fragment_v2& mf);
/// Equivalent to `operator()(partition_end{})`
bool on_end_of_partition();
void on_end_of_stream();
};