Skip to content

Commit

Permalink
add connection idle time support. if the connection has been idle for…
Browse files Browse the repository at this point in the history
… a long time, reconnect it.
  • Loading branch information
sewenew committed Jul 29, 2021
1 parent e8e12e2 commit e2e1bbf
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 11 deletions.
3 changes: 2 additions & 1 deletion src/sw/redis++/async_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ AsyncConnection::AsyncConnection(const ConnectionOptions &opts,
AsyncConnectionMode mode) :
_opts(opts),
_loop(loop),
_create_time(std::chrono::steady_clock::now()) {
_create_time(std::chrono::steady_clock::now()),
_last_active(std::chrono::steady_clock::now()) {
assert(_loop != nullptr);

switch (mode) {
Expand Down
11 changes: 11 additions & 0 deletions src/sw/redis++/async_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ class AsyncConnection : public std::enable_shared_from_this<AsyncConnection> {
return _create_time;
}

auto last_active() const
-> std::chrono::time_point<std::chrono::steady_clock> {
return _last_active;
}

void disconnect(std::exception_ptr err);

template <typename Result, typename ResultParser>
Expand Down Expand Up @@ -123,6 +128,8 @@ class AsyncConnection : public std::enable_shared_from_this<AsyncConnection> {
redisAsyncContext& _context() {
assert(_ctx != nullptr);

_last_active = std::chrono::steady_clock::now();

return *_ctx;
}

Expand Down Expand Up @@ -179,6 +186,10 @@ class AsyncConnection : public std::enable_shared_from_this<AsyncConnection> {
// The time that the connection is created.
std::chrono::time_point<std::chrono::steady_clock> _create_time{};

// The time that the connection is created or the time that
// the connection is recently used, i.e. `_context()` is called.
std::atomic<std::chrono::time_point<std::chrono::steady_clock>> _last_active{};

std::vector<std::unique_ptr<AsyncEvent>> _events;

std::atomic<State> _state{State::NOT_CONNECTED};
Expand Down
16 changes: 12 additions & 4 deletions src/sw/redis++/async_connection_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ AsyncConnectionSPtr AsyncConnectionPool::fetch() {
auto connection = _fetch();

auto connection_lifetime = _pool_opts.connection_lifetime;
auto connection_idle_time = _pool_opts.connection_idle_time;

if (_sentinel) {
auto opts = _opts;
Expand All @@ -149,7 +150,7 @@ AsyncConnectionSPtr AsyncConnectionPool::fetch() {

lock.unlock();

if (role_changed || _need_reconnect(*connection, connection_lifetime)) {
if (role_changed || _need_reconnect(*connection, connection_lifetime, connection_idle_time)) {
try {
auto tmp_connection = sentinel.create(opts, shared_from_this(), _loop.get());

Expand All @@ -172,7 +173,7 @@ AsyncConnectionSPtr AsyncConnectionPool::fetch() {

assert(connection);

if (_need_reconnect(*connection, connection_lifetime)) {
if (_need_reconnect(*connection, connection_lifetime, connection_idle_time)) {
try {
auto tmp_connection = _create();

Expand Down Expand Up @@ -310,18 +311,25 @@ void AsyncConnectionPool::_wait_for_connection(std::unique_lock<std::mutex> &loc
}

bool AsyncConnectionPool::_need_reconnect(const AsyncConnection &connection,
const std::chrono::milliseconds &connection_lifetime) const {
const std::chrono::milliseconds &connection_lifetime,
const std::chrono::milliseconds &connection_idle_time) const {
if (connection.broken()) {
return true;
}

auto now = std::chrono::steady_clock::now();
if (connection_lifetime > std::chrono::milliseconds(0)) {
auto now = std::chrono::steady_clock::now();
if (now - connection.create_time() > connection_lifetime) {
return true;
}
}

if (connection_idle_time > std::chrono::milliseconds(0)) {
if (now - connection.last_active() > connection_idle_time) {
return true;
}
}

return false;
}

Expand Down
3 changes: 2 additions & 1 deletion src/sw/redis++/async_connection_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ class AsyncConnectionPool : public std::enable_shared_from_this<AsyncConnectionP
void _wait_for_connection(std::unique_lock<std::mutex> &lock);

bool _need_reconnect(const AsyncConnection &connection,
const std::chrono::milliseconds &connection_lifetime) const;
const std::chrono::milliseconds &connection_lifetime,
const std::chrono::milliseconds &connection_idle_time) const;

void _update_connection_opts(const std::string &host, int port) {
_opts.host = host;
Expand Down
1 change: 1 addition & 0 deletions src/sw/redis++/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ void swap(Connection &lhs, Connection &rhs) noexcept {
Connection::Connection(const ConnectionOptions &opts) :
_ctx(Connector(opts).connect()),
_create_time(std::chrono::steady_clock::now()),
_last_active(std::chrono::steady_clock::now()),
_opts(opts) {
assert(_ctx && !broken());

Expand Down
11 changes: 11 additions & 0 deletions src/sw/redis++/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ class Connection {
return _create_time;
}

auto last_active() const
-> std::chrono::time_point<std::chrono::steady_clock> {
return _last_active;
}

template <typename ...Args>
void send(const char *format, Args &&...args);

Expand Down Expand Up @@ -185,6 +190,10 @@ class Connection {
// The time that the connection is created.
std::chrono::time_point<std::chrono::steady_clock> _create_time{};

// The time that the connection is created or the time that
// the connection is recently used, i.e. `_context()` is called.
std::chrono::time_point<std::chrono::steady_clock> _last_active{};

ConnectionOptions _opts;

// TODO: define _tls_ctx before _ctx
Expand Down Expand Up @@ -216,6 +225,8 @@ inline void Connection::send(const char *format, Args &&...args) {
}

inline redisContext* Connection::_context() {
_last_active = std::chrono::steady_clock::now();

return _ctx.get();
}

Expand Down
16 changes: 12 additions & 4 deletions src/sw/redis++/connection_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ Connection ConnectionPool::fetch() {
auto connection = _fetch();

auto connection_lifetime = _pool_opts.connection_lifetime;
auto connection_idle_time = _pool_opts.connection_idle_time;

if (_sentinel) {
auto opts = _opts;
Expand All @@ -101,7 +102,7 @@ Connection ConnectionPool::fetch() {

lock.unlock();

if (role_changed || _need_reconnect(connection, connection_lifetime)) {
if (role_changed || _need_reconnect(connection, connection_lifetime, connection_idle_time)) {
try {
connection = _create(sentinel, opts, false);
} catch (const Error &e) {
Expand All @@ -116,7 +117,7 @@ Connection ConnectionPool::fetch() {

lock.unlock();

if (_need_reconnect(connection, connection_lifetime)) {
if (_need_reconnect(connection, connection_lifetime, connection_idle_time)) {
try {
connection.reconnect();
} catch (const Error &e) {
Expand Down Expand Up @@ -248,18 +249,25 @@ void ConnectionPool::_wait_for_connection(std::unique_lock<std::mutex> &lock) {
}

bool ConnectionPool::_need_reconnect(const Connection &connection,
const std::chrono::milliseconds &connection_lifetime) const {
const std::chrono::milliseconds &connection_lifetime,
const std::chrono::milliseconds &connection_idle_time) const {
if (connection.broken()) {
return true;
}

auto now = std::chrono::steady_clock::now();
if (connection_lifetime > std::chrono::milliseconds(0)) {
auto now = std::chrono::steady_clock::now();
if (now - connection.create_time() > connection_lifetime) {
return true;
}
}

if (connection_idle_time > std::chrono::milliseconds(0)) {
if (now - connection.last_active() > connection_idle_time) {
return true;
}
}

return false;
}

Expand Down
6 changes: 5 additions & 1 deletion src/sw/redis++/connection_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ struct ConnectionPoolOptions {

// Max lifetime of a connection. 0ms means we never expire the connection.
std::chrono::milliseconds connection_lifetime{0};

// Max idle time of a connection. 0ms means we never expire the connection.
std::chrono::milliseconds connection_idle_time{0};
};

class ConnectionPool {
Expand Down Expand Up @@ -85,7 +88,8 @@ class ConnectionPool {
void _wait_for_connection(std::unique_lock<std::mutex> &lock);

bool _need_reconnect(const Connection &connection,
const std::chrono::milliseconds &connection_lifetime) const;
const std::chrono::milliseconds &connection_lifetime,
const std::chrono::milliseconds &connection_idle_time) const;

void _update_connection_opts(const std::string &host, int port) {
_opts.host = host;
Expand Down

0 comments on commit e2e1bbf

Please sign in to comment.