Skip to content

Commit

Permalink
transport: (Un)Subscribe cql_server::event_notifier from controller
Browse files Browse the repository at this point in the history
There's a migration notifier that's carried through cql_server
_just_ to let event-notifier (un)subscribe on it. Also there's
a call for global storage-service in there which will need to
be replaced with yet another pass-through argument which is not
great.

It's easier to establish this subscription outside of cql_server
like it's currently done for proxy and sl-manager. In case of
cql_server the "outside" is the controller.

This patch just moves the subscription management from cql_server
to controller, next two patches will make more use of this change.

Signed-off-by: Pavel Emelyanov <[email protected]>
  • Loading branch information
xemul committed Jul 22, 2021
1 parent b57fb0a commit 1acef41
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 11 deletions.
31 changes: 28 additions & 3 deletions transport/controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "transport/controller.hh"
#include "transport/server.hh"
#include "service/memory_limiter.hh"
#include "service/storage_service.hh" // temporary
#include "db/config.hh"
#include "gms/gossiper.hh"
#include "log.hh"
Expand Down Expand Up @@ -152,6 +153,11 @@ future<> controller::do_start_server() {
cserver->start(std::ref(_qp), std::ref(_auth_service), std::ref(_mnotifier), std::ref(_mem_limiter), cql_server_config, std::ref(cfg), std::ref(_sl_controller)).get();
auto on_error = defer([&cserver] { cserver->stop().get(); });

subscribe_server(*cserver).get();
auto on_error_unsub = defer([this, &cserver] {
unsubscribe_server(*cserver).get();
});

parallel_for_each(configs, [&cserver, keepalive](const listen_cfg & cfg) {
return cserver->invoke_on_all(&cql_transport::cql_server::listen, cfg.addr, cfg.cred, cfg.is_shard_aware, keepalive).then([cfg] {
logger.info("Starting listening for CQL clients on {} ({}, {})"
Expand All @@ -163,6 +169,7 @@ future<> controller::do_start_server() {
set_cql_ready(true).get();

on_error.cancel();
on_error_unsub.cancel();
_server = std::move(cserver);
});
}
Expand Down Expand Up @@ -195,16 +202,34 @@ future<> controller::do_stop_server() {
return do_with(std::move(_server), [this] (std::unique_ptr<distributed<cql_transport::cql_server>>& cserver) {
if (cserver) {
// FIXME: cql_server::stop() doesn't kill existing connections and wait for them
return set_cql_ready(false).finally([&cserver] {
return cserver->stop().then([] {
logger.info("CQL server stopped");
return set_cql_ready(false).finally([this, &cserver] {
return unsubscribe_server(*cserver).then([&cserver] {
return cserver->stop().then([] {
logger.info("CQL server stopped");
});
});
});
}
return make_ready_future<>();
});
}

future<> controller::subscribe_server(sharded<cql_server>& server) {
return server.invoke_on_all([this] (cql_server& server) {
_mnotifier.local().register_listener(server.get_migration_listener());
service::get_local_storage_service().register_subscriber(server.get_lifecycle_listener());
return make_ready_future<>();
});
}

future<> controller::unsubscribe_server(sharded<cql_server>& server) {
return server.invoke_on_all([this] (cql_server& server) {
return _mnotifier.local().unregister_listener(server.get_migration_listener()).then([this, &server]{
return service::get_local_storage_service().unregister_subscriber(server.get_lifecycle_listener());
});
});
}

future<bool> controller::is_server_running() {
return smp::submit_to(0, [this] {
return make_ready_future<bool>(bool(_server));
Expand Down
3 changes: 3 additions & 0 deletions transport/controller.hh
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class controller {
future<> do_start_server();
future<> do_stop_server();

future<> subscribe_server(sharded<cql_server>& server);
future<> unsubscribe_server(sharded<cql_server>& server);

public:
controller(sharded<auth::service>&, sharded<service::migration_notifier>&, gms::gossiper&,
sharded<cql3::query_processor>&, sharded<service::memory_limiter>&,
Expand Down
12 changes: 4 additions & 8 deletions transport/event_notifier.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,16 @@
#include "transport/server.hh"
#include <seastar/core/gate.hh>
#include "service/migration_listener.hh"
#include "service/storage_service.hh"
#include "transport/response.hh"
#include "gms/gossiper.hh"

namespace cql_transport {

static logging::logger elogger("event_notifier");

cql_server::event_notifier::event_notifier(service::migration_notifier& mn) : _mnotifier(mn)
{
_mnotifier.register_listener(this);
service::get_local_storage_service().register_subscriber(this);
(void)_mnotifier;
}

cql_server::event_notifier::~event_notifier()
Expand All @@ -41,11 +40,8 @@ cql_server::event_notifier::~event_notifier()
}

future<> cql_server::event_notifier::stop() {
return _mnotifier.unregister_listener(this).then([this]{
return service::get_local_storage_service().unregister_subscriber(this).finally([this] {
_stopped = true;
});
});
_stopped = true;
return make_ready_future<>();
}

void cql_server::event_notifier::register_event(event::event_type et, cql_server::connection* conn)
Expand Down
4 changes: 4 additions & 0 deletions transport/server.hh
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ public:
qos::service_level_controller& sl_controller);
public:
using response = cql_transport::response;
service::endpoint_lifecycle_subscriber* get_lifecycle_listener() const noexcept;
service::migration_listener* get_migration_listener() const noexcept;
private:
class fmt_visitor;
friend class connection;
Expand Down Expand Up @@ -328,4 +330,6 @@ public:
virtual void on_down(const gms::inet_address& endpoint) override;
};

inline service::endpoint_lifecycle_subscriber* cql_server::get_lifecycle_listener() const noexcept { return _notifier.get(); }
inline service::migration_listener* cql_server::get_migration_listener() const noexcept { return _notifier.get(); }
}

0 comments on commit 1acef41

Please sign in to comment.