From b9651a9ff40461730f5985d6d2888f8c91e087f1 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Wed, 18 Mar 2020 21:36:56 +0800 Subject: [PATCH] [PulsarAdmin] Brokers to async (#6541) * brokers to async * retrigger * remove slash --- .../apache/pulsar/client/admin/Brokers.java | 112 ++++++- .../client/admin/internal/BrokersImpl.java | 274 +++++++++++++++--- 2 files changed, 334 insertions(+), 52 deletions(-) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Brokers.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Brokers.java index 83ccac5fc1d51..003ffc3bcd6f4 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Brokers.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Brokers.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException; import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException; @@ -53,9 +54,25 @@ public interface Brokers { */ List getActiveBrokers(String cluster) throws PulsarAdminException; + /** + * Get the list of active brokers in the cluster asynchronously. + *

+ * Get the list of active brokers (web service addresses) in the cluster. + *

+ * Response Example: + * + *

+	 * ["prod1-broker1.messaging.use.example.com:8080", "prod1-broker2.messaging.use.example.com:8080", "prod1-broker3.messaging.use.example.com:8080"]
+	 * 
+ * + * @param cluster + * Cluster name + * @return a list of (host:port) + */ + CompletableFuture> getActiveBrokersAsync(String cluster); /** - * Get the map of owned namespaces and their status from a single broker in the cluster + * Get the map of owned namespaces and their status from a single broker in the cluster. *

* The map is returned in a JSON object format below *

@@ -71,36 +88,85 @@ public interface Brokers { * @throws PulsarAdminException */ Map getOwnedNamespaces(String cluster, String brokerUrl) throws PulsarAdminException; - - /** + + /** + * Get the map of owned namespaces and their status from a single broker in the cluster asynchronously. + *

+ * The map is returned in a JSON object format below + *

+ * Response Example: + * + *

+	 * {"ns-1":{"broker_assignment":"shared","is_active":"true","is_controlled":"false"}, "ns-2":{"broker_assignment":"primary","is_active":"true","is_controlled":"true"}}
+	 * 
+ * + * @param cluster + * @param brokerUrl + * @return + */ + CompletableFuture> getOwnedNamespacesAsync(String cluster, String brokerUrl); + + /** + * Update a dynamic configuration value into ZooKeeper. + *

* It updates dynamic configuration value in to Zk that triggers watch on * brokers and all brokers can update {@link ServiceConfiguration} value * locally * - * @param key - * @param value + * @param configName + * @param configValue * @throws PulsarAdminException */ void updateDynamicConfiguration(String configName, String configValue) throws PulsarAdminException; + + /** + * Update a dynamic configuration value into ZooKeeper asynchronously. + *

+ * It updates dynamic configuration value in to Zk that triggers watch on + * brokers and all brokers can update {@link ServiceConfiguration} value + * locally + * + * @param configName + * @param configValue + */ + CompletableFuture updateDynamicConfigurationAsync(String configName, String configValue); /** - * It deletes dynamic configuration value in to Zk. It will not impact current value in broker but next time when + * It deletes dynamic configuration value into ZooKeeper. + *

+ * It will not impact current value in broker but next time when * broker restarts, it applies value from configuration file only. * - * @param key - * @param value + * @param configName * @throws PulsarAdminException */ void deleteDynamicConfiguration(String configName) throws PulsarAdminException; + /** + * It deletes dynamic configuration value into ZooKeeper asynchronously. + *

+ * It will not impact current value in broker but next time when + * broker restarts, it applies value from configuration file only. + * + * @param configName + */ + CompletableFuture deleteDynamicConfigurationAsync(String configName); + /** - * Get list of updatable configuration name + * Get list of updatable configuration name. * * @return * @throws PulsarAdminException */ List getDynamicConfigurationNames() throws PulsarAdminException; + /** + * Get list of updatable configuration name asynchronously. + * + * @return + */ + CompletableFuture> getDynamicConfigurationNamesAsync(); + /** * Get values of runtime configuration * @@ -109,6 +175,13 @@ public interface Brokers { */ Map getRuntimeConfigurations() throws PulsarAdminException; + /** + * Get values of runtime configuration asynchronously. + * + * @return + */ + CompletableFuture> getRuntimeConfigurationsAsync(); + /** * Get values of all overridden dynamic-configs * @@ -117,6 +190,13 @@ public interface Brokers { */ Map getAllDynamicConfigurations() throws PulsarAdminException; + /** + * Get values of all overridden dynamic-configs asynchronously. + * + * @return + */ + CompletableFuture> getAllDynamicConfigurationsAsync(); + /** * Get the internal configuration data. * @@ -124,10 +204,22 @@ public interface Brokers { */ InternalConfigurationData getInternalConfigurationData() throws PulsarAdminException; + /** + * Get the internal configuration data asynchronously. + * + * @return internal configuration data. + */ + CompletableFuture getInternalConfigurationDataAsync(); + /** * Run a healthcheck on the broker. * - * @throws an exception if the healthcheck fails. + * @throws PulsarAdminException if the healthcheck fails. */ void healthcheck() throws PulsarAdminException; + + /** + * Run a healthcheck on the broker asynchronously. + */ + CompletableFuture healthcheckAsync(); } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java index aceb3364a9bab..26131c8eb9faf 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/BrokersImpl.java @@ -20,16 +20,20 @@ import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.ws.rs.client.Entity; +import javax.ws.rs.client.InvocationCallback; import javax.ws.rs.client.WebTarget; -import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.MediaType; import org.apache.pulsar.client.admin.Brokers; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.common.conf.InternalConfigurationData; -import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus; import org.apache.pulsar.common.util.Codec; @@ -38,99 +42,285 @@ public class BrokersImpl extends BaseResource implements Brokers { public BrokersImpl(WebTarget web, Authentication auth, long readTimeoutMs) { super(auth, readTimeoutMs); - adminBrokers = web.path("/admin/v2/brokers"); + adminBrokers = web.path("admin/v2/brokers"); } @Override public List getActiveBrokers(String cluster) throws PulsarAdminException { try { - return request(adminBrokers.path(cluster)).get(new GenericType>() { - }); - } catch (Exception e) { - throw getApiException(e); + return getActiveBrokersAsync(cluster).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); } } + @Override + public CompletableFuture> getActiveBrokersAsync(String cluster) { + WebTarget path = adminBrokers.path(cluster); + final CompletableFuture> future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback>() { + @Override + public void completed(List brokers) { + future.complete(brokers); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + @Override public Map getOwnedNamespaces(String cluster, String brokerUrl) throws PulsarAdminException { try { - return request(adminBrokers.path(cluster).path(brokerUrl).path("ownedNamespaces")).get( - new GenericType>() { - }); - } catch (Exception e) { - throw getApiException(e); + return getOwnedNamespacesAsync(cluster, brokerUrl).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); } } + @Override + public CompletableFuture> getOwnedNamespacesAsync(String cluster, String brokerUrl) { + WebTarget path = adminBrokers.path(cluster).path(brokerUrl).path("ownedNamespaces"); + final CompletableFuture> future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback>() { + @Override + public void completed(Map ownedNamespaces) { + future.complete(ownedNamespaces); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + @Override public void updateDynamicConfiguration(String configName, String configValue) throws PulsarAdminException { try { - String value = Codec.encode(configValue); - request(adminBrokers.path("/configuration/").path(configName).path(value)).post(Entity.json(""), - ErrorData.class); - } catch (Exception e) { - throw getApiException(e); + updateDynamicConfigurationAsync(configName, configValue). + get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); } } + @Override + public CompletableFuture updateDynamicConfigurationAsync(String configName, String configValue) { + String value = Codec.encode(configValue); + WebTarget path = adminBrokers.path("configuration").path(configName).path(value); + return asyncPostRequest(path, Entity.entity("", MediaType.APPLICATION_JSON)); + } + @Override public void deleteDynamicConfiguration(String configName) throws PulsarAdminException { try { - request(adminBrokers.path("/configuration/").path(configName)).delete(ErrorData.class); - } catch (Exception e) { - throw getApiException(e); + deleteDynamicConfigurationAsync(configName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); } } - + + @Override + public CompletableFuture deleteDynamicConfigurationAsync(String configName) { + WebTarget path = adminBrokers.path("configuration").path(configName); + return asyncDeleteRequest(path); + } + @Override public Map getAllDynamicConfigurations() throws PulsarAdminException { try { - return request(adminBrokers.path("/configuration/").path("values")).get(new GenericType>() { - }); - } catch (Exception e) { - throw getApiException(e); + return getAllDynamicConfigurationsAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); } } + @Override + public CompletableFuture> getAllDynamicConfigurationsAsync() { + WebTarget path = adminBrokers.path("configuration").path("values"); + final CompletableFuture> future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback>() { + @Override + public void completed(Map allConfs) { + future.complete(allConfs); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + @Override public List getDynamicConfigurationNames() throws PulsarAdminException { try { - return request(adminBrokers.path("/configuration")).get(new GenericType>() { - }); - } catch (Exception e) { - throw getApiException(e); + return getDynamicConfigurationNamesAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); } } + @Override + public CompletableFuture> getDynamicConfigurationNamesAsync() { + WebTarget path = adminBrokers.path("configuration"); + final CompletableFuture> future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback>() { + @Override + public void completed(List confNames) { + future.complete(confNames); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + @Override public Map getRuntimeConfigurations() throws PulsarAdminException { try { - return request(adminBrokers.path("/configuration").path("runtime")).get(new GenericType>() { - }); - } catch (Exception e) { - throw getApiException(e); + return getRuntimeConfigurationsAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); } } + @Override + public CompletableFuture> getRuntimeConfigurationsAsync() { + WebTarget path = adminBrokers.path("configuration").path("runtime"); + final CompletableFuture> future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback>() { + @Override + public void completed(Map runtimeConfs) { + future.complete(runtimeConfs); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + @Override public InternalConfigurationData getInternalConfigurationData() throws PulsarAdminException { try { - return request(adminBrokers.path("/internal-configuration")).get(InternalConfigurationData.class); - } catch (Exception e) { - throw getApiException(e); + return getInternalConfigurationDataAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); } } + @Override + public CompletableFuture getInternalConfigurationDataAsync() { + WebTarget path = adminBrokers.path("internal-configuration"); + final CompletableFuture future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback() { + @Override + public void completed(InternalConfigurationData internalConfigurationData) { + future.complete(internalConfigurationData); + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } + @Override public void healthcheck() throws PulsarAdminException { try { - String result = request(adminBrokers.path("/health")).get(String.class); - if (!result.trim().toLowerCase().equals("ok")) { - throw new PulsarAdminException("Healthcheck returned unexpected result: " + result); - } - } catch (Exception e) { - throw getApiException(e); + healthcheckAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + throw (PulsarAdminException) e.getCause(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarAdminException(e); + } catch (TimeoutException e) { + throw new PulsarAdminException.TimeoutException(e); } } + + @Override + public CompletableFuture healthcheckAsync() { + WebTarget path = adminBrokers.path("health"); + final CompletableFuture future = new CompletableFuture<>(); + asyncGetRequest(path, + new InvocationCallback() { + @Override + public void completed(String result) { + if (!"ok".equalsIgnoreCase(result.trim())) { + future.completeExceptionally( + new PulsarAdminException("Healthcheck returned unexpected result: " + result)); + } else { + future.complete(null); + } + } + + @Override + public void failed(Throwable throwable) { + future.completeExceptionally(getApiException(throwable.getCause())); + } + }); + return future; + } }