Skip to content

Commit

Permalink
cdc: Handling of timeout/unavailable exceptions in streams fetching
Browse files Browse the repository at this point in the history
Retrying the operation of fetching generations not always makes
sense. In this patch only the lightest exceptions (timeout and
unavailable) trigger retrying, while the heavy, unrecoverable ones
abort the operation and get logged on ERROR level.

Fixes scylladb#6557
  • Loading branch information
jul-stas authored and avikivity committed Jun 22, 2020
1 parent 52180f9 commit a35b71c
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 9 deletions.
46 changes: 37 additions & 9 deletions service/storage_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,31 @@ bool storage_service::do_handle_cdc_generation(db_clock::time_point ts) {
}).get0();
}

namespace {
class cdc_generation_handling_nonfatal_exception : public std::runtime_error {
using std::runtime_error::runtime_error;
};

constexpr char could_not_retrieve_msg_template[]
= "Could not retrieve CDC streams with timestamp {} upon gossip event. Reason: \"{}\". Action: {}.";
} // anon. namespace

bool storage_service::do_handle_cdc_generation_intercept_nonfatal_errors(db_clock::time_point ts) {
try {
return do_handle_cdc_generation(ts);
} catch (exceptions::request_timeout_exception& e) {
throw cdc_generation_handling_nonfatal_exception(e.what());
} catch (exceptions::unavailable_exception& e) {
throw cdc_generation_handling_nonfatal_exception(e.what());
} catch (...) {
const auto ep = std::current_exception();
if (is_timeout_exception(ep)) {
throw cdc_generation_handling_nonfatal_exception(format("{}", ep));
}
throw;
}
}

class ander {
private:
bool _result = true;
Expand All @@ -705,21 +730,22 @@ void storage_service::async_handle_cdc_generation(db_clock::time_point ts) {
while (true) {
sleep_abortable(std::chrono::seconds(5), ss->_abort_source).get();
try {
bool using_this_gen = ss->do_handle_cdc_generation(ts);
const bool using_this_gen = ss->do_handle_cdc_generation_intercept_nonfatal_errors(ts);
if (using_this_gen) {
cdc::update_streams_description(ts, sys_dist_ks,
[ss] { return ss->get_token_metadata().count_normal_token_owners(); }, ss->_abort_source);
}
return;
} catch (...) {
} catch (cdc_generation_handling_nonfatal_exception& e) {
if (get_storage_service().map_reduce(ander(), [ts] (storage_service& ss) {
return ss._cdc_metadata.known_or_obsolete(ts);
}).get0()) {
return;
}

cdc_log.warn("Could not retrieve CDC streams with timestamp {}: {}. Will retry again.",
ts, std::current_exception());
cdc_log.warn(could_not_retrieve_msg_template, ts, e.what(), "continuing to retry in the background");
} catch (...) {
cdc_log.error(could_not_retrieve_msg_template, ts, std::current_exception(), "not retrying anymore");
return; // Exotic ("fatal") exception => do not retry
}
}
});
Expand All @@ -745,12 +771,14 @@ void storage_service::handle_cdc_generation(std::optional<db_clock::time_point>

bool using_this_gen = false;
try {
using_this_gen = do_handle_cdc_generation(*ts);
} catch(...) {
cdc_log.warn("Could not retrieve CDC streams with timestamp {} on gossip event: {}."
" Will retry in the background.", *ts, std::current_exception());
using_this_gen = do_handle_cdc_generation_intercept_nonfatal_errors(*ts);
} catch (cdc_generation_handling_nonfatal_exception& e) {
cdc_log.warn(could_not_retrieve_msg_template, ts, e.what(), "retrying in the background");
async_handle_cdc_generation(*ts);
return;
} catch(...) {
cdc_log.error(could_not_retrieve_msg_template, ts, std::current_exception(), "not retrying");
return; // Exotic ("fatal") exception => do not retry
}

if (using_this_gen) {
Expand Down
3 changes: 3 additions & 0 deletions service/storage_service.hh
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,9 @@ private:
/* Returns `true` iff we started using the generation (it was not obsolete),
* which means that this node might write some CDC log entries using streams from this generation. */
bool do_handle_cdc_generation(db_clock::time_point);
/* Wrapper around `do_handle_cdc_generation` which intercepts timeout/unavailability exceptions.
* Returns: do_handle_cdc_generation(ts). */
bool do_handle_cdc_generation_intercept_nonfatal_errors(db_clock::time_point ts);

/* If `handle_cdc_generation` fails, it schedules an asynchronous retry in the background
* using `async_handle_cdc_generation`.
Expand Down

0 comments on commit a35b71c

Please sign in to comment.