Skip to content

Commit

Permalink
[FLINK-7876] Register TaskManagerMetricGroup under ResourceID
Browse files Browse the repository at this point in the history
This commit changes that TaskManagerMetricGroups are now registered under the
TaskManager's ResourceID instead of the InstanceID. This allows to create the
TaskManagerMetricGroup at startup of the TaskManager.

Moreover, it pulls the MetricRegistry out of JobManager and TaskManager. This
allows to reuse the same MetricRegistry across multiple instances (e.g. in the
FlinkMiniCluster case). Moreover, it ensures proper cleanup of a potentially
started MetricyQueryServiceActor.

Change TaskManagersHandler to work with ResourceID instead of InstanceID

Adapt MetricFetcher to use ResourceID instead of InstanceID

This closes apache#4872.
  • Loading branch information
tillrohrmann committed Nov 1, 2017
1 parent a7e0a27 commit d45b941
Show file tree
Hide file tree
Showing 100 changed files with 1,020 additions and 834 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
Expand Down Expand Up @@ -112,7 +112,7 @@ protected void initializeServices(Configuration config) throws Exception {
}

@Override
protected void startClusterComponents(Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry) throws Exception {
protected void startClusterComponents(Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistryImpl metricRegistry) throws Exception {
super.startClusterComponents(configuration, rpcService, highAvailabilityServices, blobServer, heartbeatServices, metricRegistry);
}

Expand All @@ -123,7 +123,7 @@ protected ResourceManager<?> createResourceManager(
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
MetricRegistryImpl metricRegistry,
FatalErrorHandler fatalErrorHandler) throws Exception {
final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices;
Expand Down Expand Up @@ -102,7 +102,7 @@ protected void initializeServices(Configuration config) throws Exception {
}

@Override
protected void startClusterComponents(Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry) throws Exception {
protected void startClusterComponents(Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistryImpl metricRegistry) throws Exception {
super.startClusterComponents(configuration, rpcService, highAvailabilityServices, blobServer, heartbeatServices, metricRegistry);
}

Expand All @@ -113,7 +113,7 @@ protected ResourceManager<?> createResourceManager(
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
MetricRegistryImpl metricRegistry,
FatalErrorHandler fatalErrorHandler) throws Exception {
final ResourceManagerConfiguration rmConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.fromConfiguration(configuration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.process.ProcessReaper;
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
Expand Down Expand Up @@ -200,6 +202,7 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie
ExecutorService ioExecutor = null;
MesosServices mesosServices = null;
HighAvailabilityServices highAvailabilityServices = null;
MetricRegistryImpl metricRegistry = null;

try {
// ------- (1) load and parse / validate all configurations -------
Expand Down Expand Up @@ -304,13 +307,19 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie
// 2: the JobManager
LOG.debug("Starting JobManager actor");

metricRegistry = new MetricRegistryImpl(
MetricRegistryConfiguration.fromConfiguration(config));

metricRegistry.startQueryService(actorSystem, null);

// we start the JobManager with its standard name
ActorRef jobManager = JobManager.startJobManagerActors(
config,
actorSystem,
futureExecutor,
ioExecutor,
highAvailabilityServices,
metricRegistry,
webMonitor != null ? Option.apply(webMonitor.getRestAddress()) : Option.empty(),
Option.apply(JobMaster.JOB_MANAGER_NAME),
Option.apply(JobMaster.ARCHIVE_NAME),
Expand Down Expand Up @@ -422,6 +431,10 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie
}
}

if (metricRegistry != null) {
metricRegistry.shutdown();
}

org.apache.flink.runtime.concurrent.Executors.gracefulShutdown(
AkkaUtils.getTimeout(config).toMillis(),
TimeUnit.MILLISECONDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManager;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
Expand Down Expand Up @@ -145,7 +145,7 @@ public MesosResourceManager(
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
SlotManager slotManager,
MetricRegistry metricRegistry,
MetricRegistryImpl metricRegistry,
JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler,
// Mesos specifics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import org.apache.flink.runtime.instance.InstanceManager
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore
import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
import org.apache.flink.runtime.leaderelection.LeaderElectionService
import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
import org.apache.flink.runtime.metrics.{MetricRegistryImpl => FlinkMetricRegistry}

import scala.concurrent.duration._

Expand Down Expand Up @@ -66,7 +67,7 @@ class MesosJobManager(
submittedJobGraphs : SubmittedJobGraphStore,
checkpointRecoveryFactory : CheckpointRecoveryFactory,
jobRecoveryTimeout: FiniteDuration,
metricsRegistry: Option[FlinkMetricRegistry],
jobManagerMetricGroup: JobManagerMetricGroup,
optRestAddress: Option[String])
extends ContaineredJobManager(
flinkConfiguration,
Expand All @@ -83,7 +84,7 @@ class MesosJobManager(
submittedJobGraphs,
checkpointRecoveryFactory,
jobRecoveryTimeout,
metricsRegistry,
jobManagerMetricGroup,
optRestAddress) {

val jobPollingInterval: FiniteDuration = 5 seconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices
import org.apache.flink.runtime.io.disk.iomanager.IOManager
import org.apache.flink.runtime.io.network.NetworkEnvironment
import org.apache.flink.runtime.memory.MemoryManager
import org.apache.flink.runtime.metrics.MetricRegistry
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration
import org.apache.flink.runtime.taskmanager.{TaskManager, TaskManagerLocation}

Expand All @@ -39,7 +39,7 @@ class MesosTaskManager(
network: NetworkEnvironment,
numberOfSlots: Int,
highAvailabilityServices: HighAvailabilityServices,
metricRegistry : MetricRegistry)
taskManagerMetricGroup : TaskManagerMetricGroup)
extends TaskManager(
config,
resourceID,
Expand All @@ -49,7 +49,7 @@ class MesosTaskManager(
network,
numberOfSlots,
highAvailabilityServices,
metricRegistry) {
taskManagerMetricGroup) {

override def handleMessage: Receive = {
super.handleMessage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
Expand Down Expand Up @@ -160,7 +160,7 @@ public TestingMesosResourceManager(
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
SlotManager slotManager,
MetricRegistry metricRegistry,
MetricRegistryImpl metricRegistry,
JobLeaderIdService jobLeaderIdService,
FatalErrorHandler fatalErrorHandler,

Expand Down Expand Up @@ -306,7 +306,7 @@ class MockResourceManagerRuntimeServices {
public final ScheduledExecutor scheduledExecutor;
public final TestingHighAvailabilityServices highAvailabilityServices;
public final HeartbeatServices heartbeatServices;
public final MetricRegistry metricRegistry;
public final MetricRegistryImpl metricRegistry;
public final TestingLeaderElectionService rmLeaderElectionService;
public final JobLeaderIdService jobLeaderIdService;
public final SlotManager slotManager;
Expand All @@ -321,7 +321,7 @@ class MockResourceManagerRuntimeServices {
rmLeaderElectionService = new TestingLeaderElectionService();
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
heartbeatServices = new TestingHeartbeatServices(5L, 5L, scheduledExecutor);
metricRegistry = mock(MetricRegistry.class);
metricRegistry = mock(MetricRegistryImpl.class);
slotManager = mock(SlotManager.class);
slotManagerStarted = new CompletableFuture<>();
jobLeaderIdService = new JobLeaderIdService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
Expand Down Expand Up @@ -92,7 +92,7 @@ public void testAddingMetrics() throws NoSuchFieldException, IllegalAccessExcept

MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(configuration);

MetricRegistry metricRegistry = new MetricRegistry(metricRegistryConfiguration);
MetricRegistryImpl metricRegistry = new MetricRegistryImpl(metricRegistryConfiguration);

char delimiter = metricRegistry.getDelimiter();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.dropwizard.ScheduledDropwizardReporter;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.util.TestLogger;
Expand Down Expand Up @@ -105,12 +105,12 @@ public void testDropwizardHistogramWrapperReporting() throws Exception {
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestingReporter.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "my_reporter." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, reportingInterval + " MILLISECONDS");

MetricRegistry registry = null;
MetricRegistryImpl registry = null;

MetricRegistryConfiguration metricRegistryConfiguration = MetricRegistryConfiguration.fromConfiguration(config);

try {
registry = new MetricRegistry(metricRegistryConfiguration);
registry = new MetricRegistryImpl(metricRegistryConfiguration);
DropwizardHistogramWrapper histogramWrapper = new DropwizardHistogramWrapper(
new com.codahale.metrics.Histogram(new SlidingWindowReservoir(size)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.util.TestMeter;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.groups.FrontMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
Expand Down Expand Up @@ -108,7 +108,7 @@ public void testPortConflictHandling() throws Exception {
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2.port", "9020-9035");

MetricRegistry reg = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
MetricRegistryImpl reg = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));

TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm");

Expand Down Expand Up @@ -168,7 +168,7 @@ public void testJMXAvailability() throws Exception {
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
cfg.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2.port", "9040-9055");

MetricRegistry reg = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
MetricRegistryImpl reg = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));

TaskManagerMetricGroup mg = new TaskManagerMetricGroup(reg, "host", "tm");

Expand Down Expand Up @@ -231,15 +231,15 @@ public Integer getValue() {
*/
@Test
public void testHistogramReporting() throws Exception {
MetricRegistry registry = null;
MetricRegistryImpl registry = null;
String histogramName = "histogram";

try {
Configuration config = new Configuration();
config.setString(MetricOptions.REPORTERS_LIST, "jmx_test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());

registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));

TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");

Expand Down Expand Up @@ -281,15 +281,15 @@ public void testHistogramReporting() throws Exception {
*/
@Test
public void testMeterReporting() throws Exception {
MetricRegistry registry = null;
MetricRegistryImpl registry = null;
String meterName = "meter";

try {
Configuration config = new Configuration();
config.setString(MetricOptions.REPORTERS_LIST, "jmx_test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "jmx_test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());

registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));

TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.metrics.util.TestMeter;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
Expand Down Expand Up @@ -75,12 +75,12 @@ public class PrometheusReporterTaskScopeTest {
private TaskMetricGroup taskMetricGroup1;
private TaskMetricGroup taskMetricGroup2;

private MetricRegistry registry;
private MetricRegistryImpl registry;
private PrometheusReporter reporter;

@Before
public void setupReporter() {
registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9400-9500")));
registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(createConfigWithOneReporter("test1", "9400-9500")));
reporter = (PrometheusReporter) registry.getReporters().get(0);

TaskManagerMetricGroup tmMetricGroup = new TaskManagerMetricGroup(registry, TASK_MANAGER_HOST, TASK_MANAGER_ID);
Expand Down
Loading

0 comments on commit d45b941

Please sign in to comment.