Skip to content

Commit

Permalink
Timeout support
Browse files Browse the repository at this point in the history
mnadeem committed Apr 17, 2021
1 parent ef15be4 commit e197f36
Showing 15 changed files with 669 additions and 193 deletions.
13 changes: 11 additions & 2 deletions src/main/java/com/github/dexecutor/core/DefaultDexecutor.java
Original file line number Diff line number Diff line change
@@ -58,6 +58,7 @@ public class DefaultDexecutor <T, R> implements Dexecutor<T, R> {
private final ExecutionEngine<T, R> executionEngine;
private final ExecutorService immediatelyRetryExecutor;
private final ScheduledExecutorService scheduledRetryExecutor;
private final ScheduledExecutorService timeoutExecutor;

private final DexecutorState<T, R> state;

@@ -70,11 +71,13 @@ public DefaultDexecutor(final DexecutorConfig<T, R> config) {

this.immediatelyRetryExecutor = Executors.newFixedThreadPool(config.getImmediateRetryPoolThreadsCount());
this.scheduledRetryExecutor = Executors.newScheduledThreadPool(config.getScheduledRetryPoolThreadsCount());
this.timeoutExecutor = Executors.newScheduledThreadPool(config.getTimeoutSchedulerPoolThreadsCount());

this.executionEngine = config.getExecutorEngine();
this.validator = config.getValidator();
this.taskProvider = config.getTaskProvider();
this.state = config.getDexecutorState();
this.executionEngine.setTimeoutScheduler(timeoutExecutor);
}

public void print(final Traversar<T, R> traversar, final TraversarAction<T, R> action) {
@@ -143,9 +146,12 @@ public ExecutionResults<T, R> execute(final ExecutionConfig config) {
private void shutdownExecutors() {
this.immediatelyRetryExecutor.shutdown();
this.scheduledRetryExecutor.shutdown();
this.timeoutExecutor.shutdown();

try {
this.immediatelyRetryExecutor.awaitTermination(1, TimeUnit.NANOSECONDS);
this.scheduledRetryExecutor.awaitTermination(1, TimeUnit.NANOSECONDS);
this.timeoutExecutor.awaitTermination(1, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
logger.error("Error Shuting down Executor", e);
}
@@ -227,7 +233,7 @@ private void doAfterExecutionDone(final ExecutionConfig config, final ExecutionR
final Node<T, R> processedNode = state.getGraphNode(executionResult.getId());
updateNode(executionResult, processedNode);

if (executionResult.isSuccess()) {
if (executionResult.isSuccess() || executionResult.isCancelled()) {
state.markProcessingDone(processedNode);
}

@@ -316,8 +322,10 @@ private Integer getExecutionCount(final Node<T, R> node) {
private void updateNode(final ExecutionResult<T, R> executionResult, final Node<T, R> processedNode) {
updateExecutionCount(processedNode);
processedNode.setResult(executionResult.getResult());
if(executionResult.isErrored()) {
if (executionResult.isErrored()) {
processedNode.setErrored();
} else if (executionResult.isCancelled()) {
processedNode.setCancelled();
} else {
processedNode.setSuccess();
}
@@ -328,6 +336,7 @@ private void forceStopIfRequired() {
this.state.forcedStop();
this.immediatelyRetryExecutor.shutdownNow();
this.scheduledRetryExecutor.shutdownNow();
this.timeoutExecutor.shutdownNow();
throw new IllegalStateException("Forced to Stop the instance of Dexecutor!");
}
}
Original file line number Diff line number Diff line change
@@ -20,13 +20,16 @@
import static com.github.dexecutor.core.support.Preconditions.checkNotNull;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.dexecutor.core.concurrent.ExecutorCompletionService;
import com.github.dexecutor.core.concurrent.IdentifiableRunnableFuture;
import com.github.dexecutor.core.task.ExecutionResult;
import com.github.dexecutor.core.task.Task;
import com.github.dexecutor.core.task.TaskExecutionException;
@@ -46,8 +49,10 @@ public final class DefaultExecutionEngine<T, R> implements ExecutionEngine<T, R>

private ExecutionListener<T, R> executionListener = new QuiteExecutionListener<>();
private final ExecutorService executorService;
private final CompletionService<ExecutionResult<T, R>> completionService;

private final ExecutorCompletionService<T, ExecutionResult<T, R>> completionService;

private ScheduledExecutorService timeoutExecutor;

public DefaultExecutionEngine(final DexecutorState<T, R> state, final ExecutorService executorService) {
this(state, executorService, null);
}
@@ -62,35 +67,68 @@ public DefaultExecutionEngine(final DexecutorState<T, R> state, final ExecutorSe
checkNotNull(executorService, "Executer Service should not be null");
this.state = state;
this.executorService = executorService;
this.completionService = new ExecutorCompletionService<ExecutionResult<T, R>>(executorService);
this.completionService = new ExecutorCompletionService<T, ExecutionResult<T, R>>(executorService);
if (listener != null) {
this.executionListener = listener;
}
}

@Override
public ExecutionResult<T, R> processResult() {
T identifier = null;
try {
return this.completionService.take().get();
@SuppressWarnings("unchecked")
IdentifiableRunnableFuture<T, ExecutionResult<T, R>> future = (IdentifiableRunnableFuture<T, ExecutionResult<T, R>>) this.completionService.take();
identifier = future.getIdentifier();
if (future.isCancelled()) {
ExecutionResult<T, R> result = ExecutionResult.cancelled(future.getIdentifier(), "Task cancelled");
state.removeErrored(result);
return result;
} else {
return future.get();
}
} catch (Exception e) {
throw new TaskExecutionException("Task execution ", e);
throw new TaskExecutionException(identifier + " Task execution ", e);
}
}

@Override
public void submit(final Task<T, R> task) {
logger.debug("Received Task {} ", task.getId());
this.completionService.submit(newCallable(task));
Future<ExecutionResult<T, R>> future = this.completionService.submit(newCallable(task));

if (this.timeoutExecutor != null && task.getTimeout() != null) {
logger.trace("Task # {}, Is Timeout based", task.getId());
addTimeOut(task, future);
}
}

private void addTimeOut(Task<T, R> task, Future<ExecutionResult<T, R>> future) {

timeoutExecutor.schedule(new Runnable() {
public void run() {
if (task.isCompleted()) {
logger.trace("Task already completed {}", task);
} else if (task.isTimedOut()) {
boolean result = future.cancel(true);
logger.trace("Task timed out {}, cancelled it? : {}", task, result);
} else {
logger.trace("Task Not timed out {}, adding it back", task);
addTimeOut(task, future);
}
}
}, task.getTimeout().toNanos(), TimeUnit.NANOSECONDS);
}

private Callable<ExecutionResult<T, R>> newCallable(final Task<T, R> task) {
return new Callable<ExecutionResult<T,R>>() {
return new IdentifiableCallable<T, ExecutionResult<T,R>>() {

@Override
public ExecutionResult<T, R> call() throws Exception {
R r = null;
ExecutionResult<T, R> result = null;
try {
task.markStart();
r = task.execute();
result = ExecutionResult.success(task.getId(), r);
state.removeErrored(result);
@@ -100,9 +138,17 @@ public ExecutionResult<T, R> call() throws Exception {
state.addErrored(result);
executionListener.onError(task, e);
logger.error("Error Execution Task # {}", task.getId(), e);
} finally {
task.markEnd();
result.setTimes(task.getStartTime(), task.getEndTime());
}
return result;
}

@Override
public T getIdentifier() {
return task.getId();
}
};
}

@@ -125,4 +171,9 @@ public String toString() {
public void setExecutionListener(ExecutionListener<T, R> listener) {
this.executionListener = listener;
}

@Override
public void setTimeoutScheduler(ScheduledExecutorService timeoutExecutor) {
this.timeoutExecutor = timeoutExecutor;
}
}
362 changes: 188 additions & 174 deletions src/main/java/com/github/dexecutor/core/DexecutorConfig.java
Original file line number Diff line number Diff line change
@@ -1,174 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.dexecutor.core;

import static com.github.dexecutor.core.support.Preconditions.checkNotNull;

import java.util.concurrent.ExecutorService;

import com.github.dexecutor.core.graph.CyclicValidator;
import com.github.dexecutor.core.graph.Validator;
import com.github.dexecutor.core.task.TaskProvider;

/**
* <p>Configuration Object for Dexecutor framework. At a minimum it needs {@code ExecutorService} and {@code TaskProvider}, rest are optional and takes default values</p>
* <p>This provides way to hook in your own {@code DexecutorState} and {@code Validator} </p>
*
* @author Nadeem Mohammad
*
* @param <T> Type of Node/Task ID
* @param <R> Type of Node/Task result
*/
public class DexecutorConfig<T, R> {

/**
* Number of threads that should handle the immediate retry.
*/
private int immediateRetryPoolThreadsCount = 1;

/**
* Number of threads that should handle the immediate retry.
*/
private int scheduledRetryPoolThreadsCount = 1;

/**
* executor is the main platform on which tasks are executed
*/
private ExecutionEngine<T, R> executionEngine;

/**
* When it comes to task execution, task provider would be consulted to provide task objects for execution
*/
private TaskProvider<T, R> taskProvider;
/**
* Validator for validating the constructed graph, defaults to detecting Cyclic checks
*/
private Validator<T, R> validator = new CyclicValidator<T, R>();

private DexecutorState<T, R> dexecutorState = new DefaultDexecutorState<T, R>();

/**
* Construct the object with mandatory params, rest are optional
* @param executorService provided executor service
* @param taskProvider provided task provider
*/
public DexecutorConfig(final ExecutorService executorService, final TaskProvider<T, R> taskProvider) {
this(executorService, taskProvider, null);
}

public DexecutorConfig(final ExecutorService executorService, final TaskProvider<T, R> taskProvider, final ExecutionListener<T, R> listener) {
checkNotNull(executorService, "Executer Service should not be null");
checkNotNull(taskProvider, "Task Provider should not be null");
this.executionEngine = new DefaultExecutionEngine<>(this.dexecutorState, executorService, listener);
this.taskProvider = taskProvider;
}

/**
* Construct the object with mandatory params, rest are optional
* @param executionEngine provided execution Engine
* @param taskProvider provided task provider
*/
public DexecutorConfig(final ExecutionEngine<T, R> executionEngine, final TaskProvider<T, R> taskProvider) {
this(new DefaultDexecutorState<>(), executionEngine, taskProvider);
}

public DexecutorConfig(final DexecutorState<T, R> dexecutorState, final ExecutionEngine<T, R> executionEngine, final TaskProvider<T, R> taskProvider) {
checkNotNull(executionEngine, "Execution Engine should not be null");
checkNotNull(taskProvider, "Task Provider should not be null");
checkNotNull(dexecutorState, "Dexecutor State should not be null");
this.executionEngine = executionEngine;
this.taskProvider = taskProvider;
this.dexecutorState = dexecutorState;
}

void validate() {
checkNotNull(this.executionEngine, "Execution Engine should not be null");
checkNotNull(this.taskProvider, "Task Provider should not be null");
checkNotNull(this.validator, "Validator should not be null");
checkNotNull(this.dexecutorState, "Dexecutor State should not be null");
}

ExecutionEngine<T, R> getExecutorEngine() {
return this.executionEngine;
}

TaskProvider<T, R> getTaskProvider() {
return this.taskProvider;
}

Validator<T, R> getValidator() {
return this.validator;
}
/**
* change the validator to that of specified
* @param validator the validator
*/
public void setValidator(final Validator<T, R> validator) {
this.validator = validator;
}

/**
*
* @return the immediate retry thread pool count
*/
public int getImmediateRetryPoolThreadsCount() {
return immediateRetryPoolThreadsCount;
}
/**
* sets the immediate retry thread pool size to that of specified
* @param immediateRetryPoolThreadsCount Number of threads that should process retry immediately
*/
public void setImmediateRetryPoolThreadsCount(int immediateRetryPoolThreadsCount) {
this.immediateRetryPoolThreadsCount = immediateRetryPoolThreadsCount;
}
/**
*
* @return the scheduled retry thread pool size
*/
public int getScheduledRetryPoolThreadsCount() {
return scheduledRetryPoolThreadsCount;
}
/**
* sets the scheduled thread pool size to that of specified
* @param scheduledRetryPoolThreadsCount Number of threads that should process retry immediately
*/
public void setScheduledRetryPoolThreadsCount(int scheduledRetryPoolThreadsCount) {
this.scheduledRetryPoolThreadsCount = scheduledRetryPoolThreadsCount;
}
/**
*
* @return the dexecutor state
*/
public DexecutorState<T, R> getDexecutorState() {
return this.dexecutorState;
}

/**
*
* @param dexecutorState to set to
*/
public void setDexecutorState(final DexecutorState<T, R> dexecutorState) {
this.dexecutorState = dexecutorState;
}

public void setExecutionListener(ExecutionListener<T, R> listener) {
if (listener != null) {
this.executionEngine.setExecutionListener(listener);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.dexecutor.core;

import static com.github.dexecutor.core.support.Preconditions.checkNotNull;

import java.util.concurrent.ExecutorService;

import com.github.dexecutor.core.graph.CyclicValidator;
import com.github.dexecutor.core.graph.Validator;
import com.github.dexecutor.core.task.TaskProvider;

/**
* <p>Configuration Object for Dexecutor framework. At a minimum it needs {@code ExecutorService} and {@code TaskProvider}, rest are optional and takes default values</p>
* <p>This provides way to hook in your own {@code DexecutorState} and {@code Validator} </p>
*
* @author Nadeem Mohammad
*
* @param <T> Type of Node/Task ID
* @param <R> Type of Node/Task result
*/
public class DexecutorConfig<T, R> {

/**
* Number of threads that should handle the immediate retry.
*/
private int immediateRetryPoolThreadsCount = 1;

/**
* Number of threads that should handle the immediate retry.
*/
private int scheduledRetryPoolThreadsCount = 1;

/**
* Number of threads that should handle the timeout scheduler.
*/
private int timeoutSchedulerPoolThreadsCount = 2;

/**
* executor is the main platform on which tasks are executed
*/
private ExecutionEngine<T, R> executionEngine;

/**
* When it comes to task execution, task provider would be consulted to provide task objects for execution
*/
private TaskProvider<T, R> taskProvider;
/**
* Validator for validating the constructed graph, defaults to detecting Cyclic checks
*/
private Validator<T, R> validator = new CyclicValidator<T, R>();

private DexecutorState<T, R> dexecutorState = new DefaultDexecutorState<T, R>();

/**
* Construct the object with mandatory params, rest are optional
* @param executorService provided executor service
* @param taskProvider provided task provider
*/
public DexecutorConfig(final ExecutorService executorService, final TaskProvider<T, R> taskProvider) {
this(executorService, taskProvider, null);
}

public DexecutorConfig(final ExecutorService executorService, final TaskProvider<T, R> taskProvider, final ExecutionListener<T, R> listener) {
checkNotNull(executorService, "Executer Service should not be null");
checkNotNull(taskProvider, "Task Provider should not be null");
this.executionEngine = new DefaultExecutionEngine<>(this.dexecutorState, executorService, listener);
this.taskProvider = taskProvider;
}

/**
* Construct the object with mandatory params, rest are optional
* @param executionEngine provided execution Engine
* @param taskProvider provided task provider
*/
public DexecutorConfig(final ExecutionEngine<T, R> executionEngine, final TaskProvider<T, R> taskProvider) {
this(new DefaultDexecutorState<>(), executionEngine, taskProvider);
}

public DexecutorConfig(final DexecutorState<T, R> dexecutorState, final ExecutionEngine<T, R> executionEngine, final TaskProvider<T, R> taskProvider) {
checkNotNull(executionEngine, "Execution Engine should not be null");
checkNotNull(taskProvider, "Task Provider should not be null");
checkNotNull(dexecutorState, "Dexecutor State should not be null");
this.executionEngine = executionEngine;
this.taskProvider = taskProvider;
this.dexecutorState = dexecutorState;
}

void validate() {
checkNotNull(this.executionEngine, "Execution Engine should not be null");
checkNotNull(this.taskProvider, "Task Provider should not be null");
checkNotNull(this.validator, "Validator should not be null");
checkNotNull(this.dexecutorState, "Dexecutor State should not be null");
}

ExecutionEngine<T, R> getExecutorEngine() {
return this.executionEngine;
}

TaskProvider<T, R> getTaskProvider() {
return this.taskProvider;
}

Validator<T, R> getValidator() {
return this.validator;
}
/**
* change the validator to that of specified
* @param validator the validator
*/
public void setValidator(final Validator<T, R> validator) {
this.validator = validator;
}

/**
*
* @return the immediate retry thread pool count
*/
public int getImmediateRetryPoolThreadsCount() {
return immediateRetryPoolThreadsCount;
}
/**
* sets the immediate retry thread pool size to that of specified
* @param immediateRetryPoolThreadsCount Number of threads that should process retry immediately
*/
public void setImmediateRetryPoolThreadsCount(int immediateRetryPoolThreadsCount) {
this.immediateRetryPoolThreadsCount = immediateRetryPoolThreadsCount;
}
/**
*
* @return the scheduled retry thread pool size
*/
public int getScheduledRetryPoolThreadsCount() {
return scheduledRetryPoolThreadsCount;
}
/**
* sets the scheduled thread pool size to that of specified
* @param scheduledRetryPoolThreadsCount Number of threads that should process retry immediately
*/
public void setScheduledRetryPoolThreadsCount(int scheduledRetryPoolThreadsCount) {
this.scheduledRetryPoolThreadsCount = scheduledRetryPoolThreadsCount;
}

public int getTimeoutSchedulerPoolThreadsCount() {
return timeoutSchedulerPoolThreadsCount;
}

public void setTimeoutSchedulerPoolThreadsCount(int timeoutSchedulerPoolThreadsCount) {
this.timeoutSchedulerPoolThreadsCount = timeoutSchedulerPoolThreadsCount;
}

/**
*
* @return the dexecutor state
*/
public DexecutorState<T, R> getDexecutorState() {
return this.dexecutorState;
}

/**
*
* @param dexecutorState to set to
*/
public void setDexecutorState(final DexecutorState<T, R> dexecutorState) {
this.dexecutorState = dexecutorState;
}

public void setExecutionListener(ExecutionListener<T, R> listener) {
if (listener != null) {
this.executionEngine.setExecutionListener(listener);
}
}
}
4 changes: 4 additions & 0 deletions src/main/java/com/github/dexecutor/core/ExecutionEngine.java
Original file line number Diff line number Diff line change
@@ -17,6 +17,8 @@

package com.github.dexecutor.core;

import java.util.concurrent.ScheduledExecutorService;

import com.github.dexecutor.core.task.ExecutionResult;
import com.github.dexecutor.core.task.Task;
import com.github.dexecutor.core.task.TaskExecutionException;
@@ -61,4 +63,6 @@ public interface ExecutionEngine<T, R> {
* @param listener to notify
*/
void setExecutionListener(ExecutionListener<T, R> listener);

void setTimeoutScheduler(ScheduledExecutorService timeoutExecutor);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.github.dexecutor.core;

import java.util.concurrent.Callable;

public interface IdentifiableCallable<T, V> extends Callable<V>{

T getIdentifier();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package com.github.dexecutor.core.concurrent;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;

import com.github.dexecutor.core.IdentifiableCallable;

public class ExecutorCompletionService<T, V> implements CompletionService<V> {
private final Executor executor;
private final BlockingQueue<Future<V>> completionQueue;

/**
* FutureTask extension to enqueue upon completion
*/
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}

/**
* Creates an ExecutorCompletionService using the supplied
* executor for base task execution and a
* {@link LinkedBlockingQueue} as a completion queue.
*
* @param executor the executor to use
* @throws NullPointerException if executor is {@code null}
*/
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}

/**
* Creates an ExecutorCompletionService using the supplied
* executor for base task execution and the supplied queue as its
* completion queue.
*
* @param executor the executor to use
* @param completionQueue the queue to use as the completion queue
* normally one dedicated for use by this service. This
* queue is treated as unbounded -- failed attempted
* {@code Queue.add} operations for completed tasks cause
* them not to be retrievable.
* @throws NullPointerException if executor or completionQueue are {@code null}
*/
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.completionQueue = completionQueue;
}

public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}

public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
}

private RunnableFuture<V> newTaskFor(Callable<V> task) {

@SuppressWarnings("unchecked")
IdentifiableCallable<T, V> callable = (IdentifiableCallable<T, V>) task;

return new IdentifiableRunnableFuture<T, V>(callable.getIdentifier(), callable);
}

private RunnableFuture<V> newTaskFor(Runnable task, V result) {
@SuppressWarnings("unchecked")
IdentifiableCallable<T, V> callable = (IdentifiableCallable<T, V>) task;
return new IdentifiableRunnableFuture<T, V>(callable.getIdentifier(), task, result);
}

public Future<V> take() throws InterruptedException {
return completionQueue.take();
}

public Future<V> poll() {
return completionQueue.poll();
}

public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.github.dexecutor.core.concurrent;

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

public class IdentifiableRunnableFuture<T, V> extends FutureTask<V> {

private T identifier;

public IdentifiableRunnableFuture(T identifier, Callable<V> callable) {
super(callable);
this.identifier = identifier;
}

public IdentifiableRunnableFuture(T identifier, Runnable runnable, V result) {
super(runnable, result);
this.identifier = identifier;
}

public T getIdentifier() {
return identifier;
}
}
6 changes: 5 additions & 1 deletion src/main/java/com/github/dexecutor/core/graph/Node.java
Original file line number Diff line number Diff line change
@@ -161,6 +161,10 @@ public void setSuccess() {
public void setErrored() {
this.status = NodeStatus.ERRORED;
}

public void setCancelled() {
this.status = NodeStatus.CANCELLED;
}

/**
* Sets the node's execution result to SKIPPED
@@ -223,6 +227,6 @@ public String toString() {
*
*/
enum NodeStatus {
ERRORED,SKIPPED,SUCCESS;
ERRORED,SKIPPED,SUCCESS,CANCELLED;
}
}
Original file line number Diff line number Diff line change
@@ -17,6 +17,9 @@

package com.github.dexecutor.core.task;

import java.time.Duration;
import java.time.LocalDateTime;

import com.github.dexecutor.core.graph.NodeProvider;

/**
@@ -75,4 +78,39 @@ public ExecutionResults<T, R> getParentResults() {
public void setNodeProvider(NodeProvider<T, R> nodeProvider) {
this.task.setNodeProvider(nodeProvider);
}

@Override
public void markEnd() {
this.task.markEnd();
}

@Override
public void markStart() {
this.task.markStart();
}

@Override
public LocalDateTime getStartTime() {
return this.task.getStartTime();
}

@Override
public LocalDateTime getEndTime() {
return this.task.getEndTime();
}

@Override
public Duration getTimeout() {
return this.task.getTimeout();
}

@Override
public boolean isTimedOut() {
return this.task.isTimedOut();
}

@Override
public String toString() {
return this.task.toString();
}
}
38 changes: 35 additions & 3 deletions src/main/java/com/github/dexecutor/core/task/ExecutionResult.java
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
package com.github.dexecutor.core.task;

import java.io.Serializable;
import java.time.LocalDateTime;

/**
* Holds execution result of a node identified by id
@@ -37,6 +38,15 @@ public final class ExecutionResult<T, R> implements Serializable {
private R result;
private ExecutionStatus status = ExecutionStatus.SUCCESS;
private String message;

/**
* start time for the task
*/
private LocalDateTime startTime;
/**
* End time for the task
*/
private LocalDateTime endTime;

public ExecutionResult(final T id, final R result, final ExecutionStatus status) {
this(id, result, status, EMPTY);
@@ -56,6 +66,10 @@ public static <T, R> ExecutionResult<T, R> success(final T id, final R result) {
public static <T, R> ExecutionResult<T, R> errored(final T id, final R result, final String msg) {
return new ExecutionResult<T, R>(id, result, ExecutionStatus.ERRORED, msg);
}

public static <T, R> ExecutionResult<T, R> cancelled(final T id, final String msg) {
return new ExecutionResult<T, R>(id, null, ExecutionStatus.CANCELLED, msg);
}

/**
*
@@ -111,6 +125,10 @@ public boolean isSuccess() {
public boolean isErrored() {
return ExecutionStatus.ERRORED.equals(this.status);
}

public boolean isCancelled() {
return ExecutionStatus.CANCELLED.equals(this.status);
}

/**
*
@@ -128,7 +146,20 @@ public boolean isSkipped() {
public String getMessage() {
return message;
}


public void setTimes(LocalDateTime startTime, LocalDateTime endTime) {
this.startTime = startTime;
this.endTime = endTime;
}

public LocalDateTime getStartTime() {
return startTime;
}

public LocalDateTime getEndTime() {
return endTime;
}

@Override
public int hashCode() {
final int prime = 31;
@@ -157,6 +188,7 @@ public boolean equals(Object obj) {

@Override
public String toString() {
return "ExecutionResult [id=" + id + ", result=" + result + ", status=" + status + ", message=" + message + "]";
}
return "ExecutionResult [id=" + id + ", result=" + result + ", status=" + status + ", message=" + message
+ ", startTime=" + startTime + ", endTime=" + endTime + "]";
}
}
Original file line number Diff line number Diff line change
@@ -28,12 +28,15 @@
* <li>
* <code>SUCCESS</code> : Task Execution was successful
* </li>
* <li>
* <code>CANCELLED</code> : Task Execution was cancelled
* </li>
*
* </ul>
* @author Nadeem Mohammad
*
*/

public enum ExecutionStatus {
ERRORED, SKIPPED, SUCCESS;
ERRORED, SKIPPED, SUCCESS, CANCELLED;
}
47 changes: 47 additions & 0 deletions src/main/java/com/github/dexecutor/core/task/Task.java
Original file line number Diff line number Diff line change
@@ -18,6 +18,8 @@
package com.github.dexecutor.core.task;

import java.io.Serializable;
import java.time.Duration;
import java.time.LocalDateTime;

import com.github.dexecutor.core.graph.Node;
import com.github.dexecutor.core.graph.NodeProvider;
@@ -45,6 +47,14 @@ public abstract class Task<T, R> implements Serializable {
* id of the task, this would be same as that of {@code Node} id
*/
private T id;
/**
* start time for the task
*/
private LocalDateTime start;
/**
* End time for the task
*/
private LocalDateTime end;

/**
*
@@ -139,4 +149,41 @@ public void setConsiderExecutionError(boolean considerExecutionError) {
public boolean shouldExecute(final ExecutionResults<T, R> parentResults) {
return true;
}

public void markStart() {
if (this.start == null) {
this.start = LocalDateTime.now();
}
}

public LocalDateTime getStartTime() {
return this.start;
}

public void markEnd() {
if (this.end == null) {
this.end = LocalDateTime.now();
}
}

public LocalDateTime getEndTime() {
return this.end;
}

public boolean isCompleted() {
return getEndTime() != null;
}

public Duration getTimeout() {
return null;
}

public boolean isTimedOut() {
return getTimeout() != null && getStartTime() != null && getStartTime().plus(getTimeout()).isBefore(LocalDateTime.now());
}

@Override
public String toString() {
return "Task [id=" + id + ", start=" + start + ", timeout=" + getTimeout() + ", end=" + end + "]";
}
}
Original file line number Diff line number Diff line change
@@ -22,14 +22,14 @@
import static org.junit.Assert.assertThat;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

import com.github.dexecutor.core.concurrent.ExecutorCompletionService;
import com.github.dexecutor.core.task.ExecutionResult;
import com.github.dexecutor.core.task.TaskExecutionException;

@@ -65,7 +65,7 @@ public void processResult() throws InterruptedException {
}

private static class MockedCompletionService
extends MockUp<ExecutorCompletionService<ExecutionResult<Integer, Integer>>> {
extends MockUp<ExecutorCompletionService<Integer, ExecutionResult<Integer, Integer>>> {

@Mock
public void $init(Executor executor) {
136 changes: 136 additions & 0 deletions src/test/java/com/github/dexecutor/core/DexecutorTimeoutTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.github.dexecutor.core;

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.dexecutor.core.graph.Node;
import com.github.dexecutor.core.support.TestUtil;
import com.github.dexecutor.core.task.ExecutionResults;
import com.github.dexecutor.core.task.Task;
import com.github.dexecutor.core.task.TaskProvider;

/**
*
* @author Nadeem Mohammad
*
*/
public class DexecutorTimeoutTest {

private static final Logger logger = LoggerFactory.getLogger(DexecutorTimeoutTest.class);

@Test
public void testDependentTaskExecution() {
ExecutorService executorService = newExecutor();

try {
DexecutorConfig<Integer, Integer> config = new DexecutorConfig<>(executorService, new SleepyTaskProvider());
DefaultDexecutor<Integer, Integer> executor = new DefaultDexecutor<Integer, Integer>(config);
executor.addDependency(1, 2);
executor.addDependency(1, 2);
executor.addDependency(1, 3);
executor.addDependency(3, 4);
executor.addDependency(3, 5);
executor.addDependency(3, 6);
executor.addDependency(2, 7);
executor.addDependency(2, 9);
executor.addDependency(2, 8);
executor.addDependency(9, 10);
executor.addDependency(12, 13);
executor.addDependency(13, 4);
executor.addDependency(13, 14);
executor.addIndependent(11);

ExecutionResults<Integer, Integer> result = executor.execute(ExecutionConfig.TERMINATING);
System.out.println(result);

Collection<Node<Integer, Integer>> processedNodesOrder = TestUtil.processedNodesOrder(executor);
assertThat(processedNodesOrder).containsAll(executionOrderExpectedResult());
assertThat(processedNodesOrder).size().isGreaterThanOrEqualTo(4);

} finally {
try {
executorService.shutdownNow();
executorService.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {

}
}
}

private Collection<Node<Integer, Integer>> executionOrderExpectedResult() {
List<Node<Integer, Integer>> result = new ArrayList<Node<Integer, Integer>>();
result.add(new Node<Integer, Integer>(1));
//result.add(new Node<Integer, Integer>(2));
result.add(new Node<Integer, Integer>(11));
result.add(new Node<Integer, Integer>(12));
return result;
}

private ExecutorService newExecutor() {
return Executors.newFixedThreadPool(7);
}

private static class SleepyTaskProvider implements TaskProvider<Integer, Integer> {

public Task<Integer, Integer> provideTask(final Integer id) {

return new Task<Integer, Integer>() {

private static final long serialVersionUID = 1L;

public Integer execute() {

if (id == 14) {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
logger.error(this.toString(), e);
}
} else {
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
logger.error(this.toString(), e);
}
}

return id;
}

@Override
public Duration getTimeout() {
return Duration.ofMillis(1);
}
};
}
}

}
2 changes: 1 addition & 1 deletion src/test/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@
<appender-ref ref="CONSOLE" />
</logger>

<logger name="com.github.dexecutor" level="debug"
<logger name="com.github.dexecutor" level="trace"
additivity="false">
<appender-ref ref="CONSOLE" />
</logger>

0 comments on commit e197f36

Please sign in to comment.