diff --git a/src/sw/redis++/async_connection.cpp b/src/sw/redis++/async_connection.cpp index 38e58166..6070d7c8 100644 --- a/src/sw/redis++/async_connection.cpp +++ b/src/sw/redis++/async_connection.cpp @@ -143,8 +143,12 @@ void AsyncConnection::connect_callback(std::exception_ptr err) { _authing_callback(); break; + case State::SELECTING_DB: + _select_db_callback(); + break; + default: - assert(_state == State::SELECTING_DB); + assert(_state == State::ENABLE_READONLY); _set_ready(); } @@ -255,6 +259,8 @@ void AsyncConnection::_connecting_callback() { _auth(); } else if (_need_select_db()) { _select_db(); + } else if (_need_enable_readonly()) { + _enable_readonly(); } else { _set_ready(); } @@ -263,6 +269,16 @@ void AsyncConnection::_connecting_callback() { void AsyncConnection::_authing_callback() { if (_need_select_db()) { _select_db(); + } else if (_need_enable_readonly()) { + _enable_readonly(); + } else { + _set_ready(); + } +} + +void AsyncConnection::_select_db_callback() { + if (_need_enable_readonly()) { + _enable_readonly(); } else { _set_ready(); } @@ -299,6 +315,16 @@ void AsyncConnection::_select_db() { _state = State::SELECTING_DB; } +void AsyncConnection::_enable_readonly() { + assert(!broken()); + + if (redisAsyncCommand(_ctx, set_options_callback, nullptr, "READONLY") != REDIS_OK) { + throw Error("failed to send readonly command"); + } + + _state = State::ENABLE_READONLY; +} + void AsyncConnection::_set_ready() { _state = State::READY; @@ -356,6 +382,10 @@ bool AsyncConnection::_need_select_db() const { return _opts.db != 0; } +bool AsyncConnection::_need_enable_readonly() const { + return _opts.readonly; +} + void AsyncConnection::_clean_async_context(void *data) { auto *ctx = static_cast(data); diff --git a/src/sw/redis++/async_connection.h b/src/sw/redis++/async_connection.h index 6cccb964..727a16be 100644 --- a/src/sw/redis++/async_connection.h +++ b/src/sw/redis++/async_connection.h @@ -122,7 +122,8 @@ class AsyncConnection : public std::enable_shared_from_this { AUTHING, SELECTING_DB, READY, - WAIT_SENTINEL + WAIT_SENTINEL, + ENABLE_READONLY }; redisAsyncContext& _context() { @@ -137,6 +138,8 @@ class AsyncConnection : public std::enable_shared_from_this { void _authing_callback(); + void _select_db_callback(); + bool _need_auth() const; void _auth(); @@ -145,6 +148,10 @@ class AsyncConnection : public std::enable_shared_from_this { void _select_db(); + bool _need_enable_readonly() const; + + void _enable_readonly(); + void _set_ready(); void _connect_with_sentinel(); @@ -387,6 +394,12 @@ class ClusterEvent : public CommandEvent { } private: + enum class State { + NORMAL = 0, + MOVED, + ASKING + }; + static void _cluster_reply_callback(redisAsyncContext * /*ctx*/, void *r, void *privdata) { auto event = static_cast *>(privdata); @@ -406,9 +419,24 @@ class ClusterEvent : public CommandEvent { event->_pool->update(event->_key, AsyncEventUPtr(event)); return; } catch (const MovedError &err) { + switch (event->_state) { + case State::MOVED: + throw Error("too many moved error"); + break; + + case State::ASKING: + throw Error("Slot migrating..."); + break; + + default: + break; + } + + event->_state = State::MOVED; event->_pool->update(event->_key, AsyncEventUPtr(event)); return; } catch (const AskError &err) { + event->_state = State::ASKING; auto pool = event->_pool->fetch(err.node()); assert(pool); GuardedAsyncConnection connection(pool); @@ -430,6 +458,8 @@ class ClusterEvent : public CommandEvent { std::shared_ptr _pool; std::string _key; + + State _state = State::NORMAL; }; template