Skip to content

Commit

Permalink
[FLINK-29993][conf] Add MetricOptions#forReporter
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Nov 21, 2022
1 parent 4ad7a95 commit c814348
Show file tree
Hide file tree
Showing 9 changed files with 124 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.MetricOptions;
Expand Down Expand Up @@ -131,11 +130,8 @@ public static void shutDownServices() throws Exception {
public static Configuration getFlinkConfiguration() {
Configuration flinkConfig = new Configuration();
flinkConfig.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("16m"));
flinkConfig.setString(
ConfigConstants.METRICS_REPORTER_PREFIX
+ "my_reporter."
+ MetricOptions.REPORTER_FACTORY_CLASS.key(),
JMXReporterFactory.class.getName());
MetricOptions.forReporter(flinkConfig, "my_reporter")
.set(MetricOptions.REPORTER_FACTORY_CLASS, JMXReporterFactory.class.getName());
return flinkConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.configuration;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.docs.Documentation;
import org.apache.flink.configuration.description.Description;
Expand Down Expand Up @@ -67,6 +68,27 @@ public class MetricOptions {
+ " any of the names in the list will be started. Otherwise, all reporters that could be found in"
+ " the configuration will be started.");

/**
* Returns a view over the given configuration via which options can be set/retrieved for the
* given reporter.
*
* <pre>
* Configuration config = ...
* MetricOptions.forReporter(config, "my_reporter")
* .set(MetricOptions.REPORTER_INTERVAL, Duration.ofSeconds(10))
* ...
* </pre>
*
* @param configuration backing configuration
* @param reporterName reporter name
* @return view over configuration
*/
@Experimental
public static Configuration forReporter(Configuration configuration, String reporterName) {
return new DelegatingConfiguration(
configuration, ConfigConstants.METRICS_REPORTER_PREFIX + reporterName + ".");
}

/** @deprecated use {@link MetricOptions#REPORTER_FACTORY_CLASS} instead. */
@Deprecated
public static final ConfigOption<String> REPORTER_CLASS =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.configuration;

import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;

class MetricOptionsTest {
private static final ConfigOption<String> SUB_OPTION =
ConfigOptions.key("option").stringType().noDefaultValue();
private static final ConfigOption<String> FULL_OPTION =
ConfigOptions.key(
ConfigConstants.METRICS_REPORTER_PREFIX
+ "my_reporter."
+ SUB_OPTION.key())
.stringType()
.noDefaultValue();

@Test
void testForReporterWrite() {
Configuration configuration = new Configuration();

MetricOptions.forReporter(configuration, "my_reporter").set(SUB_OPTION, "value");

assertThat(configuration.get(FULL_OPTION)).isEqualTo("value");
}

@Test
void testForReporterRead() {
Configuration configuration = new Configuration();
configuration.set(FULL_OPTION, "value");

assertThat(MetricOptions.forReporter(configuration, "my_reporter").get(SUB_OPTION))
.isEqualTo("value");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.metrics.prometheus.tests;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.prometheus.PrometheusReporterFactory;
Expand Down Expand Up @@ -154,13 +153,12 @@ public PrometheusReporterEndToEndITCase(TestParams params) {
private static Configuration getFlinkConfig() {
final Configuration config = new Configuration();

config.setString(
ConfigConstants.METRICS_REPORTER_PREFIX
+ "prom."
+ MetricOptions.REPORTER_FACTORY_CLASS.key(),
PrometheusReporterFactory.class.getName());
MetricOptions.forReporter(config, "prom")
.set(
MetricOptions.REPORTER_FACTORY_CLASS,
PrometheusReporterFactory.class.getName())
.setString("port", "9000-9100");

config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "prom.port", "9000-9100");
return config;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.core.testutils.OneShotLatch;
Expand Down Expand Up @@ -74,11 +73,8 @@ class JMXJobManagerMetricTest {
private static Configuration getConfiguration() {
Configuration flinkConfiguration = new Configuration();

flinkConfiguration.setString(
ConfigConstants.METRICS_REPORTER_PREFIX
+ "test."
+ MetricOptions.REPORTER_FACTORY_CLASS.key(),
JMXReporterFactory.class.getName());
MetricOptions.forReporter(flinkConfiguration, "test")
.set(MetricOptions.REPORTER_FACTORY_CLASS, JMXReporterFactory.class.getName());
flinkConfiguration.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "jobmanager.<job_name>");

return flinkConfiguration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,8 @@ void testActivateOneReporterAmongTwoDeclared() {
void testReporterSetupSupplier() throws Exception {
final Configuration config = new Configuration();

config.setString(
ConfigConstants.METRICS_REPORTER_PREFIX
+ "reporter1."
+ MetricOptions.REPORTER_FACTORY_CLASS.key(),
TestReporter1.class.getName());
MetricOptions.forReporter(config, "reporter1")
.set(MetricOptions.REPORTER_FACTORY_CLASS, TestReporter1.class.getName());

final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, null);

Expand All @@ -156,21 +153,12 @@ void testReporterSetupSupplier() throws Exception {
void testMultipleReporterInstantiation() throws Exception {
Configuration config = new Configuration();

config.setString(
ConfigConstants.METRICS_REPORTER_PREFIX
+ "test1."
+ MetricOptions.REPORTER_FACTORY_CLASS.key(),
TestReporter11.class.getName());
config.setString(
ConfigConstants.METRICS_REPORTER_PREFIX
+ "test2."
+ MetricOptions.REPORTER_FACTORY_CLASS.key(),
TestReporter12.class.getName());
config.setString(
ConfigConstants.METRICS_REPORTER_PREFIX
+ "test3."
+ MetricOptions.REPORTER_FACTORY_CLASS.key(),
TestReporter13.class.getName());
MetricOptions.forReporter(config, "test1")
.set(MetricOptions.REPORTER_FACTORY_CLASS, TestReporter11.class.getName());
MetricOptions.forReporter(config, "test2")
.set(MetricOptions.REPORTER_FACTORY_CLASS, TestReporter12.class.getName());
MetricOptions.forReporter(config, "test3")
.set(MetricOptions.REPORTER_FACTORY_CLASS, TestReporter13.class.getName());

List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, null);

Expand Down Expand Up @@ -212,13 +200,11 @@ public void open(MetricConfig config) {
}

private static void configureReporter1(Configuration config) {
config.setString(
ConfigConstants.METRICS_REPORTER_PREFIX
+ "reporter1."
+ MetricOptions.REPORTER_FACTORY_CLASS.key(),
TestReporter1.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "reporter1.arg1", "value1");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "reporter1.arg2", "value2");
Configuration reporterConfig =
MetricOptions.forReporter(config, "reporter1")
.set(MetricOptions.REPORTER_FACTORY_CLASS, TestReporter1.class.getName());
reporterConfig.setString("arg1", "value1");
reporterConfig.setString("arg2", "value2");
}

private static void assertReporter1Configured(ReporterSetup setup) {
Expand All @@ -232,13 +218,11 @@ private static void assertReporter1Configured(ReporterSetup setup) {
}

private static void configureReporter2(Configuration config) {
config.setString(
ConfigConstants.METRICS_REPORTER_PREFIX
+ "reporter2."
+ MetricOptions.REPORTER_FACTORY_CLASS.key(),
TestReporter2.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "reporter2.arg1", "value1");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "reporter2.arg3", "value3");
Configuration reporterConfig =
MetricOptions.forReporter(config, "reporter2")
.set(MetricOptions.REPORTER_FACTORY_CLASS, TestReporter2.class.getName());
reporterConfig.setString("arg1", "value1");
reporterConfig.setString("arg3", "value3");
}

private static void assertReporter2Configured(ReporterSetup setup) {
Expand All @@ -256,16 +240,11 @@ void testVariableExclusionParsing() throws Exception {
final String excludedVariable1 = "foo";
final String excludedVariable2 = "foo";
final Configuration config = new Configuration();
config.setString(
ConfigConstants.METRICS_REPORTER_PREFIX
+ "test."
+ MetricOptions.REPORTER_FACTORY_CLASS.key(),
TestReporterFactory.class.getName());
config.setString(
ConfigConstants.METRICS_REPORTER_PREFIX
+ "test."
+ MetricOptions.REPORTER_EXCLUDED_VARIABLES.key(),
excludedVariable1 + ";" + excludedVariable2);
MetricOptions.forReporter(config, "test")
.set(MetricOptions.REPORTER_FACTORY_CLASS, TestReporterFactory.class.getName())
.set(
MetricOptions.REPORTER_EXCLUDED_VARIABLES,
excludedVariable1 + ";" + excludedVariable2);

final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, null);

Expand All @@ -284,11 +263,8 @@ void testVariableExclusionParsing() throws Exception {
@Test
void testFactoryParsing() throws Exception {
final Configuration config = new Configuration();
config.setString(
ConfigConstants.METRICS_REPORTER_PREFIX
+ "test."
+ MetricOptions.REPORTER_FACTORY_CLASS.key(),
TestReporterFactory.class.getName());
MetricOptions.forReporter(config, "test")
.set(MetricOptions.REPORTER_FACTORY_CLASS, TestReporterFactory.class.getName());

final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, null);

Expand Down Expand Up @@ -342,16 +318,12 @@ void testAdditionalVariablesParsing() {
final String tag2 = "fizz";
final String tagValue2 = "buzz";
final Configuration config = new Configuration();
config.setString(
ConfigConstants.METRICS_REPORTER_PREFIX
+ "test."
+ MetricOptions.REPORTER_FACTORY_CLASS.key(),
TestReporterFactory.class.getName());
config.setString(
ConfigConstants.METRICS_REPORTER_PREFIX
+ "test."
+ MetricOptions.REPORTER_ADDITIONAL_VARIABLES.key(),
String.join(",", tag1 + ":" + tagValue1, tag2 + ":" + tagValue2));

MetricOptions.forReporter(config, "test")
.set(MetricOptions.REPORTER_FACTORY_CLASS, TestReporterFactory.class.getName())
.setString(
MetricOptions.REPORTER_ADDITIONAL_VARIABLES.key(),
String.join(",", tag1 + ":" + tagValue1, tag2 + ":" + tagValue2));

final List<ReporterSetup> reporterSetups = ReporterSetup.fromConfiguration(config, null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.LogicalScopeProvider;
Expand Down Expand Up @@ -243,14 +242,10 @@ private MetricGroup unwrap(MetricGroup group) {
}

public Configuration addToConfiguration(Configuration configuration) {
configuration.setString(
ConfigConstants.METRICS_REPORTER_PREFIX
+ "mini_cluster_resource_reporter."
+ MetricOptions.REPORTER_FACTORY_CLASS.key(),
InMemoryReporter.Factory.class.getName());
configuration.setString(
ConfigConstants.METRICS_REPORTER_PREFIX + "mini_cluster_resource_reporter." + ID,
id.toString());
MetricOptions.forReporter(configuration, "mini_cluster_resource_reporter")
.set(MetricOptions.REPORTER_FACTORY_CLASS, InMemoryReporter.Factory.class.getName())
.setString("ID", id.toString());

return configuration;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.runtime.metrics;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.core.testutils.BlockerSync;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.metrics.Gauge;
Expand Down Expand Up @@ -149,8 +150,8 @@ void testJobManagerMetrics() throws Exception {

private static Configuration getConfiguration() {
Configuration configuration = new Configuration();
configuration.setString(
"metrics.reporter.test_reporter.factory.class", TestReporter.class.getName());
MetricOptions.forReporter(configuration, "test_reporter")
.set(MetricOptions.REPORTER_FACTORY_CLASS, TestReporter.class.getName());
return configuration;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ private static Configuration getConfiguration() {
configuration.setString(REPORTERS_LIST, "test_reporter");
configuration.setString(MetricOptions.SCOPE_NAMING_JM, "jobmanager");
configuration.setString(MetricOptions.SCOPE_NAMING_TM, "taskmanager");
configuration.setString(
"metrics.reporter.test_reporter.factory.class", TestReporter.class.getName());
MetricOptions.forReporter(configuration, "test_reporter")
.set(MetricOptions.REPORTER_FACTORY_CLASS, TestReporter.class.getName());
return configuration;
}

Expand Down

0 comments on commit c814348

Please sign in to comment.