Skip to content

Commit

Permalink
Fix admin-api-brokers list failed (apache#9191)
Browse files Browse the repository at this point in the history
Fixes apache#9128

### Motivation
There have bug in parses cluster service url. Current address resolution does not support multiple addresses.
Such as `http://host1:8080,host2:8080,host3:8080`

### Modifications
Let URI resolution support multiple addresses

### Verifying this change
unit test:
activeBrokerParse
  • Loading branch information
315157973 authored Jan 13, 2021
1 parent 118d64e commit 908dae9
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.namespace.LookupOptions;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundles;
Expand Down Expand Up @@ -356,14 +358,19 @@ protected void validateClusterOwnership(String cluster) throws WebApplicationExc
}

private URI getRedirectionUrl(ClusterData differentClusterData) throws MalformedURLException {
URL webUrl = null;
if (isRequestHttps() && pulsar.getConfiguration().getWebServicePortTls().isPresent()
&& StringUtils.isNotBlank(differentClusterData.getServiceUrlTls())) {
webUrl = new URL(differentClusterData.getServiceUrlTls());
} else {
webUrl = new URL(differentClusterData.getServiceUrl());
try {
PulsarServiceNameResolver serviceNameResolver = new PulsarServiceNameResolver();
if (isRequestHttps() && pulsar.getConfiguration().getWebServicePortTls().isPresent()
&& StringUtils.isNotBlank(differentClusterData.getServiceUrlTls())) {
serviceNameResolver.updateServiceUrl(differentClusterData.getServiceUrlTls());
} else {
serviceNameResolver.updateServiceUrl(differentClusterData.getServiceUrl());
}
URL webUrl = new URL(serviceNameResolver.resolveHostUri().toString());
return UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.getHost()).port(webUrl.getPort()).build();
} catch (PulsarClientException.InvalidServiceURL exception) {
throw new MalformedURLException(exception.getMessage());
}
return UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.getHost()).port(webUrl.getPort()).build();
}

protected static CompletableFuture<ClusterData> getClusterDataIfDifferentCluster(PulsarService pulsar,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -198,6 +200,24 @@ public Void call() throws Exception {
// Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
}

@Test(timeOut = 10000)
public void activeBrokerParse() throws Exception {
pulsar1.getConfiguration().setAuthorizationEnabled(true);
//init clusterData
ClusterData cluster2Data = new ClusterData();
String cluster2ServiceUrls = String.format("%s,localhost:1234,localhost:5678", pulsar2.getWebServiceAddress());
cluster2Data.setServiceUrl(cluster2ServiceUrls);
String cluster2 = "activeCLuster2";
admin2.clusters().createCluster(cluster2, cluster2Data);
Awaitility.await().atMost(3, TimeUnit.SECONDS).until(()
-> admin2.clusters().getCluster(cluster2) != null);

List<String> list = admin1.brokers().getActiveBrokers(cluster2);
assertEquals(list.get(0), url2.toString().replace("http://", ""));
//restore configuration
pulsar1.getConfiguration().setAuthorizationEnabled(false);
}

@SuppressWarnings("unchecked")
@Test(timeOut = 30000)
public void testConcurrentReplicator() throws Exception {
Expand Down

0 comments on commit 908dae9

Please sign in to comment.