Skip to content

Commit

Permalink
Merge branch '1.2.x' of github.com:micronaut-projects/micronaut-core …
Browse files Browse the repository at this point in the history
…into 1.2.x
  • Loading branch information
graemerocher committed Oct 17, 2019
2 parents 734e295 + 27b3bdb commit 1a5cf4d
Showing 1 changed file with 120 additions and 114 deletions.
234 changes: 120 additions & 114 deletions http-client/src/main/java/io/micronaut/http/client/DefaultHttpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1795,136 +1795,142 @@ private <O, E> void addFullHttpResponseHandler(

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpResponse fullResponse) {
HttpResponseStatus status = fullResponse.status();
int statusCode = status.code();
HttpStatus httpStatus;
try {
httpStatus = HttpStatus.valueOf(statusCode);
} catch (IllegalArgumentException e) {
emitter.onError(e);
return;
}

try {

HttpHeaders headers = fullResponse.headers();
if (log.isTraceEnabled()) {
log.trace("HTTP Client Response Received for Request: {} {}", request.getMethod(), request.getUri());
log.trace("Status Code: {}", status);
traceHeaders(headers);
traceBody("Response", fullResponse.content());
HttpResponseStatus status = fullResponse.status();
int statusCode = status.code();
HttpStatus httpStatus;
try {
httpStatus = HttpStatus.valueOf(statusCode);
} catch (IllegalArgumentException e) {
if (complete.compareAndSet(false, true)) {
emitter.onError(e);
} else if (LOG.isWarnEnabled()) {
LOG.warn("Unsupported http status after handler completed: " + e.getMessage(), e);
}
return;
}

// it is a redirect
if (statusCode > 300 && statusCode < 400 && configuration.isFollowRedirects() && headers.contains(HttpHeaderNames.LOCATION)) {
String location = headers.get(HttpHeaderNames.LOCATION);
Flowable<io.micronaut.http.HttpResponse<O>> redirectedRequest = exchange(io.micronaut.http.HttpRequest.GET(location), bodyType);
redirectedRequest.first(io.micronaut.http.HttpResponse.notFound())
.subscribe((oHttpResponse, throwable) -> {
if (throwable != null) {
emitter.onError(throwable);
try {
HttpHeaders headers = fullResponse.headers();
if (log.isTraceEnabled()) {
log.trace("HTTP Client Response Received for Request: {} {}", request.getMethod(), request.getUri());
log.trace("Status Code: {}", status);
traceHeaders(headers);
traceBody("Response", fullResponse.content());
}

} else {
emitter.onNext(oHttpResponse);
emitter.onComplete();
}
});
return;
}
if (statusCode == HttpStatus.NO_CONTENT.getCode()) {
// normalize the NO_CONTENT header, since http content aggregator adds it even if not present in the response
headers.remove(HttpHeaderNames.CONTENT_LENGTH);
}
boolean errorStatus = statusCode >= 400;
FullNettyClientHttpResponse<O> response
= new FullNettyClientHttpResponse<>(fullResponse, httpStatus, mediaTypeCodecRegistry, byteBufferFactory, bodyType, errorStatus);
// it is a redirect
if (statusCode > 300 && statusCode < 400 && configuration.isFollowRedirects() && headers.contains(HttpHeaderNames.LOCATION)) {
String location = headers.get(HttpHeaderNames.LOCATION);
Flowable<io.micronaut.http.HttpResponse<O>> redirectedRequest = exchange(io.micronaut.http.HttpRequest.GET(location), bodyType);
redirectedRequest.first(io.micronaut.http.HttpResponse.notFound())
.subscribe((oHttpResponse, throwable) -> {
if (throwable != null) {
emitter.onError(throwable);

} else {
emitter.onNext(oHttpResponse);
emitter.onComplete();
}
});
return;
}
if (statusCode == HttpStatus.NO_CONTENT.getCode()) {
// normalize the NO_CONTENT header, since http content aggregator adds it even if not present in the response
headers.remove(HttpHeaderNames.CONTENT_LENGTH);
}
boolean errorStatus = statusCode >= 400;
FullNettyClientHttpResponse<O> response
= new FullNettyClientHttpResponse<>(fullResponse, httpStatus, mediaTypeCodecRegistry, byteBufferFactory, bodyType, errorStatus);

if (complete.compareAndSet(false, true)) {
if (errorStatus) {
try {
HttpClientResponseException clientError;
if (errorType != HttpClient.DEFAULT_ERROR_TYPE) {
clientError = new HttpClientResponseException(
status.reasonPhrase(),
null,
response,
new HttpClientErrorDecoder() {
@Override
public Argument<?> getErrorType(MediaType mediaType) {
return errorType;
}
}
);
} else {
clientError = new HttpClientResponseException(
status.reasonPhrase(),
response
);
}
if (complete.compareAndSet(false, true)) {
if (errorStatus) {
try {
emitter.onError(clientError);
} finally {
response.onComplete();
}
} catch (Throwable t) {
if (t instanceof HttpClientResponseException) {
HttpClientResponseException clientError;
if (errorType != HttpClient.DEFAULT_ERROR_TYPE) {
clientError = new HttpClientResponseException(
status.reasonPhrase(),
null,
response,
new HttpClientErrorDecoder() {
@Override
public Argument<?> getErrorType(MediaType mediaType) {
return errorType;
}
}
);
} else {
clientError = new HttpClientResponseException(
status.reasonPhrase(),
response
);
}
try {
emitter.onError(t);
emitter.onError(clientError);
} finally {
response.onComplete();
}
} else {
response.onComplete();
FullNettyClientHttpResponse<Object> errorResponse = new FullNettyClientHttpResponse<>(
fullResponse,
httpStatus,
mediaTypeCodecRegistry,
byteBufferFactory,
null,
true
);
errorResponse.onComplete();
HttpClientResponseException clientResponseError = new HttpClientResponseException(
"Error decoding HTTP error response body: " + t.getMessage(),
t,
errorResponse,
null
);
emitter.onError(clientResponseError);
}
}
} else {
emitter.onNext(response);
response.onComplete();
emitter.onComplete();
}
}
} catch (Throwable t) {
if (complete.compareAndSet(false, true)) {
if (t instanceof HttpClientResponseException) {
emitter.onError(t);
} else {
FullNettyClientHttpResponse<Object> response = new FullNettyClientHttpResponse<>(fullResponse, httpStatus, mediaTypeCodecRegistry, byteBufferFactory, null, true);
HttpClientResponseException clientResponseError = new HttpClientResponseException(
"Error decoding HTTP response body: " + t.getMessage(),
t,
response,
new HttpClientErrorDecoder() {
@Override
public Argument<?> getErrorType(MediaType mediaType) {
return errorType;
} catch (Throwable t) {
if (t instanceof HttpClientResponseException) {
try {
emitter.onError(t);
} finally {
response.onComplete();
}
} else {
response.onComplete();
FullNettyClientHttpResponse<Object> errorResponse = new FullNettyClientHttpResponse<>(
fullResponse,
httpStatus,
mediaTypeCodecRegistry,
byteBufferFactory,
null,
true
);
errorResponse.onComplete();
HttpClientResponseException clientResponseError = new HttpClientResponseException(
"Error decoding HTTP error response body: " + t.getMessage(),
t,
errorResponse,
null
);
emitter.onError(clientResponseError);
}
);
try {
emitter.onError(clientResponseError);
} finally {
}
} else {
emitter.onNext(response);
response.onComplete();
emitter.onComplete();
}
}
} else {
if (LOG.isWarnEnabled()) {
LOG.warn("Exception fired after handler completed: " + t.getMessage(), t);
} catch (Throwable t) {
if (complete.compareAndSet(false, true)) {
if (t instanceof HttpClientResponseException) {
emitter.onError(t);
} else {
FullNettyClientHttpResponse<Object> response = new FullNettyClientHttpResponse<>(fullResponse, httpStatus, mediaTypeCodecRegistry, byteBufferFactory, null, true);
HttpClientResponseException clientResponseError = new HttpClientResponseException(
"Error decoding HTTP response body: " + t.getMessage(),
t,
response,
new HttpClientErrorDecoder() {
@Override
public Argument<?> getErrorType(MediaType mediaType) {
return errorType;
}
}
);
try {
emitter.onError(clientResponseError);
} finally {
response.onComplete();
}
}
} else {
if (LOG.isWarnEnabled()) {
LOG.warn("Exception fired after handler completed: " + t.getMessage(), t);
}
}
}
} finally {
Expand Down

0 comments on commit 1a5cf4d

Please sign in to comment.