forked from rethinkdb/rethinkdb_rebirth
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbatching.hpp
127 lines (110 loc) · 4.56 KB
/
batching.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// Copyright 2010-2014 RethinkDB, all rights reserved.
#ifndef RDB_PROTOCOL_BATCHING_HPP_
#define RDB_PROTOCOL_BATCHING_HPP_
#include <utility>
#include "btree/keys.hpp"
#include "containers/archive/archive.hpp"
#include "containers/archive/versioned.hpp"
#include "containers/optional.hpp"
#include "rdb_protocol/datum.hpp"
#include "rpc/serialize_macros.hpp"
#include "time.hpp"
template<class T>
class counted_t;
namespace ql {
class datum_t;
class env_t;
enum class batch_type_t {
// A normal batch.
NORMAL = 0,
// The first batch in a series of normal batches. The size limit is reduced
// to help minimizing the latency until a user receives their first response.
NORMAL_FIRST = 1,
// A batch fetched for a terminal or terminal-like term, e.g. a big batched
// insert. Ignores latency caps because the latency the user encounters is
// determined by bandwidth instead.
TERMINAL = 2,
// If we're ordering by an sindex, get a batch with a constant value for
// that sindex. We sometimes need a batch with that invariant for sorting.
// (This replaces that SORTING_HINT_NEXT stuff.)
SINDEX_CONSTANT = 3
};
ARCHIVE_PRIM_MAKE_RANGED_SERIALIZABLE(
batch_type_t, int8_t, batch_type_t::NORMAL, batch_type_t::SINDEX_CONSTANT);
enum ignore_latency_t { NO, YES };
class batcher_t {
public:
bool note_el(const datum_t &t) {
seen_one_el = true;
els_left -= 1;
min_els_left -= 1;
size_left -= serialized_size<cluster_version_t::CLUSTER>(t);
return should_send_batch();
}
bool should_send_batch(
ignore_latency_t ignore_latency = ignore_latency_t::NO) const;
batcher_t(batcher_t &&other) :
batch_type(std::move(other.batch_type)),
seen_one_el(std::move(other.seen_one_el)),
min_els_left(std::move(other.min_els_left)),
els_left(std::move(other.els_left)),
size_left(std::move(other.size_left)),
end_time(std::move(other.end_time)) { }
kiloticks_t kiloticks_left() {
kiloticks_t cur_time = get_kiloticks();
return kiloticks_t{end_time.micros > cur_time.micros ?
end_time.micros - cur_time.micros :
0};
}
batch_type_t get_batch_type() { return batch_type; }
private:
DISABLE_COPYING(batcher_t);
friend class batchspec_t;
batcher_t(batch_type_t batch_type, int64_t min_els, int64_t max_els,
int64_t max_size, kiloticks_t end_time);
const batch_type_t batch_type;
bool seen_one_el;
int64_t min_els_left, els_left, size_left;
const kiloticks_t end_time;
};
class batchspec_t {
public:
static batchspec_t user(batch_type_t batch_type, env_t *env);
static batchspec_t all(); // Gimme everything.
static batchspec_t empty() { return batchspec_t(); }
static batchspec_t default_for(batch_type_t batch_type);
batch_type_t get_batch_type() const { return batch_type; }
batchspec_t with_new_batch_type(batch_type_t new_batch_type) const;
batchspec_t with_min_els(int64_t new_min_els) const;
batchspec_t with_max_dur(kiloticks_t new_max_dur) const;
batchspec_t with_at_most(uint64_t max_els) const;
// These are used to allow batchspecs to override the default ordering on a
// stream. This is only really useful when a stream is being treated as a
// set, as in the case of `include_initial` changefeeds where always using
// `ASCENDING` ordering allows the logic to be simpler.
batchspec_t with_lazy_sorting_override(sorting_t sort) const;
sorting_t lazy_sorting(sorting_t base) const {
return lazy_sorting_override.value_or(base);
}
batchspec_t scale_down(int64_t divisor) const;
batcher_t to_batcher() const;
private:
// I made this private and accessible through a static function because it
// was being accidentally default-initialized.
batchspec_t() { } // USE ONLY FOR SERIALIZATION
batchspec_t(batch_type_t batch_type, int64_t min_els, int64_t max_els,
int64_t max_size, int64_t first_scaledown,
kiloticks_t max_dur, kiloticks_t start_time);
template<cluster_version_t W>
friend void serialize(write_message_t *wm, const batchspec_t &batchspec);
template<cluster_version_t W>
friend archive_result_t deserialize(read_stream_t *s, batchspec_t *batchspec);
batch_type_t batch_type;
int64_t min_els, max_els, max_size, first_scaledown_factor;
kiloticks_t max_dur;
kiloticks_t start_time;
optional<sorting_t> lazy_sorting_override;
};
RDB_DECLARE_SERIALIZABLE(batchspec_t);
} // namespace ql
#endif // RDB_PROTOCOL_BATCHING_HPP_