Skip to content

Commit

Permalink
Cleaning up and improving worker endpoints (apache#3191)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrypeng authored and srkukarni committed Dec 14, 2018
1 parent 018fb04 commit 23a6622
Show file tree
Hide file tree
Showing 10 changed files with 228 additions and 214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,24 @@
*/
package org.apache.pulsar.broker.admin.v2;

import java.util.function.Supplier;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.worker.WorkerService;

import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;

@Slf4j
@Path("/worker")
public class Worker extends AdminResource implements Supplier<WorkerService> {
Expand All @@ -53,42 +53,47 @@ public WorkerService get() {

@GET
@ApiOperation(
value = "Fetches information about the Pulsar cluster running Pulsar Functions"
value = "Fetches information about the Pulsar cluster running Pulsar Functions",
response = WorkerInfo.class,
responseContainer = "List"
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")

@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 503, message = "Worker service is not running")
})
@Path("/cluster")
@Produces(MediaType.APPLICATION_JSON)
public Response getCluster() {
public List<WorkerInfo> getCluster() {
return worker.getCluster();
}

@GET
@ApiOperation(
value = "Fetches info about the leader node of the Pulsar cluster running Pulsar Functions")
value = "Fetches info about the leader node of the Pulsar cluster running Pulsar Functions",
response = WorkerInfo.class
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")

@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 503, message = "Worker service is not running")
})
@Path("/cluster/leader")
@Produces(MediaType.APPLICATION_JSON)
public Response getClusterLeader() {
public WorkerInfo getClusterLeader() {
return worker.getClusterLeader();
}

@GET
@ApiOperation(
value = "Fetches information about which Pulsar Functions are assigned to which Pulsar clusters",
response = Function.Assignment.class,
responseContainer = "Map"
response = Map.class
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 503, message = "Worker service is not running")
})
@Path("/assignments")
public Response getAssignments() {
@Produces(MediaType.APPLICATION_JSON)
public Map<String, Collection<String>> getAssignments() {
return worker.getAssignments();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,17 @@
import io.swagger.annotations.ApiResponses;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;

@Slf4j
Expand All @@ -51,18 +53,33 @@ public WorkerService get() {

@GET
@Path("/metrics")
@ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, responseContainer = "List")
@ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission") })
@ApiOperation(
value = "Gets the metrics for Monitoring",
notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics",
response = org.apache.pulsar.common.stats.Metrics.class,
responseContainer = "List")
@ApiResponses(value = {
@ApiResponse(code = 401, message = "Don't have admin permission"),
@ApiResponse(code = 503, message = "Worker service is not running")
})
@Produces(MediaType.APPLICATION_JSON)
public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws Exception {
return worker.getWorkerMetrics(clientAppId());
}

@GET
@Path("/functionsmetrics")
@ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = Metrics.class)
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 503, message = "Worker service is not running") })
public Response getStats() throws IOException {
@ApiOperation(
value = "Get metrics for all functions owned by worker",
notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics",
response = WorkerFunctionInstanceStats.class,
responseContainer = "List")
@ApiResponses(value = {
@ApiResponse(code = 401, message = "Don't have admin permission"),
@ApiResponse(code = 503, message = "Worker service is not running")
})
@Produces(MediaType.APPLICATION_JSON)
public List<WorkerFunctionInstanceStats> getStats() throws IOException {
return worker.getFunctionsMetrics(clientAppId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import java.util.List;
import java.util.Map;

import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;

/**
* Admin interface for worker stats management.
Expand All @@ -36,8 +36,8 @@ public interface Worker {
* @return
* @throws PulsarAdminException
*/
Metrics getFunctionsStats() throws PulsarAdminException;
List<WorkerFunctionInstanceStats> getFunctionsStats() throws PulsarAdminException;

/**
* Get worker metrics.
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,20 @@
*/
package org.apache.pulsar.client.admin.internal;

import static org.apache.pulsar.client.admin.internal.FunctionsImpl.mergeJson;

import java.lang.reflect.Type;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Worker;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.WorkerFunctionInstanceStats;

import javax.ws.rs.ClientErrorException;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.Response;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.Worker;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.functions.WorkerInfo;
import java.util.Collection;
import java.util.List;
import java.util.Map;

@Slf4j
public class WorkerImpl extends BaseResource implements Worker {
Expand All @@ -53,27 +46,28 @@ public WorkerImpl(WebTarget web, Authentication auth) {
}

@Override
public Metrics getFunctionsStats() throws PulsarAdminException {
public List<WorkerFunctionInstanceStats> getFunctionsStats() throws PulsarAdminException {
try {
Response response = request(workerStats.path("functionsmetrics")).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
String jsonResponse = response.readEntity(String.class);
Metrics.Builder metricsBuilder = Metrics.newBuilder();
mergeJson(jsonResponse, metricsBuilder);
return metricsBuilder.build();
} catch (Exception e) {
throw getApiException(e);
}
}
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
List<WorkerFunctionInstanceStats> metricsList
= response.readEntity(new GenericType<List<WorkerFunctionInstanceStats>>() {});
return metricsList;
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws PulsarAdminException {
try {
return request(workerStats.path("metrics"))
.get(new GenericType<List<org.apache.pulsar.common.stats.Metrics>>() {
});
Response response = request(workerStats.path("metrics")).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
return response.readEntity(new GenericType<List<org.apache.pulsar.common.stats.Metrics>>() {});
} catch (Exception e) {
throw getApiException(e);
}
Expand All @@ -82,9 +76,11 @@ public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws Pu
@Override
public List<WorkerInfo> getCluster() throws PulsarAdminException {
try {
return request(worker.path("cluster"))
.get(new GenericType<List<WorkerInfo>>() {
});
Response response = request(worker.path("cluster")).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
return response.readEntity(new GenericType<List<WorkerInfo>>() {});
} catch (Exception e) {
throw getApiException(e);
}
Expand All @@ -93,8 +89,11 @@ public List<WorkerInfo> getCluster() throws PulsarAdminException {
@Override
public WorkerInfo getClusterLeader() throws PulsarAdminException {
try {
return request(worker.path("cluster").path("leader"))
.get(new GenericType<WorkerInfo>(){});
Response response = request(worker.path("cluster").path("leader")).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
return response.readEntity(new GenericType<WorkerInfo>(){});
} catch (Exception e) {
throw getApiException(e);
}
Expand All @@ -107,9 +106,8 @@ public Map<String, Collection<String>> getAssignments() throws PulsarAdminExcept
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
String jsonResponse = response.readEntity(String.class);
Type type = new TypeToken<Map<String, Collection<String>>>(){}.getType();
Map<String, Collection<String>> assignments = new Gson().fromJson(jsonResponse, type);
Map<String, Collection<String>> assignments
= response.readEntity(new GenericType<Map<String, Collection<String>>>() {});
return assignments;
} catch (Exception e) {
throw getApiException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,14 @@ <T> void print(List<T> items) {
}
}

<T> void printList(T item) {
try {
System.out.println(writer.writeValueAsString(item));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

<T> void print(T item) {
try {
System.out.println(writer.writeValueAsString(item));
Expand Down
Loading

0 comments on commit 23a6622

Please sign in to comment.