Skip to content

Commit

Permalink
fix: Functions unnecessarily restart during FunctionRuntimeManager in…
Browse files Browse the repository at this point in the history
…it phase (apache#5527)

* fix: Functions unnecessarily restart during FunctionRuntimeManager init phase
  • Loading branch information
jerrypeng authored and wolfstudy committed Nov 1, 2019
1 parent 79f4710 commit dd58baf
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,37 @@ public class FunctionRuntimeManager implements AutoCloseable{
// All the runtime info related to functions executed by this worker
// Fully Qualified InstanceId - > FunctionRuntimeInfo
@VisibleForTesting
Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new ConcurrentHashMap<>();
class FunctionRuntimeInfos {

private Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new ConcurrentHashMap<>();

public FunctionRuntimeInfo get(String fullyQualifiedInstanceId) {
return functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
}

public void put(String fullyQualifiedInstanceId, FunctionRuntimeInfo functionRuntimeInfo) {
if (!isInitializePhase) {
functionRuntimeInfoMap.put(fullyQualifiedInstanceId, functionRuntimeInfo);
}
}

public void remove (String fullyQualifiedInstanceId) {
if (!isInitializePhase) {
functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
}
}

public Map<String, FunctionRuntimeInfo> getAll() {
return functionRuntimeInfoMap;
}

public int size() {
return functionRuntimeInfoMap.size();
}
}

@VisibleForTesting
final FunctionRuntimeInfos functionRuntimeInfos = new FunctionRuntimeInfos();

@VisibleForTesting
@Getter
Expand All @@ -100,8 +130,6 @@ public class FunctionRuntimeManager implements AutoCloseable{
@Getter
private WorkerService workerService;

@Setter
@Getter
boolean isInitializePhase = false;

private final FunctionMetaDataManager functionMetaDataManager;
Expand Down Expand Up @@ -212,12 +240,14 @@ public void initialize() {
.startMessageId(MessageId.earliest).create();

this.functionAssignmentTailer = new FunctionAssignmentTailer(this, reader);
// start init phase
this.isInitializePhase = true;
// read all existing messages
this.setInitializePhase(true);
while (reader.hasMessageAvailable()) {
this.functionAssignmentTailer.processAssignment(reader.readNext());
}
this.setInitializePhase(false);
// init phase is done
this.isInitializePhase = false;
// realize existing assignments
Map<String, Assignment> assignmentMap = workerIdToAssignments.get(this.workerConfig.getWorkerId());
if (assignmentMap != null) {
Expand Down Expand Up @@ -651,10 +681,10 @@ private void updateAssignment(Assignment assignment) {
newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());

this.conditionallyStartFunction(newFunctionRuntimeInfo);
this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
this.functionRuntimeInfos.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
}
} else {
this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
this.functionRuntimeInfos.remove(fullyQualifiedInstanceId);
}
} else {
// if assignment got transferred to me just set function runtime
Expand All @@ -668,9 +698,9 @@ private void updateAssignment(Assignment assignment) {
runtimeSpawner.getRuntime().reinitialize();
newFunctionRuntimeInfo.setRuntimeSpawner(runtimeSpawner);

this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
this.functionRuntimeInfos.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
} else {
this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
this.functionRuntimeInfos.remove(fullyQualifiedInstanceId);
}
}
} else {
Expand All @@ -685,10 +715,10 @@ private void updateAssignment(Assignment assignment) {
FunctionRuntimeInfo newFunctionRuntimeInfo = new FunctionRuntimeInfo();
newFunctionRuntimeInfo.setFunctionInstance(assignment.getInstance());
this.conditionallyStartFunction(newFunctionRuntimeInfo);
this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
this.functionRuntimeInfos.put(fullyQualifiedInstanceId, newFunctionRuntimeInfo);
}
} else {
this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
this.functionRuntimeInfos.remove(fullyQualifiedInstanceId);
}
}

Expand Down Expand Up @@ -732,7 +762,7 @@ public synchronized void deleteAssignment(String fullyQualifiedInstanceId) {
this.conditionallyTerminateFunction(functionRuntimeInfo);
}
}
this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
this.functionRuntimeInfos.remove(fullyQualifiedInstanceId);
}

String workerId = null;
Expand Down Expand Up @@ -773,13 +803,14 @@ private void addAssignment(Assignment assignment) {
}

private void startFunctionInstance(Assignment assignment) {
log.info("infos: {}", functionRuntimeInfos.getAll());
String fullyQualifiedInstanceId = FunctionCommon.getFullyQualifiedInstanceId(assignment.getInstance());
FunctionRuntimeInfo functionRuntimeInfo = _getFunctionRuntimeInfo(fullyQualifiedInstanceId);

if (functionRuntimeInfo == null) {
functionRuntimeInfo = new FunctionRuntimeInfo()
.setFunctionInstance(assignment.getInstance());
this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, functionRuntimeInfo);
this.functionRuntimeInfos.put(fullyQualifiedInstanceId, functionRuntimeInfo);
} else {
//Somehow this function is already started
log.warn("Function {} already running. Going to restart function.",
Expand All @@ -790,7 +821,7 @@ private void startFunctionInstance(Assignment assignment) {
}

public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() {
return this.functionRuntimeInfoMap;
return this.functionRuntimeInfos.getAll();
}

/**
Expand Down Expand Up @@ -844,7 +875,7 @@ public synchronized FunctionRuntimeInfo getFunctionRuntimeInfo(String fullyQuali
}

private FunctionRuntimeInfo _getFunctionRuntimeInfo(String fullyQualifiedInstanceId) {
FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfos.get(fullyQualifiedInstanceId);

// sanity check to make sure assignments and runtimeinfo is in sync
if (functionRuntimeInfo == null) {
Expand Down
Loading

0 comments on commit dd58baf

Please sign in to comment.