Skip to content

Commit

Permalink
fixing and refactoring function status (apache#3102)
Browse files Browse the repository at this point in the history
* fixing and refactoring function status

* further refactoring

* cleaning up
  • Loading branch information
jerrypeng authored Dec 3, 2018
1 parent c7440f9 commit d4794bd
Show file tree
Hide file tree
Showing 10 changed files with 796 additions and 431 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -669,9 +669,8 @@ public void testPulsarFunctionStatus() throws Exception {
}
}, 5, 200);

FunctionRuntimeManager functionRuntimeManager = functionsWorkerService.getFunctionRuntimeManager();
FunctionStatus functionStatus = functionRuntimeManager.getFunctionStatus(tenant, namespacePortion,
functionName, null);
FunctionStatus functionStatus = admin.functions().getFunctionStatus(tenant, namespacePortion,
functionName);

int numInstances = functionStatus.getNumInstances();
assertEquals(numInstances, 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,10 @@
*/
package org.apache.pulsar.functions.worker;

import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;

import com.google.common.annotations.VisibleForTesting;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
Expand All @@ -46,24 +30,35 @@
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.FunctionStatus;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.Assignment;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.runtime.*;

import com.google.common.annotations.VisibleForTesting;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.Reflections;

import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;

/**
* This class managers all aspects of functions assignments and running of function assignments for this worker
*/
Expand Down Expand Up @@ -590,207 +585,6 @@ public FunctionStats getFunctionStats(String tenant, String namespace, String fu
return functionStats.calculateOverall();
}

/**
* Get status of a function instance. If this worker is not running the function instance,
* @param tenant the tenant the function belongs to
* @param namespace the namespace the function belongs to
* @param functionName the function name
* @param instanceId the function instance id
* @return the function status
*/
public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData getFunctionInstanceStatus(
String tenant, String namespace,
String functionName, int instanceId, URI uri) {
Assignment assignment;
if (runtimeFactory.externallyManaged()) {
assignment = this.findAssignment(tenant, namespace, functionName, -1);
} else {
assignment = this.findAssignment(tenant, namespace, functionName, instanceId);
}

if (assignment == null) {
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData functionInstanceStatusData
= new FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
functionInstanceStatusData.setRunning(false);
functionInstanceStatusData.setError("Function has not been scheduled");
return functionInstanceStatusData;
}

final String assignedWorkerId = assignment.getWorkerId();
final String workerId = this.workerConfig.getWorkerId();

// If I am running worker
if (assignedWorkerId.equals(workerId)) {
FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(
org.apache.pulsar.functions.utils.Utils.getFullyQualifiedInstanceId(assignment.getInstance()));
RuntimeSpawner runtimeSpawner = functionRuntimeInfo.getRuntimeSpawner();
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData functionInstanceStatusData
= new FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
if (runtimeSpawner != null) {
try {
InstanceCommunication.FunctionStatus status = functionRuntimeInfo.getRuntimeSpawner().getFunctionStatus(instanceId).get();
functionInstanceStatusData.setRunning(status.getRunning());
functionInstanceStatusData.setError(status.getFailureException());
functionInstanceStatusData.setNumRestarts(status.getNumRestarts());
functionInstanceStatusData.setNumReceived(status.getNumReceived());
functionInstanceStatusData.setNumSuccessfullyProcessed(status.getNumSuccessfullyProcessed());
functionInstanceStatusData.setNumUserExceptions(status.getNumUserExceptions());

List<ExceptionInformation> userExceptionInformationList = new LinkedList<>();
for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestUserExceptionsList()) {
ExceptionInformation exceptionInformation
= new ExceptionInformation();
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
userExceptionInformationList.add(exceptionInformation);
}
functionInstanceStatusData.setLatestUserExceptions(userExceptionInformationList);

functionInstanceStatusData.setNumSystemExceptions(status.getNumSystemExceptions());
List<ExceptionInformation> systemExceptionInformationList = new LinkedList<>();
for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionEntry : status.getLatestSystemExceptionsList()) {
ExceptionInformation exceptionInformation
= new ExceptionInformation();
exceptionInformation.setTimestampMs(exceptionEntry.getMsSinceEpoch());
exceptionInformation.setExceptionString(exceptionEntry.getExceptionString());
systemExceptionInformationList.add(exceptionInformation);
}
functionInstanceStatusData.setLatestSystemExceptions(systemExceptionInformationList);

functionInstanceStatusData.setAverageLatency(status.getAverageLatency());
functionInstanceStatusData.setLastInvocationTime(status.getLastInvocationTime());
functionInstanceStatusData.setWorkerId(assignedWorkerId);


} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
} else {
functionInstanceStatusData.setRunning(false);
if (functionRuntimeInfo.getStartupException() != null) {
functionInstanceStatusData.setError(functionRuntimeInfo.getStartupException().getMessage());
}
functionInstanceStatusData.setWorkerId(assignedWorkerId);
}
return functionInstanceStatusData;
} else {
// query other worker

List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
WorkerInfo workerInfo = null;
for (WorkerInfo entry: workerInfoList) {
if (assignment.getWorkerId().equals(entry.getWorkerId())) {
workerInfo = entry;
}
}
if (workerInfo == null) {
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData functionInstanceStatusData
= new FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData();
functionInstanceStatusData.setRunning(false);
functionInstanceStatusData.setError("Function has not been scheduled");
return functionInstanceStatusData;
}

if (uri == null) {
throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
} else {
URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
}
}
}

