Skip to content

Commit

Permalink
bug fix: 1. failed to send READONLY command in async cluster slave mo…
Browse files Browse the repository at this point in the history
…de; 2. avoid too many MOVED error redirection
  • Loading branch information
sewenew committed Dec 9, 2021
1 parent ab56ea7 commit c5652b6
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 2 deletions.
32 changes: 31 additions & 1 deletion src/sw/redis++/async_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,12 @@ void AsyncConnection::connect_callback(std::exception_ptr err) {
_authing_callback();
break;

case State::SELECTING_DB:
_select_db_callback();
break;

default:
assert(_state == State::SELECTING_DB);
assert(_state == State::ENABLE_READONLY);

_set_ready();
}
Expand Down Expand Up @@ -255,6 +259,8 @@ void AsyncConnection::_connecting_callback() {
_auth();
} else if (_need_select_db()) {
_select_db();
} else if (_need_enable_readonly()) {
_enable_readonly();
} else {
_set_ready();
}
Expand All @@ -263,6 +269,16 @@ void AsyncConnection::_connecting_callback() {
void AsyncConnection::_authing_callback() {
if (_need_select_db()) {
_select_db();
} else if (_need_enable_readonly()) {
_enable_readonly();
} else {
_set_ready();
}
}

void AsyncConnection::_select_db_callback() {
if (_need_enable_readonly()) {
_enable_readonly();
} else {
_set_ready();
}
Expand Down Expand Up @@ -299,6 +315,16 @@ void AsyncConnection::_select_db() {
_state = State::SELECTING_DB;
}

void AsyncConnection::_enable_readonly() {
assert(!broken());

if (redisAsyncCommand(_ctx, set_options_callback, nullptr, "READONLY") != REDIS_OK) {
throw Error("failed to send readonly command");
}

_state = State::ENABLE_READONLY;
}

void AsyncConnection::_set_ready() {
_state = State::READY;

Expand Down Expand Up @@ -356,6 +382,10 @@ bool AsyncConnection::_need_select_db() const {
return _opts.db != 0;
}

bool AsyncConnection::_need_enable_readonly() const {
return _opts.readonly;
}

void AsyncConnection::_clean_async_context(void *data) {
auto *ctx = static_cast<AsyncContext *>(data);

Expand Down
32 changes: 31 additions & 1 deletion src/sw/redis++/async_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ class AsyncConnection : public std::enable_shared_from_this<AsyncConnection> {
AUTHING,
SELECTING_DB,
READY,
WAIT_SENTINEL
WAIT_SENTINEL,
ENABLE_READONLY
};

redisAsyncContext& _context() {
Expand All @@ -137,6 +138,8 @@ class AsyncConnection : public std::enable_shared_from_this<AsyncConnection> {

void _authing_callback();

void _select_db_callback();

bool _need_auth() const;

void _auth();
Expand All @@ -145,6 +148,10 @@ class AsyncConnection : public std::enable_shared_from_this<AsyncConnection> {

void _select_db();

bool _need_enable_readonly() const;

void _enable_readonly();

void _set_ready();

void _connect_with_sentinel();
Expand Down Expand Up @@ -387,6 +394,12 @@ class ClusterEvent : public CommandEvent<Result, ResultParser> {
}

private:
enum class State {
NORMAL = 0,
MOVED,
ASKING
};

static void _cluster_reply_callback(redisAsyncContext * /*ctx*/, void *r, void *privdata) {
auto event = static_cast<ClusterEvent<Result, ResultParser> *>(privdata);

Expand All @@ -406,9 +419,24 @@ class ClusterEvent : public CommandEvent<Result, ResultParser> {
event->_pool->update(event->_key, AsyncEventUPtr(event));
return;
} catch (const MovedError &err) {
switch (event->_state) {
case State::MOVED:
throw Error("too many moved error");
break;

case State::ASKING:
throw Error("Slot migrating...");
break;

default:
break;
}

event->_state = State::MOVED;
event->_pool->update(event->_key, AsyncEventUPtr(event));
return;
} catch (const AskError &err) {
event->_state = State::ASKING;
auto pool = event->_pool->fetch(err.node());
assert(pool);
GuardedAsyncConnection connection(pool);
Expand All @@ -430,6 +458,8 @@ class ClusterEvent : public CommandEvent<Result, ResultParser> {
std::shared_ptr<AsyncShardsPool> _pool;

std::string _key;

State _state = State::NORMAL;
};

template <typename Result, typename ResultParser>
Expand Down

0 comments on commit c5652b6

Please sign in to comment.