Skip to content

Commit

Permalink
During Function update, cleanup should only happen for temp files tha…
Browse files Browse the repository at this point in the history
…t were generated (apache#7201)

* Fix logic while updating functions. Cleanup should only happen for temp files that were generated

* Fix integration tests

* Fix test

* Fix logic for parallelism > 1

* Fix test

* Address comments

Co-authored-by: Sanjeev Kulkarni <[email protected]>
  • Loading branch information
srkukarni and Sanjeev Kulkarni authored Jun 10, 2020
1 parent e64d951 commit 828d033
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public class RuntimeSpawner implements AutoCloseable {
private final InstanceConfig instanceConfig;
@Getter
private final RuntimeFactory runtimeFactory;
private final String codeFile;
private final String originalCodeFileName;

@Getter
Expand All @@ -62,7 +61,6 @@ public RuntimeSpawner(InstanceConfig instanceConfig,
RuntimeFactory containerFactory, long instanceLivenessCheckFreqMs) {
this.instanceConfig = instanceConfig;
this.runtimeFactory = containerFactory;
this.codeFile = codeFile;
this.originalCodeFileName = originalCodeFileName;
this.numRestarts = 0;
this.instanceLivenessCheckFreqMs = instanceLivenessCheckFreqMs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,10 @@ public void registerFunction(final String tenant,
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
updateRequest(functionMetaDataBuilder.build());
} finally {

if (!(functionPkgUrl != null && functionPkgUrl.startsWith(Utils.FILE))
&& componentPackageFile != null && componentPackageFile.exists()) {
componentPackageFile.delete();
if (componentPackageFile != null && componentPackageFile.exists()) {
if (functionPkgUrl == null || !functionPkgUrl.startsWith(Utils.FILE)) {
componentPackageFile.delete();
}
}
}
}
Expand Down Expand Up @@ -419,9 +419,10 @@ public void updateFunction(final String tenant,

updateRequest(functionMetaDataBuilder.build());
} finally {
if (!(functionPkgUrl != null && functionPkgUrl.startsWith(Utils.FILE))
&& componentPackageFile != null && componentPackageFile.exists()) {
componentPackageFile.delete();
if (componentPackageFile != null && componentPackageFile.exists()) {
if ((functionPkgUrl != null && !functionPkgUrl.startsWith(Utils.FILE)) || uploadedInputStream != null) {
componentPackageFile.delete();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,10 @@ public void registerSink(final String tenant,
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
updateRequest(functionMetaDataBuilder.build());
} finally {

if (!(sinkPkgUrl != null && sinkPkgUrl.startsWith(Utils.FILE))
&& componentPackageFile != null && componentPackageFile.exists()) {
componentPackageFile.delete();
if (componentPackageFile != null && componentPackageFile.exists()) {
if (sinkPkgUrl == null || !sinkPkgUrl.startsWith(Utils.FILE)) {
componentPackageFile.delete();
}
}
}
}
Expand Down Expand Up @@ -419,9 +419,10 @@ public void updateSink(final String tenant,

updateRequest(functionMetaDataBuilder.build());
} finally {
if (!(sinkPkgUrl != null && sinkPkgUrl.startsWith(Utils.FILE))
&& componentPackageFile != null && componentPackageFile.exists()) {
componentPackageFile.delete();
if (componentPackageFile != null && componentPackageFile.exists()) {
if ((sinkPkgUrl != null && !sinkPkgUrl.startsWith(Utils.FILE)) || uploadedInputStream != null) {
componentPackageFile.delete();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,10 @@ public void registerSource(final String tenant,
functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
updateRequest(functionMetaDataBuilder.build());
} finally {

if (!(sourcePkgUrl != null && sourcePkgUrl.startsWith(Utils.FILE))
&& componentPackageFile != null && componentPackageFile.exists()) {
componentPackageFile.delete();
if (componentPackageFile != null && componentPackageFile.exists()) {
if (sourcePkgUrl == null || !sourcePkgUrl.startsWith(Utils.FILE)) {
componentPackageFile.delete();
}
}
}
}
Expand Down Expand Up @@ -416,9 +416,10 @@ public void updateSource(final String tenant,

updateRequest(functionMetaDataBuilder.build());
} finally {
if (!(sourcePkgUrl != null && sourcePkgUrl.startsWith(Utils.FILE))
&& componentPackageFile != null && componentPackageFile.exists()) {
componentPackageFile.delete();
if (componentPackageFile != null && componentPackageFile.exists()) {
if ((sourcePkgUrl != null && !sourcePkgUrl.startsWith(Utils.FILE)) || uploadedInputStream != null) {
componentPackageFile.delete();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1639,6 +1639,12 @@ private void testExclamationFunction(Runtime runtime,
// get function stats
getFunctionStats(functionName, numMessages);

// update parallelism
updateFunctionParallelism(functionName, 2);

//get function status
getFunctionStatus(functionName, 0, true, 2);

// delete function
deleteFunction(functionName);

Expand Down Expand Up @@ -1756,6 +1762,22 @@ private static <T> void submitFunction(Runtime runtime,
ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), inputTopicSchema);
}

private static void updateFunctionParallelism(String functionName, int parallelism) throws Exception {

CommandGenerator generator = new CommandGenerator();
generator.setFunctionName(functionName);
generator.setParallelism(parallelism);
String command = generator.generateUpdateFunctionCommand();

log.info("---------- Function command: {}", command);
String[] commands = {
"sh", "-c", command
};
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
commands);
assertTrue(result.getStdout().contains("\"Updated successfully\""));
}

private static <T> void submitFunction(Runtime runtime,
String inputTopicName,
String outputTopicName,
Expand Down Expand Up @@ -1948,6 +1970,11 @@ private static void checkSubscriptionsCleanup(String topic) throws Exception {
}

private static void getFunctionStatus(String functionName, int numMessages, boolean checkRestarts) throws Exception {
getFunctionStatus(functionName, numMessages, checkRestarts, 1);
}

private static void getFunctionStatus(String functionName, int numMessages, boolean checkRestarts, int parallelism)
throws Exception {
ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(
PulsarCluster.ADMIN_SCRIPT,
"functions",
Expand All @@ -1959,20 +1986,35 @@ private static void getFunctionStatus(String functionName, int numMessages, bool

FunctionStatus functionStatus = FunctionStatus.decode(result.getStdout());

assertEquals(functionStatus.getNumInstances(), 1);
assertEquals(functionStatus.getNumRunning(), 1);
assertEquals(functionStatus.getInstances().size(), 1);
assertEquals(functionStatus.getInstances().get(0).getInstanceId(), 0);
assertTrue(functionStatus.getInstances().get(0).getStatus().getAverageLatency() > 0.0);
assertEquals(functionStatus.getInstances().get(0).getStatus().isRunning(), true);
assertTrue(functionStatus.getInstances().get(0).getStatus().getLastInvocationTime() > 0);
assertEquals(functionStatus.getInstances().get(0).getStatus().getNumReceived(), numMessages);
assertEquals(functionStatus.getInstances().get(0).getStatus().getNumSuccessfullyProcessed(), numMessages);
if (checkRestarts) {
assertEquals(functionStatus.getInstances().get(0).getStatus().getNumRestarts(), 0);
assertEquals(functionStatus.getNumInstances(), parallelism);
assertEquals(functionStatus.getNumRunning(), parallelism);
assertEquals(functionStatus.getInstances().size(), parallelism);
boolean avgLatencyGreaterThanZero = false;
int totalMessagesProcessed = 0;
int totalMessagesSuccessfullyProcessed = 0;
boolean lastInvocationTimeGreaterThanZero = false;
for (int i = 0; i < parallelism; ++i) {
assertEquals(functionStatus.getInstances().get(i).getStatus().isRunning(), true);
assertTrue(functionStatus.getInstances().get(i).getInstanceId() >= 0);
assertTrue(functionStatus.getInstances().get(i).getInstanceId() < parallelism);
avgLatencyGreaterThanZero = avgLatencyGreaterThanZero
|| functionStatus.getInstances().get(i).getStatus().getAverageLatency() > 0.0;
lastInvocationTimeGreaterThanZero = lastInvocationTimeGreaterThanZero
|| functionStatus.getInstances().get(i).getStatus().getLastInvocationTime() > 0;
totalMessagesProcessed += functionStatus.getInstances().get(i).getStatus().getNumReceived();
totalMessagesSuccessfullyProcessed += functionStatus.getInstances().get(i).getStatus().getNumSuccessfullyProcessed();
if (checkRestarts) {
assertEquals(functionStatus.getInstances().get(i).getStatus().getNumRestarts(), 0);
}
assertEquals(functionStatus.getInstances().get(i).getStatus().getLatestUserExceptions().size(), 0);
assertEquals(functionStatus.getInstances().get(i).getStatus().getLatestSystemExceptions().size(), 0);
}
assertEquals(functionStatus.getInstances().get(0).getStatus().getLatestUserExceptions().size(), 0);
assertEquals(functionStatus.getInstances().get(0).getStatus().getLatestSystemExceptions().size(), 0);
if (numMessages > 0) {
assertTrue(avgLatencyGreaterThanZero);
assertTrue(lastInvocationTimeGreaterThanZero);
}
assertEquals(totalMessagesProcessed, numMessages);
assertEquals(totalMessagesSuccessfullyProcessed, numMessages);
}

private static void publishAndConsumeMessages(String inputTopic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,9 @@ public String generateUpdateFunctionCommand(String codeFile) {
if (functionName != null) {
commandBuilder.append(" --name " + functionName);
}
commandBuilder.append(" --className " + functionClassName);
if (functionClassName != null) {
commandBuilder.append(" --className " + functionClassName);
}
if (sourceTopic != null) {
commandBuilder.append(" --inputs " + sourceTopic);
}
Expand Down Expand Up @@ -268,24 +270,26 @@ public String generateUpdateFunctionCommand(String codeFile) {
commandBuilder.append(" --slidingIntervalDurationMs " + slidingIntervalDurationMs);
}

switch (runtime){
case JAVA:
commandBuilder.append(" --jar " + JAVAJAR);
break;
case PYTHON:
if (codeFile != null) {
commandBuilder.append(" --py " + PYTHONBASE + codeFile);
} else {
commandBuilder.append(" --py " + PYTHONBASE);
}
break;
case GO:
if (codeFile != null) {
commandBuilder.append(" --go " + GOBASE + codeFile);
} else {
commandBuilder.append(" --go " + GOBASE);
}
break;
if (codeFile != null) {
switch (runtime) {
case JAVA:
commandBuilder.append(" --jar " + JAVAJAR);
break;
case PYTHON:
if (codeFile != null) {
commandBuilder.append(" --py " + PYTHONBASE + codeFile);
} else {
commandBuilder.append(" --py " + PYTHONBASE);
}
break;
case GO:
if (codeFile != null) {
commandBuilder.append(" --go " + GOBASE + codeFile);
} else {
commandBuilder.append(" --go " + GOBASE);
}
break;
}
}
return commandBuilder.toString();
}
Expand Down

0 comments on commit 828d033

Please sign in to comment.