diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InstantiateViaFactory.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InstantiateViaFactory.java index 454ec914841b8..a5c6ed1a9415a 100644 --- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InstantiateViaFactory.java +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InstantiateViaFactory.java @@ -28,6 +28,9 @@ * backwards-compatibility with existing reflection-based configurations. * *

When an annotated reporter is configured to be used via reflection the given factory will be used instead. + * + *

Attention: This annotation does not work if the reporter is loaded as a plugin. For these cases, annotate the + * factory with {@link InterceptInstantiationViaReflection} instead. */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InterceptInstantiationViaReflection.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InterceptInstantiationViaReflection.java new file mode 100644 index 0000000000000..6a6f05b6ce305 --- /dev/null +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/reporter/InterceptInstantiationViaReflection.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.metrics.reporter; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation for {@link MetricReporterFactory MetricReporterFactories} that want to maintain + * backwards-compatibility with existing reflection-based configurations. + * + *

When a reporter is configured to be used via reflection the annotated factory will be used instead. + * + * @see InstantiateViaFactory + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface InterceptInstantiationViaReflection { + String reporterClassName(); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java index da15079d14bdf..042791803abc8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ReporterSetup.java @@ -26,6 +26,7 @@ import org.apache.flink.core.plugin.PluginManager; import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.reporter.InstantiateViaFactory; +import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.MetricReporterFactory; import org.apache.flink.runtime.metrics.scope.ScopeFormat; @@ -259,6 +260,17 @@ private static Optional loadReporter( } if (reporterClassName != null) { + final Optional interceptingFactory = reporterFactories.values().stream() + .filter(factory -> { + InterceptInstantiationViaReflection annotation = factory.getClass().getAnnotation(InterceptInstantiationViaReflection.class); + return annotation != null && annotation.reporterClassName().equals(reporterClassName); + }) + .findAny(); + + if (interceptingFactory.isPresent()) { + return loadViaFactory(reporterConfig, interceptingFactory.get()); + } + return loadViaReflection(reporterClassName, reporterName, reporterConfig, reporterFactories); } @@ -278,13 +290,20 @@ private static Optional loadViaFactory( LOG.warn("The reporter factory ({}) could not be found for reporter {}. Available factories: {}.", factoryClassName, reporterName, reporterFactories.keySet()); return Optional.empty(); } else { - final MetricConfig metricConfig = new MetricConfig(); - reporterConfig.addAllToProperties(metricConfig); - - return Optional.of(factory.createMetricReporter(metricConfig)); + return loadViaFactory(reporterConfig, factory); } } + private static Optional loadViaFactory( + final Configuration reporterConfig, + final MetricReporterFactory factory) { + + final MetricConfig metricConfig = new MetricConfig(); + reporterConfig.addAllToProperties(metricConfig); + + return Optional.of(factory.createMetricReporter(metricConfig)); + } + private static Optional loadViaReflection( final String reporterClassName, final String reporterName, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java index f687c66034c6a..728f78350a7da 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ReporterSetupTest.java @@ -21,8 +21,10 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.core.plugin.TestingPluginManager; import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.reporter.InstantiateViaFactory; +import org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.reporter.MetricReporterFactory; import org.apache.flink.runtime.metrics.scope.ScopeFormat; @@ -32,6 +34,7 @@ import org.junit.Assert; import org.junit.Test; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.Properties; @@ -341,6 +344,26 @@ public void testFactoryAnnotation() { assertTrue(metricReporter.createdByFactory); } + /** + * Verifies that the factory approach is used if the factory is annotated with {@link org.apache.flink.metrics.reporter.InterceptInstantiationViaReflection}. + */ + @Test + public void testReflectionInterception() { + final Configuration config = new Configuration(); + config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, InstantiationTypeTrackingTestReporter.class.getName()); + + final List reporterSetups = ReporterSetup.fromConfiguration(config, new TestingPluginManager(Collections.singletonMap( + MetricReporterFactory.class, + Collections.singletonList(new InterceptingInstantiationTypeTrackingTestReporterFactory()).iterator()))); + + assertEquals(1, reporterSetups.size()); + + final ReporterSetup reporterSetup = reporterSetups.get(0); + final InstantiationTypeTrackingTestReporter metricReporter = (InstantiationTypeTrackingTestReporter) reporterSetup.getReporter(); + + assertTrue(metricReporter.createdByFactory); + } + /** * Factory that exposed the last provided metric config. */ @@ -390,6 +413,18 @@ public MetricReporter createMetricReporter(Properties config) { } } + /** + * Factory for {@link InstantiationTypeTrackingTestReporter} that intercepts reflection-based instantiation attempts. + */ + @InterceptInstantiationViaReflection(reporterClassName = "org.apache.flink.runtime.metrics.ReporterSetupTest$InstantiationTypeTrackingTestReporter") + public static class InterceptingInstantiationTypeTrackingTestReporterFactory implements MetricReporterFactory { + + @Override + public MetricReporter createMetricReporter(Properties config) { + return new InstantiationTypeTrackingTestReporter(true); + } + } + /** * Reporter that exposes which constructor was called. */