-
Notifications
You must be signed in to change notification settings - Fork 97
/
Copy pathConnectionPoolImpl.cpp
470 lines (391 loc) · 14.6 KB
/
ConnectionPoolImpl.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
#include <atomic>
#include <iostream>
#include <thread>
#include <future>
#include <queue>
#include <mutex>
#include <boost/utility/string_ref.hpp>
#include "restc-cpp/restc-cpp.h"
#include "restc-cpp/Connection.h"
#include "restc-cpp/ConnectionPool.h"
#include "restc-cpp/logging.h"
#include "restc-cpp/error.h"
#include "restc-cpp/internals/helpers.h"
#include "ConnectionImpl.h"
#include "SocketImpl.h"
#ifdef RESTC_CPP_WITH_TLS
# include "TlsSocketImpl.h"
#endif
using namespace std;
#if __cplusplus < 201703L
# include <boost/unordered_map.hpp>
#endif
namespace restc_cpp {
class ConnectionPoolImpl
: public ConnectionPool
, public std::enable_shared_from_this<ConnectionPoolImpl> {
public:
struct Key {
Key(boost::asio::ip::tcp::endpoint ep, Connection::Type connectionType)
: endpoint(std::move(ep)), type(connectionType) {}
Key(const Key&) = default;
Key(Key&&) = default;
~Key() = default;
Key& operator=(const Key&) = delete;
Key& operator=(Key&&) = delete;
bool operator<(const Key& key) const {
if (static_cast<int>(type) < static_cast<int>(key.type)) {
return true;
}
if (static_cast<int>(type) > static_cast<int>(key.type)) {
return false;
}
return endpoint < key.endpoint;
}
bool operator==(const Key& key) const {
return type == key.type && endpoint == key.endpoint;
}
friend std::ostream& operator<<(std::ostream& o, const Key& v) {
return o << "{Key "
<< (v.type == Connection::Type::HTTPS ? "https" : "http")
<< "://"
<< v.endpoint
<< "}";
}
// Custom hash function
struct KeyHash {
std::size_t operator()(const Key& key) const {
std::size_t h1 = 0;
// Hash the binary address data
if (key.endpoint.address().is_v4()) {
// IPv4: 4 bytes
const auto addr = key.endpoint.address().to_v4().to_bytes();
h1 = std::hash<uint32_t>()(*reinterpret_cast<const uint32_t*>(addr.data()));
} else if (key.endpoint.address().is_v6()) {
// IPv6: 16 bytes
const auto addr = key.endpoint.address().to_v6().to_bytes();
const uint64_t* parts = reinterpret_cast<const uint64_t*>(addr.data());
h1 = std::hash<uint64_t>()(parts[0]) ^ std::hash<uint64_t>()(parts[1]);
}
// Hash the port and type
std::size_t h2 = std::hash<unsigned short>()(key.endpoint.port());
std::size_t h3 = std::hash<int>()(static_cast<int>(key.type));
// Combine the hashes
return h1 ^ (h2 << 1) ^ (h3 << 2);
}
};
// Equality comparison for hash table
struct KeyEqual {
bool operator()(const Key& lhs, const Key& rhs) const {
return lhs == rhs;
}
};
private:
boost::asio::ip::tcp::endpoint endpoint;
Connection::Type type;
};
struct Entry {
using timestamp_t = decltype(chrono::steady_clock::now());
using ptr_t = std::shared_ptr<Entry>;
Entry(boost::asio::ip::tcp::endpoint ep,
const Connection::Type connectionType,
Connection::ptr_t conn,
const Request::Properties& prop)
: key{std::move(ep), connectionType}, connection{std::move(conn)}, ttl{prop.cacheTtlSeconds}
, created{time(nullptr)} {}
friend ostream& operator << (ostream& o, const Entry& e) {
o << "{Entry " << e.key;
if (e.connection) {
o << ' ' << *e.connection;
} else {
o << "No connection";
}
return o << '}';
}
const Key& GetKey() const noexcept { return key; }
Connection::ptr_t& GetConnection() noexcept { return connection; }
int GetTtl() const noexcept { return ttl; }
time_t GetCreated() const noexcept { return created;}
timestamp_t GetLastUsed() const noexcept {
LOCK_ALWAYS_;
return last_used;
}
void SetLastUsed(timestamp_t ts) {
LOCK_ALWAYS_;
last_used = ts;
}
private:
const Key key;
Connection::ptr_t connection;
const int ttl = 60;
const time_t created;
mutable std::mutex mutex_;
timestamp_t last_used = chrono::steady_clock::now();
};
// Owns the connection
class ConnectionWrapper : public Connection
{
public:
using release_callback_t = std::function<void (const Entry::ptr_t&)>;
ConnectionWrapper(Entry::ptr_t entry,
release_callback_t on_release)
: on_release_{std::move(on_release)}, entry_{std::move(entry)}
{
}
ConnectionWrapper() = delete;
ConnectionWrapper(const ConnectionWrapper&) = delete;
ConnectionWrapper(ConnectionWrapper&&) = delete;
ConnectionWrapper& operator = (const ConnectionWrapper& ) = delete;
ConnectionWrapper& operator = (ConnectionWrapper&& ) = delete;
Socket& GetSocket() override {
return entry_->GetConnection()->GetSocket();
}
[[nodiscard]] const Socket &GetSocket() const override
{
return entry_->GetConnection()->GetSocket();
}
[[nodiscard]] boost::uuids::uuid GetId() const override
{
return entry_->GetConnection()->GetId();
}
~ConnectionWrapper() override {
if (on_release_) {
on_release_(entry_);
}
}
private:
release_callback_t on_release_;
shared_ptr<Entry> entry_;
};
explicit ConnectionPoolImpl(RestClient& owner)
: owner_{owner}, properties_{owner.GetConnectionProperties()}
, cache_cleanup_timer_{owner.GetIoService()}
{
on_release_ = [this](const Entry::ptr_t& entry) { OnRelease(entry); };
}
Connection::ptr_t
GetConnection(const boost::asio::ip::tcp::endpoint ep,
const Connection::Type connectionType,
bool newConnectionPlease) override {
if (!newConnectionPlease) {
if (auto conn = GetFromCache(ep, connectionType)) {
RESTC_CPP_LOG_TRACE_("Reusing connection from cache "
<< *conn);
return conn;
}
if (!CanCreateNewConnection(ep, connectionType)) {
throw ConstraintException(
"Cannot create connection - too many connections");
}
}
return CreateNew(ep, connectionType);
}
// Get ctx for internal, syncronized operations;
auto& GetCtx() const {
return owner_.GetIoService();
}
size_t GetIdleConnections() const override {
LOCK_ALWAYS_;
return idle_.size();
}
void Close() override {
RESTC_CPP_LOG_TRACE_("ConnectionPoolImpl::Close: enter");
if (!closed_) {
call_once(close_once_, [this] {
RESTC_CPP_LOG_TRACE_("ConnectionPoolImpl::Close: closing *once*.");
LOCK_ALWAYS_;
closed_ = true;
cache_cleanup_timer_.cancel();
idle_.clear();
});
}
RESTC_CPP_LOG_TRACE_("ConnectionPoolImpl::Close: leave");
}
void StartTimer() {
ScheduleNextCacheCleanup();
}
private:
void ScheduleNextCacheCleanup() {
LOCK_ALWAYS_;
cache_cleanup_timer_.expires_from_now(
boost::posix_time::seconds(properties_->cacheCleanupIntervalSeconds));
cache_cleanup_timer_.async_wait([capture0 = shared_from_this()](auto &&PH1) {
capture0->OnCacheCleanup(std::forward<decltype(PH1)>(PH1));
});
}
void OnCacheCleanup(const boost::system::error_code &error)
{
RESTC_CPP_LOG_TRACE_("OnCacheCleanup: enter");
if (closed_) {
RESTC_CPP_LOG_TRACE_("OnCacheCleanup: closed");
return;
}
if (error) {
RESTC_CPP_LOG_DEBUG_("OnCacheCleanup: " << error);
return;
}
RESTC_CPP_LOG_TRACE_("OnCacheCleanup: Cleaning cache...");
const auto now = std::chrono::steady_clock::now();
{
LOCK_ALWAYS_;
for(auto it = idle_.begin(); !closed_ && it != idle_.end();) {
auto current = it;
++it;
const auto& entry = *current->second;
auto expires = entry.GetLastUsed() + std::chrono::seconds(entry.GetTtl());
if (expires < now) {
RESTC_CPP_LOG_TRACE_("Expiring " << *current->second->GetConnection());
idle_.erase(current);
} else {
RESTC_CPP_LOG_TRACE_("Keeping << " << *current->second->GetConnection()
<< " expieres in "
<< std::chrono::duration_cast<std::chrono::seconds>(expires - now).count()
<< " seconds ");
}
}
}
RESTC_CPP_LOG_TRACE_("OnCacheCleanup: schedule next");
ScheduleNextCacheCleanup();
RESTC_CPP_LOG_TRACE_("OnCacheCleanup: leave");
}
void OnRelease(const Entry::ptr_t &entry)
{
{
LOCK_ALWAYS_;
in_use_.erase(entry->GetKey());
}
if (closed_ || !entry->GetConnection()->GetSocket().IsOpen()) {
RESTC_CPP_LOG_TRACE_("Discarding " << *entry << " after use");
return;
}
RESTC_CPP_LOG_TRACE_("Recycling " << *entry << " after use");
entry->SetLastUsed(chrono::steady_clock::now());
{
LOCK_ALWAYS_;
idle_.insert({entry->GetKey(), entry});
}
}
// Check the constraints to see if we can create a new connection
bool CanCreateNewConnection(const boost::asio::ip::tcp::endpoint& ep,
const Connection::Type connectionType) {
if (closed_) {
throw ObjectExpiredException("The connection-pool is closed.");
}
promise<bool> pr;
auto result = pr.get_future();
size_t cnt = 0;
const auto key = Key{ep, connectionType};
{
LOCK_ALWAYS_;
cnt = idle_.count(key) + in_use_.count(key);
}
if (cnt >= properties_->cacheMaxConnectionsPerEndpoint) {
RESTC_CPP_LOG_DEBUG_("No more available slots for " << key);
pr.set_value(false);
return false;
}
{
LOCK_ALWAYS_;
cnt = idle_.size() + in_use_.size();
}
if (cnt >= properties_->cacheMaxConnections) {
// See if we can release an idle connection.
if (!PurgeOldestIdleEntry()) {
RESTC_CPP_LOG_DEBUG_("No more available slots (max="
<< properties_->cacheMaxConnections
<< ", used=" << cnt << ')');
pr.set_value(false);
return false;
}
}
return true;
}
bool PurgeOldestIdleEntry() {
RESTC_CPP_LOG_TRACE_("PurgeOldestIdleEntry: enter");
LOCK_ALWAYS_;
auto oldest = idle_.begin();
for (auto it = idle_.begin(); it != idle_.end(); ++it) {
if (it->second->GetLastUsed() < oldest->second->GetLastUsed()) {
oldest = it;
}
}
if (oldest != idle_.end()) {
RESTC_CPP_LOG_TRACE_("LRU-Purging " << *oldest->second);
idle_.erase(oldest);
RESTC_CPP_LOG_TRACE_("PurgeOldestIdleEntry: success");
return true;
}
RESTC_CPP_LOG_TRACE_("PurgeOldestIdleEntry: failed");
return false;
}
// Get a connection from the cache if it's there.
Connection::ptr_t GetFromCache(const boost::asio::ip::tcp::endpoint& ep,
const Connection::Type connectionType) {
if (closed_) {
throw ObjectExpiredException("The connection-pool is closed.");
}
promise<Connection::ptr_t> pr;
auto result = pr.get_future();
LOCK_ALWAYS_;
const auto key = Key{ep, connectionType};
auto it = idle_.find(key);
if (it != idle_.end()) {
auto wrapper = make_unique<ConnectionWrapper>(it->second, on_release_);
in_use_.insert(*it);
idle_.erase(it);
return wrapper;
}
return {};
}
Connection::ptr_t CreateNew(const boost::asio::ip::tcp::endpoint& ep,
const Connection::Type connectionType) {
unique_ptr<Socket> socket;
if (connectionType == Connection::Type::HTTP) {
socket = make_unique<SocketImpl>(owner_.GetIoService());
}
else {
#ifdef RESTC_CPP_WITH_TLS
socket = make_unique<TlsSocketImpl>(owner_.GetIoService(), owner_.GetTLSContext());
#else
throw NotImplementedException(
"restc_cpp is compiled without TLS support");
#endif
}
auto entry = make_shared<Entry>(ep, connectionType,
make_shared<ConnectionImpl>(std::move(socket)),
*properties_);
RESTC_CPP_LOG_TRACE_("Created new connection " << *entry);
promise<Connection::ptr_t> pr;
auto result = pr.get_future();
{
LOCK_ALWAYS_;
in_use_.insert({entry->GetKey(), entry});
}
return make_unique<ConnectionWrapper>(entry, on_release_);
}
#ifdef RESTC_CPP_THREADED_CTX
std::atomic_bool closed_{false};
#else
bool closed_ = false;
#endif
std::once_flag close_once_;
RestClient& owner_;
#if __cplusplus < 201703L
boost::unordered_multimap<Key, Entry::ptr_t, Key::KeyHash, Key::KeyEqual> idle_;
boost::unordered_multimap<Key, std::weak_ptr<Entry>, Key::KeyHash, Key::KeyEqual> in_use_;
#else
std::unordered_multimap<Key, Entry::ptr_t, Key::KeyHash, Key::KeyEqual> idle_;
std::unordered_multimap<Key, std::weak_ptr<Entry>, Key::KeyHash, Key::KeyEqual> in_use_;
#endif
const Request::Properties::ptr_t properties_;
ConnectionWrapper::release_callback_t on_release_;
boost::asio::deadline_timer cache_cleanup_timer_;
mutable std::mutex mutex_;
}; // ConnectionPoolImpl
ConnectionPool::ptr_t
ConnectionPool::Create(RestClient& owner) {
auto instance = make_shared<ConnectionPoolImpl>(owner);
instance->StartTimer();
return instance;
}
} // restc_cpp