forked from ClickHouse/ClickHouse
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPoolWithFailover.cpp
134 lines (114 loc) · 4.83 KB
/
PoolWithFailover.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#include "PoolWithFailover.h"
#if USE_LIBPQXX
#include "Utils.h"
#include <Common/parseRemoteDescription.h>
#include <Common/Exception.h>
#include <Common/quoteString.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
namespace DB
{
namespace ErrorCodes
{
extern const int POSTGRESQL_CONNECTION_FAILURE;
}
}
namespace postgres
{
PoolWithFailover::PoolWithFailover(
const DB::ExternalDataSourcesConfigurationByPriority & configurations_by_priority,
size_t pool_size,
size_t pool_wait_timeout_,
size_t max_tries_,
bool auto_close_connection_)
: pool_wait_timeout(pool_wait_timeout_)
, max_tries(max_tries_)
, auto_close_connection(auto_close_connection_)
{
LOG_TRACE(&Poco::Logger::get("PostgreSQLConnectionPool"), "PostgreSQL connection pool size: {}, connection wait timeout: {}, max failover tries: {}",
pool_size, pool_wait_timeout, max_tries_);
for (const auto & [priority, configurations] : configurations_by_priority)
{
for (const auto & replica_configuration : configurations)
{
auto connection_info = formatConnectionString(replica_configuration.database,
replica_configuration.host, replica_configuration.port, replica_configuration.username, replica_configuration.password);
replicas_with_priority[priority].emplace_back(connection_info, pool_size);
}
}
}
PoolWithFailover::PoolWithFailover(
const DB::StoragePostgreSQLConfiguration & configuration,
size_t pool_size,
size_t pool_wait_timeout_,
size_t max_tries_,
bool auto_close_connection_)
: pool_wait_timeout(pool_wait_timeout_)
, max_tries(max_tries_)
, auto_close_connection(auto_close_connection_)
{
LOG_TRACE(&Poco::Logger::get("PostgreSQLConnectionPool"), "PostgreSQL connection pool size: {}, connection wait timeout: {}, max failover tries: {}",
pool_size, pool_wait_timeout, max_tries_);
/// Replicas have the same priority, but traversed replicas are moved to the end of the queue.
for (const auto & [host, port] : configuration.addresses)
{
LOG_DEBUG(&Poco::Logger::get("PostgreSQLPoolWithFailover"), "Adding address host: {}, port: {} to connection pool", host, port);
auto connection_string = formatConnectionString(configuration.database, host, port, configuration.username, configuration.password);
replicas_with_priority[0].emplace_back(connection_string, pool_size);
}
}
ConnectionHolderPtr PoolWithFailover::get()
{
std::lock_guard lock(mutex);
DB::WriteBufferFromOwnString error_message;
for (size_t try_idx = 0; try_idx < max_tries; ++try_idx)
{
for (auto & priority : replicas_with_priority)
{
auto & replicas = priority.second;
for (size_t i = 0; i < replicas.size(); ++i)
{
auto & replica = replicas[i];
ConnectionPtr connection;
auto connection_available = replica.pool->tryBorrowObject(connection, []() { return nullptr; }, pool_wait_timeout);
if (!connection_available)
{
LOG_WARNING(log, "Unable to fetch connection within the timeout");
continue;
}
try
{
/// Create a new connection or reopen an old connection if it became invalid.
if (!connection)
{
connection = std::make_unique<Connection>(replica.connection_info);
LOG_DEBUG(log, "New connection to {}", connection->getInfoForLog());
}
connection->connect();
}
catch (const pqxx::broken_connection & pqxx_error)
{
LOG_ERROR(log, "Connection error: {}", pqxx_error.what());
error_message << fmt::format(
"Try {}. Connection to {} failed with error: {}\n",
try_idx + 1, DB::backQuote(replica.connection_info.host_port), pqxx_error.what());
replica.pool->returnObject(std::move(connection));
continue;
}
catch (...)
{
replica.pool->returnObject(std::move(connection));
throw;
}
auto connection_holder = std::make_unique<ConnectionHolder>(replica.pool, std::move(connection), auto_close_connection);
/// Move all traversed replicas to the end.
if (replicas.size() > 1)
std::rotate(replicas.begin(), replicas.begin() + i + 1, replicas.end());
return connection_holder;
}
}
}
throw DB::Exception(DB::ErrorCodes::POSTGRESQL_CONNECTION_FAILURE, error_message.str());
}
}
#endif