Skip to content

Commit

Permalink
add initialize routine to FunctionRuntimeManager (apache#2784)
Browse files Browse the repository at this point in the history
  • Loading branch information
jerrypeng authored and srkukarni committed Oct 12, 2018
1 parent 3a66808 commit a1a1abe
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,11 @@ public class FunctionAssignmentTailer
private final FunctionRuntimeManager functionRuntimeManager;
private final Reader<byte[]> reader;

public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager)
public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, Reader<byte[]> reader)
throws PulsarClientException {
this.functionRuntimeManager = functionRuntimeManager;

this.reader = functionRuntimeManager.getWorkerService().getClient().newReader()
.topic(functionRuntimeManager.getWorkerConfig().getFunctionAssignmentTopic()).readCompacted(true)
.startMessageId(MessageId.earliest).create();
this.reader = reader;
}

public void start() {
Expand All @@ -66,8 +64,7 @@ public void close() {
log.info("Stopped function state consumer");
}

@Override
public void accept(Message<byte[]> msg) {
public void processAssignment(Message<byte[]> msg) {
if(msg.getData()==null || (msg.getData().length==0)) {
log.info("Received assignment delete: {}", msg.getKey());
this.functionRuntimeManager.deleteAssignment(msg.getKey());
Expand All @@ -82,8 +79,13 @@ public void accept(Message<byte[]> msg) {
throw new RuntimeException(e);
}
log.info("Received assignment update: {}", assignment);
this.functionRuntimeManager.processAssignment(assignment);
this.functionRuntimeManager.processAssignment(assignment);
}
}

@Override
public void accept(Message<byte[]> msg) {
processAssignment(msg);
// receive next request
receiveOne();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,11 @@ public FunctionMetaDataManager(WorkerConfig workerConfig,

/**
* Initializes the FunctionMetaDataManager. Does the following:
* 1. Restores from snapshot if one exists
* 2. Sends out initialize marker to FMT and consume messages until the initialize marker is consumed
* 1. Consume all existing function meta data upon start to establish existing state
*/
public void initialize() {
log.info("/** Initializing Function Metadata Manager **/");
try {

Reader<byte[]> reader = pulsarClient.newReader()
.topic(this.workerConfig.getFunctionMetadataTopic())
.startMessageId(MessageId.earliest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,13 @@
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;

import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function.Assignment;
Expand All @@ -65,6 +68,8 @@ public class FunctionRuntimeManager implements AutoCloseable{

// All the runtime info related to functions executed by this worker
// Fully Qualified InstanceId - > FunctionRuntimeInfo
// NOTE: please use setFunctionRuntimeInfo and deleteFunctionRuntimeInfo methods to modify this data structure
// Since during initialization phase nothing should be modified
@VisibleForTesting
Map<String, FunctionRuntimeInfo> functionRuntimeInfoMap = new ConcurrentHashMap<>();

Expand All @@ -75,7 +80,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
@VisibleForTesting
LinkedBlockingQueue<FunctionAction> actionQueue;

private final FunctionAssignmentTailer functionAssignmentTailer;
private FunctionAssignmentTailer functionAssignmentTailer;

private FunctionActioner functionActioner;

Expand All @@ -89,14 +94,16 @@ public class FunctionRuntimeManager implements AutoCloseable{
@Getter
private WorkerService workerService;

@Setter
@Getter
boolean isInitializePhase = false;

public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerService, Namespace dlogNamespace,
MembershipManager membershipManager, ConnectorsManager connectorsManager) throws Exception {
this.workerConfig = workerConfig;
this.workerService = workerService;
this.functionAdmin = workerService.getFunctionAdmin();

this.functionAssignmentTailer = new FunctionAssignmentTailer(this);

AuthenticationConfig authConfig = AuthenticationConfig.builder()
.clientAuthenticationPlugin(workerConfig.getClientAuthenticationPlugin())
.clientAuthenticationParameters(workerConfig.getClientAuthenticationParameters())
Expand Down Expand Up @@ -144,6 +151,41 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerSer
this.membershipManager = membershipManager;
}

/**
* Initializes the FunctionRuntimeManager. Does the following:
* 1. Consume all existing assignments to establish existing/latest set of assignments
* 2. After current assignments are read, assignments belonging to this worker will be processed
*/
public void initialize() {
log.info("/** Initializing Runtime Manager **/");
try {
Reader<byte[]> reader = this.getWorkerService().getClient().newReader()
.topic(this.getWorkerConfig().getFunctionAssignmentTopic()).readCompacted(true)
.startMessageId(MessageId.earliest).create();

this.functionAssignmentTailer = new FunctionAssignmentTailer(this, reader);
// read all existing messages
this.setInitializePhase(true);
while (reader.hasMessageAvailable()) {
this.functionAssignmentTailer.processAssignment(reader.readNext());
}
this.setInitializePhase(false);
// realize existing assignments
Map<String, Assignment> assignmentMap = workerIdToAssignments.get(this.workerConfig.getWorkerId());
if (assignmentMap != null) {
for (Assignment assignment : assignmentMap.values()) {
startFunctionInstance(assignment);
}
}
// start assignment tailer
this.functionAssignmentTailer.start();

} catch (Exception e) {
log.error("Failed to initialize function runtime manager: ", e.getMessage(), e);
throw new RuntimeException(e);
}
}

/**
* Starts the function runtime manager
*/
Expand Down Expand Up @@ -623,27 +665,29 @@ void deleteAssignment(Assignment assignment) {
}

private void addAssignment(Assignment assignment) {
String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance());

//add new function
this.setAssignment(assignment);

//Assigned to me
if (assignment.getWorkerId().equals(workerConfig.getWorkerId())) {
if (!this.functionRuntimeInfoMap.containsKey(fullyQualifiedInstanceId)) {
this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, new FunctionRuntimeInfo()
.setFunctionInstance(assignment.getInstance()));
startFunctionInstance(assignment);
}
}

} else {
//Somehow this function is already started
log.warn("Function {} already running. Going to restart function.",
this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId));
this.insertStopAction(this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId));
}
FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
this.insertStartAction(functionRuntimeInfo);
private void startFunctionInstance(Assignment assignment) {
String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance());
if (!this.functionRuntimeInfoMap.containsKey(fullyQualifiedInstanceId)) {
this.setFunctionRuntimeInfo(fullyQualifiedInstanceId, new FunctionRuntimeInfo()
.setFunctionInstance(assignment.getInstance()));

} else {
//Somehow this function is already started
log.warn("Function {} already running. Going to restart function.",
this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId));
this.insertStopAction(this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId));
}

FunctionRuntimeInfo functionRuntimeInfo = this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
this.insertStartAction(functionRuntimeInfo);
}

public Map<String, FunctionRuntimeInfo> getFunctionRuntimeInfos() {
Expand Down Expand Up @@ -675,26 +719,29 @@ public void updateRates() {

@VisibleForTesting
void insertStopAction(FunctionRuntimeInfo functionRuntimeInfo) {
FunctionAction functionAction = new FunctionAction();
functionAction.setAction(FunctionAction.Action.STOP);
functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
try {
actionQueue.put(functionAction);
} catch (InterruptedException ex) {
throw new RuntimeException("Interrupted while putting action");
if (!this.isInitializePhase) {
FunctionAction functionAction = new FunctionAction();
functionAction.setAction(FunctionAction.Action.STOP);
functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
try {
actionQueue.put(functionAction);
} catch (InterruptedException ex) {
throw new RuntimeException("Interrupted while putting action");
}
}

}

@VisibleForTesting
void insertStartAction(FunctionRuntimeInfo functionRuntimeInfo) {
FunctionAction functionAction = new FunctionAction();
functionAction.setAction(FunctionAction.Action.START);
functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
try {
actionQueue.put(functionAction);
} catch (InterruptedException ex) {
throw new RuntimeException("Interrupted while putting action");
if (!this.isInitializePhase) {
FunctionAction functionAction = new FunctionAction();
functionAction.setAction(FunctionAction.Action.START);
functionAction.setFunctionRuntimeInfo(functionRuntimeInfo);
try {
actionQueue.put(functionAction);
} catch (InterruptedException ex) {
throw new RuntimeException("Interrupted while putting action");
}
}
}

Expand Down Expand Up @@ -731,11 +778,16 @@ void setAssignment(Assignment assignment) {
}

private void deleteFunctionRuntimeInfo(String fullyQualifiedInstanceId) {
this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
if (!this.isInitializePhase) {
this.functionRuntimeInfoMap.remove(fullyQualifiedInstanceId);
}
}

private void setFunctionRuntimeInfo(String fullyQualifiedInstanceId, FunctionRuntimeInfo functionRuntimeInfo) {
this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, functionRuntimeInfo);
// Don't modify Function Runtime Infos when initializing
if (!this.isInitializePhase) {
this.functionRuntimeInfoMap.put(fullyQualifiedInstanceId, functionRuntimeInfo);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ && isNotBlank(workerConfig.getClientAuthenticationParameters())) {
this.functionRuntimeManager = new FunctionRuntimeManager(
this.workerConfig, this, this.dlogNamespace, this.membershipManager, connectorsManager);

// initialize function runtime manager
this.functionRuntimeManager.initialize();

// Setting references to managers in scheduler
this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager);
this.schedulerManager.setFunctionRuntimeManager(this.functionRuntimeManager);
Expand Down
Loading

0 comments on commit a1a1abe

Please sign in to comment.