Skip to content

Commit

Permalink
Added cli commands to get function cluster related information (apach…
Browse files Browse the repository at this point in the history
…e#2426)

* Added command line to get cluster/cluster leader/function assignment information.
Also refactored such kind of meta requests to a seperate endpoint

* Removed left-over debug statements

* Added back /functionsmetrics

* Added /functionsmetrics back to the broker worker

* Removed leftover references of getcluster

* Fix integration tests

* Fixed instantiation of service in worker only mode

* Removed log statement

* Seperated stats calls to a seperate endpoint to mimic broker
  • Loading branch information
srkukarni authored Aug 25, 2018
1 parent d4b214c commit f44d367
Show file tree
Hide file tree
Showing 21 changed files with 640 additions and 296 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,50 +198,6 @@ public Response listFunctions(final @PathParam("tenant") String tenant,

}

@GET
@ApiOperation(
value = "Fetches information about the Pulsar cluster running Pulsar Functions",
response = WorkerInfo.class,
responseContainer = "List"
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")

})
@Path("/cluster")
@Produces(MediaType.APPLICATION_JSON)
public List<WorkerInfo> getCluster() {
return functions.getCluster();
}

@GET
@ApiOperation(
value = "Fetches info about the leader node of the Pulsar cluster running Pulsar Functions",
response = WorkerInfo.class
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")

})
@Path("/cluster/leader")
public WorkerInfo getClusterLeader() {
return functions.getClusterLeader();
}

@GET
@ApiOperation(
value = "Fetches information about which Pulsar Functions are assigned to which Pulsar clusters",
response = Assignment.class,
responseContainer = "Map"
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
})
@Path("/assignments")
public Response getAssignments() {
return functions.getAssignments();
}

@POST
@ApiOperation(
value = "Triggers a Pulsar Function with a user-specified value or file data",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin.v2;

import java.util.function.Supplier;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.worker.WorkerService;

import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;

@Slf4j
@Path("/worker")
public class Worker extends AdminResource implements Supplier<WorkerService> {

private final WorkerImpl worker;

public Worker() {
this.worker = new WorkerImpl(this);
}

@Override
public WorkerService get() {
return pulsar().getWorkerService();
}

@GET
@ApiOperation(
value = "Fetches information about the Pulsar cluster running Pulsar Functions"
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")

})
@Path("/cluster")
@Produces(MediaType.APPLICATION_JSON)
public Response getCluster() {
return worker.getCluster();
}

@GET
@ApiOperation(
value = "Fetches info about the leader node of the Pulsar cluster running Pulsar Functions")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")

})
@Path("/cluster/leader")
@Produces(MediaType.APPLICATION_JSON)
public Response getClusterLeader() {
return worker.getClusterLeader();
}

