Skip to content

Commit

Permalink
Update Function Semantics (apache#2985)
Browse files Browse the repository at this point in the history
* Make update functions better

* Compiled

* more checks

* bug fix

* Added tests

* Tests pass

* Fixed tests

* Fixed tests

* Added tests

* Added unittests

* Fixed unittest

* Fixed unittest

* Fixed unittest

* Timeout fix

* Fixed unittest

* Fix unittest

* Addressed feedback
  • Loading branch information
srkukarni authored Nov 21, 2018
1 parent 7efce98 commit 1b0589b
Show file tree
Hide file tree
Showing 25 changed files with 1,541 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ public void testFunctionAssignmentsWithRestart() throws Exception {
final String namespacePortion = "assignment-test";
final String replNamespace = tenant + "/" + namespacePortion;
final String sinkTopic = "persistent://" + replNamespace + "/my-topic1";
final String logTopic = "persistent://" + replNamespace + "/log-topic";
final String baseFunctionName = "assign-restart";
final String subscriptionName = "test-sub";
final int totalFunctions = 5;
Expand All @@ -240,8 +241,7 @@ public void testFunctionAssignmentsWithRestart() throws Exception {
functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
"my.*", sinkTopic, subscriptionName);
functionConfig.setParallelism(parallelism);
// set-auto-ack prop =true
functionConfig.setAutoAck(true);
// don't set any log topic
admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
}
retryStrategically((test) -> {
Expand All @@ -263,8 +263,8 @@ public void testFunctionAssignmentsWithRestart() throws Exception {
functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
"my.*", sinkTopic, subscriptionName);
functionConfig.setParallelism(parallelism);
// set-auto-ack prop =false
functionConfig.setAutoAck(false);
// Now set the log topic
functionConfig.setLogTopic(logTopic);
admin.functions().updateFunctionWithUrl(functionConfig, jarFilePathUrl);
}

Expand Down Expand Up @@ -308,7 +308,7 @@ public void testFunctionAssignmentsWithRestart() throws Exception {
// validate updated function prop = auto-ack=false and instnaceid
for (int i = 0; i < (totalFunctions - totalDeletedFunction); i++) {
String functionName = baseFunctionName + i;
assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).getAutoAck());
assertEquals(admin.functions().getFunction(tenant, namespacePortion, functionName).getLogTopic(), logTopic);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.pulsar.admin.cli;

import com.google.gson.Gson;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -35,8 +34,6 @@
import org.apache.pulsar.admin.cli.CmdFunctions.RestartFunction;
import org.apache.pulsar.admin.cli.CmdFunctions.StopFunction;
import org.apache.pulsar.admin.cli.CmdFunctions.UpdateFunction;
import org.apache.pulsar.admin.cli.CmdSinks.CreateSink;
import org.apache.pulsar.admin.cli.CmdSources.CreateSource;
import org.apache.pulsar.client.admin.Functions;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
Expand All @@ -60,10 +57,8 @@
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -205,7 +200,7 @@ public void testCreateFunction() throws Exception {
assertEquals(fnName, creater.getFunctionName());
assertEquals(inputTopicName, creater.getInputs());
assertEquals(outputTopicName, creater.getOutput());
assertEquals(false, creater.isAutoAck());
assertEquals(new Boolean(false), creater.getAutoAck());

verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,14 +287,14 @@ abstract class FunctionDetailsCommand extends BaseCommand {
@Parameter(names = "--autoAck", description = "Whether or not the framework will automatically acknowleges messages", hidden = true)
protected Boolean DEPRECATED_autoAck = null;
@Parameter(names = "--auto-ack", description = "Whether or not the framework will automatically acknowleges messages", arity = 1)
protected boolean autoAck = true;
protected Boolean autoAck;
// for backwards compatibility purposes
@Parameter(names = "--timeoutMs", description = "The message timeout in milliseconds", hidden = true)
protected Long DEPRECATED_timeoutMs;
@Parameter(names = "--timeout-ms", description = "The message timeout in milliseconds")
protected Long timeoutMs;
@Parameter(names = "--max-message-retries", description = "How many times should we try to process a message before giving up")
protected Integer maxMessageRetries = -1;
protected Integer maxMessageRetries;
@Parameter(names = "--dead-letter-topic", description = "The topic where all messages which could not be processed successfully are sent")
protected String deadLetterTopic;
protected FunctionConfig functionConfig;
Expand Down Expand Up @@ -403,21 +403,29 @@ void processArguments() throws Exception {
}

Resources resources = functionConfig.getResources();
if (resources == null) {
resources = new Resources();
}
if (cpu != null) {
if (resources == null) {
resources = new Resources();
}
resources.setCpu(cpu);
}

if (ram != null) {
if (resources == null) {
resources = new Resources();
}
resources.setRam(ram);
}

if (disk != null) {
if (resources == null) {
resources = new Resources();
}
resources.setDisk(disk);
}
functionConfig.setResources(resources);
if (resources != null) {
functionConfig.setResources(resources);
}

if (timeoutMs != null) {
functionConfig.setTimeoutMs(timeoutMs);
Expand Down Expand Up @@ -452,7 +460,9 @@ void processArguments() throws Exception {

functionConfig.setWindowConfig(windowConfig);

functionConfig.setAutoAck(autoAck);
if (autoAck != null) {
functionConfig.setAutoAck(autoAck);
}

if (null != maxMessageRetries) {
functionConfig.setMaxMessageRetries(maxMessageRetries);
Expand Down Expand Up @@ -711,6 +721,24 @@ void runCmd() throws Exception {

@Parameters(commandDescription = "Update a Pulsar Function that's been deployed to a Pulsar cluster")
class UpdateFunction extends FunctionDetailsCommand {

@Override
protected void validateFunctionConfigs(FunctionConfig functionConfig) {
if (StringUtils.isEmpty(functionConfig.getClassName())) {
if (StringUtils.isEmpty(functionConfig.getName())) {
throw new IllegalArgumentException("Function Name not provided");
}
} else if (StringUtils.isEmpty(functionConfig.getName())) {
org.apache.pulsar.common.functions.Utils.inferMissingFunctionName(functionConfig);
}
if (StringUtils.isEmpty(functionConfig.getTenant())) {
org.apache.pulsar.common.functions.Utils.inferMissingTenant(functionConfig);
}
if (StringUtils.isEmpty(functionConfig.getNamespace())) {
org.apache.pulsar.common.functions.Utils.inferMissingNamespace(functionConfig);
}
}

@Override
void runCmd() throws Exception {
if (Utils.isFunctionPackageUrlSupported(functionConfig.getJar())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ void runCmd() throws Exception {
}
print("Updated successfully");
}

protected void validateSinkConfigs(SinkConfig sinkConfig) {
org.apache.pulsar.common.functions.Utils.inferMissingArguments(sinkConfig);
}
}

abstract class SinkDetailsCommand extends BaseCommand {
Expand Down Expand Up @@ -253,7 +257,7 @@ abstract class SinkDetailsCommand extends BaseCommand {
@Parameter(names = "--retainOrdering", description = "Sink consumes and sinks messages in order", hidden = true)
protected Boolean DEPRECATED_retainOrdering;
@Parameter(names = "--retain-ordering", description = "Sink consumes and sinks messages in order")
protected boolean retainOrdering;
protected Boolean retainOrdering;
@Parameter(names = "--parallelism", description = "The sink's parallelism factor (i.e. the number of sink instances to run)")
protected Integer parallelism;
@Parameter(names = {"-a", "--archive"}, description = "Path to the archive file for the sink. It also supports url-path [http/https/file (file protocol assumes that file already exists on worker host)] from which worker can download the package.", listConverter = StringConverter.class)
Expand All @@ -280,7 +284,7 @@ abstract class SinkDetailsCommand extends BaseCommand {
@Parameter(names = "--sink-config", description = "User defined configs key/values")
protected String sinkConfigString;
@Parameter(names = "--auto-ack", description = "Whether or not the framework will automatically acknowleges messages", arity = 1)
protected boolean autoAck = true;
protected Boolean autoAck;
@Parameter(names = "--timeout-ms", description = "The message timeout in milliseconds")
protected Long timeoutMs;

Expand Down Expand Up @@ -329,7 +333,9 @@ void processArguments() throws Exception {
sinkConfig.setProcessingGuarantees(processingGuarantees);
}

sinkConfig.setRetainOrdering(retainOrdering);
if (retainOrdering != null) {
sinkConfig.setRetainOrdering(retainOrdering);
}

if (null != inputs) {
sinkConfig.setInputs(Arrays.asList(inputs.split(",")));
Expand Down Expand Up @@ -371,27 +377,37 @@ void processArguments() throws Exception {
}

Resources resources = sinkConfig.getResources();
if (resources == null) {
resources = new Resources();
}
if (cpu != null) {
if (resources == null) {
resources = new Resources();
}
resources.setCpu(cpu);
}

if (ram != null) {
if (resources == null) {
resources = new Resources();
}
resources.setRam(ram);
}

if (disk != null) {
if (resources == null) {
resources = new Resources();
}
resources.setDisk(disk);
}
sinkConfig.setResources(resources);
if (resources != null) {
sinkConfig.setResources(resources);
}

if (null != sinkConfigString) {
sinkConfig.setConfigs(parseConfigs(sinkConfigString));
}

sinkConfig.setAutoAck(autoAck);
if (autoAck != null) {
sinkConfig.setAutoAck(autoAck);
}
if (timeoutMs != null) {
sinkConfig.setTimeoutMs(timeoutMs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ void runCmd() throws Exception {
}
print("Updated successfully");
}

protected void validateSourceConfigs(SourceConfig sourceConfig) {
org.apache.pulsar.common.functions.Utils.inferMissingArguments(sourceConfig);
}
}

abstract class SourceDetailsCommand extends BaseCommand {
Expand Down Expand Up @@ -337,21 +341,29 @@ void processArguments() throws Exception {
}

Resources resources = sourceConfig.getResources();
if (resources == null) {
resources = new Resources();
}
if (cpu != null) {
if (resources == null) {
resources = new Resources();
}
resources.setCpu(cpu);
}

if (ram != null) {
if (resources == null) {
resources = new Resources();
}
resources.setRam(ram);
}

if (disk != null) {
if (resources == null) {
resources = new Resources();
}
resources.setDisk(disk);
}
sourceConfig.setResources(resources);
if (resources != null) {
sourceConfig.setResources(resources);
}

if (null != sourceConfigString) {
sourceConfig.setConfigs(parseConfigs(sourceConfigString));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ public SinkConfig getSinkConfig() {
sinkConfig.setTenant(TENANT);
sinkConfig.setNamespace(NAMESPACE);
sinkConfig.setName(NAME);
sinkConfig.setAutoAck(true);

sinkConfig.setInputs(INPUTS_LIST);
sinkConfig.setTopicToSerdeClassName(CUSTOM_SERDE_INPUT_MAP);
Expand Down Expand Up @@ -450,7 +449,7 @@ public void testCmdSinkConfigFileMissingResources() throws Exception {
testSinkConfig.setResources(null);

SinkConfig expectedSinkConfig = getSinkConfig();
expectedSinkConfig.setResources(new Resources());
expectedSinkConfig.setResources(null);
testCmdSinkConfigFile(testSinkConfig, expectedSinkConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ public void testCmdSourceConfigFileMissingResources() throws Exception {
testSourceConfig.setResources(null);

SourceConfig expectedSourceConfig = getSourceConfig();
expectedSourceConfig.setResources(new Resources());
expectedSourceConfig.setResources(null);
testCmdSourceConfigFile(testSourceConfig, expectedSourceConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@
*/
package org.apache.pulsar.common.functions;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.*;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
public class ConsumerConfig {
private String schemaType;
private String serdeClassName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,16 @@
import java.util.Map;
import java.util.TreeMap;

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.WindowConfig;
import lombok.*;

@Getter
@Setter
@Data
@EqualsAndHashCode
@ToString
@Builder(toBuilder=true)
@NoArgsConstructor
@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class FunctionConfig {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
@ToString
@AllArgsConstructor
@NoArgsConstructor
@Builder(toBuilder=true)
public class Resources {
// Default cpu is 1 core
private Double cpu = 1d;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@
*/
package org.apache.pulsar.common.functions;

import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.*;
import lombok.experimental.Accessors;

@Data
Expand Down
Loading

0 comments on commit 1b0589b

Please sign in to comment.