Skip to content

Commit

Permalink
[Issue 7489] Remove timestamp from metrics (apache#7539)
Browse files Browse the repository at this point in the history
* [Issue 7489] Remove timestamp from exception metrics for functions and connectors

* [Issue 7489] Remove timestamp from exception metrics for Go functions

* [Issue 7489] Remove unused import in go stats

* [Issue 7489] Remove timestamp from metrics in python stats

* Change to v2 of go github actions

* Update go github actions

* Remove the version from the go test command

* Rename github jobs for go

Co-authored-by: Matteo Merli <[email protected]>
  • Loading branch information
vzhikserg and merlimat authored Jul 25, 2020
1 parent ee39e40 commit 777ed16
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 42 deletions.
8 changes: 5 additions & 3 deletions .github/workflows/ci-go-functions-style.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ on:
- 'pulsar-function-go/**'

jobs:
build:
name: Build
check-style:

name: Go ${{ matrix.go-version }} Functions style check
runs-on: ubuntu-latest
strategy:
matrix:
go-version: [1.11, 1.12, 1.13, 1.14]

steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v2
Expand All @@ -53,7 +55,7 @@ jobs:
args: site2 deployment .asf.yaml .ci ct.yaml

- name: Set up Go
uses: actions/setup-go@v1
uses: actions/setup-go@v2
if: steps.docs.outputs.changed_only == 'no'
with:
go-version: ${{ matrix.go-version }}
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/ci-go-functions-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
timeout-minutes: 120

steps:
- name: checkout
- name: Check out code into the Go module directory
uses: actions/checkout@v2
with:
fetch-depth: 0
Expand All @@ -57,13 +57,13 @@ jobs:
args: site2 deployment .asf.yaml .ci ct.yaml

- name: Set up Go
uses: actions/setup-go@v1
uses: actions/setup-go@v2
if: steps.docs.outputs.changed_only == 'no'
with:
go-version: ${{ matrix.go-version }}
id: go

- name: run tests
- name: Run tests
if: steps.docs.outputs.changed_only == 'no'
run: |
cd pulsar-function-go
Expand Down
15 changes: 7 additions & 8 deletions pulsar-function-go/pf/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package pf

import (
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -30,7 +29,7 @@ import (

var (
metricsLabelNames = []string{"tenant", "namespace", "name", "instance_id", "cluster", "fqfn"}
exceptionLabelNames = []string{"error", "ts"}
exceptionLabelNames = []string{"error"}
exceptionMetricsLabelNames = append(metricsLabelNames, exceptionLabelNames...)
)

Expand Down Expand Up @@ -254,12 +253,12 @@ func (stat *StatWithLabelValues) addUserException(err error) {
stat.latestUserException = stat.latestUserException[1:]
}
// report exception via prometheus
stat.reportUserExceptionPrometheus(err, ts)
stat.reportUserExceptionPrometheus(err)
}

//@limits(calls=5, period=60)
func (stat *StatWithLabelValues) reportUserExceptionPrometheus(exception error, ts int64) {
errorTs := []string{exception.Error(), strconv.FormatInt(ts, 10)}
func (stat *StatWithLabelValues) reportUserExceptionPrometheus(exception error) {
errorTs := []string{exception.Error()}
exceptionMetricLabels := append(stat.metricsLabels, errorTs...)
userExceptions.WithLabelValues(exceptionMetricLabels...).Set(1.0)
}
Expand All @@ -284,12 +283,12 @@ func (stat *StatWithLabelValues) addSysException(exception error) {
stat.latestSysException = stat.latestSysException[1:]
}
// report exception via prometheus
stat.reportSystemExceptionPrometheus(exception, ts)
stat.reportSystemExceptionPrometheus(exception)
}

//@limits(calls=5, period=60)
func (stat *StatWithLabelValues) reportSystemExceptionPrometheus(exception error, ts int64) {
errorTs := []string{exception.Error(), strconv.FormatInt(ts, 10)}
func (stat *StatWithLabelValues) reportSystemExceptionPrometheus(exception error) {
errorTs := []string{exception.Error()}
exceptionMetricLabels := append(stat.metricsLabels, errorTs...)
systemExceptions.WithLabelValues(exceptionMetricLabels...).Set(1.0)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,8 @@ public abstract class ComponentStatsManager implements AutoCloseable {
protected static final String[] exceptionMetricsLabelNames;

static {
exceptionMetricsLabelNames = Arrays.copyOf(metricsLabelNames, metricsLabelNames.length + 2);
exceptionMetricsLabelNames = Arrays.copyOf(metricsLabelNames, metricsLabelNames.length + 1);
exceptionMetricsLabelNames[metricsLabelNames.length] = "error";
exceptionMetricsLabelNames[metricsLabelNames.length + 1] = "ts";
}

public static ComponentStatsManager getStatsManager(CollectorRegistry collectorRegistry,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public void addUserException(Throwable ex) {

// report exception throw prometheus
if (userExceptionRateLimiter.tryAcquire()) {
String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex);
userExceptions.labels(exceptionMetricsLabels).set(1.0);
}
}
Expand All @@ -255,15 +255,14 @@ public void addSystemException(Throwable ex) {

// report exception throw prometheus
if (sysExceptionRateLimiter.tryAcquire()) {
String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex);
sysExceptions.labels(exceptionMetricsLabels).set(1.0);
}
}

private String[] getExceptionMetricsLabels(Throwable ex, long ts) {
String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
private String[] getExceptionMetricsLabels(Throwable ex) {
String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 1);
exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = ex.getMessage() != null ? ex.getMessage() : "";
return exceptionMetricsLabels;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void incrSysExceptions(Throwable ex) {

// report exception throw prometheus
if (sysExceptionRateLimiter.tryAcquire()) {
String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex);
sysExceptions.labels(exceptionMetricsLabels).set(1.0);
}
}
Expand All @@ -236,15 +236,14 @@ public void incrSinkExceptions(Throwable ex) {

// report exception throw prometheus
if (sinkExceptionRateLimiter.tryAcquire()) {
String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex);
sinkExceptions.labels(exceptionMetricsLabels).set(1.0);
}
}

private String[] getExceptionMetricsLabels(Throwable ex, long ts) {
String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
private String[] getExceptionMetricsLabels(Throwable ex) {
String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 1);
exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = ex.getMessage() != null ? ex.getMessage() : "";
return exceptionMetricsLabels;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void incrSysExceptions(Throwable ex) {

// report exception throw prometheus
if (sysExceptionRateLimiter.tryAcquire()) {
String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex);
sysExceptions.labels(exceptionMetricsLabels).set(1.0);
}
}
Expand All @@ -230,15 +230,14 @@ public void incrSourceExceptions(Throwable ex) {

// report exception throw prometheus
if (sourceExceptionRateLimiter.tryAcquire()) {
String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts);
String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex);
sourceExceptions.labels(exceptionMetricsLabels).set(1.0);
}
}

private String[] getExceptionMetricsLabels(Throwable ex, long ts) {
String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2);
exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : "";
exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts);
private String[] getExceptionMetricsLabels(Throwable ex) {
String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 1);
exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = ex.getMessage() != null ? ex.getMessage() : "";
return exceptionMetricsLabels;
}

