Skip to content

Commit

Permalink
Add support to restart function (apache#2365)
Browse files Browse the repository at this point in the history
* Add support to restart function

fix: pulsar function restart

* add support to restart all function instances
  • Loading branch information
rdhabalia authored Aug 14, 2018
1 parent c9988cf commit 7bcd893
Show file tree
Hide file tree
Showing 11 changed files with 404 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,31 @@ public Response triggerFunction(final @PathParam("tenant") String tenant,

}

@POST
@ApiOperation(value = "Restart function instance", response = Void.class)
@ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The function does not exist"),
@ApiResponse(code = 500, message = "Internal server error") })
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/restart")
@Consumes(MediaType.APPLICATION_JSON)
public Response restartFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
final @PathParam("instanceId") String instanceId) {
return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId);
}

@POST
@ApiOperation(value = "Restart all function instances", response = Void.class)
@ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The function does not exist"),
@ApiResponse(code = 500, message = "Internal server error") })
@Path("/{tenant}/{namespace}/{functionName}/restart")
@Consumes(MediaType.APPLICATION_JSON)
public Response restartFunction(final @PathParam("tenant") String tenant,
final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
return functions.restartFunctionInstances(tenant, namespace, functionName);
}

@POST
@ApiOperation(
value = "Uploads Pulsar Function file data",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public void testAuthorization() throws Exception {
String jarFilePathUrl = String.format("%s:%s", Utils.FILE,
PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath());
FunctionDetails functionDetails = PulsarSinkE2ETest.createSinkConfig(jarFilePathUrl, tenant, namespacePortion,
functionName, sinkTopic, subscriptionName);
functionName, "my.*", sinkTopic, subscriptionName);

try {
functionAdmin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public void testE2EPulsarSink() throws Exception {
String jarFilePathUrl = Utils.FILE + ":"
+ PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName,
sinkTopic, subscriptionName);
"my.*", sinkTopic, subscriptionName);
admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl);

// try to update function to test: update-function functionality
Expand Down Expand Up @@ -333,7 +333,7 @@ public void testPulsarSinkStats() throws Exception {
String jarFilePathUrl = Utils.FILE + ":"
+ PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName,
sinkTopic, subscriptionName);
"my.*", sinkTopic, subscriptionName);
admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl);

// try to update function to test: update-function functionality
Expand Down Expand Up @@ -382,15 +382,15 @@ public void testPulsarSinkStats() throws Exception {
assertEquals(ownerWorkerId, workerId);
}

