Skip to content

Commit

Permalink
Fixed issue with Authorization header missing after client gets redir…
Browse files Browse the repository at this point in the history
…ected (apache#3869)

* Fixed issue with Authorization header missing after client gets redirected

* Use AsyncHttpClient instead

* Fixed merge issue

* Convert headers after body to not break multipart encoding

* Fixed tests

* Added missing property file for AsyncHttpClient in the shaded jar

* Moved ahc.properties into org.asynchttpclient.config

* One value was missing from properties file

* Also add default file for non-shaded scenario

* Another missing AHC config key

* Print topic load exception

* Removed constraint on unit tests executor
  • Loading branch information
merlimat committed Apr 4, 2019
1 parent 04099e4 commit a15b6f0
Show file tree
Hide file tree
Showing 9 changed files with 456 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ public void start() throws Exception {
}
log.info("Started Pulsar Broker service on port {}", port.get());
}

Optional<Integer> tlsPort = serviceConfig.getBrokerServicePortTls();
if (tlsPort.isPresent()) {
ServerBootstrap tlsBootstrap = bootstrap.clone();
Expand Down Expand Up @@ -491,7 +491,7 @@ private CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean
if (cause instanceof ServiceUnitNotReadyException) {
log.warn("[{}] Service unit is not ready when loading the topic", topic);
} else {
log.warn("[{}] Unexpected exception when loading topic: {}", topic, cause);
log.warn("[{}] Unexpected exception when loading topic: {}", topic, e.getMessage(), e);
}

return failedFuture(cause);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public SameThreadOrderedSafeExecutor() {
false,
false,
100000,
10,
-1,
false);
}

Expand Down
6 changes: 6 additions & 0 deletions pulsar-client-admin-shaded/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@
<include>**</include>
</includes>
</filter>
<filter>
<artifact>org.apache.pulsar:pulsar-client-admin-original</artifact>
<includes>
<include>**</include>
</includes>
</filter>
</filters>
<relocations>
<relocation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@
*/
package org.apache.pulsar.client.admin;

import org.apache.commons.lang3.StringUtils;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import java.io.Closeable;
import java.io.IOException;
import java.net.URL;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;

import org.apache.pulsar.client.admin.internal.BookiesImpl;
import org.apache.pulsar.client.admin.internal.BrokerStatsImpl;
import org.apache.pulsar.client.admin.internal.BrokersImpl;
Expand All @@ -38,13 +45,12 @@
import org.apache.pulsar.client.admin.internal.TenantsImpl;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.client.admin.internal.WorkerImpl;
import org.apache.pulsar.client.admin.internal.http.AsyncHttpConnectorProvider;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.util.SecurityUtility;
import org.glassfish.jersey.client.ClientConfig;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.jackson.JacksonFeature;
Expand All @@ -53,17 +59,6 @@
import org.slf4j.LoggerFactory;
import org.slf4j.bridge.SLF4JBridgeHandler;

import javax.net.ssl.SSLContext;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import java.io.Closeable;
import java.io.IOException;
import java.net.URL;
import java.security.cert.X509Certificate;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* Pulsar client admin API client.
*/
Expand Down Expand Up @@ -155,52 +150,15 @@ public PulsarAdmin(String serviceUrl,
httpConfig.property(ClientProperties.FOLLOW_REDIRECTS, true);
httpConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 8);
httpConfig.register(MultiPartFeature.class);
httpConfig.connectorProvider(new AsyncHttpConnectorProvider(clientConfigData));

ClientBuilder clientBuilder = ClientBuilder.newBuilder()
.withConfig(httpConfig)
.connectTimeout(this.connectTimeout, this.connectTimeoutUnit)
.readTimeout(this.readTimeout, this.readTimeoutUnit)
.register(JacksonConfigurator.class).register(JacksonFeature.class);

boolean useTls = false;

if (clientConfigData != null && StringUtils.isNotBlank(clientConfigData.getServiceUrl())
&& clientConfigData.getServiceUrl().startsWith("https://")) {
useTls = true;
try {
SSLContext sslCtx = null;

X509Certificate trustCertificates[] = SecurityUtility
.loadCertificatesFromPemFile(clientConfigData.getTlsTrustCertsFilePath());

// Set private key and certificate if available
AuthenticationDataProvider authData = auth.getAuthData();
if (authData.hasDataForTls()) {
sslCtx = SecurityUtility.createSslContext(clientConfigData.isTlsAllowInsecureConnection(),
trustCertificates, authData.getTlsCertificates(), authData.getTlsPrivateKey());
} else {
sslCtx = SecurityUtility.createSslContext(clientConfigData.isTlsAllowInsecureConnection(),
trustCertificates);
}

clientBuilder.sslContext(sslCtx);
if (clientConfigData.isTlsHostnameVerificationEnable()) {
clientBuilder.hostnameVerifier(new DefaultHostnameVerifier());
} else {
// Disable hostname verification
clientBuilder.hostnameVerifier(NoopHostnameVerifier.INSTANCE);
}
} catch (Exception e) {
try {
if (auth != null) {
auth.close();
}
} catch (IOException ioe) {
LOG.error("Failed to close the authentication service", ioe);
}
throw new PulsarClientException.InvalidConfigurationException(e.getMessage());
}
}
boolean useTls = clientConfigData.getServiceUrl().startsWith("https://");

