Skip to content

Commit

Permalink
stream_result_future: Use local manager reference
Browse files Browse the repository at this point in the history
The reference is present in all the required places already.

Signed-off-by: Pavel Emelyanov <[email protected]>
  • Loading branch information
xemul committed Nov 24, 2021
1 parent 5b748a7 commit 307a258
Showing 1 changed file with 7 additions and 9 deletions.
16 changes: 7 additions & 9 deletions streaming/stream_result_future.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ extern logging::logger sslog;
future<stream_state> stream_result_future::init_sending_side(stream_manager& mgr, UUID plan_id_, sstring description_,
std::vector<stream_event_handler*> listeners_, shared_ptr<stream_coordinator> coordinator_) {
auto sr = ::make_shared<stream_result_future>(mgr, plan_id_, description_, coordinator_);
get_local_stream_manager().register_sending(sr);
mgr.register_sending(sr);

for (auto& listener : listeners_) {
sr->add_event_listener(listener);
Expand All @@ -66,8 +66,7 @@ future<stream_state> stream_result_future::init_sending_side(stream_manager& mgr
}

shared_ptr<stream_result_future> stream_result_future::init_receiving_side(stream_manager& mgr, UUID plan_id, sstring description, inet_address from) {
auto& sm = get_local_stream_manager();
auto sr = sm.get_receiving_stream(plan_id);
auto sr = mgr.get_receiving_stream(plan_id);
if (sr) {
auto err = fmt::format("[Stream #{}] GOT PREPARE_MESSAGE from {}, description={},"
"stream_plan exists, duplicated message received?", plan_id, description, from);
Expand All @@ -77,7 +76,7 @@ shared_ptr<stream_result_future> stream_result_future::init_receiving_side(strea
sslog.info("[Stream #{}] Executing streaming plan for {} with peers={}, slave", plan_id, description, from);
bool is_receiving = true;
sr = ::make_shared<stream_result_future>(mgr, plan_id, description, is_receiving);
sm.register_receiving(sr);
mgr.register_receiving(sr);
return sr;
}

Expand Down Expand Up @@ -115,14 +114,13 @@ void stream_result_future::maybe_complete() {
auto plan_id = this->plan_id;
sslog.debug("[Stream #{}] stream_result_future: has_active_sessions={}", plan_id, has_active_sessions);
if (!has_active_sessions) {
auto& sm = get_local_stream_manager();
if (sslog.is_enabled(logging::log_level::debug)) {
sm.show_streams();
_mgr.show_streams();
}
auto duration = std::chrono::duration_cast<std::chrono::duration<float>>(lowres_clock::now() - _start_time).count();
auto stats = make_lw_shared<sstring>("");
//FIXME: discarded future.
(void)sm.get_progress_on_all_shards(plan_id).then([plan_id, duration, stats] (auto sbytes) {
(void)_mgr.get_progress_on_all_shards(plan_id).then([plan_id, duration, stats] (auto sbytes) {
auto tx_bw = sstring("0");
auto rx_bw = sstring("0");
if (std::fabs(duration) > FLT_EPSILON) {
Expand All @@ -132,8 +130,8 @@ void stream_result_future::maybe_complete() {
*stats = format("tx={:d} KiB, {} KiB/s, rx={:d} KiB, {} KiB/s", sbytes.bytes_sent / 1024, tx_bw, sbytes.bytes_received / 1024, rx_bw);
}).handle_exception([plan_id] (auto ep) {
sslog.warn("[Stream #{}] Fail to get progress on all shards: {}", plan_id, ep);
}).finally([this, plan_id, stats, &sm] () {
sm.remove_stream(plan_id);
}).finally([this, plan_id, stats] () {
_mgr.remove_stream(plan_id);
auto final_state = get_current_state();
if (final_state.has_failed_session()) {
sslog.warn("[Stream #{}] Streaming plan for {} failed, peers={}, {}", plan_id, description, _coordinator->get_peers(), *stats);
Expand Down

0 comments on commit 307a258

Please sign in to comment.