Skip to content

Commit

Permalink
Avoid creating output topic on tenant namespace if output-topic not p…
Browse files Browse the repository at this point in the history
…rovided (apache#2261)

* Avoid creating output topic on tenant namespace if output-topic not provided

fix test

add flag to skip output topic

rename skip-output cmd

* fix test
  • Loading branch information
rdhabalia authored Aug 7, 2018
1 parent b1b6702 commit 14765b2
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertNull;

/**
* Unit test of {@link CmdFunctions}.
Expand Down Expand Up @@ -434,20 +435,46 @@ public void testCreateWithoutFunctionName() throws Exception {
}

@Test
public void testCreateWithoutOutputTopic() throws Exception {
public void testCreateWithoutOutputTopicWithSkipFlag() throws Exception {
String inputTopicName = TEST_NAME + "-input-topic";
cmd.run(new String[] {
"create",
"--inputs", inputTopicName,
"--skip-output",
"--jar", "SomeJar.jar",
"--tenant", "sample",
"--namespace", "ns1",
"--className", DummyFunction.class.getName(),
});

CreateFunction creater = cmd.getCreater();
assertEquals(inputTopicName + "-" + "CmdFunctionsTest$DummyFunction" + "-output", creater.getFunctionConfig().getOutput());
assertNull(creater.getFunctionConfig().getOutput());
verify(functions, times(1)).createFunction(any(FunctionDetails.class), anyString());

}


@Test
public void testCreateWithoutOutputTopic() throws Exception {

ConsoleOutputCapturer consoleOutputCapturer = new ConsoleOutputCapturer();
consoleOutputCapturer.start();

String inputTopicName = TEST_NAME + "-input-topic";
cmd.run(new String[] {
"create",
"--inputs", inputTopicName,
"--jar", "SomeJar.jar",
"--tenant", "sample",
"--namespace", "ns1",
"--className", DummyFunction.class.getName(),
});

CreateFunction creater = cmd.getCreater();
consoleOutputCapturer.stop();
String output = consoleOutputCapturer.getStderr();
assertNull(creater.getFunctionConfig().getOutput());
assertTrue(output.contains("output topic is not present"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,10 @@ abstract class FunctionDetailsCommand extends BaseCommand {
protected String DEPRECATED_topicsPattern;
@Parameter(names = "--topics-pattern", description = "The topic pattern to consume from list of topics under a namespace that match the pattern. [--input] and [--topic-pattern] are mutually exclusive. Add SerDe class name for a pattern in --custom-serde-inputs (supported for java fun only)")
protected String topicsPattern;
@Parameter(names = "--output", description = "The function's output topic")
@Parameter(names = "--output", description = "The function's output topic (use skipOutput flag to skip output topic)")
protected String output;
@Parameter(names = "--skip-output", description = "Skip publishing function output to output topic")
protected boolean skipOutput;
// for backwards compatibility purposes
@Parameter(names = "--logTopic", description = "The topic to which the function's logs are produced", hidden = true)
protected String DEPRECATED_logTopic;
Expand Down Expand Up @@ -367,6 +369,7 @@ void processArguments() throws Exception {
if (null != output) {
functionConfig.setOutput(output);
}
functionConfig.setSkipOutput(skipOutput);
if (null != logTopic) {
functionConfig.setLogTopic(logTopic);
}
Expand Down Expand Up @@ -461,6 +464,11 @@ void processArguments() throws Exception {
}

protected void validateFunctionConfigs(FunctionConfig functionConfig) {

if (isBlank(functionConfig.getOutput()) && !functionConfig.isSkipOutput()) {
throw new ParameterException(
"output topic is not present (pass skipOutput flag to skip publish output on topic)");
}

if (isNotBlank(functionConfig.getJar()) && isNotBlank(functionConfig.getPy())) {
throw new ParameterException("Either a Java jar or a Python file needs to"
Expand Down Expand Up @@ -534,9 +542,6 @@ private void inferMissingArguments(FunctionConfig functionConfig) {
if (StringUtils.isEmpty(functionConfig.getNamespace())) {
inferMissingNamespace(functionConfig);
}
if (StringUtils.isEmpty(functionConfig.getOutput())) {
inferMissingOutput(functionConfig);
}

if (functionConfig.getParallelism() == 0) {
functionConfig.setParallelism(1);
Expand Down Expand Up @@ -581,17 +586,6 @@ private void inferMissingNamespace(FunctionConfig functionConfig) {
functionConfig.setNamespace(DEFAULT_NAMESPACE);
}

private void inferMissingOutput(FunctionConfig functionConfig) {
try {
String inputTopic = getUniqueInput(functionConfig);
String outputTopic = String.format("%s-%s-output", inputTopic, functionConfig.getName());
functionConfig.setOutput(outputTopic);
} catch (IllegalArgumentException ex) {
// It might be that we really don't need an output topic
// So we cannot really throw an exception
}
}

private String getUniqueInput(FunctionConfig functionConfig) {
if (functionConfig.getInputs().size() + functionConfig.getCustomSerdeInputs().size() != 1) {
throw new IllegalArgumentException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.Builder;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.sink.PulsarSinkConfig;
import org.apache.pulsar.functions.sink.PulsarSinkDisable;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.source.PulsarSourceConfig;
Expand Down Expand Up @@ -560,19 +561,19 @@ public void setupOutput(ContextImpl contextImpl) throws Exception {
Object object;
// If sink classname is not set, we default pulsar sink
if (sinkSpec.getClassName().isEmpty()) {
PulsarSinkConfig pulsarSinkConfig = new PulsarSinkConfig();
pulsarSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.valueOf(
this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
pulsarSinkConfig.setTopic(sinkSpec.getTopic());
pulsarSinkConfig.setSerDeClassName(sinkSpec.getSerDeClassName());
pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());

Object[] params = {this.client, pulsarSinkConfig};
Class[] paramTypes = {PulsarClient.class, PulsarSinkConfig.class};
if (StringUtils.isEmpty(sinkSpec.getTopic())) {
object = PulsarSinkDisable.INSTANCE;
} else {
PulsarSinkConfig pulsarSinkConfig = new PulsarSinkConfig();
pulsarSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees
.valueOf(this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
pulsarSinkConfig.setTopic(sinkSpec.getTopic());
pulsarSinkConfig.setSerDeClassName(sinkSpec.getSerDeClassName());
pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());
object = new PulsarSink(this.client, pulsarSinkConfig);
}

object = Reflections.createInstance(
PulsarSink.class.getName(),
PulsarSink.class.getClassLoader(), params, paramTypes);
} else {
object = Reflections.createInstance(
sinkSpec.getClassName(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.sink;

import java.util.Map;

import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class PulsarSinkDisable<T> implements Sink<T> {

public static final PulsarSinkDisable INSTANCE = new PulsarSinkDisable();

@Override
public void close() throws Exception {
// No-op
}

@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
// No-op
}

@Override
public void write(Record<T> record) throws Exception {
// No-op
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isFileExists;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClasses;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isListEntryCustom;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isMapEntryCustom;
import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isPositiveNumber;
Expand Down Expand Up @@ -80,6 +79,7 @@ public enum Runtime {
private String topicsPattern;
@isValidTopicName
private String output;
private boolean skipOutput;
@isImplementationOfClass(implementsClass = SerDe.class)
private String outputSerdeClassName;
@isValidTopicName
Expand Down

0 comments on commit 14765b2

Please sign in to comment.