forked from rethinkdb/rethinkdb_rebirth
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcontext.cc
135 lines (119 loc) · 5.12 KB
/
context.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
// Copyright 2010-2014 RethinkDB, all rights reserved.
#include "rdb_protocol/context.hpp"
#include "clustering/administration/metadata.hpp"
#include "concurrency/cross_thread_watchable.hpp"
#include "rdb_protocol/query_cache.hpp"
#include "rdb_protocol/datum.hpp"
#include "rpc/semilattice/view/field.hpp"
#include "rpc/semilattice/watchable.hpp"
#include "time.hpp"
bool sindex_config_t::operator==(const sindex_config_t &o) const {
if (func_version != o.func_version || multi != o.multi || geo != o.geo) {
return false;
}
/* This is kind of a hack--we compare the functions by serializing them and comparing
the serialized values. */
write_message_t wm1, wm2;
serialize<cluster_version_t::CLUSTER>(&wm1, func);
serialize<cluster_version_t::CLUSTER>(&wm2, o.func);
vector_stream_t stream1, stream2;
int res = send_write_message(&stream1, &wm1);
guarantee(res == 0);
res = send_write_message(&stream2, &wm2);
guarantee(res == 0);
return stream1.vector() == stream2.vector();
}
RDB_IMPL_SERIALIZABLE_4_SINCE_v2_1(sindex_config_t,
func, func_version, multi, geo);
bool write_hook_config_t::operator==(const write_hook_config_t &o) const {
if (func_version != o.func_version) {
return false;
}
/* This is kind of a hack--we compare the functions by serializing them and comparing
the serialized values. */
write_message_t wm1, wm2;
serialize<cluster_version_t::CLUSTER>(&wm1, func);
serialize<cluster_version_t::CLUSTER>(&wm2, o.func);
vector_stream_t stream1, stream2;
int res = send_write_message(&stream1, &wm1);
guarantee(res == 0);
res = send_write_message(&stream2, &wm2);
guarantee(res == 0);
return stream1.vector() == stream2.vector();
}
RDB_IMPL_SERIALIZABLE_2_SINCE_v2_4(write_hook_config_t,
func, func_version);
void sindex_status_t::accum(const sindex_status_t &other) {
progress_numerator += other.progress_numerator;
progress_denominator += other.progress_denominator;
ready &= other.ready;
start_time = std::min(start_time, other.start_time);
rassert(outdated == other.outdated);
}
RDB_IMPL_SERIALIZABLE_5_FOR_CLUSTER(sindex_status_t,
progress_numerator, progress_denominator, ready, outdated, start_time);
const char *rql_perfmon_name = "query_engine";
rdb_context_t::stats_t::stats_t(perfmon_collection_t *global_stats)
: qe_stats_membership(global_stats, &qe_stats_collection, rql_perfmon_name),
client_connections_membership(&qe_stats_collection,
&client_connections, "client_connections"),
clients_active_membership(&qe_stats_collection,
&clients_active, "clients_active"),
queries_per_sec(secs_to_ticks(1)),
queries_per_sec_membership(&qe_stats_collection,
&queries_per_sec, "queries_per_sec"),
queries_total_membership(&qe_stats_collection,
&queries_total, "queries_total") { }
rdb_context_t::rdb_context_t()
: extproc_pool(nullptr),
cluster_interface(nullptr),
manager(nullptr),
reql_http_proxy(),
stats(&get_global_perfmon_collection()) { }
rdb_context_t::rdb_context_t(
extproc_pool_t *_extproc_pool,
reql_cluster_interface_t *_cluster_interface,
std::shared_ptr<semilattice_read_view_t<auth_semilattice_metadata_t>>
auth_semilattice_view)
: extproc_pool(_extproc_pool),
cluster_interface(_cluster_interface),
manager(nullptr),
reql_http_proxy(),
stats(&get_global_perfmon_collection()) {
init_auth_watchables(auth_semilattice_view);
}
rdb_context_t::rdb_context_t(
extproc_pool_t *_extproc_pool,
mailbox_manager_t *_mailbox_manager,
reql_cluster_interface_t *_cluster_interface,
std::shared_ptr<semilattice_read_view_t<auth_semilattice_metadata_t>>
auth_semilattice_view,
perfmon_collection_t *global_stats,
const std::string &_reql_http_proxy)
: extproc_pool(_extproc_pool),
cluster_interface(_cluster_interface),
manager(_mailbox_manager),
reql_http_proxy(_reql_http_proxy),
stats(global_stats) {
init_auth_watchables(auth_semilattice_view);
}
void rdb_context_t::init_auth_watchables(
std::shared_ptr<semilattice_read_view_t<auth_semilattice_metadata_t>>
auth_semilattice_view) {
for (int thread = 0; thread < get_num_threads(); ++thread) {
m_cross_thread_auth_watchables.emplace_back(
new cross_thread_watchable_variable_t<auth_semilattice_metadata_t>(
clone_ptr_t<semilattice_watchable_t<auth_semilattice_metadata_t>>(
new semilattice_watchable_t<auth_semilattice_metadata_t>(
auth_semilattice_view)),
threadnum_t(thread)));
}
}
rdb_context_t::~rdb_context_t() { }
std::set<ql::query_cache_t *> *rdb_context_t::get_query_caches_for_this_thread() {
return query_caches.get();
}
clone_ptr_t<watchable_t<auth_semilattice_metadata_t>>
rdb_context_t::get_auth_watchable() const{
return m_cross_thread_auth_watchables[get_thread_id().threadnum]->get_watchable();
}