/**
* Get statuses of all function instances.
* @param tenant the tenant the function belongs to
* @param namespace the namespace the function belongs to
* @param functionName the function name
* @return a list of function statuses
* @throws PulsarAdminException
*/
public FunctionStatus getFunctionStatus(String tenant, String namespace,
String functionName, URI uri)
throws PulsarAdminException {

Collection<Assignment> assignments = this.findFunctionAssignments(tenant, namespace, functionName);

FunctionStatus functionStatus = new FunctionStatus();
if (assignments.isEmpty()) {
Function.FunctionMetaData functionMetaData = workerService.getFunctionMetaDataManager().getFunctionMetaData(tenant, namespace, functionName);
functionStatus.setNumInstances(functionMetaData.getFunctionDetails().getParallelism());
functionStatus.setNumRunning(0);

return functionStatus;
}

// TODO refactor the code for externally managed.
if (runtimeFactory.externallyManaged()) {
Assignment assignment = assignments.iterator().next();
boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
if (isOwner) {
int parallelism = assignment.getInstance().getFunctionMetaData().getFunctionDetails().getParallelism();
for (int i = 0; i < parallelism; ++i) {
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData functionInstanceStatusData
= getFunctionInstanceStatus(tenant, namespace, functionName, i, null);
FunctionStatus.FunctionInstanceStatus functionInstanceStatus
= new FunctionStatus.FunctionInstanceStatus();
functionInstanceStatus.setInstanceId(i);
functionInstanceStatus.setStatus(functionInstanceStatusData);
functionStatus.addInstance(functionInstanceStatus);
}
} else {
// find the hostname/port of the worker who is the owner

List<WorkerInfo> workerInfoList = this.membershipManager.getCurrentMembership();
WorkerInfo workerInfo = null;
for (WorkerInfo entry: workerInfoList) {
if (assignment.getWorkerId().equals(entry.getWorkerId())) {
workerInfo = entry;
}
}
if (workerInfo == null) {
Function.FunctionMetaData functionMetaData = workerService.getFunctionMetaDataManager().getFunctionMetaData(tenant, namespace, functionName);
functionStatus.setNumInstances(functionMetaData.getFunctionDetails().getParallelism());
functionStatus.setNumRunning(0);
return functionStatus;
}

if (uri == null) {
throw new WebApplicationException(Response.serverError().status(Status.INTERNAL_SERVER_ERROR).build());
} else {
URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
}
}
} else {
for (Assignment assignment : assignments) {
boolean isOwner = this.workerConfig.getWorkerId().equals(assignment.getWorkerId());
FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData functionInstanceStatusData;
if (isOwner) {
functionInstanceStatusData = getFunctionInstanceStatus(tenant, namespace, functionName, assignment.getInstance().getInstanceId(), null);
} else {
functionInstanceStatusData = this.functionAdmin.functions().getFunctionStatus(
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(),
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(),
assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(),
assignment.getInstance().getInstanceId());
}

FunctionStatus.FunctionInstanceStatus instanceStatus = new FunctionStatus.FunctionInstanceStatus();
instanceStatus.setInstanceId(assignment.getInstance().getInstanceId());
instanceStatus.setStatus(functionInstanceStatusData);
functionStatus.addInstance(instanceStatus);
}
}
functionStatus.setNumInstances(functionStatus.instances.size());
functionStatus.getInstances().forEach(functionInstanceStatus -> {
if (functionInstanceStatus.getStatus().isRunning()) {
functionStatus.numRunning++;
}
});
return functionStatus;
}

/**
* Process an assignment update from the assignment topic
* @param newAssignment the assignment
Expand Down Expand Up @@ -990,8 +784,11 @@ public void close() throws Exception {
}
}

private FunctionRuntimeInfo getFunctionRuntimeInfo(String fullyQualifiedInstanceId) {
return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
public synchronized FunctionRuntimeInfo getFunctionRuntimeInfo(String fullyQualifiedInstanceId) {
return getFunctionRuntimeInfoInternal(fullyQualifiedInstanceId);
}

private FunctionRuntimeInfo getFunctionRuntimeInfoInternal(String fullyQualifiedInstanceId) {
return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId);
}
}
Loading

0 comments on commit d4794bd

Please sign in to comment.