this.client = clientBuilder.build();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.admin.internal.http;

import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.ssl.SslContext;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Client;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response.Status;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.util.SecurityUtility;
import org.asynchttpclient.AsyncCompletionHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;
import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.client.ClientRequest;
import org.glassfish.jersey.client.ClientResponse;
import org.glassfish.jersey.client.spi.AsyncConnectorCallback;
import org.glassfish.jersey.client.spi.Connector;

@Slf4j
public class AsyncHttpConnector implements Connector {

private final AsyncHttpClient httpClient;

@SneakyThrows
public AsyncHttpConnector(Client client, ClientConfigurationData conf) {
DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
confBuilder.setFollowRedirect(true);
confBuilder.setConnectTimeout((int) client.getConfiguration().getProperty(ClientProperties.CONNECT_TIMEOUT));
confBuilder.setReadTimeout((int) client.getConfiguration().getProperty(ClientProperties.READ_TIMEOUT));
confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()));
confBuilder.setKeepAliveStrategy(new DefaultKeepAliveStrategy() {
@Override
public boolean keepAlive(Request ahcRequest, HttpRequest request, HttpResponse response) {
// Close connection upon a server error or per HTTP spec
return (response.status().code() / 100 != 5) && super.keepAlive(ahcRequest, request, response);
}
});

if (conf != null && StringUtils.isNotBlank(conf.getServiceUrl())
&& conf.getServiceUrl().startsWith("https://")) {

SslContext sslCtx = null;

// Set client key and certificate if available
AuthenticationDataProvider authData = conf.getAuthentication().getAuthData();
if (authData.hasDataForTls()) {
sslCtx = SecurityUtility.createNettySslContextForClient(
conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
conf.getTlsTrustCertsFilePath(),
authData.getTlsCertificates(), authData.getTlsPrivateKey());
} else {
sslCtx = SecurityUtility.createNettySslContextForClient(
conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
conf.getTlsTrustCertsFilePath());
}

confBuilder.setSslContext(sslCtx);
}
httpClient = new DefaultAsyncHttpClient(confBuilder.build());
}

@Override
public ClientResponse apply(ClientRequest jerseyRequest) {
CompletableFuture<ClientResponse> future = new CompletableFuture<>();

try {
Future<?> resultFuture = apply(jerseyRequest, new AsyncConnectorCallback() {
@Override
public void response(ClientResponse response) {
future.complete(response);
}

@Override
public void failure(Throwable failure) {
future.completeExceptionally(failure);
}
});

Integer timeout = ClientProperties.getValue(
jerseyRequest.getConfiguration().getProperties(),
ClientProperties.READ_TIMEOUT, 0);

if (timeout != null && timeout > 0) {
resultFuture.get(timeout, TimeUnit.MILLISECONDS);
} else {
resultFuture.get();
}
} catch (ExecutionException ex) {
Throwable e = ex.getCause() == null ? ex : ex.getCause();
throw new ProcessingException(e.getMessage(), e);
} catch (Exception ex) {
throw new ProcessingException(ex.getMessage(), ex);
}

return future.join();
}

@Override
public Future<?> apply(ClientRequest jerseyRequest, AsyncConnectorCallback callback) {
final CompletableFuture<Object> future = new CompletableFuture<>();

BoundRequestBuilder builder = httpClient.prepare(jerseyRequest.getMethod(), jerseyRequest.getUri().toString());

if (jerseyRequest.hasEntity()) {
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
jerseyRequest.setStreamProvider(contentLength -> outStream);
try {
jerseyRequest.writeEntity();
} catch (IOException e) {
future.completeExceptionally(e);
return future;
}

builder.setBody(outStream.toByteArray());
}

jerseyRequest.getHeaders().forEach((key, headers) -> {
if (!HttpHeaders.USER_AGENT.equals(key)) {
builder.addHeader(key, headers);
}
});

builder.execute(new AsyncCompletionHandler<Response>() {
@Override
public Response onCompleted(Response response) throws Exception {
ClientResponse jerseyResponse = new ClientResponse(Status.fromStatusCode(response.getStatusCode()),
jerseyRequest);
response.getHeaders().forEach(e -> jerseyResponse.header(e.getKey(), e.getValue()));
if (response.hasResponseBody()) {
jerseyResponse.setEntityStream(response.getResponseBodyAsStream());
}
callback.response(jerseyResponse);
future.complete(jerseyResponse);
return response;
}

@Override
public void onThrowable(Throwable t) {
callback.failure(t);
future.completeExceptionally(t);
}
});

return future;
}

@Override
public String getName() {
return "Pulsar-Admin";
}

@Override
public void close() {
try {
httpClient.close();
} catch (IOException e) {
log.warn("Failed to close http client", e);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.admin.internal.http;

import javax.ws.rs.client.Client;
import javax.ws.rs.core.Configuration;

import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.glassfish.jersey.client.spi.Connector;
import org.glassfish.jersey.client.spi.ConnectorProvider;

public class AsyncHttpConnectorProvider implements ConnectorProvider {

private final ClientConfigurationData conf;

public AsyncHttpConnectorProvider(ClientConfigurationData conf) {
this.conf = conf;
}

@Override
public Connector getConnector(Client client, Configuration runtimeConfig) {
return new AsyncHttpConnector(client, conf);
}
}
Loading

0 comments on commit a15b6f0

Please sign in to comment.