Skip to content

Commit

Permalink
Merge pull request grpc#10054 from murgatroid99/uv_resolver_fallback
Browse files Browse the repository at this point in the history
Add uv resolver fallback for named ports, fix portability tests
  • Loading branch information
murgatroid99 authored Mar 10, 2017
2 parents 97129b0 + d0cda5c commit a85e7d4
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 28 deletions.
2 changes: 2 additions & 0 deletions build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2576,6 +2576,8 @@ targets:
- grpc
- gpr_test_util
- gpr
exclude_iomgrs:
- uv
platforms:
- mac
- linux
Expand Down
8 changes: 4 additions & 4 deletions src/core/ext/transport/chttp2/transport/chttp2_transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,10 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport *t,
grpc_error *error) {
if (!t->closed) {
if (!grpc_error_has_clear_grpc_status(error)) {
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE);
}
if (t->write_state != GRPC_CHTTP2_WRITE_STATE_IDLE) {
if (t->close_transport_on_writes_finished == NULL) {
t->close_transport_on_writes_finished =
Expand All @@ -520,10 +524,6 @@ static void close_transport_locked(grpc_exec_ctx *exec_ctx,
grpc_error_add_child(t->close_transport_on_writes_finished, error);
return;
}
if (!grpc_error_has_clear_grpc_status(error)) {
error = grpc_error_set_int(error, GRPC_ERROR_INT_GRPC_STATUS,
GRPC_STATUS_UNAVAILABLE);
}
t->closed = 1;
connectivity_state_set(exec_ctx, t, GRPC_CHANNEL_SHUTDOWN,
GRPC_ERROR_REF(error), "close_transport");
Expand Down
22 changes: 13 additions & 9 deletions src/core/lib/iomgr/pollset_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

#include <string.h>

#include <grpc/support/alloc.h>
#include <grpc/support/log.h>
#include <grpc/support/sync.h>

Expand All @@ -61,25 +62,30 @@ gpr_mu grpc_polling_mu;
immediately in the next loop iteration.
Note: In the future, if there is a bug that involves missing wakeups in the
future, try adding a uv_async_t to kick the loop differently */
uv_timer_t dummy_uv_handle;
uv_timer_t *dummy_uv_handle;

size_t grpc_pollset_size() { return sizeof(grpc_pollset); }

void dummy_timer_cb(uv_timer_t *handle) {}

void dummy_handle_close_cb(uv_handle_t *handle) { gpr_free(handle); }

void grpc_pollset_global_init(void) {
gpr_mu_init(&grpc_polling_mu);
uv_timer_init(uv_default_loop(), &dummy_uv_handle);
dummy_uv_handle = gpr_malloc(sizeof(uv_timer_t));
uv_timer_init(uv_default_loop(), dummy_uv_handle);
grpc_pollset_work_run_loop = 1;
}

static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; }

void grpc_pollset_global_shutdown(void) {
gpr_mu_destroy(&grpc_polling_mu);
uv_close((uv_handle_t *)&dummy_uv_handle, timer_close_cb);
uv_close((uv_handle_t *)dummy_uv_handle, dummy_handle_close_cb);
}

static void timer_run_cb(uv_timer_t *timer) {}

static void timer_close_cb(uv_handle_t *handle) { handle->data = (void *)1; }

void grpc_pollset_init(grpc_pollset *pollset, gpr_mu **mu) {
*mu = &grpc_polling_mu;
uv_timer_init(uv_default_loop(), &pollset->timer);
Expand All @@ -95,7 +101,7 @@ void grpc_pollset_shutdown(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
uv_run(uv_default_loop(), UV_RUN_NOWAIT);
} else {
// kick the loop once
uv_timer_start(&dummy_uv_handle, dummy_timer_cb, 0, 0);
uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0);
}
grpc_closure_sched(exec_ctx, closure, GRPC_ERROR_NONE);
}
Expand All @@ -111,8 +117,6 @@ void grpc_pollset_destroy(grpc_pollset *pollset) {
}
}

static void timer_run_cb(uv_timer_t *timer) {}

grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,
grpc_pollset_worker **worker_hdl,
gpr_timespec now, gpr_timespec deadline) {
Expand Down Expand Up @@ -145,7 +149,7 @@ grpc_error *grpc_pollset_work(grpc_exec_ctx *exec_ctx, grpc_pollset *pollset,

grpc_error *grpc_pollset_kick(grpc_pollset *pollset,
grpc_pollset_worker *specific_worker) {
uv_timer_start(&dummy_uv_handle, dummy_timer_cb, 0, 0);
uv_timer_start(dummy_uv_handle, dummy_timer_cb, 0, 0);
return GRPC_ERROR_NONE;
}

Expand Down
52 changes: 50 additions & 2 deletions src/core/lib/iomgr/resolve_address_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include <grpc/support/host_port.h>
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>

#include "src/core/lib/iomgr/closure.h"
#include "src/core/lib/iomgr/error.h"
Expand All @@ -54,8 +55,36 @@ typedef struct request {
grpc_closure *on_done;
grpc_resolved_addresses **addresses;
struct addrinfo *hints;
char *host;
char *port;
} request;

static int retry_named_port_failure(int status, request *r,
uv_getaddrinfo_cb getaddrinfo_cb) {
if (status != 0) {
// This loop is copied from resolve_address_posix.c
char *svc[][2] = {{"http", "80"}, {"https", "443"}};
for (size_t i = 0; i < GPR_ARRAY_SIZE(svc); i++) {
if (strcmp(r->port, svc[i][0]) == 0) {
int retry_status;
uv_getaddrinfo_t *req = gpr_malloc(sizeof(uv_getaddrinfo_t));
req->data = r;
retry_status = uv_getaddrinfo(uv_default_loop(), req, getaddrinfo_cb,
r->host, svc[i][1], r->hints);
if (retry_status < 0 || getaddrinfo_cb == NULL) {
// The callback will not be called
gpr_free(req);
}
return retry_status;
}
}
}
/* If this function calls uv_getaddrinfo, it will return that function's
return value. That function only returns numbers <=0, so we can safely
return 1 to indicate that we never retried */
return 1;
}

static grpc_error *handle_addrinfo_result(int status, struct addrinfo *result,
grpc_resolved_addresses **addresses) {
struct addrinfo *resp;
Expand Down Expand Up @@ -97,13 +126,21 @@ static void getaddrinfo_callback(uv_getaddrinfo_t *req, int status,
request *r = (request *)req->data;
grpc_exec_ctx exec_ctx = GRPC_EXEC_CTX_INIT;
grpc_error *error;
int retry_status;

gpr_free(req);
retry_status = retry_named_port_failure(status, r, getaddrinfo_callback);
if (retry_status == 0) {
// The request is being retried. Nothing should be done here
return;
}
/* Either no retry was attempted, or the retry failed. Either way, the
original error probably has more interesting information */
error = handle_addrinfo_result(status, res, r->addresses);
grpc_closure_sched(&exec_ctx, r->on_done, error);
grpc_exec_ctx_finish(&exec_ctx);

gpr_free(r->hints);
gpr_free(r);
gpr_free(req);
uv_freeaddrinfo(res);
}

Expand Down Expand Up @@ -143,6 +180,7 @@ static grpc_error *blocking_resolve_address_impl(
uv_getaddrinfo_t req;
int s;
grpc_error *err;
int retry_status;

req.addrinfo = NULL;

Expand All @@ -158,6 +196,12 @@ static grpc_error *blocking_resolve_address_impl(
hints.ai_flags = AI_PASSIVE; /* for wildcard IP address */

s = uv_getaddrinfo(uv_default_loop(), &req, NULL, host, port, &hints);
request r = {
.addresses = addresses, .hints = &hints, .host = host, .port = port};
retry_status = retry_named_port_failure(s, &r, NULL);
if (retry_status <= 0) {
s = retry_status;
}
err = handle_addrinfo_result(s, req.addrinfo, addresses);

done:
Expand Down Expand Up @@ -200,6 +244,8 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
r = gpr_malloc(sizeof(request));
r->on_done = on_done;
r->addresses = addrs;
r->host = host;
r->port = port;
req = gpr_malloc(sizeof(uv_getaddrinfo_t));
req->data = r;

Expand All @@ -222,6 +268,8 @@ static void resolve_address_impl(grpc_exec_ctx *exec_ctx, const char *name,
gpr_free(r);
gpr_free(req);
gpr_free(hints);
gpr_free(host);
gpr_free(port);
}
}

Expand Down
1 change: 0 additions & 1 deletion src/core/lib/iomgr/tcp_client_uv.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ static void uv_tc_on_alarm(grpc_exec_ctx *exec_ctx, void *acp,
const char *str = grpc_error_string(error);
gpr_log(GPR_DEBUG, "CLIENT_CONNECT: %s: on_alarm: error=%s",
connect->addr_name, str);
grpc_error_free_string(str);
}
if (error == GRPC_ERROR_NONE) {
/* error == NONE implies that the timer ran out, and wasn't cancelled. If
Expand Down
7 changes: 4 additions & 3 deletions test/core/iomgr/tcp_client_uv_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static int g_connections_complete = 0;
static grpc_endpoint *g_connecting = NULL;

static gpr_timespec test_deadline(void) {
return GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
return grpc_timeout_seconds_to_deadline(10);
}

static void finish_connection() {
Expand All @@ -73,7 +73,8 @@ static void must_succeed(grpc_exec_ctx *exec_ctx, void *arg,
grpc_error *error) {
GPR_ASSERT(g_connecting != NULL);
GPR_ASSERT(error == GRPC_ERROR_NONE);
grpc_endpoint_shutdown(exec_ctx, g_connecting);
grpc_endpoint_shutdown(exec_ctx, g_connecting,
GRPC_ERROR_CREATE("must_succeed called"));
grpc_endpoint_destroy(exec_ctx, g_connecting);
g_connecting = NULL;
finish_connection();
Expand Down Expand Up @@ -133,7 +134,7 @@ void test_succeeds(void) {
"pollset_work",
grpc_pollset_work(&exec_ctx, g_pollset, &worker,
gpr_now(GPR_CLOCK_MONOTONIC),
GRPC_TIMEOUT_SECONDS_TO_DEADLINE(5))));
grpc_timeout_seconds_to_deadline(5))));
gpr_mu_unlock(g_mu);
grpc_exec_ctx_flush(&exec_ctx);
gpr_mu_lock(g_mu);
Expand Down
4 changes: 2 additions & 2 deletions test/core/iomgr/tcp_server_uv_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ static void server_weak_ref_set(server_weak_ref *weak_ref,
static void on_connect(grpc_exec_ctx *exec_ctx, void *arg, grpc_endpoint *tcp,
grpc_pollset *pollset,
grpc_tcp_server_acceptor *acceptor) {
grpc_endpoint_shutdown(exec_ctx, tcp);
grpc_endpoint_shutdown(exec_ctx, tcp, GRPC_ERROR_CREATE("Connected"));
grpc_endpoint_destroy(exec_ctx, tcp);

on_connect_result temp_result;
Expand Down Expand Up @@ -203,7 +203,7 @@ static void close_cb(uv_handle_t *handle) { gpr_free(handle); }

static void tcp_connect(grpc_exec_ctx *exec_ctx, const struct sockaddr *remote,
socklen_t remote_len, on_connect_result *result) {
gpr_timespec deadline = GRPC_TIMEOUT_SECONDS_TO_DEADLINE(10);
gpr_timespec deadline = grpc_timeout_seconds_to_deadline(10);
uv_tcp_t *client_handle = gpr_malloc(sizeof(uv_tcp_t));
uv_connect_t *req = gpr_malloc(sizeof(uv_connect_t));
int nconnects_before;
Expand Down
5 changes: 2 additions & 3 deletions test/core/util/trickle_endpoint.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
*
*/

#include "src/core/lib/iomgr/sockaddr.h"

#include "test/core/util/passthru_endpoint.h"

#include <inttypes.h>
Expand All @@ -40,9 +42,6 @@
#include <grpc/support/log.h>
#include <grpc/support/string_util.h>
#include <grpc/support/useful.h>

#include "src/core/lib/iomgr/sockaddr.h"

#include "src/core/lib/slice/slice_internal.h"

typedef struct {
Expand Down
4 changes: 3 additions & 1 deletion tools/run_tests/generated/tests.json
Original file line number Diff line number Diff line change
Expand Up @@ -1790,7 +1790,9 @@
],
"cpu_cost": 1.0,
"exclude_configs": [],
"exclude_iomgrs": [],
"exclude_iomgrs": [
"uv"
],
"flaky": false,
"gtest": false,
"language": "c",
Expand Down
6 changes: 3 additions & 3 deletions tools/run_tests/run_tests_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,19 @@ def _generate_jobs(languages, configs, platforms, iomgr_platform = 'native',
for config in configs:
name = '%s_%s_%s_%s' % (language, platform, config, iomgr_platform)
runtests_args = ['-l', language,
'-c', config]
'-c', config,
'--iomgr_platform', iomgr_platform]
if arch or compiler:
name += '_%s_%s' % (arch, compiler)
runtests_args += ['--arch', arch,
'--compiler', compiler]

runtests_args += extra_args
if platform == 'linux':
job = _docker_jobspec(name=name, runtests_args=runtests_args, inner_jobs=inner_jobs)
else:
job = _workspace_jobspec(name=name, runtests_args=runtests_args, inner_jobs=inner_jobs)

job.labels = [platform, config, language] + labels
job.labels = [platform, config, language, iomgr_platform] + labels
result.append(job)
return result

Expand Down

0 comments on commit a85e7d4

Please sign in to comment.