Skip to content

Commit

Permalink
[fix apache#9851] Add forwardSourceMessageProperty to SourceConfig (a…
Browse files Browse the repository at this point in the history
…pache#9907)

Fixes apache#9851

### Motivation

Pulsar IO Source connector cannot pass message properties to destination topic, as apache#9851 discussed, it is a bug that `forwardSourceMessageProperty` is not applied to Source connector properly. 

### Modifications

- add `forwardSourceMessageProperty` to `SourceConfig`
- add set `forwardSourceMessageProperty` from pulsar admin client
- add related logic in `SourceConfigUtils`
- add integration test
  • Loading branch information
freeznet authored Mar 16, 2021
1 parent 960a79e commit d0249e5
Show file tree
Hide file tree
Showing 8 changed files with 264 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ abstract class SourceDetailsCommand extends BaseCommand {
protected String sourceConfigString;
@Parameter(names = "--custom-runtime-options", description = "A string that encodes options to customize the runtime, see docs for configured runtime for details")
protected String customRuntimeOptions;
@Parameter(names = "--forward-source-message-property", description = "Forwarding input message's properties to output topic when processing")
protected Boolean forwardSourceMessageProperty = true;

protected SourceConfig sourceConfig;

Expand Down Expand Up @@ -419,6 +421,10 @@ void processArguments() throws Exception {
if (customRuntimeOptions != null) {
sourceConfig.setCustomRuntimeOptions(customRuntimeOptions);
}

if (null != forwardSourceMessageProperty) {
sourceConfig.setForwardSourceMessageProperty(forwardSourceMessageProperty);
}
// check if source configs are valid
validateSourceConfigs(sourceConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public IObjectFactory getObjectFactory() {
private static final Long RAM = 1024L * 1024L;
private static final Long DISK = 1024L * 1024L * 1024L;
private static final String SINK_CONFIG_STRING = "{\"created_at\":\"Mon Jul 02 00:33:15 +0000 2018\"}";
private static final boolean FORWARD_PROPERTIES = true;

private PulsarAdmin pulsarAdmin;
private Sources source;
Expand Down Expand Up @@ -114,6 +115,7 @@ public SourceConfig getSourceConfig() {
sourceConfig.setArchive(JAR_FILE_PATH);
sourceConfig.setResources(new Resources(CPU, RAM, DISK));
sourceConfig.setConfigs(createSource.parseConfigs(SINK_CONFIG_STRING));
sourceConfig.setForwardSourceMessageProperty(FORWARD_PROPERTIES);
return sourceConfig;
}

Expand Down Expand Up @@ -575,26 +577,32 @@ public void testUpdateSource() throws Exception {

updateSource.archive = "new-archive";

updateSource.forwardSourceMessageProperty = true;

updateSource.processArguments();

updateSource.runCmd();


verify(source).updateSource(eq(SourceConfig.builder()
.tenant(PUBLIC_TENANT)
.namespace(DEFAULT_NAMESPACE)
.name(updateSource.name)
.archive(updateSource.archive)
.forwardSourceMessageProperty(true)
.build()), eq(updateSource.archive), eq(new UpdateOptions()));


updateSource.archive = null;

updateSource.parallelism = 2;

updateSource.processArguments();

updateSource.updateAuthData = true;

updateSource.forwardSourceMessageProperty = false;

updateSource.processArguments();

UpdateOptions updateOptions = new UpdateOptions();
updateOptions.setUpdateAuthData(true);

Expand All @@ -605,6 +613,7 @@ public void testUpdateSource() throws Exception {
.namespace(DEFAULT_NAMESPACE)
.name(updateSource.name)
.parallelism(2)
.forwardSourceMessageProperty(false)
.build()), eq(null), eq(updateOptions));


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,6 @@ public class SourceConfig {
private BatchSourceConfig batchSourceConfig;
// batchBuilder provides two types of batch construction methods, DEFAULT and KEY_BASED
private String batchBuilder;

private Boolean forwardSourceMessageProperty;
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ public static FunctionDetails convert(SourceConfig sourceConfig, ExtractedSource
sinkSpecBuilder.setProducerSpec(ProducerConfigUtils.convert(sourceConfig.getProducerConfig()));
}

if (sourceConfig.getForwardSourceMessageProperty() == Boolean.TRUE) {
sinkSpecBuilder.setForwardSourceMessageProperty(sourceConfig.getForwardSourceMessageProperty());
} else {
sinkSpecBuilder.setForwardSourceMessageProperty(false);
}

functionDetailsBuilder.setSink(sinkSpecBuilder);

// use default resources if resources not set
Expand Down Expand Up @@ -236,6 +242,8 @@ public static SourceConfig convertFromDetails(FunctionDetails functionDetails) {
sourceConfig.setCustomRuntimeOptions(functionDetails.getCustomRuntimeOptions());
}

sourceConfig.setForwardSourceMessageProperty(sinkSpec.getForwardSourceMessageProperty());

return sourceConfig;
}

Expand Down Expand Up @@ -392,6 +400,9 @@ public static SourceConfig validateUpdate(SourceConfig existingConfig, SourceCon
validateBatchSourceConfigUpdate(existingConfig.getBatchSourceConfig(), newConfig.getBatchSourceConfig());
mergedConfig.setBatchSourceConfig(newConfig.getBatchSourceConfig());
}
if (newConfig.getForwardSourceMessageProperty() != null) {
mergedConfig.setForwardSourceMessageProperty(newConfig.getForwardSourceMessageProperty());
}
return mergedConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ private SourceConfig createSourceConfig() {
sourceConfig.setParallelism(1);
sourceConfig.setRuntimeFlags("-DKerberos");
sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
sourceConfig.setForwardSourceMessageProperty(true);

Map<String, String> consumerConfigs = new HashMap<>();
consumerConfigs.put("security.protocal", "SASL_PLAINTEXT");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.io;

import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.io.core.SourceContext;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

public class TestPropertySource implements Source<String> {

@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
}

@Override
public Record<String> read() throws Exception {
Thread.sleep(50);
return new Record<String>() {
@Override
public Optional<String> getKey() {
return Optional.empty();
}

@Override
public String getValue() {
return "property";
}
@Override
public Map<String, String> getProperties() {
HashMap<String, String> props = new HashMap<String, String>();
props.put("hello", "world");
props.put("foo", "bar");
return props;
}
};
}

@Override
public void close() throws Exception {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.io;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.functions.FunctionState;
import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.common.policies.data.SourceStatus;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.JAVAJAR;
import static org.apache.pulsar.tests.integration.suites.PulsarTestSuite.retryStrategically;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

/**
* Source Property related test cases.
*/
@Slf4j
public class PulsarSourcePropertyTest extends PulsarStandaloneTestSuite {
@Test(groups = {"source"})
public void testSourceProperty() throws Exception {
String outputTopicName = "test-source-property-input-" + randomName(8);
String sourceName = "test-source-property-" + randomName(8);
submitSourceConnector(sourceName, outputTopicName, "org.apache.pulsar.tests.integration.io.TestPropertySource", JAVAJAR);

// get source info
getSourceInfoSuccess(sourceName);

// get source status
getSourceStatus(sourceName);

try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(container.getHttpServiceUrl()).build()) {

Awaitility.await().ignoreExceptions().untilAsserted(() -> {
SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
assertEquals(status.getInstances().size(), 1);
assertTrue(status.getInstances().get(0).getStatus().numWritten > 0);
});
}

@Cleanup PulsarClient client = PulsarClient.builder()
.serviceUrl(container.getPlainTextServiceUrl())
.build();
@Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic(outputTopicName)
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionName("test-sub")
.subscribe();

for (int i = 0; i < 10; i++) {
Message<String> msg = consumer.receive();
assertEquals(msg.getValue(), "property");
assertEquals(msg.getProperty("hello"), "world");
assertEquals(msg.getProperty("foo"), "bar");
}

// delete source
deleteSource(sourceName);

getSourceInfoNotFound(sourceName);
}

private void submitSourceConnector(String sourceName,
String outputTopicName,
String className,
String archive) throws Exception {
String[] commands = {
PulsarCluster.ADMIN_SCRIPT,
"sources", "create",
"--name", sourceName,
"--destinationTopicName", outputTopicName,
"--archive", archive,
"--classname", className
};
log.info("Run command : {}", StringUtils.join(commands, ' '));
ContainerExecResult result = container.execCmd(commands);
assertTrue(
result.getStdout().contains("\"Created successfully\""),
result.getStdout());
}

private void getSourceInfoSuccess(String sourceName) throws Exception {
ContainerExecResult result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
"sources",
"get",
"--tenant", "public",
"--namespace", "default",
"--name", sourceName
);
assertTrue(result.getStdout().contains("\"name\": \"" + sourceName + "\""));
}

private void getSourceStatus(String sourceName) throws Exception {
ContainerExecResult result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
"sources",
"status",
"--tenant", "public",
"--namespace", "default",
"--name", sourceName
);
assertTrue(result.getStdout().contains("\"running\" : true"));
}

private void deleteSource(String sourceName) throws Exception {
ContainerExecResult result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
"sources",
"delete",
"--tenant", "public",
"--namespace", "default",
"--name", sourceName
);
assertTrue(result.getStdout().contains("Delete source successfully"));
result.assertNoStderr();
}

private void getSourceInfoNotFound(String sourceName) throws Exception {
try {
container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
"sources",
"get",
"--tenant", "public",
"--namespace", "default",
"--name", sourceName);
fail("Command should have exited with non-zero");
} catch (ContainerExecException e) {
assertTrue(e.getResult().getStderr().contains("Reason: Source " + sourceName + " doesn't exist"));
}
}
}
1 change: 1 addition & 0 deletions tests/integration/src/test/resources/pulsar-function.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
<classes>
<class name="org.apache.pulsar.tests.integration.functions.PulsarStateTest" />
<class name="org.apache.pulsar.tests.integration.io.GenericRecordSourceTest" />
<class name="org.apache.pulsar.tests.integration.io.PulsarSourcePropertyTest" />
</classes>
</test>
</suite>

0 comments on commit d0249e5

Please sign in to comment.