From a15b6f073e6a1752c5f1169c072756b7efa7f762 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Thu, 4 Apr 2019 13:11:45 -0500 Subject: [PATCH] Fixed issue with Authorization header missing after client gets redirected (#3869) * Fixed issue with Authorization header missing after client gets redirected * Use AsyncHttpClient instead * Fixed merge issue * Convert headers after body to not break multipart encoding * Fixed tests * Added missing property file for AsyncHttpClient in the shaded jar * Moved ahc.properties into org.asynchttpclient.config * One value was missing from properties file * Also add default file for non-shaded scenario * Another missing AHC config key * Print topic load exception * Removed constraint on unit tests executor --- .../pulsar/broker/service/BrokerService.java | 4 +- .../auth/SameThreadOrderedSafeExecutor.java | 2 +- pulsar-client-admin-shaded/pom.xml | 6 + .../pulsar/client/admin/PulsarAdmin.java | 68 ++---- .../internal/http/AsyncHttpConnector.java | 201 ++++++++++++++++++ .../http/AsyncHttpConnectorProvider.java | 40 ++++ .../config/ahc-default.properties | 73 +++++++ .../org/asynchttpclient/config/ahc.properties | 73 +++++++ .../PulsarTokenAuthenticationBaseSuite.java | 48 ++++- 9 files changed, 456 insertions(+), 59 deletions(-) create mode 100644 pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java create mode 100644 pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java create mode 100644 pulsar-client-admin/src/main/resources/org/asynchttpclient/config/ahc-default.properties create mode 100644 pulsar-client-admin/src/main/resources/org/asynchttpclient/config/ahc.properties diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 99a8930348074..58ddcdcc7e5da 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -300,7 +300,7 @@ public void start() throws Exception { } log.info("Started Pulsar Broker service on port {}", port.get()); } - + Optional tlsPort = serviceConfig.getBrokerServicePortTls(); if (tlsPort.isPresent()) { ServerBootstrap tlsBootstrap = bootstrap.clone(); @@ -491,7 +491,7 @@ private CompletableFuture> getTopic(final String topic, boolean if (cause instanceof ServiceUnitNotReadyException) { log.warn("[{}] Service unit is not ready when loading the topic", topic); } else { - log.warn("[{}] Unexpected exception when loading topic: {}", topic, cause); + log.warn("[{}] Unexpected exception when loading topic: {}", topic, e.getMessage(), e); } return failedFuture(cause); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java index 3f7480437906d..0df2b23b8b75a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/SameThreadOrderedSafeExecutor.java @@ -35,7 +35,7 @@ public SameThreadOrderedSafeExecutor() { false, false, 100000, - 10, + -1, false); } diff --git a/pulsar-client-admin-shaded/pom.xml b/pulsar-client-admin-shaded/pom.xml index c641c2b83bef2..4b367365b73b7 100644 --- a/pulsar-client-admin-shaded/pom.xml +++ b/pulsar-client-admin-shaded/pom.xml @@ -104,6 +104,12 @@ ** + + org.apache.pulsar:pulsar-client-admin-original + + ** + + diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java index deb972226eab3..4731879119af0 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/PulsarAdmin.java @@ -18,9 +18,16 @@ */ package org.apache.pulsar.client.admin; -import org.apache.commons.lang3.StringUtils; -import org.apache.http.conn.ssl.DefaultHostnameVerifier; -import org.apache.http.conn.ssl.NoopHostnameVerifier; +import java.io.Closeable; +import java.io.IOException; +import java.net.URL; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.WebTarget; + import org.apache.pulsar.client.admin.internal.BookiesImpl; import org.apache.pulsar.client.admin.internal.BrokerStatsImpl; import org.apache.pulsar.client.admin.internal.BrokersImpl; @@ -38,13 +45,12 @@ import org.apache.pulsar.client.admin.internal.TenantsImpl; import org.apache.pulsar.client.admin.internal.TopicsImpl; import org.apache.pulsar.client.admin.internal.WorkerImpl; +import org.apache.pulsar.client.admin.internal.http.AsyncHttpConnectorProvider; import org.apache.pulsar.client.api.Authentication; -import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; -import org.apache.pulsar.common.util.SecurityUtility; import org.glassfish.jersey.client.ClientConfig; import org.glassfish.jersey.client.ClientProperties; import org.glassfish.jersey.jackson.JacksonFeature; @@ -53,17 +59,6 @@ import org.slf4j.LoggerFactory; import org.slf4j.bridge.SLF4JBridgeHandler; -import javax.net.ssl.SSLContext; -import javax.ws.rs.client.Client; -import javax.ws.rs.client.ClientBuilder; -import javax.ws.rs.client.WebTarget; -import java.io.Closeable; -import java.io.IOException; -import java.net.URL; -import java.security.cert.X509Certificate; -import java.util.Map; -import java.util.concurrent.TimeUnit; - /** * Pulsar client admin API client. */ @@ -155,6 +150,7 @@ public PulsarAdmin(String serviceUrl, httpConfig.property(ClientProperties.FOLLOW_REDIRECTS, true); httpConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 8); httpConfig.register(MultiPartFeature.class); + httpConfig.connectorProvider(new AsyncHttpConnectorProvider(clientConfigData)); ClientBuilder clientBuilder = ClientBuilder.newBuilder() .withConfig(httpConfig) @@ -162,45 +158,7 @@ public PulsarAdmin(String serviceUrl, .readTimeout(this.readTimeout, this.readTimeoutUnit) .register(JacksonConfigurator.class).register(JacksonFeature.class); - boolean useTls = false; - - if (clientConfigData != null && StringUtils.isNotBlank(clientConfigData.getServiceUrl()) - && clientConfigData.getServiceUrl().startsWith("https://")) { - useTls = true; - try { - SSLContext sslCtx = null; - - X509Certificate trustCertificates[] = SecurityUtility - .loadCertificatesFromPemFile(clientConfigData.getTlsTrustCertsFilePath()); - - // Set private key and certificate if available - AuthenticationDataProvider authData = auth.getAuthData(); - if (authData.hasDataForTls()) { - sslCtx = SecurityUtility.createSslContext(clientConfigData.isTlsAllowInsecureConnection(), - trustCertificates, authData.getTlsCertificates(), authData.getTlsPrivateKey()); - } else { - sslCtx = SecurityUtility.createSslContext(clientConfigData.isTlsAllowInsecureConnection(), - trustCertificates); - } - - clientBuilder.sslContext(sslCtx); - if (clientConfigData.isTlsHostnameVerificationEnable()) { - clientBuilder.hostnameVerifier(new DefaultHostnameVerifier()); - } else { - // Disable hostname verification - clientBuilder.hostnameVerifier(NoopHostnameVerifier.INSTANCE); - } - } catch (Exception e) { - try { - if (auth != null) { - auth.close(); - } - } catch (IOException ioe) { - LOG.error("Failed to close the authentication service", ioe); - } - throw new PulsarClientException.InvalidConfigurationException(e.getMessage()); - } - } + boolean useTls = clientConfigData.getServiceUrl().startsWith("https://"); this.client = clientBuilder.build(); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java new file mode 100644 index 0000000000000..fd89108a6fff0 --- /dev/null +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.admin.internal.http; + +import io.netty.handler.codec.http.HttpRequest; +import io.netty.handler.codec.http.HttpResponse; +import io.netty.handler.ssl.SslContext; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import javax.ws.rs.ProcessingException; +import javax.ws.rs.client.Client; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.Response.Status; + +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.PulsarVersion; +import org.apache.pulsar.client.api.AuthenticationDataProvider; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.common.util.SecurityUtility; +import org.asynchttpclient.AsyncCompletionHandler; +import org.asynchttpclient.AsyncHttpClient; +import org.asynchttpclient.BoundRequestBuilder; +import org.asynchttpclient.DefaultAsyncHttpClient; +import org.asynchttpclient.DefaultAsyncHttpClientConfig; +import org.asynchttpclient.Request; +import org.asynchttpclient.Response; +import org.asynchttpclient.channel.DefaultKeepAliveStrategy; +import org.glassfish.jersey.client.ClientProperties; +import org.glassfish.jersey.client.ClientRequest; +import org.glassfish.jersey.client.ClientResponse; +import org.glassfish.jersey.client.spi.AsyncConnectorCallback; +import org.glassfish.jersey.client.spi.Connector; + +@Slf4j +public class AsyncHttpConnector implements Connector { + + private final AsyncHttpClient httpClient; + + @SneakyThrows + public AsyncHttpConnector(Client client, ClientConfigurationData conf) { + DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder(); + confBuilder.setFollowRedirect(true); + confBuilder.setConnectTimeout((int) client.getConfiguration().getProperty(ClientProperties.CONNECT_TIMEOUT)); + confBuilder.setReadTimeout((int) client.getConfiguration().getProperty(ClientProperties.READ_TIMEOUT)); + confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion())); + confBuilder.setKeepAliveStrategy(new DefaultKeepAliveStrategy() { + @Override + public boolean keepAlive(Request ahcRequest, HttpRequest request, HttpResponse response) { + // Close connection upon a server error or per HTTP spec + return (response.status().code() / 100 != 5) && super.keepAlive(ahcRequest, request, response); + } + }); + + if (conf != null && StringUtils.isNotBlank(conf.getServiceUrl()) + && conf.getServiceUrl().startsWith("https://")) { + + SslContext sslCtx = null; + + // Set client key and certificate if available + AuthenticationDataProvider authData = conf.getAuthentication().getAuthData(); + if (authData.hasDataForTls()) { + sslCtx = SecurityUtility.createNettySslContextForClient( + conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(), + conf.getTlsTrustCertsFilePath(), + authData.getTlsCertificates(), authData.getTlsPrivateKey()); + } else { + sslCtx = SecurityUtility.createNettySslContextForClient( + conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(), + conf.getTlsTrustCertsFilePath()); + } + + confBuilder.setSslContext(sslCtx); + } + httpClient = new DefaultAsyncHttpClient(confBuilder.build()); + } + + @Override + public ClientResponse apply(ClientRequest jerseyRequest) { + CompletableFuture future = new CompletableFuture<>(); + + try { + Future resultFuture = apply(jerseyRequest, new AsyncConnectorCallback() { + @Override + public void response(ClientResponse response) { + future.complete(response); + } + + @Override + public void failure(Throwable failure) { + future.completeExceptionally(failure); + } + }); + + Integer timeout = ClientProperties.getValue( + jerseyRequest.getConfiguration().getProperties(), + ClientProperties.READ_TIMEOUT, 0); + + if (timeout != null && timeout > 0) { + resultFuture.get(timeout, TimeUnit.MILLISECONDS); + } else { + resultFuture.get(); + } + } catch (ExecutionException ex) { + Throwable e = ex.getCause() == null ? ex : ex.getCause(); + throw new ProcessingException(e.getMessage(), e); + } catch (Exception ex) { + throw new ProcessingException(ex.getMessage(), ex); + } + + return future.join(); + } + + @Override + public Future apply(ClientRequest jerseyRequest, AsyncConnectorCallback callback) { + final CompletableFuture future = new CompletableFuture<>(); + + BoundRequestBuilder builder = httpClient.prepare(jerseyRequest.getMethod(), jerseyRequest.getUri().toString()); + + if (jerseyRequest.hasEntity()) { + ByteArrayOutputStream outStream = new ByteArrayOutputStream(); + jerseyRequest.setStreamProvider(contentLength -> outStream); + try { + jerseyRequest.writeEntity(); + } catch (IOException e) { + future.completeExceptionally(e); + return future; + } + + builder.setBody(outStream.toByteArray()); + } + + jerseyRequest.getHeaders().forEach((key, headers) -> { + if (!HttpHeaders.USER_AGENT.equals(key)) { + builder.addHeader(key, headers); + } + }); + + builder.execute(new AsyncCompletionHandler() { + @Override + public Response onCompleted(Response response) throws Exception { + ClientResponse jerseyResponse = new ClientResponse(Status.fromStatusCode(response.getStatusCode()), + jerseyRequest); + response.getHeaders().forEach(e -> jerseyResponse.header(e.getKey(), e.getValue())); + if (response.hasResponseBody()) { + jerseyResponse.setEntityStream(response.getResponseBodyAsStream()); + } + callback.response(jerseyResponse); + future.complete(jerseyResponse); + return response; + } + + @Override + public void onThrowable(Throwable t) { + callback.failure(t); + future.completeExceptionally(t); + } + }); + + return future; + } + + @Override + public String getName() { + return "Pulsar-Admin"; + } + + @Override + public void close() { + try { + httpClient.close(); + } catch (IOException e) { + log.warn("Failed to close http client", e); + } + } + +} diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java new file mode 100644 index 0000000000000..2f24089340d59 --- /dev/null +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.admin.internal.http; + +import javax.ws.rs.client.Client; +import javax.ws.rs.core.Configuration; + +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.glassfish.jersey.client.spi.Connector; +import org.glassfish.jersey.client.spi.ConnectorProvider; + +public class AsyncHttpConnectorProvider implements ConnectorProvider { + + private final ClientConfigurationData conf; + + public AsyncHttpConnectorProvider(ClientConfigurationData conf) { + this.conf = conf; + } + + @Override + public Connector getConnector(Client client, Configuration runtimeConfig) { + return new AsyncHttpConnector(client, conf); + } +} diff --git a/pulsar-client-admin/src/main/resources/org/asynchttpclient/config/ahc-default.properties b/pulsar-client-admin/src/main/resources/org/asynchttpclient/config/ahc-default.properties new file mode 100644 index 0000000000000..fa74eb61514e3 --- /dev/null +++ b/pulsar-client-admin/src/main/resources/org/asynchttpclient/config/ahc-default.properties @@ -0,0 +1,73 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This file is used by the shaded asynchttpclient packaged in pulsar-client-${VERSION}-shaded.jar. +# For more details please refer https://github.com/apache/pulsar/issues/389 + +org.asynchttpclient.threadPoolName=AsyncHttpClient +org.asynchttpclient.maxConnections=-1 +org.asynchttpclient.maxConnectionsPerHost=-1 +org.asynchttpclient.connectTimeout=5000 +org.asynchttpclient.pooledConnectionIdleTimeout=60000 +org.asynchttpclient.connectionPoolCleanerPeriod=1000 +org.asynchttpclient.readTimeout=60000 +org.asynchttpclient.requestTimeout=60000 +org.asynchttpclient.connectionTtl=-1 +org.asynchttpclient.followRedirect=false +org.asynchttpclient.maxRedirects=5 +org.asynchttpclient.compressionEnforced=false +org.asynchttpclient.userAgent=AHC/2.1 +org.asynchttpclient.enabledProtocols=TLSv1.2, TLSv1.1, TLSv1 +org.asynchttpclient.enabledCipherSuites= +org.asynchttpclient.useProxySelector=false +org.asynchttpclient.useProxyProperties=false +org.asynchttpclient.validateResponseHeaders=true +org.asynchttpclient.aggregateWebSocketFrameFragments=true +org.asynchttpclient.strict302Handling=false +org.asynchttpclient.keepAlive=true +org.asynchttpclient.maxRequestRetry=5 +org.asynchttpclient.disableUrlEncodingForBoundRequests=false +org.asynchttpclient.useLaxCookieEncoder=false +org.asynchttpclient.removeQueryParamOnRedirect=true +org.asynchttpclient.useOpenSsl=false +org.asynchttpclient.useInsecureTrustManager=false +org.asynchttpclient.disableHttpsEndpointIdentificationAlgorithm=false +org.asynchttpclient.sslSessionCacheSize=0 +org.asynchttpclient.sslSessionTimeout=0 +org.asynchttpclient.tcpNoDelay=true +org.asynchttpclient.soReuseAddress=false +org.asynchttpclient.soLinger=-1 +org.asynchttpclient.soSndBuf=-1 +org.asynchttpclient.soRcvBuf=-1 +org.asynchttpclient.httpClientCodecMaxInitialLineLength=4096 +org.asynchttpclient.httpClientCodecMaxHeaderSize=8192 +org.asynchttpclient.httpClientCodecMaxChunkSize=8192 +org.asynchttpclient.httpClientCodecInitialBufferSize=128 +org.asynchttpclient.disableZeroCopy=false +org.asynchttpclient.handshakeTimeout=10000 +org.asynchttpclient.chunkedFileChunkSize=8192 +org.asynchttpclient.webSocketMaxBufferSize=128000000 +org.asynchttpclient.webSocketMaxFrameSize=10240 +org.asynchttpclient.keepEncodingHeader=false +org.asynchttpclient.shutdownQuietPeriod=2000 +org.asynchttpclient.shutdownTimeout=15000 +org.asynchttpclient.useNativeTransport=false +org.asynchttpclient.ioThreadsCount=0 +org.asynchttpclient.acquireFreeChannelTimeout=0 +org.asynchttpclient.enableWebSocketCompression=true \ No newline at end of file diff --git a/pulsar-client-admin/src/main/resources/org/asynchttpclient/config/ahc.properties b/pulsar-client-admin/src/main/resources/org/asynchttpclient/config/ahc.properties new file mode 100644 index 0000000000000..914456a9a0f49 --- /dev/null +++ b/pulsar-client-admin/src/main/resources/org/asynchttpclient/config/ahc.properties @@ -0,0 +1,73 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# This file is used by the shaded asynchttpclient packaged in pulsar-client-${VERSION}-shaded.jar. +# For more details please refer https://github.com/apache/pulsar/issues/389 + +org.apache.pulsar.shade.org.asynchttpclient.threadPoolName=AsyncHttpClient +org.apache.pulsar.shade.org.asynchttpclient.maxConnections=-1 +org.apache.pulsar.shade.org.asynchttpclient.maxConnectionsPerHost=-1 +org.apache.pulsar.shade.org.asynchttpclient.connectTimeout=5000 +org.apache.pulsar.shade.org.asynchttpclient.pooledConnectionIdleTimeout=60000 +org.apache.pulsar.shade.org.asynchttpclient.connectionPoolCleanerPeriod=1000 +org.apache.pulsar.shade.org.asynchttpclient.readTimeout=60000 +org.apache.pulsar.shade.org.asynchttpclient.requestTimeout=60000 +org.apache.pulsar.shade.org.asynchttpclient.connectionTtl=-1 +org.apache.pulsar.shade.org.asynchttpclient.followRedirect=false +org.apache.pulsar.shade.org.asynchttpclient.maxRedirects=5 +org.apache.pulsar.shade.org.asynchttpclient.compressionEnforced=false +org.apache.pulsar.shade.org.asynchttpclient.userAgent=AHC/2.1 +org.apache.pulsar.shade.org.asynchttpclient.enabledProtocols=TLSv1.2, TLSv1.1, TLSv1 +org.apache.pulsar.shade.org.asynchttpclient.enabledCipherSuites= +org.apache.pulsar.shade.org.asynchttpclient.useProxySelector=false +org.apache.pulsar.shade.org.asynchttpclient.useProxyProperties=false +org.apache.pulsar.shade.org.asynchttpclient.validateResponseHeaders=true +org.apache.pulsar.shade.org.asynchttpclient.aggregateWebSocketFrameFragments=true +org.apache.pulsar.shade.org.asynchttpclient.strict302Handling=false +org.apache.pulsar.shade.org.asynchttpclient.keepAlive=true +org.apache.pulsar.shade.org.asynchttpclient.maxRequestRetry=5 +org.apache.pulsar.shade.org.asynchttpclient.disableUrlEncodingForBoundRequests=false +org.apache.pulsar.shade.org.asynchttpclient.useLaxCookieEncoder=false +org.apache.pulsar.shade.org.asynchttpclient.removeQueryParamOnRedirect=true +org.apache.pulsar.shade.org.asynchttpclient.useOpenSsl=false +org.apache.pulsar.shade.org.asynchttpclient.useInsecureTrustManager=false +org.apache.pulsar.shade.org.asynchttpclient.disableHttpsEndpointIdentificationAlgorithm=false +org.apache.pulsar.shade.org.asynchttpclient.sslSessionCacheSize=0 +org.apache.pulsar.shade.org.asynchttpclient.sslSessionTimeout=0 +org.apache.pulsar.shade.org.asynchttpclient.tcpNoDelay=true +org.apache.pulsar.shade.org.asynchttpclient.soReuseAddress=false +org.apache.pulsar.shade.org.asynchttpclient.soLinger=-1 +org.apache.pulsar.shade.org.asynchttpclient.soSndBuf=-1 +org.apache.pulsar.shade.org.asynchttpclient.soRcvBuf=-1 +org.apache.pulsar.shade.org.asynchttpclient.httpClientCodecMaxInitialLineLength=4096 +org.apache.pulsar.shade.org.asynchttpclient.httpClientCodecMaxHeaderSize=8192 +org.apache.pulsar.shade.org.asynchttpclient.httpClientCodecMaxChunkSize=8192 +org.apache.pulsar.shade.org.asynchttpclient.httpClientCodecInitialBufferSize=128 +org.apache.pulsar.shade.org.asynchttpclient.disableZeroCopy=false +org.apache.pulsar.shade.org.asynchttpclient.handshakeTimeout=10000 +org.apache.pulsar.shade.org.asynchttpclient.chunkedFileChunkSize=8192 +org.apache.pulsar.shade.org.asynchttpclient.webSocketMaxBufferSize=128000000 +org.apache.pulsar.shade.org.asynchttpclient.webSocketMaxFrameSize=10240 +org.apache.pulsar.shade.org.asynchttpclient.keepEncodingHeader=false +org.apache.pulsar.shade.org.asynchttpclient.shutdownQuietPeriod=2000 +org.apache.pulsar.shade.org.asynchttpclient.shutdownTimeout=15000 +org.apache.pulsar.shade.org.asynchttpclient.useNativeTransport=false +org.apache.pulsar.shade.org.asynchttpclient.ioThreadsCount=0 +org.apache.pulsar.shade.org.asynchttpclient.acquireFreeChannelTimeout=0 +org.apache.pulsar.shade.org.org.asynchttpclient.enableWebSocketCompression=true diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java index 56068118d4cfa..a0064ae1c2dd8 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/auth/token/PulsarTokenAuthenticationBaseSuite.java @@ -89,7 +89,7 @@ public void setupCluster() throws Exception { PulsarClusterSpec spec = PulsarClusterSpec.builder() .numBookies(2) - .numBrokers(1) + .numBrokers(2) .numProxies(1) .clusterName(clusterName) .build(); @@ -106,6 +106,8 @@ public void setupCluster() throws Exception { "org.apache.pulsar.broker.authentication.AuthenticationProviderToken"); brokerContainer.withEnv("authorizationEnabled", "true"); brokerContainer.withEnv("superUserRoles", SUPER_USER_ROLE + "," + PROXY_ROLE); + brokerContainer.withEnv("brokerClientAuthenticationPlugin", AuthenticationToken.class.getName()); + brokerContainer.withEnv("brokerClientAuthenticationParameters", "token:" + superUserAuthToken); } ProxyContainer proxyContainer = pulsarCluster.getProxy(); @@ -201,4 +203,48 @@ public void testPublishWithTokenAuth() throws Exception { // Expected } } + + @Test + public void testProxyRedirectWithTokenAuth() throws Exception { + + final String tenant = "token-test-tenant" + randomName(4); + final String namespace = tenant + "/ns-1"; + final String topic = namespace + "/my-topic-1"; + + @Cleanup + PulsarAdmin admin = PulsarAdmin.builder() + .serviceHttpUrl(pulsarCluster.getHttpServiceUrl()) + .authentication(AuthenticationFactory.token(superUserAuthToken)) + .build(); + + try { + admin.tenants().createTenant(tenant, + new TenantInfo(Collections.singleton(REGULAR_USER_ROLE), + Collections.singleton(pulsarCluster.getClusterName()))); + + } catch (Exception e) { + e.printStackTrace(); + } + + admin.namespaces().createNamespace(namespace, Collections.singleton(pulsarCluster.getClusterName())); + admin.namespaces().grantPermissionOnNamespace(namespace, REGULAR_USER_ROLE, + EnumSet.allOf(AuthAction.class)); + + admin.topics().createPartitionedTopic(topic, 16); + + // Create the partitions + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) + .authentication(AuthenticationFactory.token(REGULAR_USER_ROLE)) + .build(); + + // Force the topics to be created + client.newProducer() + .topic(topic) + .create() + .close(); + + admin.topics().getList(namespace); + } }