Skip to content

Commit

Permalink
database update affected row
Browse files Browse the repository at this point in the history
  • Loading branch information
ponfee committed Nov 4, 2023
1 parent 06be4d3 commit 39f0bd6
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import java.util.*;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -54,9 +55,9 @@ public abstract class AbstractJobManager {

private static final int MAX_SPLIT_TASK_SIZE = 1000;
private static final List<TriggerType> FIXED_TYPES = ImmutableList.of(TriggerType.FIXED_RATE, TriggerType.FIXED_DELAY);
private static final int AFFECTED_ONE_ROW = 1;

protected static final String TX_MANAGER_NAME = DB_NAME + TX_MANAGER_NAME_SUFFIX;
protected static final int AFFECTED_ONE_ROW = 1;

protected final SchedJobMapper jobMapper;
protected final SchedDependMapper dependMapper;
Expand All @@ -69,11 +70,11 @@ public abstract class AbstractJobManager {
// ------------------------------------------------------------------database single operation without spring transactional

public boolean disableJob(SchedJob job) {
return jobMapper.disable(job) == AFFECTED_ONE_ROW;
return isOneAffectedRow(jobMapper.disable(job));
}

public boolean changeJobState(long jobId, JobState to) {
boolean flag = jobMapper.updateState(jobId, to.value(), 1 ^ to.value()) == AFFECTED_ONE_ROW;
boolean flag = isOneAffectedRow(jobMapper.updateState(jobId, to.value(), 1 ^ to.value()));
SchedJob job;
if (flag && to == JobState.ENABLE && TriggerType.FIXED_DELAY.equals((job = jobMapper.get(jobId)).getTriggerType())) {
Date date = null;
Expand All @@ -87,11 +88,11 @@ public boolean changeJobState(long jobId, JobState to) {
}

public boolean updateJobNextTriggerTime(SchedJob job) {
return jobMapper.updateNextTriggerTime(job) == AFFECTED_ONE_ROW;
return isOneAffectedRow(jobMapper.updateNextTriggerTime(job));
}

public boolean updateJobNextScanTime(SchedJob schedJob) {
return jobMapper.updateNextScanTime(schedJob) == AFFECTED_ONE_ROW;
return isOneAffectedRow(jobMapper.updateNextScanTime(schedJob));
}

// ------------------------------------------------------------------database operation within spring transactional
Expand Down Expand Up @@ -132,7 +133,7 @@ public void updateJob(SchedJob job) throws JobCheckedException {
}

job.setUpdatedAt(new Date());
Assert.state(jobMapper.update(job) == AFFECTED_ONE_ROW, "Update sched job fail or conflict.");
assertOneAffectedRow(jobMapper.update(job), "Update sched job fail or conflict.");
}

@Transactional(transactionManager = TX_MANAGER_NAME, rollbackFor = Exception.class)
Expand All @@ -142,13 +143,45 @@ public void deleteJob(long jobId) {
if (JobState.ENABLE.equals(job.getJobState())) {
throw new IllegalStateException("Please disable job before delete this job.");
}
Assert.isTrue(jobMapper.softDelete(jobId) == AFFECTED_ONE_ROW, "Delete sched job fail or conflict.");
assertOneAffectedRow(jobMapper.softDelete(jobId), "Delete sched job fail or conflict.");
dependMapper.deleteByParentJobId(jobId);
dependMapper.deleteByChildJobId(jobId);
}

// ------------------------------------------------------------------others operation

protected boolean isOneAffectedRow(int totalAffectedRow) {
return totalAffectedRow == AFFECTED_ONE_ROW;
}

protected boolean isManyAffectedRow(int totalAffectedRow) {
return totalAffectedRow >= AFFECTED_ONE_ROW;
}

protected void assertOneAffectedRow(int totalAffectedRow, Supplier<String> errorMsgSupplier) {
if (totalAffectedRow != AFFECTED_ONE_ROW) {
throw new IllegalStateException(errorMsgSupplier.get());
}
}

protected void assertOneAffectedRow(int totalAffectedRow, String errorMsg) {
if (totalAffectedRow != AFFECTED_ONE_ROW) {
throw new IllegalStateException(errorMsg);
}
}

protected void assertManyAffectedRow(int totalAffectedRow, Supplier<String> errorMsgSupplier) {
if (totalAffectedRow < AFFECTED_ONE_ROW) {
throw new IllegalStateException(errorMsgSupplier.get());
}
}

protected void assertManyAffectedRow(int totalAffectedRow, String errorMsg) {
if (totalAffectedRow < AFFECTED_ONE_ROW) {
throw new IllegalStateException(errorMsg);
}
}

public long generateId() {
return idGenerator.generateId();
}
Expand Down Expand Up @@ -295,8 +328,7 @@ private void parseTriggerConfig(SchedJob job, boolean isUpdate) {
} else {
Date nextTriggerTime;
if (FIXED_TYPES.contains(triggerType)) {
// initial delay 30 seconds
nextTriggerTime = Dates.max(Dates.plusSeconds(new Date(), 30), job.getStartTime());
nextTriggerTime = Dates.max(new Date(), job.getStartTime());
} else {
Date baseTime = Dates.max(new Date(), job.getStartTime());
nextTriggerTime = triggerType.computeNextFireTime(job.getTriggerValue(), baseTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,16 @@ public DistributedJobManager(SchedJobMapper jobMapper,
// ------------------------------------------------------------------database single operation without spring transactional

public boolean renewInstanceUpdateTime(SchedInstance instance, Date updateTime) {
return instanceMapper.renewUpdateTime(instance.getInstanceId(), updateTime, instance.getVersion()) == AFFECTED_ONE_ROW;
return isOneAffectedRow(instanceMapper.renewUpdateTime(instance.getInstanceId(), updateTime, instance.getVersion()));
}

@Override
protected boolean cancelWaitingTask(long taskId) {
return taskMapper.terminate(taskId, null, ExecuteState.BROADCAST_ABORTED.value(), ExecuteState.WAITING.value(), null, null) == AFFECTED_ONE_ROW;
return isOneAffectedRow(taskMapper.terminate(taskId, null, ExecuteState.BROADCAST_ABORTED.value(), ExecuteState.WAITING.value(), null, null));
}

public void savepoint(long taskId, String executeSnapshot) {
Assert.state(taskMapper.savepoint(taskId, executeSnapshot) == AFFECTED_ONE_ROW, () -> "Save point failed: " + taskId + " | " + executeSnapshot);
assertOneAffectedRow(taskMapper.savepoint(taskId, executeSnapshot), () -> "Save point failed: " + taskId + " | " + executeSnapshot);
}

// ------------------------------------------------------------------database operation within spring transactional
Expand Down Expand Up @@ -228,27 +228,27 @@ public void deleteInstance(long instanceId) {

// delete workflow lead instance
int row = instanceMapper.deleteByInstanceId(instanceId);
Assert.isTrue(row == AFFECTED_ONE_ROW, () -> "Delete workflow lead instance conflict: " + instanceId);
assertOneAffectedRow(row, () -> "Delete workflow lead instance conflict: " + instanceId);

// delete task
for (SchedInstance e : instanceMapper.findWorkflowNode(instance.getWnstanceId())) {
row = taskMapper.deleteByInstanceId(e.getInstanceId());
Assert.isTrue(row >= AFFECTED_ONE_ROW, () -> "Delete sched task conflict: " + instanceId);
assertManyAffectedRow(row, () -> "Delete sched task conflict: " + instanceId);
}

// delete workflow node instance
row = instanceMapper.deleteByWnstanceId(instanceId);
Assert.isTrue(row >= AFFECTED_ONE_ROW, () -> "Delete workflow node instance conflict: " + instanceId);
assertManyAffectedRow(row, () -> "Delete workflow node instance conflict: " + instanceId);

// delete workflow config
row = workflowMapper.deleteByWnstanceId(instanceId);
Assert.isTrue(row >= AFFECTED_ONE_ROW, () -> "Delete sched workflow conflict: " + instanceId);
assertManyAffectedRow(row, () -> "Delete sched workflow conflict: " + instanceId);
} else {
int row = instanceMapper.deleteByInstanceId(instanceId);
Assert.isTrue(row == AFFECTED_ONE_ROW, () -> "Delete sched instance conflict: " + instanceId);
assertOneAffectedRow(row, () -> "Delete sched instance conflict: " + instanceId);

row = taskMapper.deleteByInstanceId(instanceId);
Assert.isTrue(row >= AFFECTED_ONE_ROW, () -> "Delete sched task conflict: " + instanceId);
assertManyAffectedRow(row, () -> "Delete sched task conflict: " + instanceId);
}
LOG.info("Delete sched instance success {}", instanceId);
});
Expand Down Expand Up @@ -277,7 +277,7 @@ public boolean terminateTask(TerminateTaskParam param) {

Date executeEndTime = toState.isTerminal() ? new Date() : null;
int row = taskMapper.terminate(param.getTaskId(), param.getWorker(), toState.value(), ExecuteState.EXECUTING.value(), executeEndTime, param.getErrorMsg());
if (row != AFFECTED_ONE_ROW) {
if (!isOneAffectedRow(row)) {
// usual is worker invoke http timeout, then retry
LOG.warn("Conflict terminate executing task: {} | {}", param.getTaskId(), toState);
return false;
Expand Down Expand Up @@ -335,7 +335,7 @@ public boolean purgeInstance(SchedInstance inst) {
// cannot be paused
Assert.isTrue(tuple.a.isTerminal(), () -> "Purge instance state must be terminal state: " + instance);
}
if (instanceMapper.terminate(instanceId, tuple.a.value(), RUN_STATE_TERMINABLE, tuple.b) != AFFECTED_ONE_ROW) {
if (!isOneAffectedRow(instanceMapper.terminate(instanceId, tuple.a.value(), RUN_STATE_TERMINABLE, tuple.b))) {
return false;
}

Expand Down Expand Up @@ -443,7 +443,7 @@ public boolean resumeInstance(long instanceId) {
Assert.isTrue(instance.isWorkflowLead(), () -> "Cannot resume workflow node instance: " + instanceId);
// update sched_instance paused lead to running state
int row = instanceMapper.updateState(instanceId, RunState.RUNNING.value(), RunState.PAUSED.value());
Assert.state(row == AFFECTED_ONE_ROW, () -> "Resume workflow lead instance failed: " + instanceId);
assertOneAffectedRow(row, () -> "Resume workflow lead instance failed: " + instanceId);
workflowMapper.resumeWaiting(instanceId);
for (SchedInstance nodeInstance : instanceMapper.findWorkflowNode(instanceId)) {
if (RunState.PAUSED.equals(nodeInstance.getRunState())) {
Expand Down Expand Up @@ -540,7 +540,7 @@ private void pauseInstance(SchedInstance instance) {
// must be paused or terminate
Assert.notNull(tuple, () -> "Pause instance failed: " + instanceId);
int row = instanceMapper.terminate(instanceId, tuple.a.value(), RUN_STATE_PAUSABLE, tuple.b);
Assert.isTrue(row == AFFECTED_ONE_ROW, () -> "Pause instance failed: " + instance + " | " + tuple.a);
assertOneAffectedRow(row, () -> "Pause instance failed: " + instance + " | " + tuple.a);
if (instance.isWorkflowNode()) {
updateWorkflowEdgeState(instance, tuple.a.value(), RUN_STATE_PAUSABLE);
}
Expand Down Expand Up @@ -568,7 +568,7 @@ private void cancelInstance(SchedInstance instance, Operations ops) {

RunState toState = tuple.a;
int row = instanceMapper.terminate(instanceId, toState.value(), RUN_STATE_TERMINABLE, tuple.b);
Assert.isTrue(row == AFFECTED_ONE_ROW, () -> "Cancel instance failed: " + instance + " | " + toState);
assertOneAffectedRow(row, () -> "Cancel instance failed: " + instance + " | " + toState);
if (instance.isWorkflowNode()) {
updateWorkflowEdgeState(instance, tuple.a.value(), RUN_STATE_TERMINABLE);
}
Expand All @@ -581,10 +581,10 @@ private void cancelInstance(SchedInstance instance, Operations ops) {
private void resumeInstance(SchedInstance instance) {
long instanceId = instance.getInstanceId();
int row = instanceMapper.updateState(instanceId, RunState.WAITING.value(), RunState.PAUSED.value());
Assert.state(row == AFFECTED_ONE_ROW, "Resume sched instance failed.");
assertOneAffectedRow(row, "Resume sched instance failed.");

row = taskMapper.updateStateByInstanceId(instanceId, ExecuteState.WAITING.value(), EXECUTE_STATE_PAUSED, null);
Assert.state(row >= AFFECTED_ONE_ROW, "Resume sched task failed.");
assertManyAffectedRow(row, "Resume sched task failed.");

// dispatch task
Tuple3<SchedJob, SchedInstance, List<SchedTask>> params = buildDispatchParams(instanceId, row);
Expand All @@ -602,11 +602,11 @@ private void updateWorkflowLeadState(SchedInstance instance) {
if (graph.allMatch(e -> e.getValue().isTerminal())) {
RunState state = graph.anyMatch(e -> e.getValue().isFailure()) ? RunState.CANCELED : RunState.FINISHED;
int row = instanceMapper.terminate(instance.getWnstanceId(), state.value(), RUN_STATE_TERMINABLE, new Date());
Assert.isTrue(row == AFFECTED_ONE_ROW, () -> "Update workflow lead instance state failed: " + instance + " | " + state);
assertOneAffectedRow(row, () -> "Update workflow lead instance state failed: " + instance + " | " + state);
} else if (workflows.stream().noneMatch(e -> RunState.RUNNING.equals(e.getRunState()))) {
RunState state = RunState.PAUSED;
int row = instanceMapper.updateState(instance.getWnstanceId(), state.value(), instance.getRunState());
Assert.isTrue(row == AFFECTED_ONE_ROW, () -> "Update workflow lead instance state failed: " + instance + " | " + state);
assertOneAffectedRow(row, () -> "Update workflow lead instance state failed: " + instance + " | " + state);
}
}

Expand Down Expand Up @@ -739,7 +739,7 @@ private void processWorkflow(SchedInstance nodeInstance) {
if (graph.allMatch(e -> e.getValue().isTerminal())) {
RunState state = graph.anyMatch(e -> e.getValue().isFailure()) ? RunState.CANCELED : RunState.FINISHED;
int row = instanceMapper.terminate(wnstanceId, state.value(), RUN_STATE_TERMINABLE, new Date());
Assert.isTrue(row == AFFECTED_ONE_ROW, () -> "Terminate workflow lead instance failed: " + nodeInstance + " | " + state);
assertOneAffectedRow(row, () -> "Terminate workflow lead instance failed: " + nodeInstance + " | " + state);
afterTerminateTask(instanceMapper.get(wnstanceId));
return;
}
Expand Down Expand Up @@ -887,7 +887,7 @@ private List<ExecuteTaskParam> loadExecutingTasks(SchedInstance instance, Operat
// update dead task
Date executeEndTime = ops.toState().isTerminal() ? new Date() : null;
int row = taskMapper.terminate(task.getTaskId(), task.getWorker(), ops.toState().value(), ExecuteState.EXECUTING.value(), executeEndTime, null);
if (row != AFFECTED_ONE_ROW) {
if (!isOneAffectedRow(row)) {
LOG.error("Cancel the dead task failed: {}", task);
executingTasks.add(builder.build(ops, task.getTaskId(), triggerTime, worker));
} else {
Expand Down

0 comments on commit 39f0bd6

Please sign in to comment.