Skip to content

Commit

Permalink
Support multi-host for pulsar-admin (apache#4018)
Browse files Browse the repository at this point in the history
*Motivation*

Sometimes the user might have many pulsar brokers but doesn't user load balancer or DNS. So we need support multi-host for pulsar-admin.

*Modifications*

Use `PulsarServiceNameResolver` to resolve it.
  • Loading branch information
zymap authored and sijie committed May 9, 2019
1 parent 6f6cc93 commit 5d6b6dc
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Client;
Expand All @@ -42,6 +45,7 @@
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.util.SecurityUtility;
import org.asynchttpclient.AsyncCompletionHandler;
Expand All @@ -63,6 +67,7 @@ public class AsyncHttpConnector implements Connector {

@Getter
private final AsyncHttpClient httpClient;
private final PulsarServiceNameResolver serviceNameResolver;

public AsyncHttpConnector(Client client, ClientConfigurationData conf) {
this((int) client.getConfiguration().getProperty(ClientProperties.CONNECT_TIMEOUT),
Expand All @@ -76,6 +81,7 @@ public AsyncHttpConnector(int connectTimeoutMs, int readTimeoutMs,
int requestTimeoutMs, ClientConfigurationData conf) {
DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder();
confBuilder.setFollowRedirect(true);
confBuilder.setRequestTimeout(conf.getRequestTimeoutMs());
confBuilder.setConnectTimeout(connectTimeoutMs);
confBuilder.setReadTimeout(readTimeoutMs);
confBuilder.setUserAgent(String.format("Pulsar-Java-v%s", PulsarVersion.getVersion()));
Expand All @@ -88,63 +94,103 @@ public boolean keepAlive(Request ahcRequest, HttpRequest request, HttpResponse r
}
});

if (conf != null && StringUtils.isNotBlank(conf.getServiceUrl())
&& conf.getServiceUrl().startsWith("https://")) {

SslContext sslCtx = null;

// Set client key and certificate if available
AuthenticationDataProvider authData = conf.getAuthentication().getAuthData();
if (authData.hasDataForTls()) {
sslCtx = SecurityUtility.createNettySslContextForClient(
conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
conf.getTlsTrustCertsFilePath(),
authData.getTlsCertificates(), authData.getTlsPrivateKey());
} else {
sslCtx = SecurityUtility.createNettySslContextForClient(
conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
conf.getTlsTrustCertsFilePath());
}
serviceNameResolver = new PulsarServiceNameResolver();
if (conf != null && StringUtils.isNotBlank(conf.getServiceUrl())) {
serviceNameResolver.updateServiceUrl(conf.getServiceUrl());
if (conf.getServiceUrl().startsWith("https://")) {

SslContext sslCtx = null;

// Set client key and certificate if available
AuthenticationDataProvider authData = conf.getAuthentication().getAuthData();
if (authData.hasDataForTls()) {
sslCtx = SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
conf.getTlsTrustCertsFilePath(), authData.getTlsCertificates(), authData.getTlsPrivateKey());
} else {
sslCtx = SecurityUtility.createNettySslContextForClient(conf.isTlsAllowInsecureConnection() || !conf.isTlsHostnameVerificationEnable(),
conf.getTlsTrustCertsFilePath());
}

confBuilder.setSslContext(sslCtx);
confBuilder.setSslContext(sslCtx);
}
}
httpClient = new DefaultAsyncHttpClient(confBuilder.build());
}

@Override
public ClientResponse apply(ClientRequest jerseyRequest) {
CompletableFuture<ClientResponse> future = new CompletableFuture<>();

try {
Future<?> resultFuture = apply(jerseyRequest, new AsyncConnectorCallback() {
@Override
public void response(ClientResponse response) {
future.complete(response);
CompletableFuture<ClientResponse> future = new CompletableFuture<>();
long startTime = System.currentTimeMillis();
while (true) {
InetSocketAddress address = serviceNameResolver.resolveHost();
URI requestUri = replaceWithNew(address, jerseyRequest.getUri());
jerseyRequest.setUri(requestUri);
CompletableFuture<ClientResponse> tempFuture = new CompletableFuture<>();
try {
resolveRequest(tempFuture, jerseyRequest);
if (System.currentTimeMillis() - startTime > httpClient.getConfig().getRequestTimeout()) {
throw new ProcessingException(
"Request timeout, the last try service url is : " + jerseyRequest.getUri().toString());
}

@Override
public void failure(Throwable failure) {
future.completeExceptionally(failure);
} catch (ExecutionException ex) {
if (System.currentTimeMillis() - startTime > httpClient.getConfig().getRequestTimeout()) {
Throwable e = ex.getCause() == null ? ex : ex.getCause();
throw new ProcessingException((e.getMessage()), e);
}
continue;
} catch (Exception e) {
if (System.currentTimeMillis() - startTime > httpClient.getConfig().getRequestTimeout()) {
throw new ProcessingException(e.getMessage(), e);
}
});
continue;
}
future = tempFuture;
break;
}

Integer timeout = ClientProperties.getValue(
jerseyRequest.getConfiguration().getProperties(),
ClientProperties.READ_TIMEOUT, 0);
return future.join();
}

private URI replaceWithNew(InetSocketAddress address, URI uri) {
String originalUri = uri.toString();
String newUri = (originalUri.split(":")[0] + "://")
+ address.getHostName() + ":"
+ address.getPort()
+ uri.getRawPath();
if (uri.getRawQuery() != null) {
newUri += "?" + uri.getRawQuery();
}
return URI.create(newUri);
}

if (timeout != null && timeout > 0) {
resultFuture.get(timeout, TimeUnit.MILLISECONDS);
} else {
resultFuture.get();


private void resolveRequest(CompletableFuture<ClientResponse> future,
ClientRequest jerseyRequest)
throws InterruptedException, ExecutionException, TimeoutException {
Future<?> resultFuture = apply(jerseyRequest, new AsyncConnectorCallback() {
@Override
public void response(ClientResponse response) {
future.complete(response);
}
} catch (ExecutionException ex) {
Throwable e = ex.getCause() == null ? ex : ex.getCause();
throw new ProcessingException(e.getMessage(), e);
} catch (Exception ex) {
throw new ProcessingException(ex.getMessage(), ex);
@Override
public void failure(Throwable failure) {
future.completeExceptionally(failure);
}
});

Integer timeout = httpClient.getConfig().getRequestTimeout() / 3;

Object result = null;
if (timeout != null && timeout > 0) {
result = resultFuture.get(timeout, TimeUnit.MILLISECONDS);
} else {
result = resultFuture.get();
}

return future.join();
if (result != null && result instanceof Throwable) {
throw new ExecutionException((Throwable) result);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
* The default implementation of {@link ServiceNameResolver}.
*/
@Slf4j
class PulsarServiceNameResolver implements ServiceNameResolver {
public class PulsarServiceNameResolver implements ServiceNameResolver {

private volatile ServiceURI serviceUri;
private volatile String serviceUrl;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public class ClientConfigurationData implements Serializable, Cloneable {
private int maxNumberOfRejectedRequestPerConnection = 50;
private int keepAliveIntervalSeconds = 30;
private int connectionTimeoutMs = 10000;
private int requestTimeoutMs = 60000;
private long defaultBackoffIntervalNanos = TimeUnit.MILLISECONDS.toNanos(100);
private long maxBackoffIntervalNanos = TimeUnit.SECONDS.toNanos(30);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.cli;

import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.tests.integration.containers.BrokerContainer;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/**
* Test for admin service url is multi host.
*/
public class AdminMultiHostTest {

private final String clusterName = "MultiHostTest-" + UUID.randomUUID();
private final PulsarClusterSpec spec = PulsarClusterSpec.builder().clusterName(clusterName).numBrokers(3).build();
private PulsarCluster pulsarCluster = null;

@BeforeMethod
public void setupCluster() throws Exception {
pulsarCluster = PulsarCluster.forSpec(spec);
pulsarCluster.start();
}

@AfterMethod
public void tearDownCluster() {
if (pulsarCluster != null) {
pulsarCluster.stop();
pulsarCluster = null;
}
}

@Test
public void testAdminMultiHost() throws Exception {
String hosts = pulsarCluster.getAllBrokersHttpServiceUrl();
PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(hosts).build();
// all brokers alive
Assert.assertEquals(admin.brokers().getActiveBrokers(clusterName).size(), 3);

// kill one broker admin should be usable
BrokerContainer one = pulsarCluster.getBroker(0);
// admin.brokers().
one.stop();
waitBrokerDown(admin, 2, 60);
Assert.assertEquals(admin.brokers().getActiveBrokers(clusterName).size(), 2);

// kill another broker
BrokerContainer two = pulsarCluster.getBroker(1);
two.stop();
waitBrokerDown(admin, 1, 60);
Assert.assertEquals(admin.brokers().getActiveBrokers(clusterName).size(), 1);
}

// Because zookeeper session timeout is 30ms and ticktime is 2ms, so we need wait more than 32ms
private void waitBrokerDown(PulsarAdmin admin, int expectBrokers, int timeout)
throws InterruptedException, ExecutionException, TimeoutException {
FutureTask<Boolean> futureTask = new FutureTask<>(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
while (admin.brokers().getActiveBrokers(clusterName).size() != expectBrokers) {
admin.brokers().healthcheck();
TimeUnit.MILLISECONDS.sleep(1000);
}
return true;
}
});
new Thread(futureTask).start();
futureTask.get(timeout, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ public class BrokerContainer extends PulsarContainer<BrokerContainer> {

public BrokerContainer(String clusterName, String hostName) {
super(
clusterName, hostName, hostName, "bin/run-broker.sh", BROKER_PORT, INVALID_PORT);
clusterName, hostName, hostName, "bin/run-broker.sh", BROKER_PORT, BROKER_HTTP_PORT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.tests.integration.topologies;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.tests.integration.containers.PulsarContainer.BROKER_HTTP_PORT;
import static org.apache.pulsar.tests.integration.containers.PulsarContainer.CS_PORT;
import static org.apache.pulsar.tests.integration.containers.PulsarContainer.ZK_PORT;

Expand All @@ -28,6 +29,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -177,6 +179,19 @@ public String getHttpServiceUrl() {
return proxyContainer.getHttpServiceUrl();
}

public String getAllBrokersHttpServiceUrl() {
String multiUrl = "http://";
Iterator<BrokerContainer> brokers = getBrokers().iterator();
while (brokers.hasNext()) {
BrokerContainer broker = brokers.next();
multiUrl += broker.getContainerIpAddress() + ":" + broker.getMappedPort(BROKER_HTTP_PORT);
if (brokers.hasNext()) {
multiUrl += ",";
}
}
return multiUrl;
}

public String getZKConnString() {
return zkContainer.getContainerIpAddress() + ":" + zkContainer.getMappedPort(ZK_PORT);
}
Expand Down
1 change: 1 addition & 0 deletions tests/integration/src/test/resources/pulsar-cli.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
<class name="org.apache.pulsar.tests.integration.cli.CLITest" />
<class name="org.apache.pulsar.tests.integration.cli.HealthcheckTest" />
<class name="org.apache.pulsar.tests.integration.compaction.TestCompaction" />
<class name="org.apache.pulsar.tests.integration.cli.AdminMultiHostTest"/>
</classes>
</test>
</suite>

0 comments on commit 5d6b6dc

Please sign in to comment.