Skip to content

Commit

Permalink
Consume/drain the inbound in case a cancellation is received before s…
Browse files Browse the repository at this point in the history
…ubscription (Azure#13493)

* Added logic to release/drain content in case a connection is cancelled before getting subscribed

* Selecting tradeoff to drain body in all cases when cancellation has happened before subscription
  • Loading branch information
kushagraThapar authored Jul 27, 2020
1 parent 089078c commit e7f151b
Showing 1 changed file with 78 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.logging.LogLevel;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
Expand All @@ -24,6 +25,7 @@
import java.nio.charset.Charset;
import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;

import static com.azure.cosmos.implementation.http.HttpClientConfig.REACTOR_NETWORK_LOG_CATEGORY;
Expand Down Expand Up @@ -102,6 +104,8 @@ public Mono<HttpResponse> send(final HttpRequest request) {
request.setReactorNettyRequestRecord(reactorNettyRequestRecord);
}

final AtomicReference<ReactorNettyHttpResponse> responseReference = new AtomicReference<>();

return this.httpClient
.observe((connection, state) -> {
Instant time = Instant.now();
Expand All @@ -116,12 +120,23 @@ public Mono<HttpResponse> send(final HttpRequest request) {
}
})
.keepAlive(this.httpClientConfig.isConnectionKeepAlive())
.port(request.port())
.request(HttpMethod.valueOf(request.httpMethod().toString()))
.uri(request.uri().toString())
.send(bodySendDelegate(request))
.responseConnection(responseDelegate(request))
.single();
.port(request.port())
.request(HttpMethod.valueOf(request.httpMethod().toString()))
.uri(request.uri().toString())
.send(bodySendDelegate(request))
.responseConnection((reactorNettyResponse, reactorNettyConnection) -> {
HttpResponse httpResponse = new ReactorNettyHttpResponse(reactorNettyResponse,
reactorNettyConnection).withRequest(request);
responseReference.set((ReactorNettyHttpResponse) httpResponse);
return Mono.just(httpResponse);
})
.doOnCancel(() -> {
ReactorNettyHttpResponse reactorNettyHttpResponse = responseReference.get();
if (reactorNettyHttpResponse != null) {
reactorNettyHttpResponse.releaseAfterCancel(request.httpMethod());
}
})
.single();
}

/**
Expand All @@ -143,17 +158,6 @@ private static BiFunction<HttpClientRequest, NettyOutbound, Publisher<Void>> bod
};
}

/**
* Delegate to receive response.
*
* @param restRequest the Rest request whose response this delegate handles
* @return a delegate upon invocation setup Rest response object
*/
private static BiFunction<HttpClientResponse, Connection, Publisher<HttpResponse>> responseDelegate(final HttpRequest restRequest) {
return (reactorNettyResponse, reactorNettyConnection) ->
Mono.just(new ReactorNettyHttpResponse(reactorNettyResponse, reactorNettyConnection).withRequest(restRequest));
}

@Override
public void shutdown() {
if (this.connectionProvider != null) {
Expand All @@ -162,6 +166,9 @@ public void shutdown() {
}

private static class ReactorNettyHttpResponse extends HttpResponse {

private final AtomicReference<ReactorNettyResponseState> state = new AtomicReference<>(ReactorNettyResponseState.NOT_SUBSCRIBED);

private final HttpClientResponse reactorNettyResponse;
private final Connection reactorNettyConnection;

Expand Down Expand Up @@ -189,22 +196,33 @@ public HttpHeaders headers() {

@Override
public Flux<ByteBuf> body() {
return bodyIntern();
return bodyIntern()
.doOnSubscribe(this::updateSubscriptionState)
.map(byteBuf -> {
byteBuf.retain();
return byteBuf;
});
}

@Override
public Mono<byte[]> bodyAsByteArray() {
return bodyIntern().aggregate().asByteArray();
return bodyIntern().aggregate()
.asByteArray()
.doOnSubscribe(this::updateSubscriptionState);
}

@Override
public Mono<String> bodyAsString() {
return bodyIntern().aggregate().asString();
return bodyIntern().aggregate()
.asString()
.doOnSubscribe(this::updateSubscriptionState);
}

@Override
public Mono<String> bodyAsString(Charset charset) {
return bodyIntern().aggregate().asString(charset);
return bodyIntern().aggregate()
.asString(charset)
.doOnSubscribe(this::updateSubscriptionState);
}

private ByteBufFlux bodyIntern() {
Expand All @@ -215,5 +233,44 @@ private ByteBufFlux bodyIntern() {
Connection internConnection() {
return reactorNettyConnection;
}

private void updateSubscriptionState(Subscription subscription) {
if (this.state.compareAndSet(ReactorNettyResponseState.NOT_SUBSCRIBED, ReactorNettyResponseState.SUBSCRIBED)) {
return;
}
// https://github.com/reactor/reactor-netty/issues/503
// FluxReceive rejects multiple subscribers, but not after a cancel().
// Subsequent subscribers after cancel() will not be rejected, but will hang instead.
// So we need to reject ones in cancelled state.
if (this.state.get() == ReactorNettyResponseState.CANCELLED) {
throw new IllegalStateException(
"The client response body has been released already due to cancellation.");
}
}

/**
* Called by {@link ReactorNettyClient} when a cancellation is detected
* but the content has not been subscribed to. If the subscription never
* materializes then the content will remain not drained. Or it could still
* materialize if the cancellation happened very early, or the response
* reading was delayed for some reason.
*/
private void releaseAfterCancel(HttpMethod method) {
if (this.state.compareAndSet(ReactorNettyResponseState.NOT_SUBSCRIBED, ReactorNettyResponseState.CANCELLED)) {
if (logger.isDebugEnabled()) {
logger.debug("Releasing body, not yet subscribed");
}
this.bodyIntern()
.doOnNext(byteBuf -> {})
.subscribe(byteBuf -> {}, ex -> {});
}
}
}

private enum ReactorNettyResponseState {
// 0 - not subscribed, 1 - subscribed, 2 - cancelled via connector (before subscribe)
NOT_SUBSCRIBED,
SUBSCRIBED,
CANCELLED;
}
}

0 comments on commit e7f151b

Please sign in to comment.