Skip to content

Commit

Permalink
adding windowed metrics for functions (apache#3021)
Browse files Browse the repository at this point in the history
* adding windowed metrics for functions

* adding license headers and cleaning up

* remove unnecessary import

* add RestException

* fixing bugs and refactoring code

* fix bug in instanceCache

* fix bug

* add test for stats and fix minor bug
  • Loading branch information
jerrypeng authored Nov 21, 2018
1 parent aaf224b commit 859c914
Show file tree
Hide file tree
Showing 27 changed files with 925 additions and 224 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant,
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
})
@Path("/{tenant}/{namespace}/{functionName}/stats")
public Response getFunctionStats(final @PathParam("tenant") String tenant,
public FunctionStats getFunctionStats(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName) throws IOException {
return functions.getFunctionStats(tenant, namespace, functionName, FunctionsImpl.FUNCTION, uri.getRequestUri());
Expand All @@ -206,7 +206,7 @@ public Response getFunctionStats(final @PathParam("tenant") String tenant,
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
})
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
public Response getFunctionInstanceStats(final @PathParam("tenant") String tenant,
public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionInstanceStats(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace,
final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.*;
import java.util.concurrent.TimeUnit;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand All @@ -45,6 +46,7 @@
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
Expand Down Expand Up @@ -357,13 +359,102 @@ public void testPulsarFunctionStats() throws Exception {
}, 5, 200);

FunctionRuntimeManager functionRuntimeManager = functionsWorkerService.getFunctionRuntimeManager();
FunctionStatusList functionStats = functionRuntimeManager.getAllFunctionStatus(tenant, namespacePortion,
FunctionStats functionStats = functionRuntimeManager.getFunctionStats(tenant, namespacePortion,
functionName, null);

FunctionStats functionStatsFromAdmin = admin.functions().getFunctionStats(tenant, namespacePortion,
functionName);

assertEquals(functionStats, functionStatsFromAdmin);

assertEquals(functionStats.getReceivedTotal(), totalMsgs);
assertEquals(functionStats.getProcessedSuccessfullyTotal(), totalMsgs);
assertEquals(functionStats.getSystemExceptionsTotal(), 0);
assertEquals(functionStats.getUserExceptionsTotal(), 0);
assertTrue(functionStats.avgProcessLatency > 0);
assertEquals(functionStats.oneMin.getReceivedTotal(), totalMsgs);
assertEquals(functionStats.oneMin.getProcessedSuccessfullyTotal(), totalMsgs);
assertEquals(functionStats.oneMin.getSystemExceptionsTotal(), 0);
assertEquals(functionStats.oneMin.getUserExceptionsTotal(), 0);
assertTrue(functionStats.oneMin.getAvgProcessLatency() > 0);
assertEquals(functionStats.getAvgProcessLatency(), functionStats.oneMin.getAvgProcessLatency());
assertTrue(functionStats.getLastInvocation() > 0);

assertEquals(functionStats.instances.size(), 1);
assertEquals(functionStats.instances.get(0).getInstanceId(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().getReceivedTotal(), totalMsgs);
assertEquals(functionStats.instances.get(0).getMetrics().getProcessedSuccessfullyTotal(), totalMsgs);
assertEquals(functionStats.instances.get(0).getMetrics().getSystemExceptionsTotal(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().getUserExceptionsTotal(), 0);
assertTrue(functionStats.instances.get(0).getMetrics().avgProcessLatency > 0);
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getReceivedTotal(), totalMsgs);
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getProcessedSuccessfullyTotal(), totalMsgs);
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getSystemExceptionsTotal(), 0);
assertEquals(functionStats.instances.get(0).getMetrics().oneMin.getUserExceptionsTotal(), 0);
assertTrue(functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency() > 0);

assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(), functionStats.instances.get(0).getMetrics().oneMin.getAvgProcessLatency());
assertEquals(functionStats.instances.get(0).getMetrics().getAvgProcessLatency(), functionStats.getAvgProcessLatency());
}

@Test(timeOut = 20000)
public void testPulsarFunctionStatus() throws Exception {

final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopic = "persistent://" + replNamespace + "/my-topic1";
final String sinkTopic = "persistent://" + replNamespace + "/output";
final String propertyKey = "key";
final String propertyValue = "value";
final String functionName = "PulsarSink-test";
final String subscriptionName = "test-sub";
admin.namespaces().createNamespace(replNamespace);
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);

// create a producer that creates a topic at broker
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create();

String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-functions-api-examples.jar").getFile();
FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
"my.*", sinkTopic, subscriptionName);
admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);

// try to update function to test: update-function functionality
admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);

retryStrategically((test) -> {
try {
return admin.topics().getStats(sourceTopic).subscriptions.size() == 1;
} catch (PulsarAdminException e) {
return false;
}
}, 5, 150);
// validate pulsar sink consumer has started on the topic
assertEquals(admin.topics().getStats(sourceTopic).subscriptions.size(), 1);

