Skip to content

Commit

Permalink
Consolidate all inferMissing logic inside ConfigUtils. (apache#2904)
Browse files Browse the repository at this point in the history
* Consolidate all inferMissing logic inside ConfigUtils.

* Removed tests that are already covered in windowconfigutils

* Reverted unneeded change
  • Loading branch information
srkukarni authored Nov 1, 2018
1 parent cd63c84 commit 42c2b0a
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 470 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Objects.isNull;
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.apache.commons.lang.StringUtils.isBlank;
import static org.apache.commons.lang.StringUtils.isNotBlank;
Expand Down Expand Up @@ -67,7 +66,6 @@
import org.apache.pulsar.common.functions.WindowConfig;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.windowing.WindowUtils;

@Slf4j
@Parameters(commandDescription = "Interface for managing Pulsar Functions (lightweight, Lambda-style compute processes that work with Pulsar)")
Expand Down Expand Up @@ -479,7 +477,7 @@ void processArguments() throws Exception {
}

// infer default vaues
inferMissingArguments(functionConfig);
FunctionConfigUtils.inferMissingArguments(functionConfig);

// check if configs are valid
validateFunctionConfigs(functionConfig);
Expand Down Expand Up @@ -514,55 +512,6 @@ protected void validateFunctionConfigs(FunctionConfig functionConfig) {
throw new IllegalArgumentException(e.getMessage());
}
}

private void inferMissingArguments(FunctionConfig functionConfig) {
if (StringUtils.isEmpty(functionConfig.getName())) {
inferMissingFunctionName(functionConfig);
}
if (StringUtils.isEmpty(functionConfig.getTenant())) {
inferMissingTenant(functionConfig);
}
if (StringUtils.isEmpty(functionConfig.getNamespace())) {
inferMissingNamespace(functionConfig);
}

if (functionConfig.getParallelism() == 0) {
functionConfig.setParallelism(1);
}

if (functionConfig.getJar() != null) {
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
} else if (functionConfig.getPy() != null) {
functionConfig.setRuntime(FunctionConfig.Runtime.PYTHON);
}

WindowConfig windowConfig = functionConfig.getWindowConfig();
if (windowConfig != null) {
WindowUtils.inferDefaultConfigs(windowConfig);
functionConfig.setAutoAck(false);
}
}

private void inferMissingFunctionName(FunctionConfig functionConfig) {
if (isNull(functionConfig.getClassName())) {
throw new ParameterException("You must specify a class name for the function");
}

String[] domains = functionConfig.getClassName().split("\\.");
if (domains.length == 0) {
functionConfig.setName(functionConfig.getClassName());
} else {
functionConfig.setName(domains[domains.length - 1]);
}
}

private void inferMissingTenant(FunctionConfig functionConfig) {
functionConfig.setTenant(PUBLIC_TENANT);
}

private void inferMissingNamespace(FunctionConfig functionConfig) {
functionConfig.setNamespace(DEFAULT_NAMESPACE);
}
}

@Parameters(commandDescription = "Run the Pulsar Function locally (rather than deploying it to the Pulsar cluster)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import static org.apache.pulsar.functions.utils.SinkConfigUtils.inferMissingArguments;
import static org.apache.pulsar.functions.utils.Utils.BUILTIN;

import com.beust.jcommander.Parameter;
Expand Down Expand Up @@ -432,15 +433,6 @@ protected Map<String, Object> parseConfigs(String str) {
return new Gson().fromJson(str, type);
}

protected void inferMissingArguments(SinkConfig sinkConfig) {
if (sinkConfig.getTenant() == null) {
sinkConfig.setTenant(PUBLIC_TENANT);
}
if (sinkConfig.getNamespace() == null) {
sinkConfig.setNamespace(DEFAULT_NAMESPACE);
}
}

protected void validateSinkConfigs(SinkConfig sinkConfig) {

if (isBlank(sinkConfig.getArchive())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
import static org.apache.pulsar.functions.utils.SourceConfigUtils.inferMissingArguments;
import static org.apache.pulsar.functions.utils.Utils.BUILTIN;

import com.beust.jcommander.Parameter;
Expand Down Expand Up @@ -391,15 +392,6 @@ protected Map<String, Object> parseConfigs(String str) {
return new Gson().fromJson(str, type);
}

private void inferMissingArguments(SourceConfig sourceConfig) {
if (sourceConfig.getTenant() == null) {
sourceConfig.setTenant(PUBLIC_TENANT);
}
if (sourceConfig.getNamespace() == null) {
sourceConfig.setNamespace(DEFAULT_NAMESPACE);
}
}

protected void validateSourceConfigs(SourceConfig sourceConfig) {
if (isBlank(sourceConfig.getArchive())) {
throw new ParameterException("Source archive not specfied");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@ public class WindowFunctionExecutor<I, O> implements Function<I, O> {
private TimestampExtractor<I> timestampExtractor;
protected transient WaterMarkEventGenerator<I> waterMarkEventGenerator;

protected static final long DEFAULT_MAX_LAG_MS = 0; // no lag
protected static final long DEFAULT_WATERMARK_EVENT_INTERVAL_MS = 1000; // 1s

protected java.util.function.Function<Collection<I>, O> windowFunction;

public void initialize(Context context) {
Expand Down Expand Up @@ -97,8 +94,6 @@ private WindowConfig getWindowConfigs(Context context) {
(new Gson().toJson(context.getUserConfigValue(WindowConfig.WINDOW_CONFIG_KEY).get())),
WindowConfig.class);


WindowUtils.inferDefaultConfigs(windowConfig);
return windowConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,8 @@
*/
package org.apache.pulsar.functions.windowing;

import org.apache.pulsar.common.functions.WindowConfig;

public class WindowUtils {
public static String getFullyQualifiedName(String tenant, String namespace, String name) {
return String.format("%s/%s/%s", tenant, namespace, name);
}

public static void inferDefaultConfigs(WindowConfig windowConfig) {
if (windowConfig.getWindowLengthDurationMs() != null && windowConfig.getSlidingIntervalDurationMs() == null) {
windowConfig.setSlidingIntervalDurationMs(windowConfig.getWindowLengthDurationMs());
}

if (windowConfig.getWindowLengthCount() != null && windowConfig.getSlidingIntervalCount() == null) {
windowConfig.setSlidingIntervalCount(windowConfig.getWindowLengthCount());
}

if (windowConfig.getTimestampExtractorClassName() != null) {
if (windowConfig.getMaxLagMs() == null) {
windowConfig.setMaxLagMs(WindowFunctionExecutor.DEFAULT_MAX_LAG_MS);
}
if (windowConfig.getWatermarkEmitIntervalMs() == null) {
windowConfig.setWatermarkEmitIntervalMs(WindowFunctionExecutor.DEFAULT_WATERMARK_EVENT_INTERVAL_MS);
}
}
}
}
Loading

0 comments on commit 42c2b0a

Please sign in to comment.