Skip to content

Commit

Permalink
Fix issues with ChunkedInputStream when using Apache Connector (eclip…
Browse files Browse the repository at this point in the history
…se-ee4j#4338)

* Set Apache Connector behaviour for Apache Http Client prior 4.5.1 to behaviour in Jersey 2.28
Keep behaviour of Jersey 2.29 for Apache HttpClient 4.5.1+

Signed-off-by: Jan Supol <[email protected]>
  • Loading branch information
jansupol authored Dec 17, 2019
1 parent e59b36e commit 51abc78
Show file tree
Hide file tree
Showing 7 changed files with 385 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,15 @@ public final class ApacheClientProperties {
*/
public static final String KEEPALIVE_STRATEGY = "jersey.config.apache.client.keepAliveStrategy";


/**
* Strategy that closes the Apache Connection. Accepts an instance of {@link ApacheConnectionClosingStrategy}.
*
* @see ApacheConnectionClosingStrategy
* @since 2.30
*/
public static final String CONNECTION_CLOSING_STRATEGY = "jersey.config.apache.client.connectionClosingStrategy";

/**
* Get the value of the specified property.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (c) 2019 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/

package org.glassfish.jersey.apache.connector;

import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
import org.glassfish.jersey.client.ClientRequest;

import java.io.IOException;
import java.io.InputStream;

/**
* /**
* Strategy that defines the way the Apache client releases resources. The client enables closing the content stream
* and the response. From the Apache documentation:
* <pre>
* The difference between closing the content stream and closing the response is that
* the former will attempt to keep the underlying connection alive by consuming the
* entity content while the latter immediately shuts down and discards the connection.
* </pre>
* With Apache Client before 4.5.1, it was ok to close the response and the content stream. This is the default for
* Apache Client 4.5 and older.
* <p/>
* For Apache Client 4.5.1+, first the content stream and the response is should be closed.
* <p/>
* In the case of Chunk content stream, the stream is not closed on the server side, and the client can hung on reading
* the closing chunk. Using the {@link org.glassfish.jersey.client.ClientProperties#READ_TIMEOUT} property can prevent
* this hanging forever and the reading of the closing chunk is terminated when the time is out. The other option, when
* the timeout is not set, is to abort the Apache client request. This is the default for Apache Client 4.5.1+ when the
* read timeout is not set.
* <p/>
* Another option is not to close the content stream, which is possible by the Apache client documentation. In this case,
* however, the server side may not be notified and would not not close its chunk stream.
*/
public interface ApacheConnectionClosingStrategy {
/**
* Method to close the connection.
* @param clientRequest The {@link ClientRequest} to get {@link ClientRequest#getConfiguration() configuration},
* and {@link ClientRequest#resolveProperty(String, Class) resolve properties}.
* @param request Apache {@code HttpUriRequest} that can be {@code abort}ed.
* @param response Apache {@code CloseableHttpResponse} that can be {@code close}d.
* @param stream The entity stream that can be {@link InputStream#close() closed}.
* @throws IOException In case of some of the closing methods throws {@link IOException}
*/
void close(ClientRequest clientRequest, HttpUriRequest request, CloseableHttpResponse response, InputStream stream)
throws IOException;

/**
* Strategy that aborts Apache HttpRequests for the case of Chunked Stream, closes the stream, and response next.
*/
class GracefulClosingStrategy implements ApacheConnectionClosingStrategy {
static final GracefulClosingStrategy INSTANCE = new GracefulClosingStrategy();

@Override
public void close(ClientRequest clientRequest, HttpUriRequest request, CloseableHttpResponse response, InputStream stream)
throws IOException {
if (response.getEntity() != null && response.getEntity().isChunked()) {
request.abort();
}
try {
stream.close();
} catch (IOException ex) {
// Ignore
} finally {
response.close();
}
}
}

/**
* Strategy that closes the response and content stream next. This is a behaviour of Jersey 2.28.
*/
class ImmediateClosingStrategy implements ApacheConnectionClosingStrategy {
static final ImmediateClosingStrategy INSTANCE = new ImmediateClosingStrategy();

@Override
public void close(ClientRequest clientRequest, HttpUriRequest request, CloseableHttpResponse response, InputStream stream)
throws IOException {
response.close();
stream.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@
import java.io.OutputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Client;
Expand Down Expand Up @@ -173,7 +174,6 @@
class ApacheConnector implements Connector {

private static final Logger LOGGER = Logger.getLogger(ApacheConnector.class.getName());

private static final VersionInfo vi;
private static final String release;

Expand Down Expand Up @@ -325,14 +325,16 @@ class ApacheConnector implements Connector {
}
clientBuilder.setDefaultRequestConfig(requestConfig);

Optional<Object> contract = config.getInstances().stream()
.filter(a -> ApacheHttpClientBuilderConfigurator.class.isInstance(a)).findFirst();
LinkedList<Object> contracts = config.getInstances().stream()
.filter(ApacheHttpClientBuilderConfigurator.class::isInstance)
.collect(Collectors.toCollection(LinkedList::new));

final HttpClientBuilder configuredBuilder = contract.isPresent()
? ((ApacheHttpClientBuilderConfigurator) contract.get()).configure(clientBuilder)
: null;
HttpClientBuilder configuredBuilder = clientBuilder;
for (Object configurator : contracts) {
configuredBuilder = ((ApacheHttpClientBuilderConfigurator) configurator).configure(configuredBuilder);
}

this.client = configuredBuilder != null ? configuredBuilder.build() : clientBuilder.build();
this.client = configuredBuilder.build();
}

private HttpClientConnectionManager getConnectionManager(final Client client,
Expand Down Expand Up @@ -515,7 +517,8 @@ public ClientResponse apply(final ClientRequest clientRequest) throws Processing
}

try {
responseContext.setEntityStream(getInputStream(response));
final ConnectionClosingMechanism closingMechanism = new ConnectionClosingMechanism(clientRequest, request);
responseContext.setEntityStream(getInputStream(response, closingMechanism));
} catch (final IOException e) {
LOGGER.log(Level.SEVERE, null, e);
}
Expand Down Expand Up @@ -655,8 +658,8 @@ private static Map<String, String> writeOutBoundHeaders(final ClientRequest clie
return stringHeaders;
}

private static InputStream getInputStream(final CloseableHttpResponse response) throws IOException {

private static InputStream getInputStream(final CloseableHttpResponse response,
final ConnectionClosingMechanism closingMechanism) throws IOException {
final InputStream inputStream;

if (response.getEntity() == null) {
Expand All @@ -670,18 +673,57 @@ private static InputStream getInputStream(final CloseableHttpResponse response)
}
}

return new FilterInputStream(inputStream) {
@Override
public void close() throws IOException {
try {
super.close();
} catch (IOException ex) {
// Ignore
} finally {
response.close();
return closingMechanism.getEntityStream(inputStream, response);
}

/**
* The way the Apache CloseableHttpResponse is to be closed.
* See https://github.com/eclipse-ee4j/jersey/issues/4321
* {@link ApacheClientProperties#CONNECTION_CLOSING_STRATEGY}
*/
private final class ConnectionClosingMechanism {
private ApacheConnectionClosingStrategy connectionClosingStrategy = null;
private final ClientRequest clientRequest;
private final HttpUriRequest apacheRequest;

private ConnectionClosingMechanism(ClientRequest clientRequest, HttpUriRequest apacheRequest) {
this.clientRequest = clientRequest;
this.apacheRequest = apacheRequest;
Object closingStrategyProperty = clientRequest
.resolveProperty(ApacheClientProperties.CONNECTION_CLOSING_STRATEGY, Object.class);
if (closingStrategyProperty != null) {
if (ApacheConnectionClosingStrategy.class.isInstance(closingStrategyProperty)) {
connectionClosingStrategy = (ApacheConnectionClosingStrategy) closingStrategyProperty;
} else {
LOGGER.log(
Level.WARNING,
LocalizationMessages.IGNORING_VALUE_OF_PROPERTY(
ApacheClientProperties.CONNECTION_CLOSING_STRATEGY,
closingStrategyProperty,
ApacheConnectionClosingStrategy.class.getName())
);
}
}
};

if (connectionClosingStrategy == null) {
if (vi.getRelease().compareTo("4.5") > 0) {
connectionClosingStrategy = ApacheConnectionClosingStrategy.GracefulClosingStrategy.INSTANCE;
} else {
connectionClosingStrategy = ApacheConnectionClosingStrategy.ImmediateClosingStrategy.INSTANCE;
}
}
}

private InputStream getEntityStream(final InputStream inputStream,
final CloseableHttpResponse response) {
InputStream filterStream = new FilterInputStream(inputStream) {
@Override
public void close() throws IOException {
connectionClosingStrategy.close(clientRequest, apacheRequest, response, in);
}
};
return filterStream;
}
}

private static class ConnectionFactory extends ManagedHttpClientConnectionFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicInteger;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.MediaType;
Expand Down Expand Up @@ -49,21 +51,13 @@ public class StreamingTest extends JerseyTest {
* Test that a data stream can be terminated from the client side.
*/
@Test
public void clientCloseTest() throws IOException {
// start streaming
InputStream inputStream = target().path("/streamingEndpoint").request()
.property(ClientProperties.READ_TIMEOUT, 1_000).get(InputStream.class);
public void clientCloseNoTimeoutTest() throws IOException {
clientCloseTest(-1);
}

WebTarget sendTarget = target().path("/streamingEndpoint/send");
// trigger sending 'A' to the stream; OK is sent if everything on the server was OK
assertEquals("OK", sendTarget.request().get().readEntity(String.class));
// check 'A' has been sent
assertEquals('A', inputStream.read());
// closing the stream should tear down the connection
inputStream.close();
// trigger sending another 'A' to the stream; it should fail
// (indicating that the streaming has been terminated on the server)
assertEquals("NOK", sendTarget.request().get().readEntity(String.class));
@Test
public void clientCloseWithTimeOutTest() throws IOException {
clientCloseTest(1_000);
}

/**
Expand Down Expand Up @@ -103,6 +97,43 @@ protected Application configure() {
return new ResourceConfig(StreamingEndpoint.class);
}

/**
* Test that a data stream can be terminated from the client side.
*/
private void clientCloseTest(int readTimeout) throws IOException {
// start streaming
AtomicInteger counter = new AtomicInteger(0);
Invocation.Builder builder = target().path("/streamingEndpoint").request();
if (readTimeout > -1) {
counter.set(1);
builder.property(ClientProperties.READ_TIMEOUT, readTimeout);
builder.property(ApacheClientProperties.CONNECTION_CLOSING_STRATEGY,
(ApacheConnectionClosingStrategy) (config, request, response, stream) -> {
try {
stream.close();
} catch (Exception e) {
// timeout, no chunk ending
} finally {
counter.set(0);
response.close();
}
});
}
InputStream inputStream = builder.get(InputStream.class);

WebTarget sendTarget = target().path("/streamingEndpoint/send");
// trigger sending 'A' to the stream; OK is sent if everything on the server was OK
assertEquals("OK", sendTarget.request().get().readEntity(String.class));
// check 'A' has been sent
assertEquals('A', inputStream.read());
// closing the stream should tear down the connection
inputStream.close();
// trigger sending another 'A' to the stream; it should fail
// (indicating that the streaming has been terminated on the server)
assertEquals("NOK", sendTarget.request().get().readEntity(String.class));
assertEquals(0, counter.get());
}

@Singleton
@Path("streamingEndpoint")
public static class StreamingEndpoint {
Expand Down
Loading

0 comments on commit 51abc78

Please sign in to comment.