Skip to content

Commit

Permalink
[FLINK-2339] Prevent asynchronous checkpoint calls from overtaking ea…
Browse files Browse the repository at this point in the history
…ch other
  • Loading branch information
StephanEwen committed Jul 9, 2015
1 parent c7ec74e commit cbde2c2
Show file tree
Hide file tree
Showing 5 changed files with 370 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*/
public class TaskEventHandler {

// Listeners for each event type
/** Listeners for each event type */
private final Multimap<Class<? extends TaskEvent>, EventListener<TaskEvent>> listeners = HashMultimap.create();

public void subscribe(EventListener<TaskEvent> listener, Class<? extends TaskEvent> eventType) {
Expand All @@ -45,7 +45,7 @@ public void unsubscribe(EventListener<TaskEvent> listener, Class<? extends TaskE
}

/**
* Publishes the task event to all subscribed event listeners..
* Publishes the task event to all subscribed event listeners.
*
* @param event The event to publish.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.taskmanager;

import java.util.concurrent.ThreadFactory;

/**
* Thread factory that creates threads with a given name, associates them with a given
* thread group, and set them to daemon mode.
*/
public class DispatherThreadFactory implements ThreadFactory {

private final ThreadGroup group;

private final String threadName;

/**
* Creates a new thread factory.
*
* @param group The group that the threads will be associated with.
* @param threadName The name for the threads.
*/
public DispatherThreadFactory(ThreadGroup group, String threadName) {
this.group = group;
this.threadName = threadName;
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, threadName);
t.setDaemon(true);
return t;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,16 @@ public class MemoryLogger extends Thread {
private final ActorSystem monitored;

private volatile boolean running = true;


public MemoryLogger(Logger logger, long interval) {
this(logger, interval, null);
}

/**
* Creates a new memory logger that logs in the given interval and lives as long as the
* given actor system.
*
* @param logger The logger to use for outputting the memory statistics.
* @param interval The interval in which the thread logs.
* @param monitored The actor system to whose life the thread is bound. The thread terminates
* once the actor system terminates.
*/
public MemoryLogger(Logger logger, long interval, ActorSystem monitored) {
super("Memory Logger");
setDaemon(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import akka.actor.ActorRef;
import akka.util.Timeout;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -64,7 +65,10 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

Expand Down Expand Up @@ -188,6 +192,7 @@ public class Task implements Runnable {
// proper happens-before semantics on parallel modification
// ------------------------------------------------------------------------

/** atomic flag that makes sure the invokable is canceled exactly once upon error */
private final AtomicBoolean invokableHasBeenCanceled;

/** The invokable of this task, if initialized */
Expand All @@ -199,6 +204,9 @@ public class Task implements Runnable {
/** The observed exception, in case the task execution failed */
private volatile Throwable failureCause;

/** Serial executor for asynchronous calls (checkpoints, etc), lazily initialized */
private volatile ExecutorService asyncCallDispatcher;

/** The handle to the state that the operator was initialized with. Will be set to null after the
* initialization, to be memory friendly */
private volatile SerializedValue<StateHandle<?>> operatorState;
Expand Down Expand Up @@ -290,11 +298,11 @@ public Task(TaskDeploymentDescriptor tdd,
this.inputGates[i] = gate;
inputGatesById.put(gate.getConsumedResultId(), gate);
}

invokableHasBeenCanceled = new AtomicBoolean(false);

// finally, create the executing thread, but do not start it
executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);

invokableHasBeenCanceled = new AtomicBoolean(false);
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -646,9 +654,17 @@ else if (STATE_UPDATER.compareAndSet(this, current, ExecutionState.FAILED)) {
try {
LOG.info("Freeing task resources for " + taskNameWithSubtask);

// stop the async dispatcher.
// copy dispatcher reference to stack, against concurrent release
ExecutorService dispatcher = this.asyncCallDispatcher;
if (dispatcher != null && !dispatcher.isShutdown()) {
dispatcher.shutdownNow();
}

// free the network resources
network.unregisterTask(this);

// free memory resources
if (invokable != null) {
memoryManager.releaseAll(invokable);
}
Expand Down Expand Up @@ -797,6 +813,7 @@ else if (current == ExecutionState.RUNNING) {
Runnable canceler = new TaskCanceler(LOG, invokable, executingThread, taskNameWithSubtask);
Thread cancelThread = new Thread(executingThread.getThreadGroup(), canceler,
"Canceler for " + taskNameWithSubtask);
cancelThread.setDaemon(true);
cancelThread.start();
}
return;
Expand Down Expand Up @@ -955,11 +972,49 @@ else if (partitionState == ExecutionState.CANCELED
LOG.debug("Ignoring partition state notification for not running task.");
}
}


/**
* Utility method to dispatch an asynchronous call on the invokable.
*
* @param runnable The async call runnable.
* @param callName The name of the call, for logging purposes.
*/
private void executeAsyncCallRunnable(Runnable runnable, String callName) {
Thread thread = new Thread(runnable, callName);
thread.setDaemon(true);
thread.start();
// make sure the executor is initialized. lock against concurrent calls to this function
synchronized (this) {
if (isCanceledOrFailed()) {
return;
}

// get ourselves a reference on the stack that cannot be concurrently modified
ExecutorService executor = this.asyncCallDispatcher;
if (executor == null) {
// first time use, initialize
executor = Executors.newSingleThreadExecutor(
new DispatherThreadFactory(TASK_THREADS_GROUP, "Async calls on " + taskNameWithSubtask));
this.asyncCallDispatcher = executor;

// double-check for execution state, and make sure we clean up after ourselves
// if we created the dispatcher while the task was concurrently canceled
if (isCanceledOrFailed()) {
executor.shutdown();
asyncCallDispatcher = null;
return;
}
}

LOG.debug("Invoking async call {} on task {}", callName, taskNameWithSubtask);

try {
executor.submit(runnable);
}
catch (RejectedExecutionException e) {
// may be that we are concurrently canceled. if not, report that something is fishy
if (!isCanceledOrFailed()) {
throw new RuntimeException("Async call was rejected, even though the task was not canceled.", e);
}
}
}
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -1051,7 +1106,7 @@ public void run() {

executer.interrupt();
try {
executer.join(5000);
executer.join(10000);
}
catch (InterruptedException e) {
// we can ignore this
Expand Down
Loading

0 comments on commit cbde2c2

Please sign in to comment.