Skip to content

Commit

Permalink
状态state 管理
Browse files Browse the repository at this point in the history
  • Loading branch information
sizegang1 committed Feb 26, 2022
1 parent 71ea32b commit 73c1b50
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ public Map task(String object, Map<String, TaskWrapper> dataSources, Long busine
// System.out.println("开始执行A");
// System.out.println(1/0);
try {
System.out.println(1 / 0);
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
stopTaskFlow(dataSources, businessId, 100);
}
HashMap<Object, Object> objectObjectHashMap = new HashMap<>();
objectObjectHashMap.put("result", "我是B的结果");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public void testGobrs() {
} catch (Exception e) {
System.out.println("异常 " + e);
}
System.out.println(asyncResult.getCapCode());
Map<String, Object> data = asyncResult.getData();
System.out.println(JSONObject.toJSONString(data));
}
Expand Down
2 changes: 1 addition & 1 deletion gobrs-async-example/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ server:
spring:
gobrs:
async:
rules: '[{name: "test", content: "AService->BService,CService,GService,DService;EService->FService"}]'
rules: '[{name: "test", content: "AService->BService->CService,GService,DService;EService->FService"}]'
# rules: '[{name: "test", content: "BService->CService->EService->DService:not;AService->DService:not"}]'
task-interrupt: false #局部异常是否打断主流程
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@
**/
public class GobrsAsyncConstant {
public static final String DEFAULT_PARAMS = "default_params";
public static final Integer DEFAULT_CATCODE = -1;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.jd.gobrs.async.constant;

/**
* @program: gobrs-async
* @ClassName StateConstan
* @description:
* @author: sizegang
* @create: 2022-02-26 18:05
* @Version 1.0
**/
public class StateConstant {
public static final int WORKING = 3;
public static final int STOP = 4;
public static final int FINISH = 1;
public static final int ERROR = 2;
public static final int INIT = 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public static AsyncResult startTaskFlow(long timeout, ThreadPoolTaskExecutor exe
CompletableFuture[] futures = new CompletableFuture[taskWrappers.size()];
for (int i = 0; i < taskWrappers.size(); i++) {
TaskWrapper wrapper = taskWrappers.get(i);
futures[i] = CompletableFuture.runAsync(() -> wrapper.task(executorService, timeout, dataSource, params, businessId), executorService);
futures[i] = CompletableFuture.runAsync(() -> wrapper.task(executorService, timeout, dataSource, params, businessId), executorService);
}
try {
CompletableFuture.allOf(futures).get(timeout, TimeUnit.MILLISECONDS);
Expand All @@ -79,6 +79,10 @@ private static AsyncResult buildResult(Map<String, TaskWrapper> dataSource, Long
AsyncResult asyncResult = new AsyncResult();
asyncResult.setDatasources(dataSource);
asyncResult.setBusinessId(businessId);
GobrsFlowState.GobrsState gobrsState = GobrsFlowState.gobrsFlowState.get(businessId);
if (gobrsState != null) {
asyncResult.setCapCode(gobrsState.getCapCode());
}
return asyncResult;
}

Expand All @@ -94,8 +98,9 @@ private static void doRelease(List<TaskWrapper> taskWrappers, Long businessId) {
}
taskWrappers.parallelStream().forEach(x -> {
x.workResult.remove(businessId);
doRelease(x.getNextWrappers(), businessId);
x.state.remove(businessId);
GobrsFlowState.gobrsFlowState.remove(businessId);
doRelease(x.getNextWrappers(), businessId);
});
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.jd.gobrs.async.gobrs;

import com.jd.gobrs.async.constant.StateConstant;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -13,28 +15,55 @@
* @Version 1.0
**/
public class GobrsFlowState {
public static Map<Long, Integer> gobrsFlowState = new ConcurrentHashMap<>();
public static final int FINISH = 2;
public static final int ERROR = 1;
public static final int WORKING = 0;
public static Map<Long, GobrsState> gobrsFlowState = new ConcurrentHashMap<>();
public static final int FINISH = StateConstant.FINISH;
public static final int ERROR = StateConstant.ERROR;
public static final int WORKING = StateConstant.WORKING;
public static final int STOP = StateConstant.STOP;

public static boolean compareAndSetState(int expect, int update, long businessId) {
Integer st = gobrsFlowState.get(businessId);
if (st == null) {
st = WORKING;
if (expect == st) {
st = update;
gobrsFlowState.put(businessId, st);
GobrsState gobrsState = gobrsFlowState.get(businessId);
if (gobrsState == null) {
GobrsState gs = new GobrsState(WORKING);
if (gs.getState() == expect) {
gs.setState(update);
gobrsFlowState.put(businessId, gs);
return true;
}
return false;
}
if (expect == st) {
st = update;
gobrsFlowState.put(businessId, st);
if (expect == gobrsState.getState()) {
gobrsState.setState(update);
gobrsFlowState.put(businessId, gobrsState);
return true;
}
return false;
}

public static class GobrsState {
public GobrsState(Integer state) {
this.state = state;
}

private int state;
private int capCode;

public Integer getState() {
return state;
}

public void setState(Integer state) {
this.state = state;
}

public Integer getCapCode() {
return capCode;
}

public void setCapCode(Integer capCode) {
this.capCode = capCode;
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ public class AsyncResult<P> implements Serializable {

private Long businessId;
Map<String, TaskWrapper> datasources;
private Integer capCode;



public P getData(Class clazz) {
TaskWrapper taskWrapper;
Expand Down Expand Up @@ -65,5 +68,11 @@ public void setDatasources(Map<String, TaskWrapper> datasources) {
this.datasources = datasources;
}

public Integer getCapCode() {
return capCode;
}

public void setCapCode(Integer capCode) {
this.capCode = capCode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.jd.gobrs.async.callback.ICallback;
import com.jd.gobrs.async.callback.ITask;
import com.jd.gobrs.async.constant.GobrsAsyncConstant;
import com.jd.gobrs.async.gobrs.GobrsFlowState;
import com.jd.gobrs.async.wrapper.TaskWrapper;

import java.util.Map;
Expand All @@ -14,7 +16,7 @@
* @create: 2022-01-28 00:16
* @Version 1.0
**/
public interface AsyncTask<T, V,P> extends ITask<T, V>, ICallback<T, V> {
public interface AsyncTask<T, V, P> extends ITask<T, V>, ICallback<T, V> {
/**
* 根据业务实现 判断是否需要执行当前task
*
Expand All @@ -40,15 +42,28 @@ default String depKey(Class clazz) {
* @return
*/
default P getData(Map<String, TaskWrapper> datasources, Long businessId, Class clazz) {
TaskWrapper taskWrapper;
if (datasources.get(clazz.getSimpleName()) != null) {
taskWrapper = datasources.get(clazz.getSimpleName());
} else {
taskWrapper = datasources.get(depKey(clazz));
}
TaskWrapper taskWrapper = datasources.get(clazz.getSimpleName()) != null
? datasources.get(clazz.getSimpleName()) : datasources.get(depKey(clazz));
if (taskWrapper == null) {
return null;
}
return (P)taskWrapper.getWorkResult(businessId);
return (P) taskWrapper.getWorkResult(businessId);
}


default boolean stopTaskFlow(Map<String, TaskWrapper> datasources, Long businessId, Integer capCode) {
Class<? extends AsyncTask> aClass = this.getClass();
TaskWrapper taskWrapper = datasources.get(aClass.getSimpleName()) != null
? datasources.get(aClass.getSimpleName()) : datasources.get(depKey(aClass));
if (taskWrapper == null) {
return false;
}
return taskWrapper.compareAndSetState(GobrsFlowState.WORKING, GobrsFlowState.ERROR, businessId);
}


default void stopTaskFlow(Map<String, TaskWrapper> datasources, Long businessId) {
stopTaskFlow(datasources, businessId, GobrsAsyncConstant.DEFAULT_CATCODE);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.jd.gobrs.async.callback.DefaultCallback;
import com.jd.gobrs.async.callback.ICallback;
import com.jd.gobrs.async.constant.GobrsAsyncConstant;
import com.jd.gobrs.async.constant.StateConstant;
import com.jd.gobrs.async.exception.AsyncExceptionInterceptor;
import com.jd.gobrs.async.exception.SkippedException;
import com.jd.gobrs.async.gobrs.GobrsFlowState;
Expand Down Expand Up @@ -73,15 +74,15 @@ public void setDependWrappers(List<DependWrapper> dependWrappers) {
* <p>
* 1-finish, 2-error, 3-working
*/
public volatile ConcurrentHashMap<Long, Integer> state = new ConcurrentHashMap<Long, Integer>();
public volatile ConcurrentHashMap<Long, Integer> state = new ConcurrentHashMap<>();
/**
* 该map存放所有wrapper的id和wrapper映射
*/
private Map<String, TaskWrapper> forParamUseWrappers;
/**
* 也是个钩子变量,用来存临时的结果
*/
public volatile ConcurrentHashMap<Long, TaskResult<V>> workResult = new ConcurrentHashMap<Long, TaskResult<V>>();
public volatile ConcurrentHashMap<Long, TaskResult<V>> workResult = new ConcurrentHashMap<>();
// private volatile TaskResult<V> workResult = TaskResult.defaultResult();
/**
* 是否在执行自己前,去校验nextWrapper的执行结果<p>
Expand All @@ -93,10 +94,10 @@ public void setDependWrappers(List<DependWrapper> dependWrappers) {
*/
private volatile boolean needCheckNextWrapperResult = true;

private static final int FINISH = 1;
private static final int ERROR = 2;
private static final int WORKING = 3;
private static final int INIT = 0;
private static final int FINISH = StateConstant.FINISH;
private static final int ERROR = StateConstant.ERROR;
private static final int WORKING = StateConstant.WORKING;
private static final int INIT = StateConstant.INIT;

private TaskWrapper(String id, ITask<T, V> worker, T param, ICallback<T, V> callback) {
if (worker == null) {
Expand Down Expand Up @@ -314,7 +315,8 @@ private void doDependsJobs(ThreadPoolTaskExecutor executorService, List<DependWr
*/
private void doTask(Map<String, Object> params, long businessId) {
if (gobrsAsyncProperties.isTaskInterrupt() &&
GobrsFlowState.compareAndSetState(GobrsFlowState.ERROR, GobrsFlowState.FINISH, businessId)) {
(GobrsFlowState.compareAndSetState(GobrsFlowState.ERROR, GobrsFlowState.FINISH, businessId)
|| GobrsFlowState.compareAndSetState(GobrsFlowState.STOP,GobrsFlowState.FINISH, businessId ))) {
return;
}
//执行结果
Expand Down Expand Up @@ -393,7 +395,7 @@ private TaskResult<V> doTaskDep(Map<String, Object> params, Long businessId) {
return objectTaskResult;
}
taskFail(WORKING, e, businessId);
if(gobrsAsyncProperties.isTaskInterrupt()){
if (gobrsAsyncProperties.isTaskInterrupt()) {
throw e;
}
return objectTaskResult;
Expand Down Expand Up @@ -490,7 +492,7 @@ public String getId() {
return id;
}

private boolean compareAndSetState(int expect, int update, long businessId) {
public boolean compareAndSetState(int expect, int update, long businessId) {
Integer st = this.state.get(businessId);
if (st == null) {
if (expect == INIT) {
Expand Down

0 comments on commit 73c1b50

Please sign in to comment.