Skip to content

Commit

Permalink
[PulsarAdmin] BrokerStats to async (apache#6549)
Browse files Browse the repository at this point in the history
  • Loading branch information
yjshen authored Mar 18, 2020
1 parent 254e54b commit b77d3c6
Show file tree
Hide file tree
Showing 2 changed files with 244 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;

import java.util.concurrent.CompletableFuture;

/**
* Admin interface for brokers management.
*/
public interface BrokerStats {

/**
* Returns Monitoring metrics
* Returns Monitoring metrics.
*
* @return
* @throws PulsarAdminException
Expand All @@ -39,7 +41,15 @@ public interface BrokerStats {
JsonArray getMetrics() throws PulsarAdminException;

/**
* Requests JSON string server mbean dump
* Returns Monitoring metrics asynchronously.
*
* @return
*/

CompletableFuture<JsonArray> getMetricsAsync();

/**
* Requests JSON string server mbean dump.
* <p>
* Notes: since we don't plan to introspect the response we avoid converting the response into POJO.
*
Expand All @@ -49,7 +59,16 @@ public interface BrokerStats {
JsonArray getMBeans() throws PulsarAdminException;

/**
* Returns JSON string topics stats
* Requests JSON string server mbean dump asynchronously.
* <p>
* Notes: since we don't plan to introspect the response we avoid converting the response into POJO.
*
* @return
*/
CompletableFuture<JsonArray> getMBeansAsync();

/**
* Returns JSON string topics stats.
* <p>
* Notes: since we don't plan to introspect the response we avoid converting the response into POJO.
*
Expand All @@ -58,9 +77,63 @@ public interface BrokerStats {
*/
JsonObject getTopics() throws PulsarAdminException;

/**
* Returns JSON string topics stats asynchronously.
* <p>
* Notes: since we don't plan to introspect the response we avoid converting the response into POJO.
*
* @return
*/
CompletableFuture<JsonObject> getTopicsAsync();

/**
* Get pending bookie client op stats by namespace.
* <p>
* Notes: since we don't plan to introspect the response we avoid converting the response into POJO.
*
* @return
* @throws PulsarAdminException
*/
JsonObject getPendingBookieOpsStats() throws PulsarAdminException;

/**
* Get pending bookie client op stats by namespace asynchronously.
* <p>
* Notes: since we don't plan to introspect the response we avoid converting the response into POJO.
*
* @return
*/
CompletableFuture<JsonObject> getPendingBookieOpsStatsAsync();

/**
* Get the stats for the Netty allocator.
*
* @param allocatorName
* @return
* @throws PulsarAdminException
*/
AllocatorStats getAllocatorStats(String allocatorName) throws PulsarAdminException;

/**
* Get the stats for the Netty allocator asynchronously.
*
* @param allocatorName
* @return
*/
CompletableFuture<AllocatorStats> getAllocatorStatsAsync(String allocatorName);

/**
* Get load for this broker.
*
* @return
* @throws PulsarAdminException
*/
LoadManagerReport getLoadReport() throws PulsarAdminException;

/**
* Get load for this broker asynchronously.
*
* @return
*/
CompletableFuture<LoadManagerReport> getLoadReportAsync();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.admin.internal;

import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;

import org.apache.pulsar.client.admin.BrokerStats;
Expand All @@ -26,12 +27,16 @@
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.stats.AllocatorStats;
import org.apache.pulsar.policies.data.loadbalancer.LoadManagerReport;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;

import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* Pulsar Admin API client.
*
Expand All @@ -51,61 +56,201 @@ public BrokerStatsImpl(WebTarget target, Authentication auth, long readTimeoutMs
@Override
public JsonArray getMetrics() throws PulsarAdminException {
try {
String json = request(adminV2BrokerStats.path("/metrics")).get(String.class);
return new Gson().fromJson(json, JsonArray.class);
} catch (Exception e) {
throw getApiException(e);
return getMetricsAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<JsonArray> getMetricsAsync() {
WebTarget path = adminV2BrokerStats.path("/metrics");
final CompletableFuture<JsonArray> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<String>() {
@Override
public void completed(String s) {
future.complete(new Gson().fromJson(s, JsonArray.class));
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

@Override
public AllocatorStats getAllocatorStats(String allocatorName) throws PulsarAdminException {
try {
return request(adminV2BrokerStats.path("/allocator-stats").path(allocatorName)).get(AllocatorStats.class);
} catch (Exception e) {
throw getApiException(e);
return getAllocatorStatsAsync(allocatorName).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<AllocatorStats> getAllocatorStatsAsync(String allocatorName) {
WebTarget path = adminV2BrokerStats.path("/allocator-stats").path(allocatorName);
final CompletableFuture<AllocatorStats> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<AllocatorStats>() {
@Override
public void completed(AllocatorStats allocatorStats) {
future.complete(allocatorStats);
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

@Override
public JsonArray getMBeans() throws PulsarAdminException {
try {
String json = request(adminV2BrokerStats.path("/mbeans")).get(String.class);
return new Gson().fromJson(json, JsonArray.class);
} catch (Exception e) {
throw getApiException(e);
return getMBeansAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<JsonArray> getMBeansAsync() {
WebTarget path = adminV2BrokerStats.path("/mbeans");
final CompletableFuture<JsonArray> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<String>() {
@Override
public void completed(String s) {
future.complete(new Gson().fromJson(s, JsonArray.class));
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

@Override
public JsonObject getTopics() throws PulsarAdminException {
try {
String json = request(adminV2BrokerStats.path("/topics")).get(String.class);
return new Gson().fromJson(json, JsonObject.class);
} catch (Exception e) {
throw getApiException(e);
return getTopicsAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<JsonObject> getTopicsAsync() {
WebTarget path = adminV2BrokerStats.path("/topics");
final CompletableFuture<JsonObject> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<String>() {
@Override
public void completed(String s) {
future.complete(new Gson().fromJson(s, JsonObject.class));
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

@Override
public LoadManagerReport getLoadReport() throws PulsarAdminException {
try {
return request(adminV2BrokerStats.path("/load-report")).get(LocalBrokerData.class);
} catch (Exception e) {
throw getApiException(e);
return getLoadReportAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<LoadManagerReport> getLoadReportAsync() {
WebTarget path = adminV2BrokerStats.path("/load-report");
final CompletableFuture<LoadManagerReport> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<LoadManagerReport>() {
@Override
public void completed(LoadManagerReport loadManagerReport) {
future.complete(loadManagerReport);
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

@Override
public JsonObject getPendingBookieOpsStats() throws PulsarAdminException {
try {
String json = request(adminV2BrokerStats.path("/bookieops")).get(String.class);
return new Gson().fromJson(json, JsonObject.class);
} catch (Exception e) {
throw getApiException(e);
return getPendingBookieOpsStatsAsync().get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarAdminException(e);
} catch (TimeoutException e) {
throw new PulsarAdminException.TimeoutException(e);
}
}

@Override
public CompletableFuture<JsonObject> getPendingBookieOpsStatsAsync() {
WebTarget path = adminV2BrokerStats.path("/bookieops");
final CompletableFuture<JsonObject> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<String>() {
@Override
public void completed(String s) {
future.complete(new Gson().fromJson(s, JsonObject.class));
}

@Override
public void failed(Throwable throwable) {
future.completeExceptionally(getApiException(throwable.getCause()));
}
});
return future;
}

public JsonObject getBrokerResourceAvailability(String namespace) throws PulsarAdminException {
try {
NamespaceName ns = NamespaceName.get(namespace);
Expand Down

0 comments on commit b77d3c6

Please sign in to comment.