diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java index e4fdf132896d8..8980dbdde076a 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.net.URL; import java.util.Map; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; @@ -192,10 +191,7 @@ public PulsarAdminImpl(String serviceUrl, this.serviceUrl = serviceUrl; ServiceURI serviceUri = ServiceURI.create(serviceUrl); - root = client.target(String.format("%s://%s" - , serviceUri.getServiceScheme() - , serviceUri.getServiceHosts()[ThreadLocalRandom.current() - .nextInt(serviceUri.getServiceHosts().length)])); + root = client.target(serviceUri.selectOne()); this.asyncHttpConnector = asyncConnectorProvider.getConnector( Math.toIntExact(connectTimeoutUnit.toMillis(this.connectTimeout)), diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/net/ServiceURI.java b/pulsar-common/src/main/java/org/apache/pulsar/common/net/ServiceURI.java index 28667559e4485..16a070d41865c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/net/ServiceURI.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/net/ServiceURI.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import lombok.AccessLevel; import lombok.EqualsAndHashCode; @@ -237,4 +238,25 @@ private static int getServicePort(String serviceName, String[] serviceInfos) { return port; } + /** + * Create a new URI from the service URI which only specifies one of the hosts. + * @return a pulsar service URI with a single host specified + */ + public String selectOne() { + StringBuilder sb = new StringBuilder(); + if (serviceName != null) { + sb.append(serviceName); + + for (int i = 0; i < serviceInfos.length; i++) { + sb.append('+').append(serviceInfos[i]); + } + sb.append("://"); + } + if (serviceUser != null) { + sb.append(serviceUser).append('@'); + } + int hostIndex = ThreadLocalRandom.current().nextInt(serviceHosts.length); + sb.append(serviceHosts[hostIndex]); + return sb.append(servicePath).toString(); + } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/net/ServiceURITest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/net/ServiceURITest.java index 1935c88a827a8..87279a92df0f9 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/net/ServiceURITest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/net/ServiceURITest.java @@ -278,4 +278,40 @@ public void testUserInfoWithMultipleHosts() { "/path/to/namespace"); } + @Test + public void testSelectOneSingleHost() { + String serviceUri = "https://host1:6650/path/to/namespace"; + assertEquals(ServiceURI.create(serviceUri).selectOne(), + serviceUri); + } + + @Test + public void testSelectOneMultipleHosts() { + String serviceUri = "https://host1:6650;host2/"; + for (int i = 0; i < 10; i++) { + String selected = ServiceURI.create(serviceUri).selectOne(); + boolean option1 = selected.equals("https://host1:6650/"); + boolean option2 = selected.equals("https://host2:443/"); + assertTrue(option1 || option2); + } + } + + @Test + public void testSelectOneAllBellsAndWhistles() { + String serviceUri = "https+blah://user1@host1:6650;host2;host3:4032/path/to/namespace"; + for (int i = 0; i < 10; i++) { + String selected = ServiceURI.create(serviceUri).selectOne(); + boolean option1 = selected.equals("https+blah://user1@host1:6650/path/to/namespace"); + boolean option2 = selected.equals("https+blah://user1@host2:443/path/to/namespace"); + boolean option3 = selected.equals("https+blah://user1@host3:4032/path/to/namespace"); + assertTrue(option1 || option2 || option3); + } + } + + @Test + public void testKubeProxyURI() { + String serviceUri = "http://localhost:57777/api/v1/namespaces/blah-blah/services/pulsar:8080/proxy"; + assertEquals(ServiceURI.create(serviceUri).selectOne(), + serviceUri); + } }