Skip to content

Commit

Permalink
Add ability to specify runtime flags in functions/sources/sinks (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
srkukarni authored and merlimat committed Mar 26, 2019
1 parent 1921bd9 commit 41d5af7
Show file tree
Hide file tree
Showing 11 changed files with 43 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ public enum Runtime {
PYTHON
}

// Any flags that you want to pass to the runtime.
// note that in thread mode, these flags will have no impact
private String runtimeFlags;

private String tenant;
private String namespace;
private String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,7 @@ public class SinkConfig {
private String archive;
// Whether the subscriptions the functions created/used should be deleted when the functions is deleted
private Boolean cleanupSubscription;

// Any flags that you want to pass to the runtime.
private String runtimeFlags;
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,6 @@ public class SourceConfig {
private Resources resources;

private String archive;
// Any flags that you want to pass to the runtime.
private String runtimeFlags;
}
1 change: 1 addition & 0 deletions pulsar-functions/proto/src/main/proto/Function.proto
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ message FunctionDetails {
Resources resources = 13;
string packageUrl = 14; //present only if function submitted with package-url
RetryDetails retryDetails = 15;
string runtimeFlags = 17;
}

message ConsumerSpec {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ public static List<String> getCmd(InstanceConfig instanceConfig,
"%s-%s",
instanceConfig.getFunctionDetails().getName(),
shardId));
if (!isEmpty(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
args.add(instanceConfig.getFunctionDetails().getRuntimeFlags());
}
if (instanceConfig.getFunctionDetails().getResources() != null) {
Function.Resources resources = instanceConfig.getFunctionDetails().getResources();
if (resources.getRam() != 0) {
Expand All @@ -151,6 +154,9 @@ public static List<String> getCmd(InstanceConfig instanceConfig,
args.add(originalCodeFileName);
} else if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.PYTHON) {
args.add("python");
if (!isEmpty(instanceConfig.getFunctionDetails().getRuntimeFlags())) {
args.add(instanceConfig.getFunctionDetails().getRuntimeFlags());
}
args.add(instanceFile);
args.add("--py");
args.add(originalCodeFileName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader
bldr.setDisk(resources.getDisk());
functionDetailsBuilder.setResources(bldr);

if (!StringUtils.isEmpty(functionConfig.getRuntimeFlags())) {
functionDetailsBuilder.setRuntimeFlags(functionConfig.getRuntimeFlags());
}

return functionDetailsBuilder.build();
}

Expand Down Expand Up @@ -315,6 +319,10 @@ public static FunctionConfig convertFromDetails(FunctionDetails functionDetails)
functionConfig.setResources(resources);
}

if (!isEmpty(functionDetails.getRuntimeFlags())) {
functionConfig.setRuntimeFlags(functionDetails.getRuntimeFlags());
}

return functionConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetail
bldr.setDisk(resources.getDisk());
functionDetailsBuilder.setResources(bldr);

if (isNotBlank(sinkConfig.getRuntimeFlags())) {
functionDetailsBuilder.setRuntimeFlags(sinkConfig.getRuntimeFlags());
}

return functionDetailsBuilder.build();
}

Expand Down Expand Up @@ -248,6 +252,10 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
resources.setDisk(functionDetails.getResources().getDisk());
}

if (isNotBlank(functionDetails.getRuntimeFlags())) {
sinkConfig.setRuntimeFlags(functionDetails.getRuntimeFlags());
}

return sinkConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ public static FunctionDetails convert(SourceConfig sourceConfig, ExtractedSource
bldr.setDisk(resources.getDisk());
functionDetailsBuilder.setResources(bldr);

if (!org.apache.commons.lang3.StringUtils.isEmpty(sourceConfig.getRuntimeFlags())) {
functionDetailsBuilder.setRuntimeFlags(sourceConfig.getRuntimeFlags());
}

return functionDetailsBuilder.build();
}

Expand Down Expand Up @@ -174,6 +178,10 @@ public static SourceConfig convertFromDetails(FunctionDetails functionDetails) {
resources.setDisk(functionDetails.getResources().getDisk());
sourceConfig.setResources(resources);
}

if (!org.apache.commons.lang3.StringUtils.isEmpty(functionDetails.getRuntimeFlags())) {
sourceConfig .setRuntimeFlags(functionDetails.getRuntimeFlags());
}
return sourceConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public void testConvertBackFidelity() {
functionConfig.setUserConfig(new HashMap<>());
functionConfig.setAutoAck(true);
functionConfig.setTimeoutMs(2000l);
functionConfig.setRuntimeFlags("-DKerberos");
Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null);
FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public void testConvertBackFidelity() throws IOException {
sinkConfig.setRetainOrdering(false);
sinkConfig.setAutoAck(true);
sinkConfig.setTimeoutMs(2000l);
sinkConfig.setRuntimeFlags("-DKerberos");
Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
SinkConfig convertedConfig = SinkConfigUtils.convertFromDetails(functionDetails);
assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public void testConvertBackFidelity() throws IOException {
sourceConfig.setTopicName("test-output");
sourceConfig.setSerdeClassName("test-serde");
sourceConfig.setParallelism(1);
sourceConfig.setRuntimeFlags("-DKerberos");
sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
sourceConfig.setConfigs(new HashMap<>());
Function.FunctionDetails functionDetails = SourceConfigUtils.convert(sourceConfig, new SourceConfigUtils.ExtractedSourceDetails(null, null));
Expand Down

0 comments on commit 41d5af7

Please sign in to comment.