From 7bcd8934a0a53ab7a62b3c9d77fbdec94ab497d2 Mon Sep 17 00:00:00 2001 From: Rajan Dhabalia Date: Mon, 13 Aug 2018 22:49:11 -0700 Subject: [PATCH] Add support to restart function (#2365) * Add support to restart function fix: pulsar function restart * add support to restart all function instances --- .../broker/admin/impl/FunctionsBase.java | 25 ++++ .../pulsar/io/PulsarFunctionTlsTest.java | 2 +- .../apache/pulsar/io/PulsarSinkE2ETest.java | 63 ++++++++++- .../apache/pulsar/client/admin/Functions.java | 33 ++++++ .../client/admin/internal/FunctionsImpl.java | 22 ++++ .../pulsar/admin/cli/CmdFunctionsTest.java | 29 +++++ .../apache/pulsar/admin/cli/CmdFunctions.java | 37 +++++- .../functions/worker/FunctionActioner.java | 4 +- .../worker/FunctionRuntimeManager.java | 107 ++++++++++++++++++ .../worker/rest/api/FunctionsImpl.java | 66 +++++++++++ .../rest/api/v2/FunctionApiV2Resource.java | 28 +++++ 11 files changed, 404 insertions(+), 12 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java index 564eb180675bc..b8891e5a92239 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java @@ -267,6 +267,31 @@ public Response triggerFunction(final @PathParam("tenant") String tenant, } + @POST + @ApiOperation(value = "Restart function instance", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/restart") + @Consumes(MediaType.APPLICATION_JSON) + public Response restartFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName, + final @PathParam("instanceId") String instanceId) { + return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId); + } + + @POST + @ApiOperation(value = "Restart all function instances", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{functionName}/restart") + @Consumes(MediaType.APPLICATION_JSON) + public Response restartFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) { + return functions.restartFunctionInstances(tenant, namespace, functionName); + } + @POST @ApiOperation( value = "Uploads Pulsar Function file data", diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java index c57a8a0e590e5..2254626804389 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java @@ -206,7 +206,7 @@ public void testAuthorization() throws Exception { String jarFilePathUrl = String.format("%s:%s", Utils.FILE, PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath()); FunctionDetails functionDetails = PulsarSinkE2ETest.createSinkConfig(jarFilePathUrl, tenant, namespacePortion, - functionName, sinkTopic, subscriptionName); + functionName, "my.*", sinkTopic, subscriptionName); try { functionAdmin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index 5db9a0ac0413f..1306f13335a5e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -271,7 +271,7 @@ public void testE2EPulsarSink() throws Exception { String jarFilePathUrl = Utils.FILE + ":" + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName, - sinkTopic, subscriptionName); + "my.*", sinkTopic, subscriptionName); admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); // try to update function to test: update-function functionality @@ -333,7 +333,7 @@ public void testPulsarSinkStats() throws Exception { String jarFilePathUrl = Utils.FILE + ":" + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName, - sinkTopic, subscriptionName); + "my.*", sinkTopic, subscriptionName); admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); // try to update function to test: update-function functionality @@ -382,7 +382,7 @@ public void testPulsarSinkStats() throws Exception { assertEquals(ownerWorkerId, workerId); } - protected static FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String functionName, String sinkTopic, String subscriptionName) { + protected static FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String functionName, String sourceTopic, String sinkTopic, String subscriptionName) { File file = new File(jarFile); try { @@ -390,7 +390,7 @@ protected static FunctionDetails createSinkConfig(String jarFile, String tenant, } catch (MalformedURLException e) { throw new RuntimeException("Failed to load user jar " + file, e); } - String sourceTopicPattern = String.format("persistent://%s/%s/my.*", tenant, namespace); + String sourceTopicPattern = String.format("persistent://%s/%s/%s", tenant, namespace, sourceTopic); Class typeArg = byte[].class; FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); @@ -446,7 +446,7 @@ public void testAuthorization(boolean validRoleName) throws Exception { String jarFilePathUrl = Utils.FILE + ":" + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName, - sinkTopic, subscriptionName); + "my.*", sinkTopic, subscriptionName); try { admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); assertTrue(validRoleName); @@ -507,4 +507,57 @@ public void testFileUrlFunctionWithoutPassingTypeArgs() throws Exception { assertEquals(functionMetadata.getSink().getTypeClassName(), typeArgs[1].getName()); } + + @Test(timeOut = 20000) + public void testFunctionRestartApi() throws Exception { + + final String namespacePortion = "io"; + final String replNamespace = tenant + "/" + namespacePortion; + final String sourceTopicName = "restartFunction"; + final String sourceTopic = "persistent://" + replNamespace + "/" + sourceTopicName; + final String sinkTopic = "persistent://" + replNamespace + "/output"; + final String functionName = "PulsarSink-test"; + final String subscriptionName = "test-sub"; + admin.namespaces().createNamespace(replNamespace); + Set clusters = Sets.newHashSet(Lists.newArrayList("use")); + admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); + + // create source topic + Producer producer = pulsarClient.newProducer().topic(sourceTopic).create(); + + String jarFilePathUrl = Utils.FILE + ":" + + PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath(); + FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName, + sourceTopicName, sinkTopic, subscriptionName); + admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl); + + retryStrategically((test) -> { + try { + SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); + return subStats != null && subStats.consumers.size() == 1; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + + SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); + assertEquals(subStats.consumers.size(), 1); + + // it should restart consumer : so, check if consumer came up again after restarting function + admin.functions().restartFunction(tenant, namespacePortion, functionName); + + retryStrategically((test) -> { + try { + SubscriptionStats subStat = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); + return subStat != null && subStat.consumers.size() == 1; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 150); + + subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName); + assertEquals(subStats.consumers.size(), 1); + + producer.close(); + } } \ No newline at end of file diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java index c04873da8e2d7..4525c51b810cd 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java @@ -181,6 +181,39 @@ public interface Functions { * Unexpected error */ FunctionStatusList getFunctionStatus(String tenant, String namespace, String function) throws PulsarAdminException; + + /** + * Restart function instance + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + * + * @param instanceId + * Function instanceId + * + * @throws PulsarAdminException + * Unexpected error + */ + void restartFunction(String tenant, String namespace, String function, int instanceId) throws PulsarAdminException; + + /** + * Restart all function instances + * + * @param tenant + * Tenant name + * @param namespace + * Namespace name + * @param function + * Function name + * + * @throws PulsarAdminException + * Unexpected error + */ + void restartFunction(String tenant, String namespace, String function) throws PulsarAdminException; /** * Triggers the function by writing to the input topic. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java index 028da3c2b146c..402b5d397122b 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java @@ -215,6 +215,27 @@ public String triggerFunction(String tenant, String namespace, String functionNa } } + @Override + public void restartFunction(String tenant, String namespace, String functionName, int instanceId) + throws PulsarAdminException { + try { + request(functions.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId)) + .path("restart")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + + @Override + public void restartFunction(String tenant, String namespace, String functionName) throws PulsarAdminException { + try { + request(functions.path(tenant).path(namespace).path(functionName).path("restart")) + .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); + } catch (Exception e) { + throw getApiException(e); + } + } + @Override public void uploadFunction(String sourceFile, String path) throws PulsarAdminException { try { @@ -289,4 +310,5 @@ public static void mergeJson(String json, Builder builder) throws IOException { public static String printJson(MessageOrBuilder msg) throws IOException { return JsonFormat.printer().print(msg); } + } \ No newline at end of file diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java index e206e752538a0..11a3c7a0b945c 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java @@ -31,6 +31,7 @@ import org.apache.pulsar.admin.cli.CmdFunctions.DeleteFunction; import org.apache.pulsar.admin.cli.CmdFunctions.GetFunction; import org.apache.pulsar.admin.cli.CmdFunctions.ListFunctions; +import org.apache.pulsar.admin.cli.CmdFunctions.RestartFunction; import org.apache.pulsar.admin.cli.CmdFunctions.UpdateFunction; import org.apache.pulsar.admin.cli.CmdSinks.CreateSink; import org.apache.pulsar.admin.cli.CmdSources.CreateSource; @@ -214,6 +215,34 @@ public void testCreateFunction() throws Exception { } + @Test + public void restartFunction() throws Exception { + String fnName = TEST_NAME + "-function"; + String tenant = "sample"; + String namespace = "ns1"; + int instanceId = 0; + cmd.run(new String[] { "restart", "--tenant", tenant, "--namespace", namespace, "--name", fnName, + "--instance-id", Integer.toString(instanceId)}); + + RestartFunction restarter = cmd.getRestarter(); + assertEquals(fnName, restarter.getFunctionName()); + + verify(functions, times(1)).restartFunction(tenant, namespace, fnName, instanceId); + } + + @Test + public void restartFunctionInstances() throws Exception { + String fnName = TEST_NAME + "-function"; + String tenant = "sample"; + String namespace = "ns1"; + cmd.run(new String[] { "restart", "--tenant", tenant, "--namespace", namespace, "--name", fnName }); + + RestartFunction restarter = cmd.getRestarter(); + assertEquals(fnName, restarter.getFunctionName()); + + verify(functions, times(1)).restartFunction(tenant, namespace, fnName); + } + @Test public void testCreateFunctionWithHttpUrl() throws Exception { String fnName = TEST_NAME + "-function"; diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index b11dabee82b17..dd1ef3df5dc0e 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -102,7 +102,8 @@ public class CmdFunctions extends CmdBase { private final DeleteFunction deleter; private final UpdateFunction updater; private final GetFunction getter; - private final GetFunctionStatus statuser; + private final GetFunctionStatus functionStatus; + private final RestartFunction restart; private final ListFunctions lister; private final StateGetter stateGetter; private final TriggerFunction triggerer; @@ -164,7 +165,7 @@ abstract class FunctionCommand extends BaseCommand { @Parameter(names = "--name", description = "The function's name") protected String functionName; - + @Override void processArguments() throws Exception { super.processArguments(); @@ -831,6 +832,27 @@ void runCmd() throws Exception { } } + @Parameters(commandDescription = "Restart function instance") + class RestartFunction extends FunctionCommand { + + @Parameter(names = "--instance-id", description = "The function instanceId (restart all instances if instance-id is not provided") + protected String instanceId; + + @Override + void runCmd() throws Exception { + if (isNotBlank(instanceId)) { + try { + admin.functions().restartFunction(tenant, namespace, functionName, Integer.parseInt(instanceId)); + } catch (NumberFormatException e) { + System.err.println("instance-id must be a number"); + } + } else { + admin.functions().restartFunction(tenant, namespace, functionName); + } + System.out.println("Restarted successfully"); + } + } + @Parameters(commandDescription = "Delete a Pulsar Function that's running on a Pulsar cluster") class DeleteFunction extends FunctionCommand { @Override @@ -1035,18 +1057,20 @@ public CmdFunctions(PulsarAdmin admin) throws PulsarClientException { deleter = new DeleteFunction(); updater = new UpdateFunction(); getter = new GetFunction(); - statuser = new GetFunctionStatus(); + functionStatus = new GetFunctionStatus(); lister = new ListFunctions(); stateGetter = new StateGetter(); triggerer = new TriggerFunction(); uploader = new UploadFunction(); downloader = new DownloadFunction(); cluster = new GetCluster(); + restart = new RestartFunction(); jcommander.addCommand("localrun", getLocalRunner()); jcommander.addCommand("create", getCreater()); jcommander.addCommand("delete", getDeleter()); jcommander.addCommand("update", getUpdater()); jcommander.addCommand("get", getGetter()); + jcommander.addCommand("restart", getRestarter()); jcommander.addCommand("getstatus", getStatuser()); jcommander.addCommand("list", getLister()); jcommander.addCommand("querystate", getStateGetter()); @@ -1082,7 +1106,7 @@ GetFunction getGetter() { } @VisibleForTesting - GetFunctionStatus getStatuser() { return statuser; } + GetFunctionStatus getStatuser() { return functionStatus; } @VisibleForTesting ListFunctions getLister() { @@ -1109,6 +1133,11 @@ DownloadFunction getDownloader() { return downloader; } + @VisibleForTesting + RestartFunction getRestarter() { + return restart; + } + private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig functionConfig) { String[] args = fqfn.split("/"); if (args.length != 3) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index 09273606ab978..b3f30fdb95810 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -127,7 +127,7 @@ public void join() throws InterruptedException { } @VisibleForTesting - protected void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Exception { + public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Exception { FunctionMetaData functionMetaData = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData(); int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId(); @@ -225,7 +225,7 @@ private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaDa } } - private void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) { + public void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) { Function.Instance instance = functionRuntimeInfo.getFunctionInstance(); FunctionMetaData functionMetaData = instance.getFunctionMetaData(); log.info("Stopping function {} - {}...", diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 5e1995e6339aa..121a454a91d33 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -20,11 +20,15 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + import lombok.extern.slf4j.Slf4j; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.common.policies.data.ErrorData; import org.apache.pulsar.functions.proto.Function.Assignment; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.proto.InstanceCommunication; @@ -35,9 +39,14 @@ import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory; import org.apache.pulsar.functions.runtime.RuntimeSpawner; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Entity; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.Status; + import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; @@ -317,6 +326,104 @@ public InstanceCommunication.FunctionStatus getFunctionInstanceStatus(String ten return functionStatus; } + public Response restartFunctionInstance(String tenant, String namespace, String functionName, int instanceId) throws Exception { + Assignment assignment = this.findAssignment(tenant, namespace, functionName, instanceId); + final String fullFunctionName = String.format("%s/%s/%s/%s", tenant, namespace, functionName, instanceId); + if (assignment == null) { + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(fullFunctionName + " doesn't exist")).build(); + } + + final String assignedWorkerId = assignment.getWorkerId(); + final String workerId = this.workerConfig.getWorkerId(); + + if (assignedWorkerId.equals(workerId)) { + restartFunction(Utils.getFullyQualifiedInstanceId(assignment.getInstance())); + return Response.status(Status.OK).build(); + } else { + // query other worker + List workerInfoList = this.membershipManager.getCurrentMembership(); + WorkerInfo workerInfo = null; + for (WorkerInfo entry : workerInfoList) { + if (assignment.getWorkerId().equals(entry.getWorkerId())) { + workerInfo = entry; + } + } + if (workerInfo == null) { + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build(); + } + + URI redirect = null; + final String redirectUrl = String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/restart", + workerInfo.getWorkerHostname(), workerInfo.getPort(), tenant, namespace, functionName, instanceId); + try { + redirect = new URI(redirectUrl); + } catch (URISyntaxException e) { + log.error("Error in preparing redirect url for {}/{}/{}/{}: {}", tenant, namespace, functionName, + instanceId, e.getMessage(), e); + return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(fullFunctionName + " invalid redirection url")).build(); + } + throw new WebApplicationException(Response.temporaryRedirect(redirect).build()); + } + } + + public Response restartFunctionInstances(String tenant, String namespace, String functionName) throws Exception { + final String fullFunctionName = String.format("%s/%s/%s", tenant, namespace, functionName); + Collection assignments = this.findFunctionAssignments(tenant, namespace, functionName); + + if (assignments.isEmpty()) { + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(fullFunctionName + " has not been assigned yet")).build(); + } + for (Assignment assignment : assignments) { + final String assignedWorkerId = assignment.getWorkerId(); + final String workerId = this.workerConfig.getWorkerId(); + String fullyQualifiedInstanceId = Utils.getFullyQualifiedInstanceId(assignment.getInstance()); + if (assignedWorkerId.equals(workerId)) { + restartFunction(fullyQualifiedInstanceId); + } else { + List workerInfoList = this.membershipManager.getCurrentMembership(); + WorkerInfo workerInfo = null; + for (WorkerInfo entry : workerInfoList) { + if (assignment.getWorkerId().equals(entry.getWorkerId())) { + workerInfo = entry; + } + } + if (workerInfo == null) { + if (log.isDebugEnabled()) { + log.debug("[{}] has not been assigned yet", fullyQualifiedInstanceId); + } + continue; + } + Client client = ClientBuilder.newClient(); + // TODO: create and use pulsar-admin to support authorization and authentication and manage redirect + final String instanceRestartUrl = String.format("http://%s:%d/admin/functions/%s/%s/%s/%d/restart", + workerInfo.getWorkerHostname(), workerInfo.getPort(), tenant, namespace, functionName, + assignment.getInstance().getInstanceId()); + client.target(instanceRestartUrl).request(MediaType.APPLICATION_JSON) + .post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class); + } + } + return Response.status(Status.OK).build(); + } + + private void restartFunction(String fullyQualifiedInstanceId) throws Exception { + log.info("[{}] restarting..", fullyQualifiedInstanceId); + FunctionRuntimeInfo functionRuntimeInfo = this.getFunctionRuntimeInfo(fullyQualifiedInstanceId); + if (functionRuntimeInfo != null) { + this.functionActioner.stopFunction(functionRuntimeInfo); + try { + this.functionActioner.startFunction(functionRuntimeInfo); + } catch (Exception ex) { + log.info("{} Error starting function", fullyQualifiedInstanceId, ex); + functionRuntimeInfo.setStartupException(ex); + throw ex; + } + } + } + /** * Get statuses of all function instances. * @param tenant the tenant the function belongs to diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java index 4bb6e49cc499b..a9e03de9e31d1 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java @@ -386,6 +386,72 @@ public Response getFunctionInstanceStatus(final String tenant, final String name return Response.status(Status.OK).entity(jsonResponse).build(); } + public Response restartFunctionInstance(final String tenant, final String namespace, final String functionName, + final String instanceId) { + + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + + // validate parameters + try { + validateGetFunctionInstanceRequestParams(tenant, namespace, functionName, instanceId); + } catch (IllegalArgumentException e) { + log.error("Invalid restart-function request @ /{}/{}/{}", tenant, namespace, functionName, e); + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(e.getMessage())).build(); + } + + FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); + if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) { + log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}", tenant, namespace, functionName); + return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build(); + } + + FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); + try { + return functionRuntimeManager.restartFunctionInstance(tenant, namespace, functionName, + Integer.parseInt(instanceId)); + } catch (WebApplicationException we) { + throw we; + } catch (Exception e) { + log.error("Failed to restart function: {}/{}/{}/{}", tenant, namespace, functionName, instanceId, e); + return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build(); + } + } + + public Response restartFunctionInstances(final String tenant, final String namespace, final String functionName) { + + if (!isWorkerServiceAvailable()) { + return getUnavailableResponse(); + } + + // validate parameters + try { + validateGetFunctionRequestParams(tenant, namespace, functionName); + } catch (IllegalArgumentException e) { + log.error("Invalid restart-Function request @ /{}/{}/{}", tenant, namespace, functionName, e); + return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(e.getMessage())).build(); + } + + FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager(); + if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) { + log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}", tenant, namespace, functionName); + return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON) + .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build(); + } + + FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager(); + try { + return functionRuntimeManager.restartFunctionInstances(tenant, namespace, functionName); + }catch (Exception e) { + log.error("Failed to restart function: {}/{}/{}", tenant, namespace, functionName, e); + return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build(); + } + } + public Response getFunctionStatus(final String tenant, final String namespace, final String functionName) throws IOException { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java index 96baada967b59..3581453920b3d 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java @@ -22,6 +22,7 @@ import org.apache.pulsar.functions.worker.rest.FunctionApiResource; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.glassfish.jersey.media.multipart.FormDataParam; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.common.io.ConnectorDefinition; import java.io.IOException; import java.io.InputStream; @@ -45,6 +46,8 @@ import org.glassfish.jersey.media.multipart.FormDataParam; import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; import lombok.extern.slf4j.Slf4j; @Slf4j @@ -162,6 +165,31 @@ public Response triggerFunction(final @PathParam("tenant") String tenant, return functions.triggerFunction(tenant, namespace, functionName, input, uploadedInputStream, topic); } + @POST + @ApiOperation(value = "Restart function instance", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/restart") + @Consumes(MediaType.APPLICATION_JSON) + public Response restartFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName, + final @PathParam("instanceId") String instanceId) { + return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId); + } + + @POST + @ApiOperation(value = "Restart all function instances", response = Void.class) + @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"), + @ApiResponse(code = 404, message = "The function does not exist"), + @ApiResponse(code = 500, message = "Internal server error") }) + @Path("/{tenant}/{namespace}/{functionName}/restart") + @Consumes(MediaType.APPLICATION_JSON) + public Response restartFunction(final @PathParam("tenant") String tenant, + final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) { + return functions.restartFunctionInstances(tenant, namespace, functionName); + } + @POST @Path("/upload") @Consumes(MediaType.MULTIPART_FORM_DATA)