Skip to content

Commit

Permalink
REST and CLI to get function metrics in json for monitoring (apache#2296
Browse files Browse the repository at this point in the history
)

* REST and CLI to get function metrics in json for monitoring

* add worker-stats end-point
  • Loading branch information
rdhabalia authored Aug 8, 2018
1 parent 3b3f541 commit 20b41de
Show file tree
Hide file tree
Showing 15 changed files with 386 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin.v2;

import java.io.IOException;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;

import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
import org.apache.pulsar.functions.worker.rest.FunctionApiResource;

import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Path("/worker-stats")
public class WorkerStats extends FunctionApiResource {

@GET
@Path("/functions")
@ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = Metrics.class)
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 503, message = "Worker service is not running") })
public Response getMetrics() throws IOException {
return functions.getFunctionsMetrcis(clientAppId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
import org.apache.pulsar.functions.worker.WorkerInfo;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl;
import org.apache.pulsar.client.admin.internal.SchemasImpl;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.client.admin.internal.WorkerStatsImpl;
import org.apache.pulsar.client.admin.internal.TenantsImpl;
import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
import org.apache.pulsar.client.admin.internal.ResourceQuotasImpl;
Expand Down Expand Up @@ -84,6 +85,7 @@ public class PulsarAdmin implements Closeable {
private final String serviceUrl;
private final Lookup lookups;
private final Functions functions;
private final WorkerStats workerStats;
private final Schemas schemas;
protected final WebTarget root;
protected final Authentication auth;
Expand Down Expand Up @@ -187,6 +189,7 @@ public PulsarAdmin(String serviceUrl, ClientConfigurationData clientConfigData)
this.resourceQuotas = new ResourceQuotasImpl(root, auth);
this.lookups = new LookupImpl(root, auth, useTls);
this.functions = new FunctionsImpl(root, auth);
this.workerStats = new WorkerStatsImpl(root, auth);
this.schemas = new SchemasImpl(root, auth);
this.bookies = new BookiesImpl(root, auth);
}
Expand Down Expand Up @@ -354,6 +357,14 @@ public Functions functions() {
return functions;
}

/**
*
* @return the Worker stats
*/
public WorkerStats workerStats() {
return workerStats;
}

/**
* @return the broker statics
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.admin;

import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;

/**
* Admin interface for worker stats management.
*/
public interface WorkerStats {


/**
* Get all functions stats on a worker
* @return
* @throws PulsarAdminException
*/
Metrics getFunctionsStats() throws PulsarAdminException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public List<WorkerInfo> getCluster() throws PulsarAdminException {
throw getApiException(e);
}
}

public static void mergeJson(String json, Builder builder) throws IOException {
JsonFormat.parser().merge(json, builder);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.admin.internal;

import javax.ws.rs.ClientErrorException;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;

import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.WorkerStats;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
import static org.apache.pulsar.client.admin.internal.FunctionsImpl.mergeJson;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class WorkerStatsImpl extends BaseResource implements WorkerStats {

private final WebTarget workerStats;

public WorkerStatsImpl(WebTarget web, Authentication auth) {
super(auth);
this.workerStats = web.path("/admin/worker-stats");
}

@Override
public Metrics getFunctionsStats() throws PulsarAdminException {
try {
Response response = request(workerStats.path("functions")).get();
if (!response.getStatusInfo().equals(Response.Status.OK)) {
throw new ClientErrorException(response);
}
String jsonResponse = response.readEntity(String.class);
Metrics.Builder metricsBuilder = Metrics.newBuilder();
mergeJson(jsonResponse, metricsBuilder);
return metricsBuilder.build();
} catch (Exception e) {
throw getApiException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.admin.cli;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.utils.Utils;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParser;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Parameters(commandDescription = "Operations to collect function-worker statistics")
public class CmdFunctionWorkerStats extends CmdBase {

private final FunctionsStats functionsStats;

/**
* Base command
*/
@Getter
abstract class BaseCommand extends CliCommand {
@Override
void run() throws Exception {
processArguments();
runCmd();
}

void processArguments() throws Exception {
}

abstract void runCmd() throws Exception;
}

@Parameters(commandDescription = "dump all functions stats")
class FunctionsStats extends BaseCommand {

@Parameter(names = { "-i", "--indent" }, description = "Indent JSON output", required = false)
boolean indent = false;

@Override
void runCmd() throws Exception {
String json = Utils.printJson(admin.workerStats().getFunctionsStats());
GsonBuilder gsonBuilder = new GsonBuilder();
if (indent) {
gsonBuilder.setPrettyPrinting();
}
System.out.println(gsonBuilder.create().toJson(new JsonParser().parse(json)));
}
}

public CmdFunctionWorkerStats(PulsarAdmin admin) throws PulsarClientException {
super("functions", admin);
functionsStats = new FunctionsStats();
jcommander.addCommand("functions", functionsStats);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,7 @@ void runCmd() throws Exception {
System.out.println(gson.toJson(new JsonParser().parse(json)));
}
}

public CmdFunctions(PulsarAdmin admin) throws PulsarClientException {
super("functions", admin);
localRunner = new LocalRunner();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class PulsarAdminTool {

commandMap.put("resource-quotas", CmdResourceQuotas.class);
commandMap.put("functions", CmdFunctions.class);
commandMap.put("functions-worker-stats", CmdFunctionWorkerStats.class);
commandMap.put("source", CmdSources.class);
commandMap.put("sink", CmdSinks.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,12 @@ service InstanceControl {
rpc GetMetrics(google.protobuf.Empty) returns (MetricsData) {}
rpc HealthCheck(google.protobuf.Empty) returns (HealthCheckResult) {}
}

message Metrics {
message InstanceMetrics {
string name = 1;
int32 instanceId = 2;
MetricsData metricsData = 3;
}
repeated InstanceMetrics metrics = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.pulsar.functions.worker.rest.api.FunctionsMetricsResource;
import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource;
import org.apache.pulsar.functions.worker.rest.api.v2.WorkerStats;
import org.glassfish.jersey.media.multipart.MultiPartFeature;

import java.util.Arrays;
Expand All @@ -36,6 +37,7 @@ public static Set<Class<?>> getApiResources() {
return new HashSet<>(
Arrays.asList(
FunctionApiV2Resource.class,
WorkerStats.class,
MultiPartFeature.class
));
}
Expand Down
Loading

0 comments on commit 20b41de

Please sign in to comment.