@GET
@ApiOperation(
value = "Fetches information about which Pulsar Functions are assigned to which Pulsar clusters",
response = Function.Assignment.class,
responseContainer = "Map"
)
@ApiResponses(value = {
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
})
@Path("/assignments")
public Response getAssignments() {
return worker.getAssignments();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,51 @@
*/
package org.apache.pulsar.broker.admin.v2;

import java.io.IOException;
import java.util.Collection;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;

import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
import org.apache.pulsar.functions.worker.rest.FunctionApiResource;

import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.api.WorkerImpl;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.Collection;
import java.util.function.Supplier;

@Slf4j
@Path("/worker-stats")
public class WorkerStats extends FunctionApiResource {
public class WorkerStats extends AdminResource implements Supplier<WorkerService> {

@GET
@Path("/functions")
@ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = Metrics.class)
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 503, message = "Worker service is not running") })
public Response getStats() throws IOException {
return functions.getFunctionsMetrcis(clientAppId());
private final WorkerImpl worker;

public WorkerStats() {
this.worker = new WorkerImpl(this);
}

@Override
public WorkerService get() {
return pulsar().getWorkerService();
}

@GET
@Path("/metrics")
@ApiOperation(value = "Gets the metrics for Monitoring", notes = "Request should be executed by Monitoring agent on each worker to fetch the worker-metrics", response = org.apache.pulsar.common.stats.Metrics.class, responseContainer = "List")
@ApiResponses(value = { @ApiResponse(code = 401, message = "Don't have admin permission") })
public Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws Exception {
return functions.getWorkerMetrcis(clientAppId());
return worker.getWorkerMetrcis(clientAppId());
}

@GET
@Path("/functionsmetrics")
@ApiOperation(value = "Get metrics for all functions owned by worker", notes = "Requested should be executed by Monitoring agent on each worker to fetch the metrics", response = Metrics.class)
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 503, message = "Worker service is not running") })
public Response getStats() throws IOException {
return worker.getFunctionsMetrics(clientAppId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,4 @@ private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) {

return new WorkerService(workerConfig);
}

@Test
public void testGetWokersApi() throws Exception {
List<WorkerInfo> workers = admin.functions().getCluster();
Assert.assertEquals(workers.size(), 1);
Assert.assertEquals(workers.get(0).getPort(), workerServicePort);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,4 @@ public interface Functions {
*
*/
Set<String> getSinks() throws PulsarAdminException;

/**
* Get list of workers present under a cluster
* @return
* @throws PulsarAdminException
*/
List<WorkerInfo> getCluster() throws PulsarAdminException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.pulsar.client.admin.internal.NonPersistentTopicsImpl;
import org.apache.pulsar.client.admin.internal.SchemasImpl;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.client.admin.internal.WorkerStatsImpl;
import org.apache.pulsar.client.admin.internal.WorkerImpl;
import org.apache.pulsar.client.admin.internal.TenantsImpl;
import org.apache.pulsar.client.admin.internal.PulsarAdminBuilderImpl;
import org.apache.pulsar.client.admin.internal.ResourceQuotasImpl;
Expand Down Expand Up @@ -85,7 +85,7 @@ public class PulsarAdmin implements Closeable {
private final String serviceUrl;
private final Lookup lookups;
private final Functions functions;
private final WorkerStats workerStats;
private final Worker worker;
private final Schemas schemas;
protected final WebTarget root;
protected final Authentication auth;
Expand Down Expand Up @@ -189,7 +189,7 @@ public PulsarAdmin(String serviceUrl, ClientConfigurationData clientConfigData)
this.resourceQuotas = new ResourceQuotasImpl(root, auth);
this.lookups = new LookupImpl(root, auth, useTls);
this.functions = new FunctionsImpl(root, auth);
this.workerStats = new WorkerStatsImpl(root, auth);
this.worker = new WorkerImpl(root, auth);
this.schemas = new SchemasImpl(root, auth);
this.bookies = new BookiesImpl(root, auth);
}
Expand Down Expand Up @@ -361,8 +361,8 @@ public Functions functions() {
*
* @return the Worker stats
*/
public WorkerStats workerStats() {
return workerStats;
public Worker worker() {
return worker;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
package org.apache.pulsar.client.admin;

import java.util.Collection;
import java.util.List;
import java.util.Map;

import org.apache.pulsar.functions.proto.InstanceCommunication.Metrics;
import org.apache.pulsar.functions.worker.WorkerInfo;

/**
* Admin interface for worker stats management.
*/
public interface WorkerStats {
public interface Worker {


/**
Expand All @@ -41,4 +44,25 @@ public interface WorkerStats {
* @throws PulsarAdminException
*/
Collection<org.apache.pulsar.common.stats.Metrics> getMetrics() throws PulsarAdminException;

/**
* Get List of all workers belonging to this cluster
* @return
* @throws PulsarAdminException
*/
List<WorkerInfo> getCluster() throws PulsarAdminException;

/**
* Get the worker who is the leader of the cluster
* @return
* @throws PulsarAdminException
*/
WorkerInfo getClusterLeader() throws PulsarAdminException;

/**
* Get the function assignment among the cluster
* @return
* @throws PulsarAdminException
*/
Map<String, Collection<String>> getAssignments() throws PulsarAdminException;
}
Loading

0 comments on commit f44d367

Please sign in to comment.