Skip to content

Commit

Permalink
Fixing OkHttp Async Client early stream closing (Azure#8476)
Browse files Browse the repository at this point in the history
* Fixing OkHttp Async Client early stream closing

* use regex based spotbug pattern for okhttp PZLA_PREFER_ZERO_LENGTH_ARRAYS
  • Loading branch information
anuchandy authored Mar 24, 2020
1 parent 3e63ae0 commit c4421df
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1900,4 +1900,11 @@
DM_CONVERT_CASE"/>
</Match>

<!-- Empty byte array is returned as null to reactor. Reactor transform null from Callable to downstream onComplete -->
<Match>
<Class name="~com\.azure\.core\.http\.okhttp\.OkHttpAsyncHttpClient\$OkHttpResponse(.*)"/>
<Method name="~(.*)\$(getBodyAsByteArray|null)\$(.*)"/>
<Bug pattern="PZLA_PREFER_ZERO_LENGTH_ARRAYS"/>
</Match>

</FindBugsFilter>
Original file line number Diff line number Diff line change
Expand Up @@ -178,29 +178,21 @@ public void onResponse(okhttp3.Call call, okhttp3.Response response) {
private static class OkHttpResponse extends HttpResponse {
private final int statusCode;
private final HttpHeaders headers;
private final Mono<ResponseBody> responseBodyMono;
private final ResponseBody responseBody;
// using 4K as default buffer size: https://stackoverflow.com/a/237495/1473510
private static final int BYTE_BUFFER_CHUNK_SIZE = 4096;

OkHttpResponse(Response innerResponse, HttpRequest request) {
super(request);
this.statusCode = innerResponse.code();
this.headers = fromOkHttpHeaders(innerResponse.headers());
if (innerResponse.body() == null) {
// innerResponse.body() getter will not return null for server returned responses.
// It can be null:
// [a]. if response is built manually with null body (e.g for mocking)
// [b]. for the cases described here
// [ref](https://square.github.io/okhttp/4.x/okhttp/okhttp3/-response/body/).
//
this.responseBodyMono = Mono.empty();
} else {
this.responseBodyMono = Mono.using(innerResponse::body,
Mono::just,
// Resource cleanup
// square.github.io/okhttp/4.x/okhttp/okhttp3/-response-body/#the-response-body-must-be-closed
ResponseBody::close, /* Change in behavior since reactor-core 3.3.0.RELEASE */ false);
}
// innerResponse.body() getter will not return null for server returned responses.
// It can be null:
// [a]. if response is built manually with null body (e.g for mocking)
// [b]. for the cases described here
// [ref](https://square.github.io/okhttp/4.x/okhttp/okhttp3/-response/body/).
//
this.responseBody = innerResponse.body();
}

@Override
Expand All @@ -220,34 +212,59 @@ public HttpHeaders getHeaders() {

@Override
public Flux<ByteBuffer> getBody() {
return this.responseBodyMono
.flatMapMany(irb -> toFluxByteBuffer(irb.byteStream()));
}

@Override
public Mono<byte[]> getBodyAsByteArray() {
return this.responseBodyMono
.flatMap(rb -> {
if (this.responseBody == null) {
return Flux.empty();
}
// Use Flux.using to close the stream after complete emission
return Flux.using(() -> this.responseBody.byteStream(),
bodyStream -> toFluxByteBuffer(bodyStream),
bodyStream -> {
try {
byte[] content = rb.bytes();
return content.length == 0 ? Mono.empty() : Mono.just(content);
// OkHttp: The stream from ResponseBody::byteStream() has to be explicitly closed.
// https://square.github.io/okhttp/4.x/okhttp/okhttp3/-response-body/#the-response-body-must-be-closed
bodyStream.close();
} catch (IOException ioe) {
throw Exceptions.propagate(ioe);
}
});
}, false);
}

@Override
public Mono<byte[]> getBodyAsByteArray() {
return Mono.fromCallable(() -> {
// Reactor: The fromCallable operator treats a null from the Callable
// as completion signal.
if (responseBody == null) {
return null;
}
byte[] content = responseBody.bytes();
// Consistent with GAed behaviour.
if (content.length == 0) {
return null;
}
// OkHttp: When calling ResponseBody::bytes() the underlying stream automatically closed.
// https://square.github.io/okhttp/4.x/okhttp/okhttp3/-response-body/#the-response-body-must-be-closed
return content;
});
}

@Override
public Mono<String> getBodyAsString() {
return this.responseBodyMono
.flatMap(rb -> {
try {
String content = rb.string();
return content.length() == 0 ? Mono.empty() : Mono.just(content);
} catch (IOException ioe) {
throw Exceptions.propagate(ioe);
}
});
return Mono.fromCallable(() -> {
// Reactor: The fromCallable operator treats a null from the Callable
// as completion signal.
if (responseBody == null) {
return null;
}
String content = responseBody.string();
// Consistent with GAed behaviour.
if (content.length() == 0) {
return null;
}
// OkHttp: When calling ResponseBody::string() the underlying stream automatically closed.
// https://square.github.io/okhttp/4.x/okhttp/okhttp3/-response-body/#the-response-body-must-be-closed
return content;
});
}

@Override
Expand All @@ -258,7 +275,10 @@ public Mono<String> getBodyAsString(Charset charset) {

@Override
public void close() {
this.responseBodyMono.subscribe().dispose();
if (this.responseBody != null) {
// It's safe to invoke close() multiple times, additional calls will be ignored.
this.responseBody.close();
}
}

/**
Expand Down

0 comments on commit c4421df

Please sign in to comment.