Skip to content

Commit

Permalink
[FLINK-10461][runtime] Refactor direct executor service
Browse files Browse the repository at this point in the history
Co-authored-by: Andrey Zagrebin <[email protected]>
Co-authored-by: klion26 <[email protected]>
  • Loading branch information
2 people authored and zentol committed Dec 20, 2018
1 parent 42f5f77 commit 1708260
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
* limitations under the License.
*/

package org.apache.flink.runtime.util;
package org.apache.flink.runtime.concurrent;

import javax.annotation.Nonnull;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -31,37 +33,42 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class DirectExecutorService implements ExecutorService {
private boolean _shutdown = false;
/** The direct executor service directly executes the runnables and the callables in the calling thread. */
class DirectExecutorService implements ExecutorService {
static final DirectExecutorService INSTANCE = new DirectExecutorService();

private boolean isShutdown = false;

@Override
public void shutdown() {
_shutdown = true;
isShutdown = true;
}

@Override
@Nonnull
public List<Runnable> shutdownNow() {
_shutdown = true;
isShutdown = true;
return Collections.emptyList();
}

@Override
public boolean isShutdown() {
return _shutdown;
return isShutdown;
}

@Override
public boolean isTerminated() {
return _shutdown;
return isShutdown;
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return _shutdown;
public boolean awaitTermination(long timeout, @Nonnull TimeUnit unit) {
return isShutdown;
}

@Override
public <T> Future<T> submit(Callable<T> task) {
@Nonnull
public <T> Future<T> submit(@Nonnull Callable<T> task) {
try {
T result = task.call();

Expand All @@ -72,34 +79,40 @@ public <T> Future<T> submit(Callable<T> task) {
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
@Nonnull
public <T> Future<T> submit(@Nonnull Runnable task, T result) {
task.run();

return new CompletedFuture<>(result, null);
}

@Override
public Future<?> submit(Runnable task) {
@Nonnull
public Future<?> submit(@Nonnull Runnable task) {
task.run();
return new CompletedFuture<>(null, null);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
@Nonnull
public <T> List<Future<T>> invokeAll(@Nonnull Collection<? extends Callable<T>> tasks) {
ArrayList<Future<T>> result = new ArrayList<>();

for (Callable<T> task : tasks) {
try {
result.add(new CompletedFuture<T>(task.call(), null));
result.add(new CompletedFuture<>(task.call(), null));
} catch (Exception e) {
result.add(new CompletedFuture<T>(null, e));
result.add(new CompletedFuture<>(null, e));
}
}
return result;
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
@Nonnull
public <T> List<Future<T>> invokeAll(
@Nonnull Collection<? extends Callable<T>> tasks, long timeout, @Nonnull TimeUnit unit) {

long end = System.currentTimeMillis() + unit.toMillis(timeout);
Iterator<? extends Callable<T>> iterator = tasks.iterator();
ArrayList<Future<T>> result = new ArrayList<>();
Expand All @@ -108,13 +121,13 @@ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, lo
Callable<T> callable = iterator.next();

try {
result.add(new CompletedFuture<T>(callable.call(), null));
result.add(new CompletedFuture<>(callable.call(), null));
} catch (Exception e) {
result.add(new CompletedFuture<T>(null, e));
result.add(new CompletedFuture<>(null, e));
}
}

while(iterator.hasNext()) {
while (iterator.hasNext()) {
iterator.next();
result.add(new Future<T>() {
@Override
Expand All @@ -133,12 +146,12 @@ public boolean isDone() {
}

@Override
public T get() throws InterruptedException, ExecutionException {
public T get() {
throw new CancellationException("Task has been cancelled.");
}

@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
public T get(long timeout, @Nonnull TimeUnit unit) {
throw new CancellationException("Task has been cancelled.");
}
});
Expand All @@ -148,7 +161,8 @@ public T get(long timeout, TimeUnit unit) throws InterruptedException, Execution
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
@Nonnull
public <T> T invokeAny(@Nonnull Collection<? extends Callable<T>> tasks) throws ExecutionException {
Exception exception = null;

for (Callable<T> task : tasks) {
Expand All @@ -164,7 +178,11 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws Interrupt
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
public <T> T invokeAny(
@Nonnull Collection<? extends Callable<T>> tasks,
long timeout,
@Nonnull TimeUnit unit) throws ExecutionException, TimeoutException {

long end = System.currentTimeMillis() + unit.toMillis(timeout);
Exception exception = null;

Expand All @@ -189,15 +207,15 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, Ti
}

@Override
public void execute(Runnable command) {
public void execute(@Nonnull Runnable command) {
command.run();
}

public static class CompletedFuture<V> implements Future<V> {
static class CompletedFuture<V> implements Future<V> {
private final V value;
private final Exception exception;

public CompletedFuture(V value, Exception exception) {
CompletedFuture(V value, Exception exception) {
this.value = value;
this.exception = exception;
}
Expand All @@ -218,7 +236,7 @@ public boolean isDone() {
}

@Override
public V get() throws InterruptedException, ExecutionException {
public V get() throws ExecutionException {
if (exception != null) {
throw new ExecutionException(exception);
} else {
Expand All @@ -227,7 +245,7 @@ public V get() throws InterruptedException, ExecutionException {
}

@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
public V get(long timeout, @Nonnull TimeUnit unit) throws ExecutionException {
return get();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,36 @@

package org.apache.flink.runtime.concurrent;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

import scala.concurrent.ExecutionContext;

/**
* Collection of {@link Executor} implementations.
* Collection of {@link Executor}, {@link ExecutorService} and {@link ExecutionContext} implementations.
*/
public class Executors {

private static final Logger LOG = LoggerFactory.getLogger(Executors.class);

/**
* Return a direct executor. The direct executor directly executes the runnable in the calling
* thread.
*
* @return Direct executor
*/
public static Executor directExecutor() {
return DirectExecutor.INSTANCE;
return DirectExecutorService.INSTANCE;
}

/**
* Direct executor implementation.
* Return a new direct executor service.
*
* <p>The direct executor service directly executes the runnables and the callables in the calling
* thread.
*
* @return New direct executor service
*/
private static class DirectExecutor implements Executor {

static final DirectExecutor INSTANCE = new DirectExecutor();

private DirectExecutor() {}

@Override
public void execute(@Nonnull Runnable command) {
command.run();
}
public static ExecutorService newDirectExecutorService() {
return new DirectExecutorService();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.util.DirectExecutorService;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
Expand Down Expand Up @@ -79,7 +78,7 @@ public void testRegularHeartbeat() {
heartbeatTimeout,
ownResourceID,
heartbeatListener,
new DirectExecutorService(),
Executors.directExecutor(),
scheduledExecutor,
LOG);

Expand Down Expand Up @@ -122,7 +121,7 @@ public void testHeartbeatMonitorUpdate() {
heartbeatTimeout,
ownResourceID,
heartbeatListener,
new DirectExecutorService(),
Executors.directExecutor(),
scheduledExecutor,
LOG);

Expand Down Expand Up @@ -163,7 +162,7 @@ public void testHeartbeatTimeout() throws Exception {
heartbeatTimeout,
ownResourceID,
heartbeatListener,
new DirectExecutorService(),
Executors.directExecutor(),
new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)),
LOG);

Expand Down Expand Up @@ -215,7 +214,7 @@ public void testHeartbeatCluster() throws Exception {
heartbeatTimeout,
resourceID,
heartbeatListener,
new DirectExecutorService(),
Executors.directExecutor(),
new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)),
LOG);

Expand All @@ -224,7 +223,7 @@ public void testHeartbeatCluster() throws Exception {
heartbeatTimeout,
resourceID2,
heartbeatListener2,
new DirectExecutorService(),
Executors.directExecutor(),
new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)),
LOG);

Expand Down Expand Up @@ -264,7 +263,7 @@ public void testTargetUnmonitoring() throws InterruptedException, ExecutionExcep
heartbeatTimeout,
resourceID,
heartbeatListener,
new DirectExecutorService(),
Executors.directExecutor(),
new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)),
LOG);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.DirectExecutorService;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
Expand Down Expand Up @@ -137,6 +136,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.flink.runtime.concurrent.Executors.newDirectExecutorService;
import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
Expand Down Expand Up @@ -423,7 +423,7 @@ public void testFailingAsyncCheckpointRunnable() throws Exception {
Whitebox.setInternalState(streamTask, "lock", new Object());
Whitebox.setInternalState(streamTask, "operatorChain", operatorChain);
Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", new DirectExecutorService());
Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", newDirectExecutorService());
Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
Whitebox.setInternalState(streamTask, "checkpointStorage", new MemoryBackendCheckpointStorage(new JobID(), null, null, Integer.MAX_VALUE));

Expand Down

0 comments on commit 1708260

Please sign in to comment.