Skip to content

Commit

Permalink
streaming: Fail streaming sessions during shutdown
Browse files Browse the repository at this point in the history
Fixes repair_additional_test.py:RepairAdditionalTest.repair_kill_3_test

The test does:

- Insert data on node1 only
- Insert data on node2 only
- Run repair on node1 and stop node1
  once "starting user-requested repair" is seen

The repair shutdown code may wait for the stream session to complete for
a very long time if node 1 finishes sending data to node2 and is waiting
for node2 to send data to it, when node1 is stopped. The stream session
will not be closed in this case until stream session _keep_alive_timeout
(10 minutes) expires. Instead of waiting for the stream_session keep
alive timer to expire, we can fail all the stream sessions during
shutdown.

Before 1 - The bad case (repair shutdown will last for 10 minutes):

  INFO  2016-09-21 16:23:56,617 [shard 0] stream_session - [Stream #bd34fea1-7fd4-11e6-8020-000000000001] Executing streaming plan for repair-in
  INFO  2016-09-21 16:23:56,617 [shard 0] stream_session - [Stream #bd34fea1-7fd4-11e6-8020-000000000001] Starting streaming to 127.0.0.2
  INFO  2016-09-21 16:23:56,617 [shard 0] stream_session - [Stream #bd34fea1-7fd4-11e6-8020-000000000001] Beginning stream session with 127.0.0.2
  INFO  2016-09-21 16:23:56,618 [shard 0] stream_session - [Stream #bd34fea1-7fd4-11e6-8020-000000000001] Prepare completed with 127.0.0.2. Receiving 1, sending 0
  INFO  2016-09-21 16:23:58,625 [shard 0] storage_service - Stop transport: stop_gossiping done
  INFO  2016-09-21 16:23:58,625 [shard 0] storage_service - Thrift server stopped
  INFO  2016-09-21 16:23:58,625 [shard 0] storage_service - CQL server stopped
  INFO  2016-09-21 16:23:58,625 [shard 0] storage_service - Stop transport: shutdown rpc and cql server done
  INFO  2016-09-21 16:23:58,626 [shard 0] storage_service - messaging_service stopped
  INFO  2016-09-21 16:23:58,626 [shard 0] storage_service - Stop transport: shutdown messaging_service done
  INFO  2016-09-21 16:23:58,626 [shard 0] storage_service - Stop transport: auth shutdown
  INFO  2016-09-21 16:23:58,626 [shard 0] storage_service - Stop transport: done
  INFO  2016-09-21 16:23:58,626 [shard 0] storage_service - Drain on shutdown: stop_transport done
  INFO  2016-09-21 16:23:58,626 [shard 0] tracing - Asked to shut down
  INFO  2016-09-21 16:23:58,626 [shard 0] tracing - Tracing is down
  INFO  2016-09-21 16:23:58,626 [shard 1] tracing - Asked to shut down
  INFO  2016-09-21 16:23:58,626 [shard 1] tracing - Tracing is down
  INFO  2016-09-21 16:23:58,626 [shard 0] storage_service - Drain on shutdown: tracing is stopped
  INFO  2016-09-21 16:23:58,669 [shard 0] storage_service - Drain on shutdown: flush column_families done
  INFO  2016-09-21 16:23:58,669 [shard 0] storage_service - Drain on shutdown: shutdown commitlog done
  INFO  2016-09-21 16:23:58,669 [shard 0] storage_service - Drain on shutdown: done
  INFO  2016-09-21 16:23:58,669 [shard 0] repair - Starting shutdown of repair
  INFO  2016-09-21 16:25:56,624 [shard 0] stream_session - [Stream #bd34fea1-7fd4-11e6-8020-000000000001] The session 0x600021516c00 made no progress with peer 127.0.0.2

Before 2 - The good case:

  INFO  2016-09-21 16:18:32,087 [shard 0] stream_session - [Stream #fbc668d1-7fd3-11e6-bc54-000000000001] Executing streaming plan for repair-in
  INFO  2016-09-21 16:18:32,087 [shard 0] stream_session - [Stream #fbc668d1-7fd3-11e6-bc54-000000000001] Starting streaming to 127.0.0.2
  INFO  2016-09-21 16:18:32,087 [shard 0] stream_session - [Stream #fbc668d1-7fd3-11e6-bc54-000000000001] Beginning stream session with 127.0.0.2
  INFO  2016-09-21 16:18:32,087 [shard 0] stream_session - [Stream #fbc668d1-7fd3-11e6-bc54-000000000001] Prepare completed with 127.0.0.2. Receiving 1, sending 0
  INFO  2016-09-21 16:18:34,098 [shard 0] storage_service - Stop transport: stop_gossiping done
  INFO  2016-09-21 16:18:34,098 [shard 0] storage_service - Thrift server stopped
  INFO  2016-09-21 16:18:34,098 [shard 0] storage_service - CQL server stopped
  INFO  2016-09-21 16:18:34,098 [shard 0] storage_service - Stop transport: shutdown rpc and cql server done
  INFO  2016-09-21 16:18:34,155 [shard 0] messaging_service - Retry verb=19 to 127.0.0.2:0, retry=10: rpc::closed_error (connection is closed)
  WARN  2016-09-21 16:18:34,155 [shard 0] stream_session - [Stream #fbc668d1-7fd3-11e6-bc54-000000000001] COMPLETE_MESSAGE for 127.0.0.2 has failed: rpc::closed_error (connection is closed)
  WARN  2016-09-21 16:18:34,155 [shard 0] stream_session - [Stream #fbc668d1-7fd3-11e6-bc54-000000000001] Streaming error occurred
  INFO  2016-09-21 16:18:34,155 [shard 0] stream_session - [Stream #fbc668d1-7fd3-11e6-bc54-000000000001] Session with 127.0.0.2 is complete, state=FAILED
  INFO  2016-09-21 16:18:34,155 [shard 0] storage_service - messaging_service stopped
  INFO  2016-09-21 16:18:34,155 [shard 0] storage_service - Stop transport: shutdown messaging_service done
  INFO  2016-09-21 16:18:34,155 [shard 0] stream_session - [Stream #fbc668d1-7fd3-11e6-bc54-000000000001] bytes_sent = 0, bytes_received = 245000
  WARN  2016-09-21 16:18:34,155 [shard 0] stream_session - [Stream #fbc668d1-7fd3-11e6-bc54-000000000001] Stream failed, peers={127.0.0.2}
  WARN  2016-09-21 16:18:34,155 [shard 0] repair - repair's stream failed: streaming::stream_exception (Stream failed)
  INFO  2016-09-21 16:18:34,155 [shard 0] repair - repair 1 failed - streaming::stream_exception (Stream failed)
  INFO  2016-09-21 16:18:34,155 [shard 0] storage_service - Stop transport: auth shutdown
  INFO  2016-09-21 16:18:34,155 [shard 0] storage_service - Stop transport: done
  INFO  2016-09-21 16:18:34,155 [shard 0] storage_service - Drain on shutdown: stop_transport done
  INFO  2016-09-21 16:18:34,155 [shard 0] tracing - Asked to shut down
  INFO  2016-09-21 16:18:34,155 [shard 0] tracing - Tracing is down
  INFO  2016-09-21 16:18:34,156 [shard 1] tracing - Asked to shut down
  INFO  2016-09-21 16:18:34,156 [shard 1] tracing - Tracing is down
  INFO  2016-09-21 16:18:34,156 [shard 0] storage_service - Drain on shutdown: tracing is stopped
  INFO  2016-09-21 16:18:34,199 [shard 0] storage_service - Drain on shutdown: flush column_families done
  INFO  2016-09-21 16:18:34,199 [shard 0] storage_service - Drain on shutdown: shutdown commitlog done
  INFO  2016-09-21 16:18:34,199 [shard 0] storage_service - Drain on shutdown: done
  INFO  2016-09-21 16:18:34,199 [shard 0] repair - Starting shutdown of repair
  INFO  2016-09-21 16:18:34,199 [shard 0] repair - Completed shutdown of repair
  INFO  2016-09-21 16:18:34,199 [shard 0] compaction_manager - Asked to stop
  INFO  2016-09-21 16:18:34,199 [shard 1] compaction_manager - Asked to stop

After:

  INFO  2016-09-21 16:06:21,684 [shard 0] stream_session - [Stream #48661c51-7fd2-11e6-8ba7-000000000001] Executing streaming plan for repair-in
  INFO  2016-09-21 16:06:21,684 [shard 0] stream_session - [Stream #48661c51-7fd2-11e6-8ba7-000000000001] Starting streaming to 127.0.0.2
  INFO  2016-09-21 16:06:21,684 [shard 0] stream_session - [Stream #48661c51-7fd2-11e6-8ba7-000000000001] Beginning stream session with 127.0.0.2
  INFO  2016-09-21 16:06:21,685 [shard 0] stream_session - [Stream #48661c51-7fd2-11e6-8ba7-000000000001] Prepare completed with 127.0.0.2. Receiving 1, sending 0
  INFO  2016-09-21 16:06:23,687 [shard 0] storage_service - Stop transport: stop_gossiping done
  INFO  2016-09-21 16:06:23,687 [shard 0] storage_service - Thrift server stopped
  INFO  2016-09-21 16:06:23,687 [shard 0] storage_service - CQL server stopped
  INFO  2016-09-21 16:06:23,687 [shard 0] storage_service - Stop transport: shutdown rpc and cql server done
  INFO  2016-09-21 16:06:23,688 [shard 0] storage_service - messaging_service stopped
  INFO  2016-09-21 16:06:23,688 [shard 0] storage_service - Stop transport: shutdown messaging_service done
  INFO  2016-09-21 16:06:23,688 [shard 0] stream_session - [Stream #48661c51-7fd2-11e6-8ba7-000000000001] Session with 127.0.0.2 is complete, state=FAILED
  INFO  2016-09-21 16:06:23,688 [shard 0] storage_service - stream_manager stopped
  INFO  2016-09-21 16:06:23,688 [shard 1] storage_service - stream_manager stopped
  INFO  2016-09-21 16:06:23,688 [shard 0] stream_session - [Stream #48661c51-7fd2-11e6-8ba7-000000000001] bytes_sent = 0, bytes_received = 25725
  INFO  2016-09-21 16:06:23,688 [shard 0] storage_service - Stop transport: shutdown stream_manager done
  WARN  2016-09-21 16:06:23,688 [shard 0] stream_session - [Stream #48661c51-7fd2-11e6-8ba7-000000000001] Stream failed, peers={127.0.0.2}
  WARN  2016-09-21 16:06:23,688 [shard 0] repair - repair's stream failed: streaming::stream_exception (Stream failed)
  INFO  2016-09-21 16:06:23,688 [shard 0] repair - repair 1 failed - streaming::stream_exception (Stream failed)
  INFO  2016-09-21 16:06:23,688 [shard 0] storage_service - Stop transport: auth shutdown
  INFO  2016-09-21 16:06:23,688 [shard 0] storage_service - Stop transport: done
  INFO  2016-09-21 16:06:23,688 [shard 0] storage_service - Drain on shutdown: stop_transport done
  INFO  2016-09-21 16:06:23,688 [shard 0] tracing - Asked to shut down
  INFO  2016-09-21 16:06:23,688 [shard 0] tracing - Tracing is down
  INFO  2016-09-21 16:06:23,688 [shard 1] tracing - Asked to shut down
  INFO  2016-09-21 16:06:23,688 [shard 1] tracing - Tracing is down
  INFO  2016-09-21 16:06:23,688 [shard 0] storage_service - Drain on shutdown: tracing is stopped
  INFO  2016-09-21 16:06:23,774 [shard 0] storage_service - Drain on shutdown: flush column_families done
  INFO  2016-09-21 16:06:23,774 [shard 0] storage_service - Drain on shutdown: shutdown commitlog done
  INFO  2016-09-21 16:06:23,774 [shard 0] storage_service - Drain on shutdown: done
  INFO  2016-09-21 16:06:23,774 [shard 0] repair - Starting shutdown of repair
  INFO  2016-09-21 16:06:23,774 [shard 0] repair - Completed shutdown of repair
  INFO  2016-09-21 16:06:23,774 [shard 0] compaction_manager - Asked to stop
  INFO  2016-09-21 16:06:23,774 [shard 1] compaction_manager - Asked to stop
  • Loading branch information
asias committed Sep 25, 2016
1 parent 7c873f0 commit f377a3b
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 0 deletions.
15 changes: 15 additions & 0 deletions service/storage_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1053,6 +1053,9 @@ future<> storage_service::stop_transport() {
ss.do_stop_ms().get();
logger.info("Stop transport: shutdown messaging_service done");

ss.do_stop_stream_manager().get();
logger.info("Stop transport: shutdown stream_manager done");

auth::auth::shutdown().get();
logger.info("Stop transport: auth shutdown");

Expand Down Expand Up @@ -1665,6 +1668,18 @@ future<> storage_service::do_stop_ms() {
});
}

future<> storage_service::do_stop_stream_manager() {
if (_stream_manager_stopped) {
return make_ready_future<>();
}
_stream_manager_stopped = true;
return streaming::get_stream_manager().invoke_on_all([] (auto& sm) {
return sm.stop();
}).then([] {
logger.info("stream_manager stopped");
});
}

future<> check_snapshot_not_exist(database& db, sstring ks_name, sstring name) {
auto& ks = db.find_keyspace(ks_name);
return parallel_for_each(ks.metadata()->cf_meta_data(), [&db, ks_name = std::move(ks_name), name = std::move(name)] (auto& pair) {
Expand Down
2 changes: 2 additions & 0 deletions service/storage_service.hh
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ private:
sstring _operation_in_progress;
bool _force_remove_completion = false;
bool _ms_stopped = false;
bool _stream_manager_stopped = false;
public:
storage_service(distributed<database>& db)
: _db(db) {
Expand Down Expand Up @@ -316,6 +317,7 @@ private:
future<> do_stop_rpc_server();
future<> do_stop_native_transport();
future<> do_stop_ms();
future<> do_stop_stream_manager();
#if 0
public void stopTransports()
{
Expand Down
8 changes: 8 additions & 0 deletions streaming/stream_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,14 @@ void stream_manager::fail_sessions(inet_address endpoint) {
}
}

void stream_manager::fail_all_sessions() {
for (auto sr : get_all_streams()) {
for (auto session : sr->get_coordinator()->get_all_stream_sessions()) {
session->close_session(stream_session_state::FAILED);
}
}
}

void stream_manager::on_remove(inet_address endpoint) {
if (has_peer(endpoint)) {
sslog.info("stream_manager: Close all stream_session with peer = {} in on_remove", endpoint);
Expand Down
2 changes: 2 additions & 0 deletions streaming/stream_manager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public:
void show_streams();

future<> stop() {
fail_all_sessions();
return make_ready_future<>();
}

Expand Down Expand Up @@ -153,6 +154,7 @@ public:
virtual void on_restart(inet_address endpoint, endpoint_state ep_state) override;

private:
void fail_all_sessions();
void fail_sessions(inet_address endpoint);
bool has_peer(inet_address endpoint);
};
Expand Down

0 comments on commit f377a3b

Please sign in to comment.