diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java index fa1177d408f53..94e7bfabbe193 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java @@ -392,63 +392,63 @@ public void testPulsarSinkStats() throws Exception { Metric m = metrics.get("pulsar_sink_received_total"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_sink_received_total_1min"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_sink_written_total"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_sink_written_total_1min"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_sink_sink_exceptions_total"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_sink_sink_exceptions_total_1min"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_sink_system_exceptions_total"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_sink_system_exceptions_total_1min"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_sink_last_invocation"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); @@ -475,63 +475,63 @@ public void testPulsarSinkStats() throws Exception { m = metrics.get("pulsar_sink_received_total"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, (double) totalMsgs); m = metrics.get("pulsar_sink_received_total_1min"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, (double) totalMsgs); m = metrics.get("pulsar_sink_written_total"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, (double) totalMsgs); m = metrics.get("pulsar_sink_written_total_1min"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, (double) totalMsgs); m = metrics.get("pulsar_sink_sink_exceptions_total"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_sink_sink_exceptions_total_1min"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_sink_system_exceptions_total"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_sink_system_exceptions_total_1min"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_sink_last_invocation"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertTrue(m.value > 0.0); @@ -569,63 +569,63 @@ public void testPulsarSourceStats() throws Exception { Metric m = metrics.get("pulsar_source_received_total"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertTrue(m.value > 0.0); m = metrics.get("pulsar_source_received_total_1min"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertTrue(m.value > 0.0); m = metrics.get("pulsar_source_written_total"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertTrue(m.value > 0.0); m = metrics.get("pulsar_source_written_total_1min"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertTrue(m.value > 0.0); m = metrics.get("pulsar_source_source_exceptions_total"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_source_source_exceptions_total_1min"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_source_system_exceptions_total"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_source_system_exceptions_total_1min"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_source_last_invocation"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertTrue(m.value > 0.0); @@ -713,77 +713,77 @@ public void testPulsarFunctionStats() throws Exception { Metric m = metrics.get("pulsar_function_received_total"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_function_received_total_1min"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_function_user_exceptions_total"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_function_user_exceptions_total_1min"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_function_process_latency_ms"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, Double.NaN); m = metrics.get("pulsar_function_process_latency_ms_1min"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, Double.NaN); m = metrics.get("pulsar_function_system_exceptions_total"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_function_system_exceptions_total_1min"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_function_last_invocation"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_function_processed_successfully_total"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_function_processed_successfully_total_1min"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); @@ -871,77 +871,77 @@ public void testPulsarFunctionStats() throws Exception { m = metrics.get("pulsar_function_received_total"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, (double) totalMsgs); m = metrics.get("pulsar_function_received_total_1min"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, (double) totalMsgs); m = metrics.get("pulsar_function_user_exceptions_total"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_function_user_exceptions_total_1min"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_function_process_latency_ms"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertTrue(m.value > 0.0); m = metrics.get("pulsar_function_process_latency_ms_1min"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertTrue(m.value > 0.0); m = metrics.get("pulsar_function_system_exceptions_total"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_function_system_exceptions_total_1min"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, 0.0); m = metrics.get("pulsar_function_last_invocation"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertTrue(m.value > 0.0); m = metrics.get("pulsar_function_processed_successfully_total"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, (double) totalMsgs); m = metrics.get("pulsar_function_processed_successfully_total_1min"); assertEquals(m.tags.get("cluster"), config.getClusterName()); assertEquals(m.tags.get("instance_id"), "0"); - assertEquals(m.tags.get("function"), functionName); + assertEquals(m.tags.get("name"), functionName); assertEquals(m.tags.get("namespace"), String.format("%s/%s", tenant, namespacePortion)); assertEquals(m.tags.get("fqfn"), FunctionDetailsUtils.getFullyQualifiedName(tenant, namespacePortion, functionName)); assertEquals(m.value, (double) totalMsgs); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java index 3cb654b4a0dfe..7c81018767fbf 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/stats/ComponentStatsManager.java @@ -46,7 +46,7 @@ public abstract class ComponentStatsManager implements AutoCloseable { public final static String USER_METRIC_PREFIX = "user_metric_"; - public static final String[] metricsLabelNames = {"tenant", "namespace", "function", "instance_id", "cluster", "fqfn"}; + public static final String[] metricsLabelNames = {"tenant", "namespace", "name", "instance_id", "cluster", "fqfn"}; protected static final String[] exceptionMetricsLabelNames; diff --git a/pulsar-functions/instance/src/main/python/function_stats.py b/pulsar-functions/instance/src/main/python/function_stats.py index b9a09c4f936df..d3dc32294fed0 100644 --- a/pulsar-functions/instance/src/main/python/function_stats.py +++ b/pulsar-functions/instance/src/main/python/function_stats.py @@ -27,7 +27,7 @@ # We keep track of the following metrics class Stats(object): - metrics_label_names = ['tenant', 'namespace', 'function', 'instance_id', 'cluster', 'fqfn'] + metrics_label_names = ['tenant', 'namespace', 'name', 'instance_id', 'cluster', 'fqfn'] exception_metrics_label_names = metrics_label_names + ['error', 'ts'] diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java index 9ae3bfc047947..a3a006a427ab8 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java @@ -526,7 +526,7 @@ private Map getLabels(Function.FunctionDetails functionDetails) final Map labels = new HashMap<>(); labels.put("namespace", functionDetails.getNamespace()); labels.put("tenant", functionDetails.getTenant()); - labels.put("function", functionDetails.getName()); + labels.put("name", functionDetails.getName()); if (customLabels != null && !customLabels.isEmpty()) { labels.putAll(customLabels); }