Skip to content

Commit

Permalink
add sentinel support for async interface
Browse files Browse the repository at this point in the history
  • Loading branch information
sewenew committed Jul 11, 2021
1 parent 99fe55d commit ef65d7f
Show file tree
Hide file tree
Showing 11 changed files with 465 additions and 87 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ if(REDIS_PLUS_PLUS_BUILD_ASYNC)
"${REDIS_PLUS_PLUS_SOURCE_DIR}/async_connection_pool.cpp"
"${REDIS_PLUS_PLUS_SOURCE_DIR}/async_redis.cpp"
"${REDIS_PLUS_PLUS_SOURCE_DIR}/event_loop.cpp"
"${REDIS_PLUS_PLUS_SOURCE_DIR}/async_sentinel.cpp"
)
endif()

Expand Down
71 changes: 59 additions & 12 deletions src/sw/redis++/async_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,27 @@ void FormattedCommand::_move(FormattedCommand &&that) noexcept {
that._size = 0;
}

AsyncConnection::AsyncConnection(const ConnectionOptions &opts, EventLoop *loop) :
AsyncConnection::AsyncConnection(const ConnectionOptions &opts,
EventLoop *loop,
AsyncConnectionMode mode) :
_opts(opts),
_loop(loop),
_create_time(std::chrono::steady_clock::now()),
_state(State::NOT_CONNECTED) {
_create_time(std::chrono::steady_clock::now()) {
assert(_loop != nullptr);

switch (mode) {
case AsyncConnectionMode::SINGLE:
_state = State::NOT_CONNECTED;
break;

case AsyncConnectionMode::SENTINEL:
_state = State::WAIT_SENTINEL;
break;

default:
throw Error("not supporeted async connection mode");
break;
}
}

AsyncConnection::~AsyncConnection() {
Expand All @@ -94,6 +109,10 @@ AsyncConnection::~AsyncConnection() {
void AsyncConnection::event_callback() {
// NOTE: we should try our best not throw in these callbacks
switch (_state.load()) {
case State::WAIT_SENTINEL:
_connect_with_sentinel();
break;

case State::NOT_CONNECTED:
_connect();
break;
Expand Down Expand Up @@ -141,13 +160,11 @@ void AsyncConnection::connect_callback(std::exception_ptr err) {
}

void AsyncConnection::disconnect(std::exception_ptr err) {
if (_ctx == nullptr) {
return;
}
if (_ctx != nullptr) {
_disable_disconnect_callback();

_disable_disconnect_callback();

redisAsyncDisconnect(_ctx);
redisAsyncDisconnect(_ctx);
}

_fail_events(err);
}
Expand All @@ -156,6 +173,19 @@ void AsyncConnection::disconnect_callback(std::exception_ptr err) {
_fail_events(err);
}

ConnectionOptions AsyncConnection::options() {
std::lock_guard<std::mutex> lock(_mtx);

return _opts;
}

void AsyncConnection::update_node_info(const std::string &host, int port) {
std::lock_guard<std::mutex> lock(_mtx);

_opts.host = host;
_opts.port = port;
}

void AsyncConnection::_disable_disconnect_callback() {
assert(_ctx != nullptr);

Expand Down Expand Up @@ -282,9 +312,26 @@ void AsyncConnection::_set_ready() {
_send();
}

void AsyncConnection::_connect_with_sentinel() {
try {
auto opts = options();
if (opts.host.empty()) {
// Still waiting for sentinel.
return;
}

// Already got node info from sentinel
_state = State::NOT_CONNECTED;

_connect();
} catch (const Error &err) {
_fail_events(std::current_exception());
}
}

void AsyncConnection::_connect() {
try {
auto ctx = _connect(_opts);
auto ctx = _connect(options());

assert(ctx && ctx->err == REDIS_OK);

Expand All @@ -294,7 +341,7 @@ void AsyncConnection::_connect() {

_state = State::CONNECTING;
} catch (const Error &err) {
_fail_events(std::make_exception_ptr(err));
_fail_events(std::current_exception());
}
}

Expand All @@ -318,7 +365,7 @@ AsyncConnection::AsyncContextUPtr AsyncConnection::_connect(const ConnectionOpti
redisAsyncContext *context = nullptr;
switch (opts.type) {
case ConnectionType::TCP:
context = redisAsyncConnect(opts.host.c_str(), _opts.port);
context = redisAsyncConnect(opts.host.c_str(), opts.port);
break;

case ConnectionType::UNIX:
Expand Down
17 changes: 16 additions & 1 deletion src/sw/redis++/async_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,17 @@ class AsyncEvent {

using AsyncEventUPtr = std::unique_ptr<AsyncEvent>;

enum class AsyncConnectionMode {
SINGLE = 0,
SENTINEL,
CLUSTER
};

class AsyncConnection : public std::enable_shared_from_this<AsyncConnection> {
public:
AsyncConnection(const ConnectionOptions &opts, EventLoop *loop);
AsyncConnection(const ConnectionOptions &opts,
EventLoop *loop,
AsyncConnectionMode = AsyncConnectionMode::SINGLE);

AsyncConnection(const AsyncConnection &) = delete;
AsyncConnection& operator=(const AsyncConnection &) = delete;
Expand Down Expand Up @@ -127,6 +135,10 @@ class AsyncConnection : public std::enable_shared_from_this<AsyncConnection> {

void disconnect_callback(std::exception_ptr err);

ConnectionOptions options();

void update_node_info(const std::string &host, int port);

private:
enum class State {
BROKEN = 0,
Expand All @@ -135,6 +147,7 @@ class AsyncConnection : public std::enable_shared_from_this<AsyncConnection> {
AUTHING,
SELECTING_DB,
READY,
WAIT_SENTINEL
};

redisAsyncContext& _context() {
Expand All @@ -157,6 +170,8 @@ class AsyncConnection : public std::enable_shared_from_this<AsyncConnection> {

void _set_ready();

void _connect_with_sentinel();

void _connect();

void _disable_disconnect_callback();
Expand Down
Loading

0 comments on commit ef65d7f

Please sign in to comment.