Skip to content

Commit

Permalink
optimize lockstrategy
Browse files Browse the repository at this point in the history
  • Loading branch information
vavi committed Nov 2, 2022
1 parent 829aaf1 commit b4be7db
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 126 deletions.
1 change: 0 additions & 1 deletion CHANELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# 3.0.0
1. 完善了对嵌套并行网关,并行网关嵌套子流程在多线程情况下的测试.
2. [微型非兼容升级] 去掉了针对signal的锁机制保护,防止和join的锁机制增加锁重入复杂度.
3. 其他待补充

# 2.6.4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ private boolean processDefaultLogic(ExecutionContext context, PvmActivity pvmAct
for (Entry<String, PvmTransition> pvmTransitionEntry : outcomeTransitions.entrySet()) {
PvmActivity target = pvmTransitionEntry.getValue().getTarget();

//注意,ExecutionContext 在多线程情况下,必须要新建对象,防止一些变量被并发修改.
ExecutionContext subThreadContext = contextFactory.createChildThreadContext(context);
PvmActivityTask task = new PvmActivityTask(target,subThreadContext);

Expand All @@ -132,11 +133,18 @@ private boolean processDefaultLogic(ExecutionContext context, PvmActivity pvmAct

} else if (outComeTransitionSize == 1 && inComeTransitionSize >= 2) {
//join 时必须使用分布式锁。
// update at 2022.10.31 这里的缩粒度不够大,在极端环境下,还是存在数据可见性的问题.
// 比如说,当这个锁结束后, 外面还需要进行持久化数据. 理论上,另外一个线程进来执行时,可能这个持久化数据还未完成.
// 所以这里取消掉锁,改为外部锁

LockStrategy lockStrategy = context.getProcessEngineConfiguration().getLockStrategy();
String processInstanceId = context.getProcessInstance().getInstanceId();
try{
lockStrategy.tryLock(processInstanceId,context);
if(null == lockStrategy){
throw new EngineException("LockStrategy must be implemented for ParallelGateway");
}

// String processInstanceId = context.getProcessInstance().getInstanceId();
// try{
// lockStrategy.tryLock(processInstanceId,context);

super.enter(context, pvmActivity);

Expand Down Expand Up @@ -207,10 +215,10 @@ private boolean processDefaultLogic(ExecutionContext context, PvmActivity pvmAct
return true;
}

}finally {

lockStrategy.unLock(processInstanceId,context);
}
// }finally {
//
// lockStrategy.unLock(processInstanceId,context);
// }

}else{
throw new EngineException("should not touch here:"+pvmActivity);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ public PvmActivityTask(PvmActivity pvmActivity,ExecutionContext context) {
public PvmActivity call() {

PvmActivity pvmActivity ;
try {
GatewaySticker.create();
// try {
// GatewaySticker.create();

//忽略了子线程的返回值
this.pvmActivity.enter(context);

// pvmActivity = GatewaySticker.currentSession().getPvmActivity();

}finally {

GatewaySticker.destroySession();
}
// }finally {
//
// GatewaySticker.destroySession();
// }

return null;
// return pvmActivity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,34 +31,34 @@
public abstract class CommonServiceHelper {


// public static void tryLock(ProcessEngineConfiguration processEngineConfiguration,
// ProcessInstance processInstance) {
// LockStrategy lockStrategy = processEngineConfiguration.getLockStrategy();
// if(null != lockStrategy){
// String bizUniqueId = processInstance.getBizUniqueId();
//
// if(null != bizUniqueId){
// lockStrategy.tryLock(bizUniqueId,null);
// }else {
// lockStrategy.tryLock(processInstance.getInstanceId(),null);
// }
// }
// }
//
//
// public static void tryUnlock(ProcessEngineConfiguration processEngineConfiguration,
// ProcessInstance processInstance) {
// LockStrategy lockStrategy = processEngineConfiguration.getLockStrategy();
// if(null != lockStrategy){
// String bizUniqueId = processInstance.getBizUniqueId();
//
// if(null != bizUniqueId){
// lockStrategy.unLock(bizUniqueId,null);
// }else {
// lockStrategy.unLock(processInstance.getInstanceId(),null);
// }
// }
// }
public static void tryLock(ProcessEngineConfiguration processEngineConfiguration,
ProcessInstance processInstance) {
LockStrategy lockStrategy = processEngineConfiguration.getLockStrategy();
if(null != lockStrategy){
String bizUniqueId = processInstance.getBizUniqueId();

if(null != bizUniqueId){
lockStrategy.tryLock(bizUniqueId,null);
}else {
lockStrategy.tryLock(processInstance.getInstanceId(),null);
}
}
}


public static void tryUnlock(ProcessEngineConfiguration processEngineConfiguration,
ProcessInstance processInstance) {
LockStrategy lockStrategy = processEngineConfiguration.getLockStrategy();
if(null != lockStrategy){
String bizUniqueId = processInstance.getBizUniqueId();

if(null != bizUniqueId){
lockStrategy.unLock(bizUniqueId,null);
}else {
lockStrategy.unLock(processInstance.getInstanceId(),null);
}
}
}

public static ProcessInstance insertAndPersist(ProcessInstance processInstance, Map<String, Object> request,
ProcessEngineConfiguration processEngineConfiguration) {
Expand All @@ -71,17 +71,17 @@ public static ProcessInstance insertAndPersist(ProcessInstance processInstance,

ProcessInstance newProcessInstance = processInstanceStorage.insert(processInstance, processEngineConfiguration);

persisteVariableInstanceIfPossible(request, processEngineConfiguration,
persistVariableInstanceIfPossible(request, processEngineConfiguration,
newProcessInstance, AdHocConstant.DEFAULT_ZERO_VALUE);

persist(newProcessInstance,processEngineConfiguration);

return newProcessInstance;
}

private static void persisteVariableInstanceIfPossible(Map<String, Object> request,
ProcessEngineConfiguration processEngineConfiguration,
ProcessInstance newProcessInstance,String executionInstanceId) {
private static void persistVariableInstanceIfPossible(Map<String, Object> request,
ProcessEngineConfiguration processEngineConfiguration,
ProcessInstance newProcessInstance, String executionInstanceId) {
VariablePersister variablePersister = processEngineConfiguration.getVariablePersister();
if( variablePersister.isPersisteVariableInstanceEnabled() && null!= request ){
AnnotationScanner annotationScanner = processEngineConfiguration.getAnnotationScanner();
Expand Down Expand Up @@ -121,7 +121,7 @@ public static ProcessInstance createExecution(String executionInstanceId, Proce

ProcessInstance newProcessInstance = processInstanceStorage.update(processInstance,processEngineConfiguration );

persisteVariableInstanceIfPossible(request, processEngineConfiguration,
persistVariableInstanceIfPossible(request, processEngineConfiguration,
newProcessInstance,executionInstanceId);

persist(processInstance , processEngineConfiguration );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,11 @@ public ProcessInstance signal(String executionInstanceId, Map<String, Object> re
public ProcessInstance signal(String executionInstanceId, Map<String, Object> request,
Map<String, Object> response) {


ExecutionInstance executionInstance = queryExecutionInstance(executionInstanceId);

ProcessInstance processInstance = processInstanceStorage.findOne(executionInstance.getProcessInstanceId()
, processEngineConfiguration);


, processEngineConfiguration);
try {

PreparePhase preparePhase = new PreparePhase(request, executionInstance, processInstance,instanceContextFactory).init();
Expand All @@ -114,7 +113,8 @@ public ProcessInstance signal(String executionInstanceId, Map<String, Object> re

return newProcessInstance;
} finally {
// CommonServiceHelper.tryUnlock(processEngineConfiguration, processInstance);

CommonServiceHelper.tryUnlock(processEngineConfiguration, processInstance);
}
}

Expand Down Expand Up @@ -146,7 +146,7 @@ public ProcessInstance signal(String processInstanceId, String executionInstance

return newProcessInstance;
} finally {
// CommonServiceHelper.tryUnlock(processEngineConfiguration, processInstance);
CommonServiceHelper.tryUnlock(processEngineConfiguration, processInstance);
}
}

Expand Down Expand Up @@ -314,7 +314,7 @@ public ExecutionContext getExecutionContext() {
public PreparePhase init() {


// CommonServiceHelper.tryLock(processEngineConfiguration, processInstance);
CommonServiceHelper.tryLock(processEngineConfiguration, processInstance);

//TUNE 校验是否有子流程的执行实例依赖这个父执行实例。

Expand All @@ -337,7 +337,7 @@ public PreparePhase init() {
}

public PreparePhase initWithShading() {
// CommonServiceHelper.tryLock(processEngineConfiguration, processInstance);
CommonServiceHelper.tryLock(processEngineConfiguration, processInstance);

//TUNE 校验是否有子流程的执行实例依赖这个父执行实例。
//BE AWARE: 注意:针对 CUSTOM 场景,由于性能考虑,这里的activityInstance可能为空。调用的地方需要判空。
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public ProcessInstance start(String processDefinitionId, String processDefinitio
PvmProcessInstance pvmProcessInstance = new DefaultPvmProcessInstance();

try {
// CommonServiceHelper.tryLock(processEngineConfiguration, processInstance);
CommonServiceHelper.tryLock(processEngineConfiguration, processInstance);

processInstance = pvmProcessInstance.start(executionContext);

Expand All @@ -97,7 +97,7 @@ public ProcessInstance start(String processDefinitionId, String processDefinitio
return processInstance;

} finally {
// CommonServiceHelper.tryUnlock(processEngineConfiguration, processInstance);
CommonServiceHelper.tryUnlock(processEngineConfiguration, processInstance);
}
}

Expand Down

This file was deleted.

Loading

0 comments on commit b4be7db

Please sign in to comment.