Skip to content

Commit

Permalink
c-ares DNS resolver: fix logical race between resolution timeout/canc…
Browse files Browse the repository at this point in the history
…ellation and fd readability (grpc#31443)

* Fix race between c-ares resolution timeout and fd readability
  • Loading branch information
apolcyn authored Oct 28, 2022
1 parent b19f87d commit e5f7b1b
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class AresClientChannelDNSResolver : public PollingResolver {
&service_config_json_, resolver_->query_timeout_ms_));
GRPC_CARES_TRACE_LOG(
"resolver:%p Started resolving TXT records. txt_request_:%p",
resolver_.get(), srv_request_.get());
resolver_.get(), txt_request_.get());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,17 +362,17 @@ static void on_readable(void* arg, grpc_error_handle error) {
fdn->readable_registered = false;
GRPC_CARES_TRACE_LOG("request:%p readable on %s", fdn->ev_driver->request,
fdn->grpc_polled_fd->GetName());
if (error.ok()) {
if (error.ok() && !ev_driver->shutting_down) {
do {
ares_process_fd(ev_driver->channel, as, ARES_SOCKET_BAD);
} while (fdn->grpc_polled_fd->IsFdStillReadableLocked());
} else {
// If error is not absl::OkStatus(), it means the fd has been shutdown or
// timed out. The pending lookups made on this ev_driver will be cancelled
// by the following ares_cancel() and the on_done callbacks will be invoked
// with a status of ARES_ECANCELLED. The remaining file descriptors in this
// ev_driver will be cleaned up in the follwing
// grpc_ares_notify_on_event_locked().
// If error is not absl::OkStatus() or the resolution was cancelled, it
// means the fd has been shutdown or timed out. The pending lookups made on
// this ev_driver will be cancelled by the following ares_cancel() and the
// on_done callbacks will be invoked with a status of ARES_ECANCELLED. The
// remaining file descriptors in this ev_driver will be cleaned up in the
// follwing grpc_ares_notify_on_event_locked().
ares_cancel(ev_driver->channel);
}
grpc_ares_notify_on_event_locked(ev_driver);
Expand All @@ -388,15 +388,15 @@ static void on_writable(void* arg, grpc_error_handle error) {
fdn->writable_registered = false;
GRPC_CARES_TRACE_LOG("request:%p writable on %s", ev_driver->request,
fdn->grpc_polled_fd->GetName());
if (error.ok()) {
if (error.ok() && !ev_driver->shutting_down) {
ares_process_fd(ev_driver->channel, ARES_SOCKET_BAD, as);
} else {
// If error is not absl::OkStatus(), it means the fd has been shutdown or
// timed out. The pending lookups made on this ev_driver will be cancelled
// by the following ares_cancel() and the on_done callbacks will be invoked
// with a status of ARES_ECANCELLED. The remaining file descriptors in this
// ev_driver will be cleaned up in the follwing
// grpc_ares_notify_on_event_locked().
// If error is not absl::OkStatus() or the resolution was cancelled, it
// means the fd has been shutdown or timed out. The pending lookups made on
// this ev_driver will be cancelled by the following ares_cancel() and the
// on_done callbacks will be invoked with a status of ARES_ECANCELLED. The
// remaining file descriptors in this ev_driver will be cleaned up in the
// follwing grpc_ares_notify_on_event_locked().
ares_cancel(ev_driver->channel);
}
grpc_ares_notify_on_event_locked(ev_driver);
Expand Down
39 changes: 24 additions & 15 deletions test/cpp/naming/resolver_component_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,9 @@ gpr_timespec TestDeadline(void) {

struct ArgsStruct {
gpr_event ev;
gpr_atm done_atm;
gpr_mu* mu;
grpc_pollset* pollset;
bool done; // guarded by mu
grpc_pollset* pollset; // guarded by mu
grpc_pollset_set* pollset_set;
std::shared_ptr<grpc_core::WorkSerializer> lock;
grpc_channel_args* channel_args;
Expand All @@ -211,7 +211,7 @@ void ArgsInit(ArgsStruct* args) {
args->pollset_set = grpc_pollset_set_create();
grpc_pollset_set_add_pollset(args->pollset_set, args->pollset);
args->lock = std::make_shared<grpc_core::WorkSerializer>();
gpr_atm_rel_store(&args->done_atm, 0);
args->done = false;
args->channel_args = nullptr;
}

Expand Down Expand Up @@ -243,24 +243,22 @@ void PollPollsetUntilRequestDone(ArgsStruct* args) {
// for that server before succeeding with the healthy one).
gpr_timespec deadline = NSecondDeadline(20);
while (true) {
bool done = gpr_atm_acq_load(&args->done_atm) != 0;
if (done) {
grpc_core::MutexLockForGprMu lock(args->mu);
if (args->done) {
break;
}
gpr_timespec time_left =
gpr_time_sub(deadline, gpr_now(GPR_CLOCK_REALTIME));
gpr_log(GPR_DEBUG, "done=%d, time_left=%" PRId64 ".%09d", done,
gpr_log(GPR_DEBUG, "done=%d, time_left=%" PRId64 ".%09d", args->done,
time_left.tv_sec, time_left.tv_nsec);
GPR_ASSERT(gpr_time_cmp(time_left, gpr_time_0(GPR_TIMESPAN)) >= 0);
grpc_pollset_worker* worker = nullptr;
grpc_core::ExecCtx exec_ctx;
gpr_mu_lock(args->mu);
GRPC_LOG_IF_ERROR(
"pollset_work",
grpc_pollset_work(
args->pollset, &worker,
grpc_core::Timestamp::FromTimespecRoundUp(NSecondDeadline(1))));
gpr_mu_unlock(args->mu);
}
gpr_event_set(&args->ev, reinterpret_cast<void*>(1));
}
Expand Down Expand Up @@ -432,11 +430,11 @@ class ResultHandler : public grpc_core::Resolver::ResultHandler {

void ReportResult(grpc_core::Resolver::Result result) override {
CheckResult(result);
gpr_atm_rel_store(&args_->done_atm, 1);
gpr_mu_lock(args_->mu);
grpc_core::MutexLockForGprMu lock(args_->mu);
GPR_ASSERT(!args_->done);
args_->done = true;
GRPC_LOG_IF_ERROR("pollset_kick",
grpc_pollset_kick(args_->pollset, nullptr));
gpr_mu_unlock(args_->mu);
}

virtual void CheckResult(const grpc_core::Resolver::Result& /*result*/) {}
Expand Down Expand Up @@ -566,7 +564,8 @@ void StartResolvingLocked(grpc_core::Resolver* r) { r->StartLocked(); }

void RunResolvesRelevantRecordsTest(
std::unique_ptr<grpc_core::Resolver::ResultHandler> (*CreateResultHandler)(
ArgsStruct* args)) {
ArgsStruct* args),
grpc_core::ChannelArgs resolver_args) {
grpc_core::ExecCtx exec_ctx;
ArgsStruct args;
ArgsInit(&args);
Expand Down Expand Up @@ -605,7 +604,6 @@ void RunResolvesRelevantRecordsTest(
}
gpr_log(GPR_DEBUG, "resolver_component_test: --enable_srv_queries: %s",
absl::GetFlag(FLAGS_enable_srv_queries).c_str());
grpc_core::ChannelArgs resolver_args;
// By default, SRV queries are disabled, so tests that expect no SRV query
// should avoid setting any channel arg. Test cases that do rely on the SRV
// query must explicitly enable SRV though.
Expand Down Expand Up @@ -645,7 +643,8 @@ void RunResolvesRelevantRecordsTest(
}

TEST(ResolverComponentTest, TestResolvesRelevantRecords) {
RunResolvesRelevantRecordsTest(CheckingResultHandler::Create);
RunResolvesRelevantRecordsTest(CheckingResultHandler::Create,
grpc_core::ChannelArgs());
}

TEST(ResolverComponentTest, TestResolvesRelevantRecordsWithConcurrentFdStress) {
Expand All @@ -656,12 +655,22 @@ TEST(ResolverComponentTest, TestResolvesRelevantRecordsWithConcurrentFdStress) {
std::thread socket_stress_thread(OpenAndCloseSocketsStressLoop, phony_port,
&done_ev);
// Run the resolver test
RunResolvesRelevantRecordsTest(ResultHandler::Create);
RunResolvesRelevantRecordsTest(ResultHandler::Create,
grpc_core::ChannelArgs());
// Shutdown and join stress thread
gpr_event_set(&done_ev, reinterpret_cast<void*>(1));
socket_stress_thread.join();
}

TEST(ResolverComponentTest, TestDoesntCrashOrHangWith1MsTimeout) {
// Queries in this test could either complete successfully or time out
// and show cancellation. This test doesn't care - we just care that the
// query completes and doesn't crash, get stuck, leak, etc.
RunResolvesRelevantRecordsTest(
ResultHandler::Create,
grpc_core::ChannelArgs().Set(GRPC_ARG_DNS_ARES_QUERY_TIMEOUT_MS, 1));
}

} // namespace

int main(int argc, char** argv) {
Expand Down

0 comments on commit e5f7b1b

Please sign in to comment.