int totalMsgs = 10;
for (int i = 0; i < totalMsgs; i++) {
String data = "my-message-" + i;
producer.newMessage().property(propertyKey, propertyValue).value(data).send();
}
retryStrategically((test) -> {
try {
SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
return subStats.unackedMessages == 0 && subStats.msgThroughputOut == totalMsgs;
} catch (PulsarAdminException e) {
return false;
}
}, 5, 200);

FunctionRuntimeManager functionRuntimeManager = functionsWorkerService.getFunctionRuntimeManager();
FunctionStatusList functionStatus = functionRuntimeManager.getAllFunctionStatus(tenant, namespacePortion,
functionName, null);

int numInstances = functionStats.getFunctionStatusListCount();
int numInstances = functionStatus.getFunctionStatusListCount();
assertEquals(numInstances, 1);

FunctionStatus stats = functionStats.getFunctionStatusListList().get(0);
FunctionStatus stats = functionStatus.getFunctionStatusListList().get(0);

double count = stats.getNumProcessed();
double success = stats.getNumSuccessfullyProcessed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,11 +659,10 @@ class GetFunctionStats extends FunctionCommand {
@Override
void runCmd() throws Exception {

Gson gson = new GsonBuilder().setPrettyPrinting().create();
if (isBlank(instanceId)) {
System.out.println(gson.toJson(admin.functions().getFunctionStats(tenant, namespace, functionName)));
print(admin.functions().getFunctionStats(tenant, namespace, functionName));
} else {
System.out.println(gson.toJson(admin.functions().getFunctionStats(tenant, namespace, functionName, Integer.parseInt(instanceId))));
print(admin.functions().getFunctionStats(tenant, namespace, functionName, Integer.parseInt(instanceId)));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.common.policies.data;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import lombok.Data;

import java.util.HashMap;
Expand All @@ -27,6 +29,7 @@
import java.util.function.Consumer;

@Data
@JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", "systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency", "1min", "lastInvocation", "instances" })
public class FunctionStats {

/**
Expand Down Expand Up @@ -54,20 +57,24 @@ public class FunctionStats {
**/
public double avgProcessLatency;

@JsonProperty("1min")
public FunctionInstanceStats.FunctionInstanceStatsDataBase oneMin = new FunctionInstanceStats.FunctionInstanceStatsDataBase();

/**
* Timestamp of when the function was last invoked by any instance
**/
public long lastInvocation;

@Data
@JsonPropertyOrder({ "instanceId", "metrics" })
public static class FunctionInstanceStats {

/** Instance Id of function instance **/
public int instanceId;

@Data
public static class FunctionInstanceStatsData {

@JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", "systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency" })
public static class FunctionInstanceStatsDataBase {
/**
* Total number of records function received from source for instance
**/
Expand All @@ -92,6 +99,14 @@ public static class FunctionInstanceStatsData {
* Average process latency for function for instance
**/
public double avgProcessLatency;
}

@Data
@JsonPropertyOrder({ "receivedTotal", "processedSuccessfullyTotal", "systemExceptionsTotal", "userExceptionsTotal", "avgProcessLatency", "1min", "lastInvocation", "userMetrics" })
public static class FunctionInstanceStatsData extends FunctionInstanceStatsDataBase {

@JsonProperty("1min")
public FunctionInstanceStatsDataBase oneMin = new FunctionInstanceStatsDataBase();

/**
* Timestamp of when the function was last invoked for instance
Expand Down Expand Up @@ -125,6 +140,13 @@ public void accept(FunctionInstanceStats functionInstanceStats) {
systemExceptionsTotal += functionInstanceStatsData.systemExceptionsTotal;
userExceptionsTotal += functionInstanceStatsData.userExceptionsTotal;
avgProcessLatency += functionInstanceStatsData.avgProcessLatency;

oneMin.receivedTotal += functionInstanceStatsData.oneMin.receivedTotal;
oneMin.processedSuccessfullyTotal += functionInstanceStatsData.oneMin.processedSuccessfullyTotal;
oneMin.systemExceptionsTotal += functionInstanceStatsData.oneMin.systemExceptionsTotal;
oneMin.userExceptionsTotal += functionInstanceStatsData.oneMin.userExceptionsTotal;
oneMin.avgProcessLatency += functionInstanceStatsData.oneMin.avgProcessLatency;

if (functionInstanceStatsData.lastInvocation > lastInvocation) {
lastInvocation = functionInstanceStatsData.lastInvocation;
}
Expand All @@ -133,6 +155,10 @@ public void accept(FunctionInstanceStats functionInstanceStats) {
});
// calculate average from sum
avgProcessLatency = avgProcessLatency / instances.size();

// calculate 1min average from sum
oneMin.avgProcessLatency = oneMin.avgProcessLatency / instances.size();

return this;
}
}
Loading

0 comments on commit 859c914

Please sign in to comment.