Skip to content

Commit

Permalink
messaging_service: do not forget to close stream when sending it to a…
Browse files Browse the repository at this point in the history
…nother side failed

Fixes scylladb#4124

Message-Id: <[email protected]>
  • Loading branch information
Gleb Natapov authored and avikivity committed Jan 31, 2019
1 parent 4b47094 commit a70374d
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions message/messaging_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -671,8 +671,10 @@ messaging_service::make_sink_and_source_for_stream_mutation_fragments(utils::UUI
rpc_protocol::client& rpc_client = *wrapper;
return wrapper->make_stream_sink<netw::serializer, frozen_mutation_fragment>().then([this, plan_id, schema_id, cf_id, estimated_partitions, reason, &rpc_client] (rpc::sink<frozen_mutation_fragment> sink) mutable {
auto rpc_handler = rpc()->make_client<rpc::source<int32_t> (utils::UUID, utils::UUID, utils::UUID, uint64_t, streaming::stream_reason, rpc::sink<frozen_mutation_fragment>)>(messaging_verb::STREAM_MUTATION_FRAGMENTS);
return rpc_handler(rpc_client , plan_id, schema_id, cf_id, estimated_partitions, reason, sink).then([sink] (rpc::source<int32_t> source) mutable {
return make_ready_future<rpc::sink<frozen_mutation_fragment>, rpc::source<int32_t>>(std::move(sink), std::move(source));
return rpc_handler(rpc_client , plan_id, schema_id, cf_id, estimated_partitions, reason, sink).then_wrapped([sink] (future<rpc::source<int32_t>> source) mutable {
return (source.failed() ? sink.close() : make_ready_future<>()).then([sink = std::move(sink), source = std::move(source)] () mutable {
return make_ready_future<rpc::sink<frozen_mutation_fragment>, rpc::source<int32_t>>(std::move(sink), std::move(source.get0()));
});
});
});
}
Expand Down

0 comments on commit a70374d

Please sign in to comment.