Skip to content

Commit

Permalink
[FLINK-5082] Pull ExecutorService lifecycle management out of the Job…
Browse files Browse the repository at this point in the history
…Manager

The provided ExecutorService will no longer be closed by the JobManager. Instead the
lifecycle is managed outside of it where it was created. This will give a nicer behaviour,
because it better seperates responsibilities.

This closes apache#2820.
  • Loading branch information
tillrohrmann committed Nov 22, 2016
1 parent 698e53e commit ae4b274
Show file tree
Hide file tree
Showing 25 changed files with 229 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@
import org.apache.flink.runtime.process.ProcessReaper;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.JvmShutdownSafeguard;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.util.NamedThreadFactory;
import org.apache.flink.runtime.util.SignalHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;

Expand All @@ -66,6 +68,8 @@
import java.security.PrivilegedAction;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.mesos.Utils.uri;
Expand All @@ -75,7 +79,7 @@

/**
* This class is the executable entry point for the Mesos Application Master.
* It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager}
* It starts actor system and the actors for {@link JobManager}
* and {@link MesosFlinkResourceManager}.
*
* The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container
Expand Down Expand Up @@ -168,6 +172,12 @@ protected int runPrivileged() {
WebMonitor webMonitor = null;
MesosArtifactServer artifactServer = null;

int numberProcessors = Hardware.getNumberCPUCores();

final ExecutorService executor = Executors.newFixedThreadPool(
numberProcessors,
new NamedThreadFactory("mesos-jobmanager-future-", "-thread-"));

try {
// ------- (1) load and parse / validate all configurations -------

Expand Down Expand Up @@ -281,7 +291,9 @@ protected int runPrivileged() {

// we start the JobManager with its standard name
ActorRef jobManager = JobManager.startJobManagerActors(
config, actorSystem,
config,
actorSystem,
executor,
new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
scala.Option.<String>empty(),
getJobManagerClass(),
Expand Down Expand Up @@ -387,6 +399,8 @@ protected int runPrivileged() {
LOG.error("Failed to stop the artifact server", t);
}

executor.shutdownNow();

return 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.mesos.runtime.clusterframework

import java.util.concurrent.ExecutorService
import java.util.concurrent.Executor

import akka.actor.ActorRef
import org.apache.flink.configuration.{Configuration => FlinkConfiguration}
Expand All @@ -37,7 +37,7 @@ import scala.concurrent.duration._
/** JobManager actor for execution on Mesos. .
*
* @param flinkConfiguration Configuration object for the actor
* @param executorService Execution context which is used to execute concurrent tasks in the
* @param executor Execution context which is used to execute concurrent tasks in the
* [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
* @param instanceManager Instance manager to manage the registered
* [[org.apache.flink.runtime.taskmanager.TaskManager]]
Expand All @@ -49,7 +49,7 @@ import scala.concurrent.duration._
* @param leaderElectionService LeaderElectionService to participate in the leader election
*/
class MesosJobManager(flinkConfiguration: FlinkConfiguration,
executorService: ExecutorService,
executor: Executor,
instanceManager: InstanceManager,
scheduler: FlinkScheduler,
libraryCacheManager: BlobLibraryCacheManager,
Expand All @@ -63,7 +63,7 @@ class MesosJobManager(flinkConfiguration: FlinkConfiguration,
metricsRegistry: Option[FlinkMetricRegistry])
extends ContaineredJobManager(
flinkConfiguration,
executorService,
executor,
instanceManager,
scheduler,
libraryCacheManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void testBackPressuredProducer() throws Exception {
}

try {
jobManger = TestingUtils.createJobManager(testActorSystem, new Configuration());
jobManger = TestingUtils.createJobManager(testActorSystem, testActorSystem.dispatcher(), new Configuration());

final Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public void testTaskClearedWhileSampling() throws Exception {
ActorGateway taskManager = null;

try {
jobManger = TestingUtils.createJobManager(testActorSystem, new Configuration());
jobManger = TestingUtils.createJobManager(testActorSystem, testActorSystem.dispatcher(), new Configuration());

final Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ public void testRedirectToLeader() throws Exception {
jobManager[i] = JobManager.startJobManagerActors(
jmConfig,
jobManagerSystem[i],
jobManagerSystem[i].dispatcher(),
JobManager.class,
MemoryArchivist.class)._1();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.util;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Thread factory which allows to specify a thread pool name and a thread name.
*
* The code is based on {@link java.util.concurrent.Executors.DefaultThreadFactory}.
*/
public class NamedThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

public NamedThreadFactory(final String poolName, final String threadName) {
SecurityManager securityManager = System.getSecurityManager();
group = (securityManager != null) ? securityManager.getThreadGroup() :
Thread.currentThread().getThreadGroup();

namePrefix = poolName +
poolNumber.getAndIncrement() +
threadName;
}

@Override
public Thread newThread(Runnable runnable) {
Thread t = new Thread(group, runnable,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon()) {
t.setDaemon(false);
}
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.runtime.clusterframework

import java.util.concurrent.ExecutorService
import java.util.concurrent.Executor

import akka.actor.ActorRef
import org.apache.flink.api.common.JobID
Expand All @@ -45,7 +45,7 @@ import scala.language.postfixOps
* to start/administer/stop the session.
*
* @param flinkConfiguration Configuration object for the actor
* @param executorService Execution context which is used to execute concurrent tasks in the
* @param executor Execution context which is used to execute concurrent tasks in the
* [[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
* @param instanceManager Instance manager to manage the registered
* [[org.apache.flink.runtime.taskmanager.TaskManager]]
Expand All @@ -58,7 +58,7 @@ import scala.language.postfixOps
*/
abstract class ContaineredJobManager(
flinkConfiguration: Configuration,
executorService: ExecutorService,
executor: Executor,
instanceManager: InstanceManager,
scheduler: FlinkScheduler,
libraryCacheManager: BlobLibraryCacheManager,
Expand All @@ -72,7 +72,7 @@ abstract class ContaineredJobManager(
metricsRegistry: Option[FlinkMetricRegistry])
extends JobManager(
flinkConfiguration,
executorService,
executor,
instanceManager,
scheduler,
libraryCacheManager,
Expand Down
Loading

0 comments on commit ae4b274

Please sign in to comment.