Skip to content

Commit

Permalink
[CALCITE-1300] Retry on HTTP-503 in hc-based AvaticaHttpClient
Browse files Browse the repository at this point in the history
  • Loading branch information
joshelser committed Jun 24, 2016
1 parent 240cee4 commit 3b5d88e
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.http.auth.AuthSchemeProvider;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.AuthSchemes;
import org.apache.http.client.methods.CloseableHttpResponse;
Expand Down Expand Up @@ -48,6 +49,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
Expand All @@ -70,7 +72,7 @@ public class AvaticaCommonsHttpClientImpl implements AvaticaHttpClient,
private static final String MAX_POOLED_CONNECTIONS_DEFAULT = "100";

protected final HttpHost host;
protected final URL url;
protected final URI uri;
protected final HttpProcessor httpProcessor;
protected final HttpRequestExecutor httpExecutor;
protected final BasicAuthCache authCache;
Expand All @@ -83,7 +85,7 @@ public class AvaticaCommonsHttpClientImpl implements AvaticaHttpClient,

public AvaticaCommonsHttpClientImpl(URL url) {
this.host = new HttpHost(url.getHost(), url.getPort(), url.getProtocol());
this.url = Objects.requireNonNull(url);
this.uri = toURI(Objects.requireNonNull(url));

this.httpProcessor = HttpProcessorBuilder.create()
.add(new RequestContent())
Expand Down Expand Up @@ -111,39 +113,50 @@ public AvaticaCommonsHttpClientImpl(URL url) {
}

public byte[] send(byte[] request) {
HttpClientContext context = HttpClientContext.create();
while (true) {
HttpClientContext context = HttpClientContext.create();

context.setTargetHost(host);
context.setTargetHost(host);

// Set the credentials if they were provided.
if (null != this.credentials) {
context.setCredentialsProvider(credentialsProvider);
context.setAuthSchemeRegistry(authRegistry);
context.setAuthCache(authCache);
}

ByteArrayEntity entity = new ByteArrayEntity(request, ContentType.APPLICATION_OCTET_STREAM);

// Create the client with the AuthSchemeRegistry and manager
HttpPost post = new HttpPost(toURI(url));
post.setEntity(entity);

try (CloseableHttpResponse response = client.execute(post, context)) {
final int statusCode = response.getStatusLine().getStatusCode();
if (HttpURLConnection.HTTP_OK == statusCode
|| HttpURLConnection.HTTP_INTERNAL_ERROR == statusCode) {
return EntityUtils.toByteArray(response.getEntity());
// Set the credentials if they were provided.
if (null != this.credentials) {
context.setCredentialsProvider(credentialsProvider);
context.setAuthSchemeRegistry(authRegistry);
context.setAuthCache(authCache);
}

throw new RuntimeException("Failed to execute HTTP Request, got HTTP/" + statusCode);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
LOG.debug("Failed to execute HTTP request", e);
throw new RuntimeException(e);
ByteArrayEntity entity = new ByteArrayEntity(request, ContentType.APPLICATION_OCTET_STREAM);

// Create the client with the AuthSchemeRegistry and manager
HttpPost post = new HttpPost(uri);
post.setEntity(entity);

try (CloseableHttpResponse response = execute(post, context)) {
final int statusCode = response.getStatusLine().getStatusCode();
if (HttpURLConnection.HTTP_OK == statusCode
|| HttpURLConnection.HTTP_INTERNAL_ERROR == statusCode) {
return EntityUtils.toByteArray(response.getEntity());
} else if (HttpURLConnection.HTTP_UNAVAILABLE == statusCode) {
LOG.debug("Failed to connect to server (HTTP/503), retrying");
continue;
}

throw new RuntimeException("Failed to execute HTTP Request, got HTTP/" + statusCode);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
LOG.debug("Failed to execute HTTP request", e);
throw new RuntimeException(e);
}
}
}

// Visible for testing
CloseableHttpResponse execute(HttpPost post, HttpClientContext context)
throws IOException, ClientProtocolException {
return client.execute(post, context);
}

@Override public void setUsernamePassword(AuthenticationType authType, String username,
String password) {
this.credentials = new UsernamePasswordCredentials(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.avatica.remote;

import org.apache.http.StatusLine;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.entity.StringEntity;

import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.net.HttpURLConnection;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
* Test class for {@link AvaticaCommonsHttpClientImpl}
*/
public class AvaticaCommonsHttpClientImplTest {

@Test public void testRetryOnHttp503() throws Exception {
final byte[] requestBytes = "fake_request".getBytes(UTF_8);
final CloseableHttpResponse badResponse = mock(CloseableHttpResponse.class);
final CloseableHttpResponse goodResponse = mock(CloseableHttpResponse.class);
final StatusLine badStatusLine = mock(StatusLine.class);
final StatusLine goodStatusLine = mock(StatusLine.class);
final StringEntity responseEntity = new StringEntity("success");
final Answer<CloseableHttpResponse> failThenSucceed = new Answer<CloseableHttpResponse>() {
private int iteration = 0;
@Override public CloseableHttpResponse answer(InvocationOnMock invocation) throws Throwable {
iteration++;
if (1 == iteration) {
return badResponse;
} else {
return goodResponse;
}
}
};

final AvaticaCommonsHttpClientImpl client = mock(AvaticaCommonsHttpClientImpl.class);

when(client.send(any(byte[].class))).thenCallRealMethod();
when(client.execute(any(HttpPost.class), any(HttpClientContext.class))).then(failThenSucceed);

when(badResponse.getStatusLine()).thenReturn(badStatusLine);
when(badStatusLine.getStatusCode()).thenReturn(HttpURLConnection.HTTP_UNAVAILABLE);

when(goodResponse.getStatusLine()).thenReturn(goodStatusLine);
when(goodStatusLine.getStatusCode()).thenReturn(HttpURLConnection.HTTP_OK);
when(goodResponse.getEntity()).thenReturn(responseEntity);

byte[] responseBytes = client.send(requestBytes);
assertEquals("success", new String(responseBytes, UTF_8));
}
}

// End AvaticaCommonsHttpClientImplTest.java

0 comments on commit 3b5d88e

Please sign in to comment.