From 85d4c237d0e2caa450eb63e6ae3156f77e1c389d Mon Sep 17 00:00:00 2001 From: Ivan Kelly Date: Mon, 30 Aug 2021 16:39:07 +0100 Subject: [PATCH] [admin] Allow http service URIs with path (#11841) e943d1b2 changed how the root URI was generated for PulsarAdmin so that if multiple hosts had been selected in the URI, only one would be used. However, the change in question discarded the path from the URI which is necessary for some usecases, such as using PulsarAdmin over kube-proxy. This change adds a method to ServiceURI to generate a correct fully formed URI with a single host, and uses this to generate the URI for PulsarAdmin. --- .../admin/internal/PulsarAdminImpl.java | 6 +--- .../apache/pulsar/common/net/ServiceURI.java | 22 ++++++++++++ .../pulsar/common/net/ServiceURITest.java | 36 +++++++++++++++++++ 3 files changed, 59 insertions(+), 5 deletions(-) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java index e4fdf132896d8..8980dbdde076a 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.net.URL; import java.util.Map; -import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; @@ -192,10 +191,7 @@ public PulsarAdminImpl(String serviceUrl, this.serviceUrl = serviceUrl; ServiceURI serviceUri = ServiceURI.create(serviceUrl); - root = client.target(String.format("%s://%s" - , serviceUri.getServiceScheme() - , serviceUri.getServiceHosts()[ThreadLocalRandom.current() - .nextInt(serviceUri.getServiceHosts().length)])); + root = client.target(serviceUri.selectOne()); this.asyncHttpConnector = asyncConnectorProvider.getConnector( Math.toIntExact(connectTimeoutUnit.toMillis(this.connectTimeout)), diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/net/ServiceURI.java b/pulsar-common/src/main/java/org/apache/pulsar/common/net/ServiceURI.java index 28667559e4485..16a070d41865c 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/net/ServiceURI.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/net/ServiceURI.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Collectors; import lombok.AccessLevel; import lombok.EqualsAndHashCode; @@ -237,4 +238,25 @@ private static int getServicePort(String serviceName, String[] serviceInfos) { return port; } + /** + * Create a new URI from the service URI which only specifies one of the hosts. + * @return a pulsar service URI with a single host specified + */ + public String selectOne() { + StringBuilder sb = new StringBuilder(); + if (serviceName != null) { + sb.append(serviceName); + + for (int i = 0; i < serviceInfos.length; i++) { + sb.append('+').append(serviceInfos[i]); + } + sb.append("://"); + } + if (serviceUser != null) { + sb.append(serviceUser).append('@'); + } + int hostIndex = ThreadLocalRandom.current().nextInt(serviceHosts.length); + sb.append(serviceHosts[hostIndex]); + return sb.append(servicePath).toString(); + } } diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/net/ServiceURITest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/net/ServiceURITest.java index 1935c88a827a8..87279a92df0f9 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/net/ServiceURITest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/net/ServiceURITest.java @@ -278,4 +278,40 @@ public void testUserInfoWithMultipleHosts() { "/path/to/namespace"); } + @Test + public void testSelectOneSingleHost() { + String serviceUri = "https://host1:6650/path/to/namespace"; + assertEquals(ServiceURI.create(serviceUri).selectOne(), + serviceUri); + } + + @Test + public void testSelectOneMultipleHosts() { + String serviceUri = "https://host1:6650;host2/"; + for (int i = 0; i < 10; i++) { + String selected = ServiceURI.create(serviceUri).selectOne(); + boolean option1 = selected.equals("https://host1:6650/"); + boolean option2 = selected.equals("https://host2:443/"); + assertTrue(option1 || option2); + } + } + + @Test + public void testSelectOneAllBellsAndWhistles() { + String serviceUri = "https+blah://user1@host1:6650;host2;host3:4032/path/to/namespace"; + for (int i = 0; i < 10; i++) { + String selected = ServiceURI.create(serviceUri).selectOne(); + boolean option1 = selected.equals("https+blah://user1@host1:6650/path/to/namespace"); + boolean option2 = selected.equals("https+blah://user1@host2:443/path/to/namespace"); + boolean option3 = selected.equals("https+blah://user1@host3:4032/path/to/namespace"); + assertTrue(option1 || option2 || option3); + } + } + + @Test + public void testKubeProxyURI() { + String serviceUri = "http://localhost:57777/api/v1/namespaces/blah-blah/services/pulsar:8080/proxy"; + assertEquals(ServiceURI.create(serviceUri).selectOne(), + serviceUri); + } }