diff --git a/.github/workflows/ci-go-functions-style.yaml b/.github/workflows/ci-go-functions-style.yaml index b3dae4d01a9f1..55c04822dd24b 100644 --- a/.github/workflows/ci-go-functions-style.yaml +++ b/.github/workflows/ci-go-functions-style.yaml @@ -33,12 +33,14 @@ on: - 'pulsar-function-go/**' jobs: - build: - name: Build + check-style: + + name: Go ${{ matrix.go-version }} Functions style check runs-on: ubuntu-latest strategy: matrix: go-version: [1.11, 1.12, 1.13, 1.14] + steps: - name: Check out code into the Go module directory uses: actions/checkout@v2 @@ -53,7 +55,7 @@ jobs: args: site2 deployment .asf.yaml .ci ct.yaml - name: Set up Go - uses: actions/setup-go@v1 + uses: actions/setup-go@v2 if: steps.docs.outputs.changed_only == 'no' with: go-version: ${{ matrix.go-version }} diff --git a/.github/workflows/ci-go-functions-test.yaml b/.github/workflows/ci-go-functions-test.yaml index 076bca57efaff..5cd478542bbbe 100644 --- a/.github/workflows/ci-go-functions-test.yaml +++ b/.github/workflows/ci-go-functions-test.yaml @@ -44,7 +44,7 @@ jobs: timeout-minutes: 120 steps: - - name: checkout + - name: Check out code into the Go module directory uses: actions/checkout@v2 with: fetch-depth: 0 @@ -57,13 +57,13 @@ jobs: args: site2 deployment .asf.yaml .ci ct.yaml - name: Set up Go - uses: actions/setup-go@v1 + uses: actions/setup-go@v2 if: steps.docs.outputs.changed_only == 'no' with: go-version: ${{ matrix.go-version }} id: go - - name: run tests + - name: Run tests if: steps.docs.outputs.changed_only == 'no' run: | cd pulsar-function-go diff --git a/pulsar-function-go/pf/stats.go b/pulsar-function-go/pf/stats.go index 2d8f15baa85f5..f7952fbd7ab5b 100644 --- a/pulsar-function-go/pf/stats.go +++ b/pulsar-function-go/pf/stats.go @@ -20,7 +20,6 @@ package pf import ( - "strconv" "time" "github.com/prometheus/client_golang/prometheus" @@ -30,7 +29,7 @@ import ( var ( metricsLabelNames = []string{"tenant", "namespace", "name", "instance_id", "cluster", "fqfn"} - exceptionLabelNames = []string{"error", "ts"} + exceptionLabelNames = []string{"error"} exceptionMetricsLabelNames = append(metricsLabelNames, exceptionLabelNames...) ) @@ -254,12 +253,12 @@ func (stat *StatWithLabelValues) addUserException(err error) { stat.latestUserException = stat.latestUserException[1:] } // report exception via prometheus - stat.reportUserExceptionPrometheus(err, ts) + stat.reportUserExceptionPrometheus(err) } //@limits(calls=5, period=60) -func (stat *StatWithLabelValues) reportUserExceptionPrometheus(exception error, ts int64) { - errorTs := []string{exception.Error(), strconv.FormatInt(ts, 10)} +func (stat *StatWithLabelValues) reportUserExceptionPrometheus(exception error) { + errorTs := []string{exception.Error()} exceptionMetricLabels := append(stat.metricsLabels, errorTs...) userExceptions.WithLabelValues(exceptionMetricLabels...).Set(1.0) } @@ -284,12 +283,12 @@ func (stat *StatWithLabelValues) addSysException(exception error) { stat.latestSysException = stat.latestSysException[1:] } // report exception via prometheus - stat.reportSystemExceptionPrometheus(exception, ts) + stat.reportSystemExceptionPrometheus(exception) } //@limits(calls=5, period=60) -func (stat *StatWithLabelValues) reportSystemExceptionPrometheus(exception error, ts int64) { - errorTs := []string{exception.Error(), strconv.FormatInt(ts, 10)} +func (stat *StatWithLabelValues) reportSystemExceptionPrometheus(exception error) { + errorTs := []string{exception.Error()} exceptionMetricLabels := append(stat.metricsLabels, errorTs...) systemExceptions.WithLabelValues(exceptionMetricLabels...).Set(1.0) } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java index daa51b7f1ba5b..cbdcc0fc27bd6 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java @@ -49,9 +49,8 @@ public abstract class ComponentStatsManager implements AutoCloseable { protected static final String[] exceptionMetricsLabelNames; static { - exceptionMetricsLabelNames = Arrays.copyOf(metricsLabelNames, metricsLabelNames.length + 2); + exceptionMetricsLabelNames = Arrays.copyOf(metricsLabelNames, metricsLabelNames.length + 1); exceptionMetricsLabelNames[metricsLabelNames.length] = "error"; - exceptionMetricsLabelNames[metricsLabelNames.length + 1] = "ts"; } public static ComponentStatsManager getStatsManager(CollectorRegistry collectorRegistry, diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java index fdedb7452d12c..f02b8505ebba4 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/FunctionStatsManager.java @@ -243,7 +243,7 @@ public void addUserException(Throwable ex) { // report exception throw prometheus if (userExceptionRateLimiter.tryAcquire()) { - String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts); + String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex); userExceptions.labels(exceptionMetricsLabels).set(1.0); } } @@ -255,15 +255,14 @@ public void addSystemException(Throwable ex) { // report exception throw prometheus if (sysExceptionRateLimiter.tryAcquire()) { - String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts); + String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex); sysExceptions.labels(exceptionMetricsLabels).set(1.0); } } - private String[] getExceptionMetricsLabels(Throwable ex, long ts) { - String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2); - exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : ""; - exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts); + private String[] getExceptionMetricsLabels(Throwable ex) { + String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 1); + exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = ex.getMessage() != null ? ex.getMessage() : ""; return exceptionMetricsLabels; } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java index c913225c09677..401aa34be697c 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SinkStatsManager.java @@ -213,7 +213,7 @@ public void incrSysExceptions(Throwable ex) { // report exception throw prometheus if (sysExceptionRateLimiter.tryAcquire()) { - String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts); + String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex); sysExceptions.labels(exceptionMetricsLabels).set(1.0); } } @@ -236,15 +236,14 @@ public void incrSinkExceptions(Throwable ex) { // report exception throw prometheus if (sinkExceptionRateLimiter.tryAcquire()) { - String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts); + String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex); sinkExceptions.labels(exceptionMetricsLabels).set(1.0); } } - private String[] getExceptionMetricsLabels(Throwable ex, long ts) { - String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2); - exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : ""; - exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts); + private String[] getExceptionMetricsLabels(Throwable ex) { + String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 1); + exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = ex.getMessage() != null ? ex.getMessage() : ""; return exceptionMetricsLabels; } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java index 0ec73523be67d..287240c04e70c 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/SourceStatsManager.java @@ -212,7 +212,7 @@ public void incrSysExceptions(Throwable ex) { // report exception throw prometheus if (sysExceptionRateLimiter.tryAcquire()) { - String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts); + String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex); sysExceptions.labels(exceptionMetricsLabels).set(1.0); } } @@ -230,15 +230,14 @@ public void incrSourceExceptions(Throwable ex) { // report exception throw prometheus if (sourceExceptionRateLimiter.tryAcquire()) { - String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex, ts); + String[] exceptionMetricsLabels = getExceptionMetricsLabels(ex); sourceExceptions.labels(exceptionMetricsLabels).set(1.0); } } - private String[] getExceptionMetricsLabels(Throwable ex, long ts) { - String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 2); - exceptionMetricsLabels[exceptionMetricsLabels.length - 2] = ex.getMessage() != null ? ex.getMessage() : ""; - exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = String.valueOf(ts); + private String[] getExceptionMetricsLabels(Throwable ex) { + String[] exceptionMetricsLabels = Arrays.copyOf(metricsLabels, metricsLabels.length + 1); + exceptionMetricsLabels[exceptionMetricsLabels.length - 1] = ex.getMessage() != null ? ex.getMessage() : ""; return exceptionMetricsLabels; } diff --git a/pulsar-functions/instance/src/main/python/function_stats.py b/pulsar-functions/instance/src/main/python/function_stats.py index 2fd7839713eae..8b54f75044de2 100644 --- a/pulsar-functions/instance/src/main/python/function_stats.py +++ b/pulsar-functions/instance/src/main/python/function_stats.py @@ -29,7 +29,7 @@ class Stats(object): metrics_label_names = ['tenant', 'namespace', 'name', 'instance_id', 'cluster', 'fqfn'] - exception_metrics_label_names = metrics_label_names + ['error', 'ts'] + exception_metrics_label_names = metrics_label_names + ['error'] PULSAR_FUNCTION_METRICS_PREFIX = "pulsar_function_" USER_METRIC_PREFIX = "user_metric_" @@ -185,13 +185,13 @@ def add_user_exception(self, exception): # report exception via prometheus try: - self.report_user_exception_prometheus(exception, ts) + self.report_user_exception_prometheus(exception) except RateLimitException: pass @limits(calls=5, period=60) - def report_user_exception_prometheus(self, exception, ts): - exception_metric_labels = self.metrics_labels + [str(exception), str(ts)] + def report_user_exception_prometheus(self, exception): + exception_metric_labels = self.metrics_labels + [str(exception)] self.user_exceptions.labels(*exception_metric_labels).set(1.0) def add_sys_exception(self, exception): @@ -203,13 +203,13 @@ def add_sys_exception(self, exception): # report exception via prometheus try: - self.report_system_exception_prometheus(exception, ts) + self.report_system_exception_prometheus(exception) except RateLimitException: pass @limits(calls=5, period=60) - def report_system_exception_prometheus(self, exception, ts): - exception_metric_labels = self.metrics_labels + [str(exception), str(ts)] + def report_system_exception_prometheus(self, exception): + exception_metric_labels = self.metrics_labels + [str(exception)] self.system_exceptions.labels(*exception_metric_labels).set(1.0) def reset(self): @@ -218,4 +218,4 @@ def reset(self): self._stat_total_sys_exceptions_1min._value.set(0.0) self._stat_process_latency_ms_1min._sum.set(0.0) self._stat_process_latency_ms_1min._count.set(0.0) - self._stat_total_received_1min._value.set(0.0) \ No newline at end of file + self._stat_total_received_1min._value.set(0.0)