Skip to content

Commit

Permalink
add passSourceMessageProperty switch to pulsar functions (apache#6318)
Browse files Browse the repository at this point in the history
Fixes apache#5116 

### Motivation
Based on the request in apache#5116, adding this flag can help reduce ambiguity and increase flexibility. By default, the function passes source message properties.

### Modifications
- add flags in pulsar admin client tools
- update functions proto and functionConfig to contain the flag
- update the pulsar sink to pass source message property if the flag is set
  • Loading branch information
nlu90 authored Feb 13, 2020
1 parent 257d1c5 commit bce14ed
Show file tree
Hide file tree
Showing 10 changed files with 22 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.apache.pulsar.broker.admin.impl.FunctionsBase;

@Path("/functions")
@Api(value = "/functions", description = "Functions admin apis", tags = "functions", hidden = true)
@Produces(MediaType.APPLICATION_JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ abstract class FunctionDetailsCommand extends BaseCommand {
protected Boolean DEPRECATED_retainOrdering;
@Parameter(names = "--retain-ordering", description = "Function consumes and processes messages in order")
protected Boolean retainOrdering;
@Parameter(names = "--forward-source-message-property", description = "Forwarding input message's properties to output topic when processing")
protected Boolean forwardSourceMessageProperty = true;
@Parameter(names = "--subs-name", description = "Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer")
protected String subsName;
@Parameter(names = "--parallelism", description = "The parallelism factor of a Pulsar Function (i.e. the number of function instances to run)")
Expand Down Expand Up @@ -389,10 +391,14 @@ void processArguments() throws Exception {
functionConfig.setProcessingGuarantees(processingGuarantees);
}

if (retainOrdering != null) {
if (null != retainOrdering) {
functionConfig.setRetainOrdering(retainOrdering);
}

if (null != forwardSourceMessageProperty) {
functionConfig.setForwardSourceMessageProperty(forwardSourceMessageProperty);
}

if (isNotBlank(subsName)) {
functionConfig.setSubName(subsName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public enum Runtime {
private String logTopic;
private ProcessingGuarantees processingGuarantees;
private Boolean retainOrdering;
private Boolean forwardSourceMessageProperty;
private Map<String, Object> userConfig;
// This is a map of secretName(aka how the secret is going to be
// accessed in the function via context) to an object that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,8 @@
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.protocol.schema.LatestVersion;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
Expand Down Expand Up @@ -756,6 +754,8 @@ public void setupOutput(ContextImpl contextImpl) throws Exception {
pulsarSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.valueOf(
this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
pulsarSinkConfig.setTopic(sinkSpec.getTopic());
pulsarSinkConfig.setForwardSourceMessageProperty(
this.instanceConfig.getFunctionDetails().getSink().getForwardSourceMessageProperty());

if (!StringUtils.isEmpty(sinkSpec.getSchemaType())) {
pulsarSinkConfig.setSchemaType(sinkSpec.getSchemaType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public void write(Record<T> record) {

msg.value(record.getValue());

if (!record.getProperties().isEmpty()) {
if (!record.getProperties().isEmpty() && pulsarSinkConfig.isForwardSourceMessageProperty()) {
msg.properties(record.getProperties());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ public class PulsarSinkConfig {
private String serdeClassName;
private String schemaType;
private String typeClassName;
private boolean forwardSourceMessageProperty;
}
2 changes: 2 additions & 0 deletions pulsar-functions/proto/src/main/proto/Function.proto
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ message SinkSpec {
* Builtin schema type or custom schema class name
*/
string schemaType = 7;

bool forwardSourceMessageProperty = 8;
}

message PackageLocationMetaData {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class ProcessRuntime implements Runtime {
}

/**
* The core logic that initialize the thread container and executes the function.
* The core logic that initialize the process container and executes the function.
*/
@Override
public void start() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader
if (!StringUtils.isBlank(functionConfig.getOutputSchemaType())) {
sinkSpecBuilder.setSchemaType(functionConfig.getOutputSchemaType());
}
if (functionConfig.getForwardSourceMessageProperty() != null) {
sinkSpecBuilder.setForwardSourceMessageProperty(functionConfig.getForwardSourceMessageProperty());
}

if (typeArgs != null) {
sinkSpecBuilder.setTypeClassName(typeArgs[1].getName());
Expand Down Expand Up @@ -291,6 +294,7 @@ public static FunctionConfig convertFromDetails(FunctionDetails functionDetails)
if (!isEmpty(functionDetails.getLogTopic())) {
functionConfig.setLogTopic(functionDetails.getLogTopic());
}
functionConfig.setForwardSourceMessageProperty(functionDetails.getSink().getForwardSourceMessageProperty());
functionConfig.setRuntime(FunctionCommon.convertRuntime(functionDetails.getRuntime()));
functionConfig.setProcessingGuarantees(FunctionCommon.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
if (functionDetails.hasRetryDetails()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public void testConvertBackFidelity() {
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
functionConfig.setRetainOrdering(false);
functionConfig.setForwardSourceMessageProperty(true);
functionConfig.setUserConfig(new HashMap<>());
functionConfig.setAutoAck(true);
functionConfig.setTimeoutMs(2000l);
Expand Down Expand Up @@ -92,6 +93,7 @@ public void testConvertWindow() {
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
functionConfig.setRetainOrdering(false);
functionConfig.setForwardSourceMessageProperty(true);
functionConfig.setUserConfig(new HashMap<>());
functionConfig.setAutoAck(true);
functionConfig.setTimeoutMs(2000l);
Expand Down Expand Up @@ -419,6 +421,7 @@ private FunctionConfig createFunctionConfig() {
functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
functionConfig.setRetainOrdering(false);
functionConfig.setForwardSourceMessageProperty(false);
functionConfig.setUserConfig(new HashMap<>());
functionConfig.setAutoAck(true);
functionConfig.setTimeoutMs(2000l);
Expand Down

0 comments on commit bce14ed

Please sign in to comment.