Expand Down
16 changes: 8 additions & 8 deletions pulsar-functions/instance/src/main/python/function_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
class Stats(object):
metrics_label_names = ['tenant', 'namespace', 'name', 'instance_id', 'cluster', 'fqfn']

exception_metrics_label_names = metrics_label_names + ['error', 'ts']
exception_metrics_label_names = metrics_label_names + ['error']

PULSAR_FUNCTION_METRICS_PREFIX = "pulsar_function_"
USER_METRIC_PREFIX = "user_metric_"
Expand Down Expand Up @@ -185,13 +185,13 @@ def add_user_exception(self, exception):

# report exception via prometheus
try:
self.report_user_exception_prometheus(exception, ts)
self.report_user_exception_prometheus(exception)
except RateLimitException:
pass

@limits(calls=5, period=60)
def report_user_exception_prometheus(self, exception, ts):
exception_metric_labels = self.metrics_labels + [str(exception), str(ts)]
def report_user_exception_prometheus(self, exception):
exception_metric_labels = self.metrics_labels + [str(exception)]
self.user_exceptions.labels(*exception_metric_labels).set(1.0)

def add_sys_exception(self, exception):
Expand All @@ -203,13 +203,13 @@ def add_sys_exception(self, exception):

# report exception via prometheus
try:
self.report_system_exception_prometheus(exception, ts)
self.report_system_exception_prometheus(exception)
except RateLimitException:
pass

@limits(calls=5, period=60)
def report_system_exception_prometheus(self, exception, ts):
exception_metric_labels = self.metrics_labels + [str(exception), str(ts)]
def report_system_exception_prometheus(self, exception):
exception_metric_labels = self.metrics_labels + [str(exception)]
self.system_exceptions.labels(*exception_metric_labels).set(1.0)

def reset(self):
Expand All @@ -218,4 +218,4 @@ def reset(self):
self._stat_total_sys_exceptions_1min._value.set(0.0)
self._stat_process_latency_ms_1min._sum.set(0.0)
self._stat_process_latency_ms_1min._count.set(0.0)
self._stat_total_received_1min._value.set(0.0)
self._stat_total_received_1min._value.set(0.0)

0 comments on commit 777ed16

Please sign in to comment.