Skip to content

Commit

Permalink
Set unack/postpone duration based on task state
Browse files Browse the repository at this point in the history
  • Loading branch information
skorse committed Oct 11, 2022
1 parent 9361ba6 commit 769974c
Showing 1 changed file with 57 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import com.netflix.conductor.common.metadata.tasks.TaskDef;
import com.netflix.conductor.common.metadata.workflow.WorkflowTask;
import com.netflix.conductor.core.WorkflowContext;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.exception.NotFoundException;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.TaskModel.Status;
import com.netflix.conductor.model.WorkflowModel;

import static com.netflix.conductor.core.config.SchedulerConfiguration.SWEEPER_EXECUTOR_NAME;
Expand Down Expand Up @@ -64,6 +68,7 @@ public CompletableFuture<Void> sweepAsync(String workflowId) {
}

public void sweep(String workflowId) {
WorkflowModel workflow = null;
try {
WorkflowContext workflowContext = new WorkflowContext(properties.getAppId());
WorkflowContext.set(workflowContext);
Expand All @@ -74,11 +79,12 @@ public void sweep(String workflowId) {
workflowRepairService.verifyAndRepairWorkflowTasks(workflowId);
}

WorkflowModel workflow = workflowExecutor.decide(workflowId);
workflow = workflowExecutor.decide(workflowId);
if (workflow != null && workflow.getStatus().isTerminal()) {
queueDAO.remove(DECIDER_QUEUE, workflowId);
return;
}

} catch (NotFoundException nfe) {
queueDAO.remove(DECIDER_QUEUE, workflowId);
LOGGER.info(
Expand All @@ -88,7 +94,56 @@ public void sweep(String workflowId) {
Monitors.error(CLASS_NAME, "sweep");
LOGGER.error("Error running sweep for " + workflowId, e);
}
if (workflow != null) {
unack(workflow);
} else {
queueDAO.setUnackTimeout(
DECIDER_QUEUE, workflowId, properties.getWorkflowOffsetTimeout().toMillis());
}
}

void unack(WorkflowModel workflowModel) {
long postponeDurationSeconds = 0;
for (TaskModel taskModel : workflowModel.getTasks()) {
if (taskModel.getStatus() == Status.IN_PROGRESS) {
postponeDurationSeconds =
(taskModel.getResponseTimeoutSeconds() != 0)
? taskModel.getResponseTimeoutSeconds() + 1
: properties.getWorkflowOffsetTimeout().getSeconds();
break;
}
if (taskModel.getStatus() == Status.SCHEDULED) {
Optional<TaskDef> taskDefinition = taskModel.getTaskDefinition();
if (taskDefinition.isEmpty()) {
taskDefinition =
Optional.ofNullable(
workflowModel
.getWorkflowDefinition()
.getTaskByRefName(
taskModel.getReferenceTaskName()))
.map(WorkflowTask::getTaskDefinition);
}
if (taskDefinition.isPresent()) {
TaskDef taskDef = taskDefinition.get();
if (taskDef.getPollTimeoutSeconds() != null
&& taskDef.getPollTimeoutSeconds() != 0) {
postponeDurationSeconds = taskDef.getPollTimeoutSeconds() + 1;
} else {
postponeDurationSeconds =
(workflowModel.getWorkflowDefinition().getTimeoutSeconds() != 0)
? workflowModel.getWorkflowDefinition().getTimeoutSeconds()
: properties.getWorkflowOffsetTimeout().getSeconds();
}
} else {
postponeDurationSeconds =
(workflowModel.getWorkflowDefinition().getTimeoutSeconds() != 0)
? workflowModel.getWorkflowDefinition().getTimeoutSeconds()
: properties.getWorkflowOffsetTimeout().getSeconds();
}
break;
}
}
queueDAO.setUnackTimeout(
DECIDER_QUEUE, workflowId, properties.getWorkflowOffsetTimeout().toMillis());
DECIDER_QUEUE, workflowModel.getWorkflowId(), postponeDurationSeconds * 1000);
}
}

0 comments on commit 769974c

Please sign in to comment.