Skip to content

Commit

Permalink
Fix Signal Safety Issue (grpc#25394)
Browse files Browse the repository at this point in the history
* Decrease flake rate

* Spruce up comments
  • Loading branch information
gnossen authored Feb 21, 2021
1 parent a807466 commit ac34f55
Showing 1 changed file with 33 additions and 10 deletions.
43 changes: 33 additions & 10 deletions src/python/grpcio/grpc/_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,24 @@ def _unknown_code_details(unknown_cygrpc_code, details):
class _RPCState(object):

def __init__(self, due, initial_metadata, trailing_metadata, code, details):
# `condition` guards all members of _RPCState. `notify_all` is called on
# `condition` when the state of the RPC has changed.
self.condition = threading.Condition()

# The cygrpc.OperationType objects representing events due from the RPC's
# completion queue.
# completion queue. If an operation is in `due`, it is guaranteed that
# `operate()` has been called on a corresponding operation. But the
# converse is not true. That is, in the case of failed `operate()`
# calls, there may briefly be events in `due` that do not correspond to
# operations submitted to Core.
self.due = set(due)
self.initial_metadata = initial_metadata
self.response = None
self.trailing_metadata = trailing_metadata
self.code = code
self.details = details
self.debug_error_string = None

# The semantics of grpc.Future.cancel and grpc.Future.cancelled are
# slightly wonky, so they have to be tracked separately from the rest of the
# result of the RPC. This field tracks whether cancellation was requested
Expand Down Expand Up @@ -220,12 +228,12 @@ def consume_request_iterator(): # pylint: disable=too-many-branches
_abort(state, code, details)
return
else:
state.due.add(cygrpc.OperationType.send_message)
operations = (cygrpc.SendMessageOperation(
serialized_request, _EMPTY_FLAGS),)
operating = call.operate(operations, event_handler)
if operating:
state.due.add(cygrpc.OperationType.send_message)
else:
if not operating:
state.due.remove(cygrpc.OperationType.send_message)
return

def _done():
Expand All @@ -244,11 +252,13 @@ def _done():
return
with state.condition:
if state.code is None:
state.due.add(cygrpc.OperationType.send_close_from_client)
operations = (
cygrpc.SendCloseFromClientOperation(_EMPTY_FLAGS),)
operating = call.operate(operations, event_handler)
if operating:
state.due.add(cygrpc.OperationType.send_close_from_client)
if not operating:
state.due.remove(
cygrpc.OperationType.send_close_from_client)

consumption_thread = cygrpc.ForkManagedThread(
target=consume_request_iterator)
Expand Down Expand Up @@ -609,10 +619,22 @@ def _next_response(self):
def _next(self):
with self._state.condition:
if self._state.code is None:
# We tentatively add the operation as expected and remove
# it if the enqueue operation fails. This allows us to guarantee that
# if an event has been submitted to the core completion queue,
# it is in `due`. If we waited until after a successful
# enqueue operation then a signal could interrupt this
# thread between the enqueue operation and the addition of the
# operation to `due`. This would cause an exception on the
# channel spin thread when the operation completes and no
# corresponding operation would be present in state.due.
# Note that, since `condition` is held through this block, there is
# no data race on `due`.
self._state.due.add(cygrpc.OperationType.receive_message)
operating = self._call.operate(
(cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),), None)
if operating:
self._state.due.add(cygrpc.OperationType.receive_message)
if not operating:
self._state.due.remove(cygrpc.OperationType.receive_message)
elif self._state.code is grpc.StatusCode.OK:
raise StopIteration()
else:
Expand Down Expand Up @@ -775,11 +797,12 @@ def _next(self):
if self._state.code is None:
event_handler = _event_handler(self._state,
self._response_deserializer)
self._state.due.add(cygrpc.OperationType.receive_message)
operating = self._call.operate(
(cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
event_handler)
if operating:
self._state.due.add(cygrpc.OperationType.receive_message)
if not operating:
self._state.due.remove(cygrpc.OperationType.receive_message)
elif self._state.code is grpc.StatusCode.OK:
raise StopIteration()
else:
Expand Down

0 comments on commit ac34f55

Please sign in to comment.