forked from datastax/cpp-driver
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhost.hpp
265 lines (214 loc) · 6.92 KB
/
host.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
/*
Copyright (c) 2014-2016 DataStax
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef __CASS_HOST_HPP_INCLUDED__
#define __CASS_HOST_HPP_INCLUDED__
#include "address.hpp"
#include "atomic.hpp"
#include "copy_on_write_ptr.hpp"
#include "get_time.hpp"
#include "logger.hpp"
#include "macros.hpp"
#include "ref_counted.hpp"
#include "scoped_ptr.hpp"
#include "spin_lock.hpp"
#include <map>
#include <math.h>
#include <set>
#include <sstream>
#include <stdint.h>
#include <vector>
namespace cass {
struct TimestampedAverage {
TimestampedAverage()
: average(-1)
, timestamp(0)
, num_measured(0) { }
int64_t average;
uint64_t timestamp;
uint64_t num_measured;
};
class VersionNumber {
public:
VersionNumber()
: major_version_(0)
, minor_version_(0)
, patch_version_(0) { }
VersionNumber(int major_version, int minor_version, int patch_version)
: major_version_(major_version)
, minor_version_(minor_version)
, patch_version_(patch_version) { }
bool operator >=(const VersionNumber& other) const {
return compare(other) >= 0;
}
bool operator <(const VersionNumber& other) const {
return compare(other) < 0;
}
int compare(const VersionNumber& other) const {
if (major_version_ < other.major_version_) return -1;
if (major_version_ > other.major_version_) return 1;
if (minor_version_ < other.minor_version_) return -1;
if (minor_version_ > other.minor_version_) return 1;
if (patch_version_ < other.patch_version_) return -1;
if (patch_version_ > other.patch_version_) return 1;
return 0;
}
bool parse(const std::string& version);
int major_version() const { return major_version_; }
int minor_version() const { return minor_version_; }
int patch_version() const { return patch_version_; }
private:
int major_version_;
int minor_version_;
int patch_version_;
};
class Host : public RefCounted<Host> {
public:
typedef SharedRefPtr<Host> Ptr;
typedef SharedRefPtr<const Host> ConstPtr;
class StateListener {
public:
virtual ~StateListener() { }
virtual void on_add(const SharedRefPtr<Host>& host) = 0;
virtual void on_remove(const SharedRefPtr<Host>& host) = 0;
virtual void on_up(const SharedRefPtr<Host>& host) = 0;
virtual void on_down(const SharedRefPtr<Host>& host) = 0;
};
enum HostState {
ADDED,
UP,
DOWN
};
Host(const Address& address, bool mark)
: address_(address)
, rack_id_(0)
, dc_id_(0)
, mark_(mark)
, state_(ADDED)
, address_string_(address.to_string()) { }
const Address& address() const { return address_; }
const std::string& address_string() const { return address_string_; }
bool mark() const { return mark_; }
void set_mark(bool mark) { mark_ = mark; }
const std::string hostname() const { return hostname_; }
void set_hostname(const std::string& hostname) {
if (!hostname.empty() && hostname[hostname.size() - 1] == '.') {
// Strip off trailing dot for hostcheck comparison
hostname_ = hostname.substr(0, hostname.size() - 1);
} else {
hostname_ = hostname;
}
}
const std::string& rack() const { return rack_; }
const std::string& dc() const { return dc_; }
void set_rack_and_dc(const std::string& rack, const std::string& dc) {
rack_ = rack;
dc_ = dc;
}
uint32_t rack_id() const { return rack_id_; }
uint32_t dc_id() const { return dc_id_; }
void set_rack_and_dc_ids(uint32_t rack_id, uint32_t dc_id) {
rack_id_ = rack_id;
dc_id_ = dc_id;
}
const std::string& listen_address() const { return listen_address_; }
void set_listen_address(const std::string& listen_address) {
listen_address_ = listen_address;
}
const VersionNumber& cassandra_version() const { return cassandra_version_; }
void set_cassaandra_version(const VersionNumber& cassandra_version) {
cassandra_version_ = cassandra_version;
}
bool was_just_added() const { return state() == ADDED; }
bool is_up() const { return state() == UP; }
void set_up() { set_state(UP); }
bool is_down() const { return state() == DOWN; }
void set_down() { set_state(DOWN); }
std::string to_string() const {
std::ostringstream ss;
ss << address_string_;
if (!rack_.empty() || !dc_.empty()) {
ss << " [" << rack_ << ':' << dc_ << "]";
}
return ss.str();
}
void enable_latency_tracking(uint64_t scale, uint64_t min_measured) {
if (!latency_tracker_) {
latency_tracker_.reset(new LatencyTracker(scale, (30LL * min_measured) / 100LL));
}
}
void update_latency(uint64_t latency_ns) {
if (latency_tracker_) {
LOG_TRACE("Latency %f ms for %s", static_cast<double>(latency_ns) / 1e6, to_string().c_str());
latency_tracker_->update(latency_ns);
}
}
TimestampedAverage get_current_average() const {
if (latency_tracker_) {
return latency_tracker_->get();
}
return TimestampedAverage();
}
private:
class LatencyTracker {
public:
LatencyTracker(uint64_t scale_ns, uint64_t threshold_to_account)
: scale_ns_(scale_ns)
, threshold_to_account_(threshold_to_account) { }
void update(uint64_t latency_ns);
TimestampedAverage get() const {
ScopedSpinlock l(SpinlockPool<LatencyTracker>::get_spinlock(this));
return current_;
}
private:
uint64_t scale_ns_;
uint64_t threshold_to_account_;
TimestampedAverage current_;
private:
DISALLOW_COPY_AND_ASSIGN(LatencyTracker);
};
private:
HostState state() const {
return state_.load(MEMORY_ORDER_ACQUIRE);
}
void set_state(HostState state) {
state_.store(state, MEMORY_ORDER_RELEASE);
}
Address address_;
uint32_t rack_id_;
uint32_t dc_id_;
bool mark_;
Atomic<HostState> state_;
std::string address_string_;
std::string listen_address_;
VersionNumber cassandra_version_;
std::string hostname_;
std::string rack_;
std::string dc_;
ScopedPtr<LatencyTracker> latency_tracker_;
private:
DISALLOW_COPY_AND_ASSIGN(Host);
};
typedef std::map<Address, SharedRefPtr<Host> > HostMap;
struct GetHost {
typedef std::pair<Address, Host::Ptr> Pair;
Host::Ptr operator()(const Pair& pair) const {
return pair.second;
}
};
typedef std::pair<Address, SharedRefPtr<Host> > HostPair;
typedef std::vector<SharedRefPtr<Host> > HostVec;
typedef CopyOnWritePtr<HostVec> CopyOnWriteHostVec;
void add_host(CopyOnWriteHostVec& hosts, const SharedRefPtr<Host>& host);
void remove_host(CopyOnWriteHostVec& hosts, const SharedRefPtr<Host>& host);
} // namespace cass
#endif