Skip to content

Commit

Permalink
Single threaded executor for image decoding
Browse files Browse the repository at this point in the history
  • Loading branch information
Michał Gregorczyk authored and tyronen committed Apr 15, 2015
1 parent 0a277ca commit 3e7003a
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (c) 2015-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/
package com.facebook.common.executors;

import javax.annotation.concurrent.GuardedBy;

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Executor;

import com.facebook.common.internal.Preconditions;
import com.facebook.common.internal.VisibleForTesting;

/**
* Simple implementation of delegating Executor that limits concurrency of execution to single
* thread.
*/
public class SerialDelegatingExecutor implements Executor {

private final Executor mDelegate;
@VisibleForTesting
final Runnable mRunnable;

/**
* True if and only if runnable has been passed to mDelegate for execution, but the execution
* itself has not completed yet.
*/
@GuardedBy("this")
@VisibleForTesting
boolean mExecutionInProgress;
@GuardedBy("this")
final private Queue<Runnable> mCommands;

public SerialDelegatingExecutor(Executor delegate) {
mDelegate = Preconditions.checkNotNull(delegate);
mExecutionInProgress = false;
mCommands = new LinkedList<Runnable>();
mRunnable = new Runnable() {
@Override
public void run() {
executeSingleCommand();
}
};
}

/**
* Submits another command for execution
*/
@Override
public void execute(Runnable command) {
synchronized (this) {
mCommands.add(command);
}
maybeSubmitRunnable();
}

private void maybeSubmitRunnable() {
synchronized (this) {
if (mExecutionInProgress || mCommands.isEmpty()) {
return;
}
mExecutionInProgress = true;
}
mDelegate.execute(mRunnable);
}

private void executeSingleCommand() {
Runnable command;
try {
removeNextCommand().run();
} finally {
clearExecutionInProgress();
maybeSubmitRunnable();
}
}

private synchronized Runnable removeNextCommand() {
return mCommands.remove();
}

private synchronized void clearExecutionInProgress() {
mExecutionInProgress = false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (c) 2015-present, Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree. An additional grant
* of patent rights can be found in the PATENTS file in the same directory.
*/
package com.facebook.common.executors;

import java.util.concurrent.Executor;

import org.junit.Before;
import org.junit.Test;

import static org.mockito.Mockito.*;

public class SerialDelegatingExecutorTest {
private SerialDelegatingExecutor mSerialDelegatingExecutor;
private Executor mExecutor;
private Runnable mRunnable;

@Before
public void setUp() {
mExecutor = mock(Executor.class);
mSerialDelegatingExecutor = new SerialDelegatingExecutor(mExecutor);
mRunnable = mock(Runnable.class);
}

@Test
public void testSubmitsTask() {
mSerialDelegatingExecutor.execute(mRunnable);

verify(mExecutor).execute(mSerialDelegatingExecutor.mRunnable);
}

@Test
public void testExecutesTask() {
mSerialDelegatingExecutor.execute(mRunnable);
mSerialDelegatingExecutor.mRunnable.run();

verify(mRunnable).run();
}

@Test
public void testDoesNotSubmitMultipleRunnables() {
mSerialDelegatingExecutor.execute(mRunnable);
mSerialDelegatingExecutor.execute(mRunnable);

verify(mExecutor).execute(mSerialDelegatingExecutor.mRunnable);
}

@Test
public void testDoesSubmitNextRunnable() {
mSerialDelegatingExecutor.execute(mRunnable);
mSerialDelegatingExecutor.execute(mRunnable);
mSerialDelegatingExecutor.mRunnable.run();

verify(mExecutor, times(2)).execute(mSerialDelegatingExecutor.mRunnable);
}

@Test
public void testExecutesMultipleTasks() {
mSerialDelegatingExecutor.execute(mRunnable);
mSerialDelegatingExecutor.execute(mRunnable);
mSerialDelegatingExecutor.mRunnable.run();
mSerialDelegatingExecutor.mRunnable.run();

verify(mRunnable, times(2)).run();
}

@Test
public void testDoesNotSubmitRunnableTooManyTimes() {
mSerialDelegatingExecutor.execute(mRunnable);
mSerialDelegatingExecutor.execute(mRunnable);
mSerialDelegatingExecutor.mRunnable.run();
mSerialDelegatingExecutor.mRunnable.run();

verify(mExecutor, times(2)).execute(mSerialDelegatingExecutor.mRunnable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,37 @@

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.facebook.common.executors.SerialDelegatingExecutor;

/**
* Basic implementation of {@link ExecutorSupplier} that provides one thread pool for the
* CPU-bound operations and another thread pool for the IO-bound operations.
* Basic implementation of {@link ExecutorSupplier}.
*
* <p> Provides one thread pool for the CPU-bound operations and another thread pool for the
* IO-bound operations. Decoding, a CPU-intensive operation, is limited to one thread.
*/
public class DefaultExecutorSupplier implements ExecutorSupplier {
// Allows for simultaneous reads and writes.
private static final int NUM_IO_BOUND_THREADS = 2;
private static final int NUM_CPU_BOUND_THREADS = Runtime.getRuntime().availableProcessors();
private static final int KEEP_ALIVE_SECONDS = 60;

private final Executor mIoBoundExecutor;
private final Executor mCpuBoundExecutor;
private final Executor mDecodeExecutor;

public DefaultExecutorSupplier() {
mIoBoundExecutor = Executors.newFixedThreadPool(NUM_IO_BOUND_THREADS);
mCpuBoundExecutor = Executors.newFixedThreadPool(NUM_CPU_BOUND_THREADS);
mCpuBoundExecutor = new ThreadPoolExecutor(
1, // keep at least that many threads alive
NUM_CPU_BOUND_THREADS, // maximum number of allowed threads
KEEP_ALIVE_SECONDS, // amount of seconds each cached thread waits before being terminated
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
mDecodeExecutor = new SerialDelegatingExecutor(mCpuBoundExecutor);
}

@Override
Expand All @@ -41,7 +56,7 @@ public Executor forLocalStorageWrite() {

@Override
public Executor forDecode() {
return mCpuBoundExecutor;
return mDecodeExecutor;
}

@Override
Expand Down

0 comments on commit 3e7003a

Please sign in to comment.