Skip to content

Commit

Permalink
add ASKING redirection support for async interface
Browse files Browse the repository at this point in the history
  • Loading branch information
sewenew committed Jul 25, 2021
1 parent a9558c1 commit 74b10b7
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 60 deletions.
18 changes: 18 additions & 0 deletions src/sw/redis++/async_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,24 @@ AsyncConnection::AsyncContextUPtr AsyncConnection::_connect(const ConnectionOpti
return ctx;
}

GuardedAsyncConnection::GuardedAsyncConnection(const AsyncConnectionPoolSPtr &pool) :
_pool(pool), _connection(_pool->fetch()) {
assert(!_connection->broken());
}

GuardedAsyncConnection::~GuardedAsyncConnection() {
// If `GuardedAsyncConnection` has been moved, `_pool` will be nullptr.
if (_pool) {
_pool->release(std::move(_connection));
}
}

AsyncConnection& GuardedAsyncConnection::connection() {
assert(_connection);

return *_connection;
}

}

}
113 changes: 86 additions & 27 deletions src/sw/redis++/async_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ struct DefaultResultParser {
};

class AsyncConnection;
class AsyncConnectionPool;
class AsyncShardsPool;

class AsyncEvent {
Expand Down Expand Up @@ -278,6 +279,87 @@ class CommandEvent : public AsyncEvent {
template <typename Result, typename ResultParser>
using CommandEventUPtr = std::unique_ptr<CommandEvent<Result, ResultParser>>;

class AskingEvent : public AsyncEvent {
public:
explicit AskingEvent(AsyncEvent *event) : _event(event) {}

~AskingEvent() {
if (_event != nullptr) {
delete _event;
}
}

virtual void handle(redisAsyncContext &ctx) override {
if (redisAsyncCommand(&ctx, _asking_callback, this, "ASKING") != REDIS_OK) {
throw_error(ctx.c, "failed to send ASKING command");
}
}

virtual void set_exception(std::exception_ptr err) override {
assert(_event != nullptr);

_event->set_exception(err);
}

private:
static void _asking_callback(redisAsyncContext *ctx, void *r, void *privdata) {
auto event = static_cast<AskingEvent *>(privdata);

assert(event != nullptr);

try {
redisReply *reply = static_cast<redisReply *>(r);
if (reply == nullptr) {
event->set_exception(std::make_exception_ptr(Error("connection has been closed")));
} else if (reply::is_error(*reply)) {
try {
throw_error(*reply);
} catch (const Error &e) {
event->set_exception(std::current_exception());
}
} else {
reply::parse<void>(*reply);

assert(ctx != nullptr);
auto *context = static_cast<AsyncContext *>(ctx->data);
assert(context != nullptr);
auto &conn = context->connection;
assert(conn);

conn->send(AsyncEventUPtr(event->_event));
event->_event = nullptr;
}
} catch (...) {
event->set_exception(std::current_exception());
}

delete event;
}

AsyncEvent *_event = nullptr;
};

// NOTE: This class is similar to `SafeAsyncConnection`.
// The difference is that `SafeAsyncConnection` tries to avoid copying a std::shared_ptr.
class GuardedAsyncConnection {
public:
explicit GuardedAsyncConnection(const std::shared_ptr<AsyncConnectionPool> &pool);

GuardedAsyncConnection(const GuardedAsyncConnection &) = delete;
GuardedAsyncConnection& operator=(const GuardedAsyncConnection &) = delete;

GuardedAsyncConnection(GuardedAsyncConnection &&) = default;
GuardedAsyncConnection& operator=(GuardedAsyncConnection &&) = default;

~GuardedAsyncConnection();

AsyncConnection& connection();

private:
std::shared_ptr<AsyncConnectionPool> _pool;
std::shared_ptr<AsyncConnection> _connection;
};

template <typename Result, typename ResultParser>
class ClusterEvent : public CommandEvent<Result, ResultParser> {
public:
Expand Down Expand Up @@ -315,22 +397,11 @@ class ClusterEvent : public CommandEvent<Result, ResultParser> {
event->_pool->update(event->_key, AsyncEventUPtr(event));
return;
} catch (const AskError &err) {
/*
auto pool = event->_pool->fetch(err.node());
assert(pool);
GuardedAsyncConnection connection(pool);
connection.connection();
// 1. send ASKING command.
_asking(connection);
// 2. resend last command.
try {
return _command(cmd, connection, std::forward<Args>(args)...);
} catch (const MovedError &err) {
throw Error("Slot migrating... ASKING node hasn't been set to IMPORTING state");
}
*/
connection.connection().send(AsyncEventUPtr(new AskingEvent(event)));
return;
} catch (const Error &e) {
event->set_exception(std::current_exception());
}
Expand Down Expand Up @@ -359,13 +430,7 @@ Future<Result> AsyncConnection::send(FormattedCommand cmd) {

auto fut = event->get_future();

{
std::lock_guard<std::mutex> lock(_mtx);

_events.push_back(std::move(event));
}

_loop->add(shared_from_this());
send(std::move(event));

return fut;
}
Expand All @@ -379,13 +444,7 @@ Future<Result> AsyncConnection::send(const std::shared_ptr<AsyncShardsPool> &poo

auto fut = event->get_future();

{
std::lock_guard<std::mutex> lock(_mtx);

_events.push_back(std::move(event));
}

_loop->add(shared_from_this());
send(std::move(event));

return fut;
}
Expand Down
33 changes: 0 additions & 33 deletions src/sw/redis++/async_connection_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,39 +173,6 @@ class SafeAsyncConnection {
AsyncConnectionSPtr _connection;
};

// NOTE: This class is similar to `SafeAsyncConnection`.
// The difference is that `SafeAsyncConnection` tries to avoid copying a std::shared_ptr.
class GuardedAsyncConnection {
public:
explicit GuardedAsyncConnection(const AsyncConnectionPoolSPtr &pool) : _pool(pool),
_connection(_pool->fetch()) {
assert(!_connection->broken());
}

GuardedAsyncConnection(const GuardedAsyncConnection &) = delete;
GuardedAsyncConnection& operator=(const GuardedAsyncConnection &) = delete;

GuardedAsyncConnection(GuardedAsyncConnection &&) = default;
GuardedAsyncConnection& operator=(GuardedAsyncConnection &&) = default;

~GuardedAsyncConnection() {
// If `GuardedAsyncConnection` has been moved, `_pool` will be nullptr.
if (_pool) {
_pool->release(std::move(_connection));
}
}

AsyncConnection& connection() {
assert(_connection);

return *_connection;
}

private:
AsyncConnectionPoolSPtr _pool;
AsyncConnectionSPtr _connection;
};

}

}
Expand Down

0 comments on commit 74b10b7

Please sign in to comment.