Skip to content

Commit

Permalink
Make sure to properly count number of processed messages in python (a…
Browse files Browse the repository at this point in the history
…pache#3060)

* Make sure to properly count number of processed messages in python

* Removed total processed

* Fixed build

* Fixed buil

* Address feedback

* Fixed unittest

* Removed unused value

* Added licence headers

* Removed unnecessary changes

* Fix integration tests

* Added numReceived as part of function status

* Unnecessary change revert

* Enhance test
  • Loading branch information
srkukarni authored Nov 27, 2018
1 parent 08ef448 commit 7719b8e
Show file tree
Hide file tree
Showing 11 changed files with 54 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -525,10 +525,10 @@ public void testPulsarFunctionStatus() throws Exception {

FunctionStatus stats = functionStatus.getFunctionStatusListList().get(0);

double count = stats.getNumProcessed();
double count = stats.getNumReceived();
double success = stats.getNumSuccessfullyProcessed();
String ownerWorkerId = stats.getWorkerId();
assertEquals((int) count, totalMsgs);
assertEquals((int)count, totalMsgs);
assertEquals((int) success, totalMsgs);
assertEquals(ownerWorkerId, workerId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,13 @@ public class FunctionStatsManager implements AutoCloseable {
public final static String USER_METRIC_PREFIX = "user_metric_";

/** Declare metric names **/
public static final String PROCESSED_TOTAL = "processed_total";
public static final String PROCESSED_SUCCESSFULLY_TOTAL = "processed_successfully_total";
public static final String SYSTEM_EXCEPTIONS_TOTAL = "system_exceptions_total";
public static final String USER_EXCEPTIONS_TOTAL = "user_exceptions_total";
public static final String PROCESS_LATENCY_MS = "process_latency_ms";
public static final String LAST_INVOCATION = "last_invocation";
public static final String RECEIVED_TOTAL = "received_total";

public static final String PROCESSED_TOTAL_1min = "processed_total_1min";
public static final String PROCESSED_SUCCESSFULLY_TOTAL_1min = "processed_successfully_total_1min";
public static final String SYSTEM_EXCEPTIONS_TOTAL_1min = "system_exceptions_total_1min";
public static final String USER_EXCEPTIONS_TOTAL_1min = "user_exceptions_total_1min";
Expand All @@ -63,8 +61,6 @@ public class FunctionStatsManager implements AutoCloseable {

/** Declare Prometheus stats **/

final Counter statTotalProcessed;

final Counter statTotalProcessedSuccessfully;

final Counter statTotalSysExceptions;
Expand All @@ -79,8 +75,6 @@ public class FunctionStatsManager implements AutoCloseable {

// windowed metrics

final Counter statTotalProcessed1min;

final Counter statTotalProcessedSuccessfully1min;

final Counter statTotalSysExceptions1min;
Expand All @@ -104,12 +98,6 @@ public FunctionStatsManager(CollectorRegistry collectorRegistry, String[] metric

this.metricsLabels = metricsLabels;

statTotalProcessed = Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_TOTAL)
.help("Total number of messages processed.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);

statTotalProcessedSuccessfully = Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL)
.help("Total number of messages processed successfully.")
Expand Down Expand Up @@ -150,12 +138,6 @@ public FunctionStatsManager(CollectorRegistry collectorRegistry, String[] metric
.labelNames(metricsLabelNames)
.register(collectorRegistry);

statTotalProcessed1min = Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_TOTAL_1min)
.help("Total number of messages processed in the last 1 minute.")
.labelNames(metricsLabelNames)
.register(collectorRegistry);

statTotalProcessedSuccessfully1min = Counter.build()
.name(PULSAR_FUNCTION_METRICS_PREFIX + PROCESSED_SUCCESSFULLY_TOTAL_1min)
.help("Total number of messages processed successfully in the last 1 minute.")
Expand Down Expand Up @@ -222,11 +204,6 @@ public void incrTotalReceived() {
statTotalRecordsRecieved1min.labels(metricsLabels).inc();
}

public void incrTotalProcessed() {
statTotalProcessed.labels(metricsLabels).inc();
statTotalProcessed1min.labels(metricsLabels).inc();
}

public void incrTotalProcessedSuccessfully() {
statTotalProcessedSuccessfully.labels(metricsLabels).inc();
statTotalProcessedSuccessfully1min.labels(metricsLabels).inc();
Expand Down Expand Up @@ -261,10 +238,6 @@ public void processTimeEnd() {
}
}

public double getTotalProcessed() {
return statTotalProcessed.labels(metricsLabels).get();
}

public double getTotalProcessedSuccessfully() {
return statTotalProcessedSuccessfully.labels(metricsLabels).get();
}
Expand Down Expand Up @@ -306,10 +279,6 @@ public double getProcessLatency99_9P() {
return statProcessLatency.labels(metricsLabels).get().quantiles.get(0.999);
}

public double getTotalProcessed1min() {
return statTotalProcessed1min.labels(metricsLabels).get();
}

public double getTotalProcessedSuccessfully1min() {
return statTotalProcessedSuccessfully1min.labels(metricsLabels).get();
}
Expand Down Expand Up @@ -348,7 +317,6 @@ public double getProcessLatency99_9P1min() {
}

public void reset() {
statTotalProcessed1min.clear();
statTotalProcessedSuccessfully1min.clear();
statTotalSysExceptions1min.clear();
statTotalUserExceptions1min.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,6 @@ public void run() {

// register end time
stats.processTimeEnd();
// increment total processed
stats.incrTotalProcessed();

removeLogTopicHandler();

Expand Down Expand Up @@ -520,15 +518,13 @@ public void resetMetrics() {
private Builder createMetricsDataBuilder() {
InstanceCommunication.MetricsData.Builder bldr = InstanceCommunication.MetricsData.newBuilder();

bldr.setProcessedTotal((long) stats.getTotalProcessed());
bldr.setProcessedSuccessfullyTotal((long) stats.getTotalProcessedSuccessfully());
bldr.setSystemExceptionsTotal((long) stats.getTotalSysExceptions());
bldr.setUserExceptionsTotal((long) stats.getTotalUserExceptions());
bldr.setReceivedTotal((long) stats.getTotalRecordsReceived());
bldr.setAvgProcessLatency(stats.getAvgProcessLatency());
bldr.setLastInvocation((long) stats.getLastInvocation());

bldr.setProcessedTotal1Min((long) stats.getTotalProcessed1min());
bldr.setProcessedSuccessfullyTotal1Min((long) stats.getTotalProcessedSuccessfully1min());
bldr.setSystemExceptionsTotal1Min((long) stats.getTotalSysExceptions1min());
bldr.setUserExceptionsTotal1Min((long) stats.getTotalUserExceptions1min());
Expand All @@ -540,7 +536,7 @@ private Builder createMetricsDataBuilder() {

public InstanceCommunication.FunctionStatus.Builder getFunctionStatus() {
InstanceCommunication.FunctionStatus.Builder functionStatusBuilder = InstanceCommunication.FunctionStatus.newBuilder();
functionStatusBuilder.setNumProcessed((long) stats.getTotalProcessed());
functionStatusBuilder.setNumReceived((long)stats.getTotalRecordsReceived());
functionStatusBuilder.setNumSuccessfullyProcessed((long) stats.getTotalProcessedSuccessfully());
functionStatusBuilder.setNumUserExceptions((long) stats.getTotalUserExceptions());
stats.getLatestUserExceptions().forEach(ex -> {
Expand Down
1 change: 0 additions & 1 deletion pulsar-functions/instance/src/main/python/Function_pb2.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
# under the License.
#

# Generated by the protocol buffer compiler. DO NOT EDIT!
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: Function.proto

Expand Down
Loading

0 comments on commit 7719b8e

Please sign in to comment.