Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into import
Browse files Browse the repository at this point in the history
  • Loading branch information
yang-g committed Jun 22, 2016
2 parents 0f852da + 19cce59 commit 3a90ddd
Show file tree
Hide file tree
Showing 32 changed files with 1,249 additions and 91 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6389,6 +6389,7 @@ LIBEND2END_TESTS_SRC = \
test/core/end2end/tests/simple_delayed_request.c \
test/core/end2end/tests/simple_metadata.c \
test/core/end2end/tests/simple_request.c \
test/core/end2end/tests/streaming_error_response.c \
test/core/end2end/tests/trailing_metadata.c \

PUBLIC_HEADERS_C += \
Expand Down Expand Up @@ -6465,6 +6466,7 @@ LIBEND2END_NOSEC_TESTS_SRC = \
test/core/end2end/tests/simple_delayed_request.c \
test/core/end2end/tests/simple_metadata.c \
test/core/end2end/tests/simple_request.c \
test/core/end2end/tests/streaming_error_response.c \
test/core/end2end/tests/trailing_metadata.c \

PUBLIC_HEADERS_C += \
Expand Down
10 changes: 6 additions & 4 deletions src/core/ext/transport/chttp2/transport/chttp2_transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ static void init_transport(grpc_exec_ctx *exec_ctx, grpc_chttp2_transport *t,
t->parsing.is_client = is_client;
t->parsing.deframe_state =
is_client ? GRPC_DTS_FH_0 : GRPC_DTS_CLIENT_PREFIX_0;
t->parsing.is_first_frame = true;
t->writing.is_client = is_client;
t->optional_drop_message = gpr_empty_slice();
grpc_connectivity_state_init(
Expand Down Expand Up @@ -643,8 +644,7 @@ static void finish_global_actions(grpc_exec_ctx *exec_ctx,

for (;;) {
if (!t->executor.writing_active && !t->closed &&
grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing,
t->executor.parsing_active)) {
grpc_chttp2_unlocking_check_writes(exec_ctx, &t->global, &t->writing)) {
t->executor.writing_active = 1;
REF_TRANSPORT(t, "writing");
prevent_endpoint_shutdown(t);
Expand Down Expand Up @@ -1093,6 +1093,7 @@ static void perform_stream_op_locked(grpc_exec_ctx *exec_ctx,
stream_global->recv_trailing_metadata_finished =
add_closure_barrier(on_complete);
stream_global->recv_trailing_metadata = op->recv_trailing_metadata;
stream_global->final_metadata_requested = true;
grpc_chttp2_list_add_check_read_ops(transport_global, stream_global);
}

Expand Down Expand Up @@ -1246,7 +1247,8 @@ static void check_read_ops(grpc_exec_ctx *exec_ctx,
stream_global->recv_initial_metadata_ready = NULL;
}
if (stream_global->recv_message_ready != NULL) {
while (stream_global->seen_error &&
while (stream_global->final_metadata_requested &&
stream_global->seen_error &&
(bs = grpc_chttp2_incoming_frame_queue_pop(
&stream_global->incoming_frames)) != NULL) {
incoming_byte_stream_destroy_locked(exec_ctx, NULL, NULL, bs);
Expand Down Expand Up @@ -1805,7 +1807,7 @@ static void post_reading_action_locked(grpc_exec_ctx *exec_ctx,
UNREF_TRANSPORT(exec_ctx, t, "reading_action");
}

GRPC_ERROR_UNREF(error);
GRPC_LOG_IF_ERROR("close_transport", error);
}

/*******************************************************************************
Expand Down
14 changes: 8 additions & 6 deletions src/core/ext/transport/chttp2/transport/frame_rst_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,14 @@ grpc_error *grpc_chttp2_rst_stream_parser_parse(
if (p->byte == 4) {
GPR_ASSERT(is_last);
stream_parsing->received_close = 1;
stream_parsing->forced_close_error = grpc_error_set_int(
GRPC_ERROR_CREATE("RST_STREAM"), GRPC_ERROR_INT_HTTP2_ERROR,
(intptr_t)((((uint32_t)p->reason_bytes[0]) << 24) |
(((uint32_t)p->reason_bytes[1]) << 16) |
(((uint32_t)p->reason_bytes[2]) << 8) |
(((uint32_t)p->reason_bytes[3]))));
if (stream_parsing->forced_close_error == GRPC_ERROR_NONE) {
stream_parsing->forced_close_error = grpc_error_set_int(
GRPC_ERROR_CREATE("RST_STREAM"), GRPC_ERROR_INT_HTTP2_ERROR,
(intptr_t)((((uint32_t)p->reason_bytes[0]) << 24) |
(((uint32_t)p->reason_bytes[1]) << 16) |
(((uint32_t)p->reason_bytes[2]) << 8) |
(((uint32_t)p->reason_bytes[3]))));
}
}

return GRPC_ERROR_NONE;
Expand Down
5 changes: 3 additions & 2 deletions src/core/ext/transport/chttp2/transport/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ struct grpc_chttp2_transport_parsing {
uint8_t incoming_frame_type;
uint8_t incoming_frame_flags;
uint8_t header_eof;
bool is_first_frame;
uint32_t expect_continuation_stream_id;
uint32_t incoming_frame_size;
uint32_t incoming_stream_id;
Expand Down Expand Up @@ -440,6 +441,7 @@ typedef struct {

bool published_initial_metadata;
bool published_trailing_metadata;
bool final_metadata_requested;

grpc_chttp2_incoming_metadata_buffer received_initial_metadata;
grpc_chttp2_incoming_metadata_buffer received_trailing_metadata;
Expand Down Expand Up @@ -524,8 +526,7 @@ struct grpc_chttp2_stream {
are required, and schedule them if so */
int grpc_chttp2_unlocking_check_writes(grpc_exec_ctx *exec_ctx,
grpc_chttp2_transport_global *global,
grpc_chttp2_transport_writing *writing,
int is_parsing);
grpc_chttp2_transport_writing *writing);
void grpc_chttp2_perform_writes(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_writing *transport_writing,
grpc_endpoint *endpoint);
Expand Down
11 changes: 11 additions & 0 deletions src/core/ext/transport/chttp2/transport/parsing.c
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,17 @@ grpc_error *grpc_chttp2_perform_read(

static grpc_error *init_frame_parser(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_parsing *transport_parsing) {
if (transport_parsing->is_first_frame &&
transport_parsing->incoming_frame_type != GRPC_CHTTP2_FRAME_SETTINGS) {
char *msg;
gpr_asprintf(
&msg, "Expected SETTINGS frame as the first frame, got frame type %d",
transport_parsing->incoming_frame_type);
grpc_error *err = GRPC_ERROR_CREATE(msg);
gpr_free(msg);
return err;
}
transport_parsing->is_first_frame = false;
if (transport_parsing->expect_continuation_stream_id != 0) {
if (transport_parsing->incoming_frame_type !=
GRPC_CHTTP2_FRAME_CONTINUATION) {
Expand Down
4 changes: 2 additions & 2 deletions src/core/ext/transport/chttp2/transport/writing.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ static void finalize_outbuf(grpc_exec_ctx *exec_ctx,

int grpc_chttp2_unlocking_check_writes(
grpc_exec_ctx *exec_ctx, grpc_chttp2_transport_global *transport_global,
grpc_chttp2_transport_writing *transport_writing, int is_parsing) {
grpc_chttp2_transport_writing *transport_writing) {
grpc_chttp2_stream_global *stream_global;
grpc_chttp2_stream_writing *stream_writing;

Expand All @@ -61,7 +61,7 @@ int grpc_chttp2_unlocking_check_writes(
[GRPC_CHTTP2_SETTINGS_HEADER_TABLE_SIZE]);

if (transport_global->dirtied_local_settings &&
!transport_global->sent_local_settings && !is_parsing) {
!transport_global->sent_local_settings) {
gpr_slice_buffer_add(
&transport_writing->outbuf,
grpc_chttp2_settings_create(
Expand Down
57 changes: 46 additions & 11 deletions src/python/grpcio/grpc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,9 +436,7 @@ class UnaryUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):
"""Affords invoking a unary-unary RPC."""

@abc.abstractmethod
def __call__(
self, request, timeout=None, metadata=None, credentials=None,
with_call=False):
def __call__(self, request, timeout=None, metadata=None, credentials=None):
"""Synchronously invokes the underlying RPC.
Args:
Expand All @@ -447,12 +445,30 @@ def __call__(
metadata: An optional sequence of pairs of bytes to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
with_call: Whether or not to include return a Call for the RPC in addition
to the response.
Returns:
The response value for the RPC, and a Call for the RPC if with_call was
set to True at invocation.
The response value for the RPC.
Raises:
RpcError: Indicating that the RPC terminated with non-OK status. The
raised RpcError will also be a Call for the RPC affording the RPC's
metadata, status code, and details.
"""
raise NotImplementedError()

@abc.abstractmethod
def with_call(self, request, timeout=None, metadata=None, credentials=None):
"""Synchronously invokes the underlying RPC.
Args:
request: The request value for the RPC.
timeout: An optional durating of time in seconds to allow for the RPC.
metadata: An optional sequence of pairs of bytes to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
Returns:
The response value for the RPC and a Call value for the RPC.
Raises:
RpcError: Indicating that the RPC terminated with non-OK status. The
Expand Down Expand Up @@ -508,8 +524,7 @@ class StreamUnaryMultiCallable(six.with_metaclass(abc.ABCMeta)):

@abc.abstractmethod
def __call__(
self, request_iterator, timeout=None, metadata=None, credentials=None,
with_call=False):
self, request_iterator, timeout=None, metadata=None, credentials=None):
"""Synchronously invokes the underlying RPC.
Args:
Expand All @@ -518,8 +533,6 @@ def __call__(
metadata: An optional sequence of pairs of bytes to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
with_call: Whether or not to include return a Call for the RPC in addition
to the response.
Returns:
The response value for the RPC, and a Call for the RPC if with_call was
Expand All @@ -532,6 +545,28 @@ def __call__(
"""
raise NotImplementedError()

@abc.abstractmethod
def with_call(
self, request_iterator, timeout=None, metadata=None, credentials=None):
"""Synchronously invokes the underlying RPC.
Args:
request_iterator: An iterator that yields request values for the RPC.
timeout: An optional duration of time in seconds to allow for the RPC.
metadata: An optional sequence of pairs of bytes to be transmitted to the
service-side of the RPC.
credentials: An optional CallCredentials for the RPC.
Returns:
The response value for the RPC and a Call for the RPC.
Raises:
RpcError: Indicating that the RPC terminated with non-OK status. The
raised RpcError will also be a Call for the RPC affording the RPC's
metadata, status code, and details.
"""
raise NotImplementedError()

@abc.abstractmethod
def future(
self, request_iterator, timeout=None, metadata=None, credentials=None):
Expand Down
32 changes: 24 additions & 8 deletions src/python/grpcio/grpc/_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,7 @@ def _prepare(self, request, timeout, metadata):
)
return state, operations, deadline, deadline_timespec, None

def __call__(
self, request, timeout=None, metadata=None, credentials=None,
with_call=False):
def _blocking(self, request, timeout, metadata, credentials):
state, operations, deadline, deadline_timespec, rendezvous = self._prepare(
request, timeout, metadata)
if rendezvous:
Expand All @@ -464,7 +462,15 @@ def __call__(
call.set_credentials(credentials._credentials)
call.start_batch(cygrpc.Operations(operations), None)
_handle_event(completion_queue.poll(), state, self._response_deserializer)
return _end_unary_response_blocking(state, with_call, deadline)
return state, deadline

def __call__(self, request, timeout=None, metadata=None, credentials=None):
state, deadline, = self._blocking(request, timeout, metadata, credentials)
return _end_unary_response_blocking(state, False, deadline)

def with_call(self, request, timeout=None, metadata=None, credentials=None):
state, deadline, = self._blocking(request, timeout, metadata, credentials)
return _end_unary_response_blocking(state, True, deadline)

def future(self, request, timeout=None, metadata=None, credentials=None):
state, operations, deadline, deadline_timespec, rendezvous = self._prepare(
Expand Down Expand Up @@ -532,9 +538,7 @@ def __init__(
self._request_serializer = request_serializer
self._response_deserializer = response_deserializer

def __call__(
self, request_iterator, timeout=None, metadata=None, credentials=None,
with_call=False):
def _blocking(self, request_iterator, timeout, metadata, credentials):
deadline, deadline_timespec = _deadline(timeout)
state = _RPCState(_STREAM_UNARY_INITIAL_DUE, None, None, None, None)
completion_queue = cygrpc.CompletionQueue()
Expand Down Expand Up @@ -563,7 +567,19 @@ def __call__(
state.condition.notify_all()
if not state.due:
break
return _end_unary_response_blocking(state, with_call, deadline)
return state, deadline

def __call__(
self, request_iterator, timeout=None, metadata=None, credentials=None):
state, deadline, = self._blocking(
request_iterator, timeout, metadata, credentials)
return _end_unary_response_blocking(state, False, deadline)

def with_call(
self, request_iterator, timeout=None, metadata=None, credentials=None):
state, deadline, = self._blocking(
request_iterator, timeout, metadata, credentials)
return _end_unary_response_blocking(state, True, deadline)

def future(
self, request_iterator, timeout=None, metadata=None, credentials=None):
Expand Down
8 changes: 4 additions & 4 deletions src/python/grpcio/grpc/beta/_client_adaptations.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ def _blocking_unary_unary(
response_deserializer=response_deserializer)
effective_metadata = _effective_metadata(metadata, metadata_transformer)
if with_call:
response, call = multi_callable(
response, call = multi_callable.with_call(
request, timeout=timeout, metadata=effective_metadata,
credentials=_credentials(protocol_options), with_call=True)
credentials=_credentials(protocol_options))
return response, _Rendezvous(None, None, call)
else:
return multi_callable(
Expand Down Expand Up @@ -237,9 +237,9 @@ def _blocking_stream_unary(
response_deserializer=response_deserializer)
effective_metadata = _effective_metadata(metadata, metadata_transformer)
if with_call:
response, call = multi_callable(
response, call = multi_callable.with_call(
request_iterator, timeout=timeout, metadata=effective_metadata,
credentials=_credentials(protocol_options), with_call=True)
credentials=_credentials(protocol_options))
return response, _Rendezvous(None, None, call)
else:
return multi_callable(
Expand Down
4 changes: 2 additions & 2 deletions src/python/grpcio/tests/unit/_channel_connectivity_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,12 @@ def test_immediately_connectable_channel_connectivity(self):
self.assertNotIn(
grpc.ChannelConnectivity.TRANSIENT_FAILURE, third_connectivities)
self.assertNotIn(
grpc.ChannelConnectivity.FATAL_FAILURE, third_connectivities)
grpc.ChannelConnectivity.SHUTDOWN, third_connectivities)
self.assertNotIn(
grpc.ChannelConnectivity.TRANSIENT_FAILURE,
fourth_connectivities)
self.assertNotIn(
grpc.ChannelConnectivity.FATAL_FAILURE, fourth_connectivities)
grpc.ChannelConnectivity.SHUTDOWN, fourth_connectivities)

def test_reachable_then_unreachable_channel_connectivity(self):
server = _server.Server((), futures.ThreadPoolExecutor(max_workers=0))
Expand Down
8 changes: 4 additions & 4 deletions src/python/grpcio/tests/unit/_metadata_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ def tearDown(self):

def testUnaryUnary(self):
multi_callable = self._channel.unary_unary(_UNARY_UNARY)
unused_response, call = multi_callable(
_REQUEST, metadata=_CLIENT_METADATA, with_call=True)
unused_response, call = multi_callable.with_call(
_REQUEST, metadata=_CLIENT_METADATA)
self.assertTrue(test_common.metadata_transmitted(
_SERVER_INITIAL_METADATA, call.initial_metadata()))
self.assertTrue(test_common.metadata_transmitted(
Expand All @@ -192,9 +192,9 @@ def testUnaryStream(self):

def testStreamUnary(self):
multi_callable = self._channel.stream_unary(_STREAM_UNARY)
unused_response, call = multi_callable(
unused_response, call = multi_callable.with_call(
[_REQUEST] * test_constants.STREAM_LENGTH,
metadata=_CLIENT_METADATA, with_call=True)
metadata=_CLIENT_METADATA)
self.assertTrue(test_common.metadata_transmitted(
_SERVER_INITIAL_METADATA, call.initial_metadata()))
self.assertTrue(test_common.metadata_transmitted(
Expand Down
Loading

0 comments on commit 3a90ddd

Please sign in to comment.