diff --git a/test/core/end2end/fixtures/http_proxy_fixture.cc b/test/core/end2end/fixtures/http_proxy_fixture.cc index ac0c953a79f49..1cac0e98de55b 100644 --- a/test/core/end2end/fixtures/http_proxy_fixture.cc +++ b/test/core/end2end/fixtures/http_proxy_fixture.cc @@ -68,6 +68,15 @@ struct grpc_end2end_http_proxy { // Connection handling // +#define SERVER_EP_READ_FAIL 1 +#define SERVER_EP_WRITE_FAIL 2 +#define CLIENT_EP_READ_FAIL 4 +#define CLIENT_EP_WRITE_FAIL 8 + +#define SERVER_EP_FAIL (SERVER_EP_READ_FAIL | SERVER_EP_WRITE_FAIL) +#define CLIENT_EP_FAIL (CLIENT_EP_READ_FAIL | CLIENT_EP_WRITE_FAIL) +#define EP_FAIL (SERVER_EP_FAIL | CLIENT_EP_FAIL) + typedef struct proxy_connection { grpc_end2end_http_proxy* proxy; @@ -76,6 +85,13 @@ typedef struct proxy_connection { gpr_refcount refcount; + // The lower four bits store the endpoint failure information + // bit-0 Server endpoint read failure + // bit-1 Server endpoint write failure + // bit-2 Client endpoint read failure + // bit-3 Client endpoint write failure + gpr_atm ep_state; + grpc_pollset_set* pollset_set; grpc_closure on_read_request_done; @@ -132,17 +148,50 @@ static void proxy_connection_unref(grpc_exec_ctx* exec_ctx, // Helper function to shut down the proxy connection. // Does NOT take ownership of a reference to error. static void proxy_connection_failed(grpc_exec_ctx* exec_ctx, - proxy_connection* conn, bool is_client, + proxy_connection* conn, int failure_type, const char* prefix, grpc_error* error) { const char* msg = grpc_error_string(error); gpr_log(GPR_INFO, "%s: %s", prefix, msg); - grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint, - GRPC_ERROR_REF(error)); - if (conn->server_endpoint != nullptr) { - grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint, - GRPC_ERROR_REF(error)); + gpr_atm ep_state; + gpr_atm new_ep_state; + while (true) { + ep_state = gpr_atm_no_barrier_load(&conn->ep_state); + new_ep_state = ep_state | failure_type; + + if (ep_state == new_ep_state) { + break; + } + + if (!gpr_atm_no_barrier_cas(&conn->ep_state, ep_state, new_ep_state)) { + continue; + } + + // failure_type is successfully set and new_ep_state != ep_state at this + // point + + // Shutdown the endpoint (client and/or server) if both read and write + // failures are observed after setting the failure_type. + // To prevent calling endpoint shutdown multiple times, It is important to + // ensure that ep_state i.e the old state, did not already have both + // failures set. + if (((ep_state & SERVER_EP_FAIL) != SERVER_EP_FAIL) && + ((new_ep_state & SERVER_EP_FAIL) == SERVER_EP_FAIL)) { + if (conn->server_endpoint != nullptr) { + grpc_endpoint_shutdown(exec_ctx, conn->server_endpoint, + GRPC_ERROR_REF(error)); + } + } + + if (((ep_state & CLIENT_EP_FAIL) != CLIENT_EP_FAIL) && + ((new_ep_state & CLIENT_EP_FAIL) == CLIENT_EP_FAIL)) { + grpc_endpoint_shutdown(exec_ctx, conn->client_endpoint, + GRPC_ERROR_REF(error)); + } + + break; } + proxy_connection_unref(exec_ctx, conn, "conn_failed"); } @@ -152,7 +201,7 @@ static void on_client_write_done(grpc_exec_ctx* exec_ctx, void* arg, proxy_connection* conn = (proxy_connection*)arg; conn->client_is_writing = false; if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(exec_ctx, conn, true /* is_client */, + proxy_connection_failed(exec_ctx, conn, CLIENT_EP_WRITE_FAIL, "HTTP proxy client write", error); return; } @@ -179,7 +228,7 @@ static void on_server_write_done(grpc_exec_ctx* exec_ctx, void* arg, proxy_connection* conn = (proxy_connection*)arg; conn->server_is_writing = false; if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(exec_ctx, conn, false /* is_client */, + proxy_connection_failed(exec_ctx, conn, SERVER_EP_WRITE_FAIL, "HTTP proxy server write", error); return; } @@ -206,7 +255,7 @@ static void on_client_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(exec_ctx, conn, true /* is_client */, + proxy_connection_failed(exec_ctx, conn, CLIENT_EP_READ_FAIL, "HTTP proxy client read", error); return; } @@ -239,7 +288,7 @@ static void on_server_read_done(grpc_exec_ctx* exec_ctx, void* arg, grpc_error* error) { proxy_connection* conn = (proxy_connection*)arg; if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(exec_ctx, conn, false /* is_client */, + proxy_connection_failed(exec_ctx, conn, SERVER_EP_READ_FAIL, "HTTP proxy server read", error); return; } @@ -272,7 +321,7 @@ static void on_write_response_done(grpc_exec_ctx* exec_ctx, void* arg, proxy_connection* conn = (proxy_connection*)arg; conn->client_is_writing = false; if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(exec_ctx, conn, true /* is_client */, + proxy_connection_failed(exec_ctx, conn, EP_FAIL, "HTTP proxy write response", error); return; } @@ -301,7 +350,7 @@ static void on_server_connect_done(grpc_exec_ctx* exec_ctx, void* arg, // connection failed. However, for the purposes of this test code, // it's fine to pretend this is a client-side error, which will // cause the client connection to be dropped. - proxy_connection_failed(exec_ctx, conn, true /* is_client */, + proxy_connection_failed(exec_ctx, conn, EP_FAIL, "HTTP proxy server connect", error); return; } @@ -351,8 +400,8 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, gpr_log(GPR_DEBUG, "on_read_request_done: %p %s", conn, grpc_error_string(error)); if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(exec_ctx, conn, true /* is_client */, - "HTTP proxy read request", error); + proxy_connection_failed(exec_ctx, conn, EP_FAIL, "HTTP proxy read request", + error); return; } // Read request and feed it to the parser. @@ -361,7 +410,7 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, error = grpc_http_parser_parse( &conn->http_parser, conn->client_read_buffer.slices[i], nullptr); if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(exec_ctx, conn, true /* is_client */, + proxy_connection_failed(exec_ctx, conn, EP_FAIL, "HTTP proxy request parse", error); GRPC_ERROR_UNREF(error); return; @@ -382,8 +431,8 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, conn->http_request.method); error = GRPC_ERROR_CREATE_FROM_COPIED_STRING(msg); gpr_free(msg); - proxy_connection_failed(exec_ctx, conn, true /* is_client */, - "HTTP proxy read request", error); + proxy_connection_failed(exec_ctx, conn, EP_FAIL, "HTTP proxy read request", + error); GRPC_ERROR_UNREF(error); return; } @@ -403,7 +452,7 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, if (!client_authenticated) { const char* msg = "HTTP Connect could not verify authentication"; error = GRPC_ERROR_CREATE_FROM_STATIC_STRING(msg); - proxy_connection_failed(exec_ctx, conn, true /* is_client */, + proxy_connection_failed(exec_ctx, conn, EP_FAIL, "HTTP proxy read request", error); GRPC_ERROR_UNREF(error); return; @@ -414,8 +463,8 @@ static void on_read_request_done(grpc_exec_ctx* exec_ctx, void* arg, error = grpc_blocking_resolve_address(conn->http_request.path, "80", &resolved_addresses); if (error != GRPC_ERROR_NONE) { - proxy_connection_failed(exec_ctx, conn, true /* is_client */, - "HTTP proxy DNS lookup", error); + proxy_connection_failed(exec_ctx, conn, EP_FAIL, "HTTP proxy DNS lookup", + error); GRPC_ERROR_UNREF(error); return; } @@ -441,6 +490,7 @@ static void on_accept(grpc_exec_ctx* exec_ctx, void* arg, conn->client_endpoint = endpoint; conn->proxy = proxy; gpr_ref_init(&conn->refcount, 1); + gpr_atm_no_barrier_store(&conn->ep_state, 0); conn->pollset_set = grpc_pollset_set_create(); grpc_pollset_set_add_pollset(exec_ctx, conn->pollset_set, proxy->pollset); grpc_endpoint_add_to_pollset_set(exec_ctx, endpoint, conn->pollset_set);