protected static FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String functionName, String sinkTopic, String subscriptionName) {
protected static FunctionDetails createSinkConfig(String jarFile, String tenant, String namespace, String functionName, String sourceTopic, String sinkTopic, String subscriptionName) {

File file = new File(jarFile);
try {
Reflections.loadJar(file);
} catch (MalformedURLException e) {
throw new RuntimeException("Failed to load user jar " + file, e);
}
String sourceTopicPattern = String.format("persistent://%s/%s/my.*", tenant, namespace);
String sourceTopicPattern = String.format("persistent://%s/%s/%s", tenant, namespace, sourceTopic);
Class<?> typeArg = byte[].class;

FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
Expand Down Expand Up @@ -446,7 +446,7 @@ public void testAuthorization(boolean validRoleName) throws Exception {
String jarFilePathUrl = Utils.FILE + ":"
+ PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName,
sinkTopic, subscriptionName);
"my.*", sinkTopic, subscriptionName);
try {
admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl);
assertTrue(validRoleName);
Expand Down Expand Up @@ -507,4 +507,57 @@ public void testFileUrlFunctionWithoutPassingTypeArgs() throws Exception {
assertEquals(functionMetadata.getSink().getTypeClassName(), typeArgs[1].getName());

}

@Test(timeOut = 20000)
public void testFunctionRestartApi() throws Exception {

final String namespacePortion = "io";
final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopicName = "restartFunction";
final String sourceTopic = "persistent://" + replNamespace + "/" + sourceTopicName;
final String sinkTopic = "persistent://" + replNamespace + "/output";
final String functionName = "PulsarSink-test";
final String subscriptionName = "test-sub";
admin.namespaces().createNamespace(replNamespace);
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);

// create source topic
Producer<byte[]> producer = pulsarClient.newProducer().topic(sourceTopic).create();

String jarFilePathUrl = Utils.FILE + ":"
+ PulsarSink.class.getProtectionDomain().getCodeSource().getLocation().getPath();
FunctionDetails functionDetails = createSinkConfig(jarFilePathUrl, tenant, namespacePortion, functionName,
sourceTopicName, sinkTopic, subscriptionName);
admin.functions().createFunctionWithUrl(functionDetails, jarFilePathUrl);

retryStrategically((test) -> {
try {
SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
return subStats != null && subStats.consumers.size() == 1;
} catch (PulsarAdminException e) {
return false;
}
}, 5, 150);

SubscriptionStats subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
assertEquals(subStats.consumers.size(), 1);

// it should restart consumer : so, check if consumer came up again after restarting function
admin.functions().restartFunction(tenant, namespacePortion, functionName);

retryStrategically((test) -> {
try {
SubscriptionStats subStat = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
return subStat != null && subStat.consumers.size() == 1;
} catch (PulsarAdminException e) {
return false;
}
}, 5, 150);

subStats = admin.topics().getStats(sourceTopic).subscriptions.get(subscriptionName);
assertEquals(subStats.consumers.size(), 1);

producer.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,39 @@ public interface Functions {
* Unexpected error
*/
FunctionStatusList getFunctionStatus(String tenant, String namespace, String function) throws PulsarAdminException;

/**
* Restart function instance
*
* @param tenant
* Tenant name
* @param namespace
* Namespace name
* @param function
* Function name
*
* @param instanceId
* Function instanceId
*
* @throws PulsarAdminException
* Unexpected error
*/
void restartFunction(String tenant, String namespace, String function, int instanceId) throws PulsarAdminException;

/**
* Restart all function instances
*
* @param tenant
* Tenant name
* @param namespace
* Namespace name
* @param function
* Function name
*
* @throws PulsarAdminException
* Unexpected error
*/
void restartFunction(String tenant, String namespace, String function) throws PulsarAdminException;

/**
* Triggers the function by writing to the input topic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,27 @@ public String triggerFunction(String tenant, String namespace, String functionNa
}
}

@Override
public void restartFunction(String tenant, String namespace, String functionName, int instanceId)
throws PulsarAdminException {
try {
request(functions.path(tenant).path(namespace).path(functionName).path(Integer.toString(instanceId))
.path("restart")).post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public void restartFunction(String tenant, String namespace, String functionName) throws PulsarAdminException {
try {
request(functions.path(tenant).path(namespace).path(functionName).path("restart"))
.post(Entity.entity("", MediaType.APPLICATION_JSON), ErrorData.class);
} catch (Exception e) {
throw getApiException(e);
}
}

@Override
public void uploadFunction(String sourceFile, String path) throws PulsarAdminException {
try {
Expand Down Expand Up @@ -289,4 +310,5 @@ public static void mergeJson(String json, Builder builder) throws IOException {
public static String printJson(MessageOrBuilder msg) throws IOException {
return JsonFormat.printer().print(msg);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.pulsar.admin.cli.CmdFunctions.DeleteFunction;
import org.apache.pulsar.admin.cli.CmdFunctions.GetFunction;
import org.apache.pulsar.admin.cli.CmdFunctions.ListFunctions;
import org.apache.pulsar.admin.cli.CmdFunctions.RestartFunction;
import org.apache.pulsar.admin.cli.CmdFunctions.UpdateFunction;
import org.apache.pulsar.admin.cli.CmdSinks.CreateSink;
import org.apache.pulsar.admin.cli.CmdSources.CreateSource;
Expand Down Expand Up @@ -214,6 +215,34 @@ public void testCreateFunction() throws Exception {

}

@Test
public void restartFunction() throws Exception {
String fnName = TEST_NAME + "-function";
String tenant = "sample";
String namespace = "ns1";
int instanceId = 0;
cmd.run(new String[] { "restart", "--tenant", tenant, "--namespace", namespace, "--name", fnName,
"--instance-id", Integer.toString(instanceId)});

RestartFunction restarter = cmd.getRestarter();
assertEquals(fnName, restarter.getFunctionName());

verify(functions, times(1)).restartFunction(tenant, namespace, fnName, instanceId);
}

@Test
public void restartFunctionInstances() throws Exception {
String fnName = TEST_NAME + "-function";
String tenant = "sample";
String namespace = "ns1";
cmd.run(new String[] { "restart", "--tenant", tenant, "--namespace", namespace, "--name", fnName });

RestartFunction restarter = cmd.getRestarter();
assertEquals(fnName, restarter.getFunctionName());

verify(functions, times(1)).restartFunction(tenant, namespace, fnName);
}

@Test
public void testCreateFunctionWithHttpUrl() throws Exception {
String fnName = TEST_NAME + "-function";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public class CmdFunctions extends CmdBase {
private final DeleteFunction deleter;
private final UpdateFunction updater;
private final GetFunction getter;
private final GetFunctionStatus statuser;
private final GetFunctionStatus functionStatus;
private final RestartFunction restart;
private final ListFunctions lister;
private final StateGetter stateGetter;
private final TriggerFunction triggerer;
Expand Down Expand Up @@ -164,7 +165,7 @@ abstract class FunctionCommand extends BaseCommand {

@Parameter(names = "--name", description = "The function's name")
protected String functionName;

@Override
void processArguments() throws Exception {
super.processArguments();
Expand Down Expand Up @@ -831,6 +832,27 @@ void runCmd() throws Exception {
}
}

@Parameters(commandDescription = "Restart function instance")
class RestartFunction extends FunctionCommand {

@Parameter(names = "--instance-id", description = "The function instanceId (restart all instances if instance-id is not provided")
protected String instanceId;

@Override
void runCmd() throws Exception {
if (isNotBlank(instanceId)) {
try {
admin.functions().restartFunction(tenant, namespace, functionName, Integer.parseInt(instanceId));
} catch (NumberFormatException e) {
System.err.println("instance-id must be a number");
}
} else {
admin.functions().restartFunction(tenant, namespace, functionName);
}
System.out.println("Restarted successfully");
}
}

@Parameters(commandDescription = "Delete a Pulsar Function that's running on a Pulsar cluster")
class DeleteFunction extends FunctionCommand {
@Override
Expand Down Expand Up @@ -1035,18 +1057,20 @@ public CmdFunctions(PulsarAdmin admin) throws PulsarClientException {
deleter = new DeleteFunction();
updater = new UpdateFunction();
getter = new GetFunction();
statuser = new GetFunctionStatus();
functionStatus = new GetFunctionStatus();
lister = new ListFunctions();
stateGetter = new StateGetter();
triggerer = new TriggerFunction();
uploader = new UploadFunction();
downloader = new DownloadFunction();
cluster = new GetCluster();
restart = new RestartFunction();
jcommander.addCommand("localrun", getLocalRunner());
jcommander.addCommand("create", getCreater());
jcommander.addCommand("delete", getDeleter());
jcommander.addCommand("update", getUpdater());
jcommander.addCommand("get", getGetter());
jcommander.addCommand("restart", getRestarter());
jcommander.addCommand("getstatus", getStatuser());
jcommander.addCommand("list", getLister());
jcommander.addCommand("querystate", getStateGetter());
Expand Down Expand Up @@ -1082,7 +1106,7 @@ GetFunction getGetter() {
}

@VisibleForTesting
GetFunctionStatus getStatuser() { return statuser; }
GetFunctionStatus getStatuser() { return functionStatus; }

@VisibleForTesting
ListFunctions getLister() {
Expand All @@ -1109,6 +1133,11 @@ DownloadFunction getDownloader() {
return downloader;
}

@VisibleForTesting
RestartFunction getRestarter() {
return restart;
}

private void parseFullyQualifiedFunctionName(String fqfn, FunctionConfig functionConfig) {
String[] args = fqfn.split("/");
if (args.length != 3) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void join() throws InterruptedException {
}

@VisibleForTesting
protected void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Exception {
public void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Exception {
FunctionMetaData functionMetaData = functionRuntimeInfo.getFunctionInstance().getFunctionMetaData();
int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();

Expand Down Expand Up @@ -225,7 +225,7 @@ private void downloadFile(File pkgFile, boolean isPkgUrlProvided, FunctionMetaDa
}
}

private void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
public void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
Function.Instance instance = functionRuntimeInfo.getFunctionInstance();
FunctionMetaData functionMetaData = instance.getFunctionMetaData();
log.info("Stopping function {} - {}...",
Expand Down
Loading

0 comments on commit 7bcd893

Please sign in to comment.