Skip to content

Commit

Permalink
upstream client: Handle recv() blocked scenarios (#223)
Browse files Browse the repository at this point in the history
There are two error scenarios currently that leads to cache drifts
between the management server and xds-relay.

1. There is a bug where the stream is stuck indefinitely if the send chan
is closed unexpectedly. The recv routine stays open and the waitgroup
never completes, blocking retries.

2. If the response receiver channel for `recv()` is blocked, the upstream
client is blocked from sending ACKs back to the management server.

Signed-off-by: Jess Yuen <[email protected]>
  • Loading branch information
jessicayuen authored Jan 19, 2021
1 parent 2be6003 commit 22cdc4f
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 10 deletions.
7 changes: 6 additions & 1 deletion internal/app/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (o *orchestrator) watchUpstream(
// TODO if set fails, we may need to retry upstream as well.
// Currently the fallback is to rely on a future response, but
// that probably isn't ideal.
// https://github.com/envoyproxy/xds-relay/issues/70s
// https://github.com/envoyproxy/xds-relay/issues/70
//
// If we fail to cache the new response, log and return the old one.
o.logger.With("error", err).With("aggregated_key", aggregatedKey).
Expand Down Expand Up @@ -388,6 +388,11 @@ func (o *orchestrator) fanout(resp transport.Response, watchers *cache.RequestsS
o.scope.Timer(metrics.TimerFanoutTime).Record(time.Since(start))
// Wait for all fanouts to complete.
wg.Wait()
o.logger.With(
"aggregated_key", aggregatedKey,
"response_type", resp.GetTypeURL(),
"response_version", resp.GetPayloadVersion(),
).Debug(context.Background(), "response fanout complete")
o.scope.Timer(metrics.TimerSendTime).Record(time.Since(start))
}

Expand Down
36 changes: 27 additions & 9 deletions internal/app/upstream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,12 +166,15 @@ func (m *client) handleStreamsWithRetry(
respCh chan transport.Response,
aggregatedKey string) {
var (
s grpc.ClientStream
stream transport.Stream
err error
scope tally.Scope
s grpc.ClientStream
stream transport.Stream
err error
scope tally.Scope

// childCtx completion signifies that the the client has closed the stream.
childCtx context.Context
cancel context.CancelFunc
// cancel is called during shutdown or clean up operations from the caller. This will close the child context.
cancel context.CancelFunc
)
for {
if m.timeout != 0 {
Expand Down Expand Up @@ -282,10 +285,15 @@ func send(
select {
case sig, ok := <-signal:
if !ok {
// This shouldn't happen since the signal channel is only closed during garbage
// collection, but in the case of an erroneous error, cancelling allows recv() to
// exit cleanly.
logger.With("aggregated_key", aggregatedKey).Debug(ctx, "send() chan closed")
cancelFunc()
return
}
logger.With("aggregated_key", aggregatedKey,
"version", sig.version).Debug(ctx, "sending version and nonce to upstream")
"version", sig.version).Debug(ctx, "sending version and nonce to upstream (ACK)")
// Ref: https://github.com/grpc/grpc-go/issues/1229#issuecomment-302755717
// Call SendMsg in a timeout because it can block in some cases.
err := util.DoWithTimeout(ctx, func() error {
Expand All @@ -305,6 +313,8 @@ func send(

// recv is an infinite loop which blocks on RecvMsg.
// The only ways to exit the goroutine is by cancelling the context or when an error occurs.
// response channel is used by the caller (orchestrator) to propagate responses to downstream clients.
// signal channel is read by send() in order to send an ACK to the upstream client.
func recv(
ctx context.Context,
complete func(),
Expand All @@ -316,6 +326,7 @@ func recv(
aggregatedKey string) {
defer complete()
for {
logger.With("aggregated_key", aggregatedKey).Debug(ctx, "recv(): listening for message")
resp, err := stream.RecvMsg()
if err != nil {
handleError(ctx, logger, aggregatedKey, "Error in RecvMsg", cancelFunc, err)
Expand All @@ -328,10 +339,16 @@ func recv(
"version", resp.GetPayloadVersion()).Debug(ctx, "recv() context done")
return
default:
response <- resp
logger.With("aggregated_key", aggregatedKey,
"version", resp.GetPayloadVersion()).Debug(ctx, "Received response from upstream")
signal <- &version{version: resp.GetPayloadVersion(), nonce: resp.GetNonce()}
"version", resp.GetPayloadVersion()).Debug(ctx, "received response from upstream")
// recv() will be blocking if the response channel is blocked from the receiver
// (orchestrator). Timeout after 5 seconds. TODO make receive timeout configurable
select {
case response <- resp:
signal <- &version{version: resp.GetPayloadVersion(), nonce: resp.GetNonce()}
case <-time.After(5 * time.Second):
handleError(ctx, logger, aggregatedKey, "recv() blocked on receiver", cancelFunc, err)
}
}
}
}
Expand All @@ -344,6 +361,7 @@ func handleError(ctx context.Context, logger log.Logger, key string, errMsg stri
// Context was cancelled, hence this is not an erroneous scenario.
// Context is cancelled only when shutdown is called or any of the send/recv goroutines error out.
// The shutdown can be called by the caller in many cases, during app shutdown/ttl expiry, etc
logger.With("aggregated_key", key).Debug(ctx, "context cancelled")
default:
logger.With("aggregated_key", key).Error(ctx, "%s: %s", errMsg, err.Error())
}
Expand Down

0 comments on commit 22cdc4f

Please sign in to comment.