Skip to content

Commit

Permalink
ISSUE-5934: Support read/write properties from/to Message in flink pu…
Browse files Browse the repository at this point in the history
…lsar consumer/producer (apache#5955)

Fix apache#5934

Motivation
Support read/write properties from/to Message in flink pulsar consumer/producer, and you can override it in your derived class

Modifications

1. modify `PulsarConsumerSource.deserialize` access right from 'private' to 'protected'.
2. add method `protected Map<String, String> generateProperties(T value)` in class `FlinkPulsarProducer`, and invoked in `TypedMessageBuilder.properties()` to add it in pulsar Message.

* fix unit test failure

Co-authored-by: herodu <[email protected]>
Co-authored-by: Sijie Guo <[email protected]>
Co-authored-by: duli <[email protected]>
  • Loading branch information
4 people authored Mar 26, 2020
1 parent 9d3ab60 commit 5ad3e02
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarProducer;
import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;

/**
Expand Down Expand Up @@ -100,7 +101,8 @@ public static void main(String[] args) throws Exception {
outputTopic,
new AuthenticationDisabled(),
wordWithCount -> wordWithCount.toString().getBytes(UTF_8),
wordWithCount -> wordWithCount.word
wordWithCount -> wordWithCount.word,
null
)).setParallelism(parallelism);
} else {
// print the results with a single thread, rather than in parallel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.flink.util.Preconditions.checkNotNull;

import java.util.function.Function;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SerializationSchema;
Expand All @@ -33,6 +34,7 @@
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
import org.apache.flink.util.SerializableObject;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.MessageId;
Expand Down Expand Up @@ -67,6 +69,11 @@ public class FlinkPulsarProducer<T>
*/
protected final PulsarKeyExtractor<T> flinkPulsarKeyExtractor;

/**
* User-provided properties extractor for assigning a key to a pulsar message.
*/
protected final PulsarPropertiesExtractor<T> flinkPulsarPropertiesExtractor;

/**
* Produce Mode.
*/
Expand Down Expand Up @@ -110,7 +117,8 @@ public FlinkPulsarProducer(String serviceUrl,
String defaultTopicName,
Authentication authentication,
SerializationSchema<T> serializationSchema,
PulsarKeyExtractor<T> keyExtractor) {
PulsarKeyExtractor<T> keyExtractor,
PulsarPropertiesExtractor<T> propertiesExtractor) {
checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url cannot be blank");
checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName cannot be blank");
checkNotNull(authentication, "auth cannot be null, set disabled for no auth");
Expand All @@ -123,17 +131,20 @@ public FlinkPulsarProducer(String serviceUrl,
this.producerConf.setTopicName(defaultTopicName);
this.schema = checkNotNull(serializationSchema, "Serialization Schema not set");
this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
this.flinkPulsarPropertiesExtractor = getOrNullPropertiesExtractor(propertiesExtractor);
ClosureCleaner.ensureSerializable(serializationSchema);
}

public FlinkPulsarProducer(ClientConfigurationData clientConfigurationData,
ProducerConfigurationData producerConfigurationData,
SerializationSchema<T> serializationSchema,
PulsarKeyExtractor<T> keyExtractor) {
PulsarKeyExtractor<T> keyExtractor,
PulsarPropertiesExtractor<T> propertiesExtractor) {
this.clientConf = checkNotNull(clientConfigurationData, "client conf can not be null");
this.producerConf = checkNotNull(producerConfigurationData, "producer conf can not be null");
this.schema = checkNotNull(serializationSchema, "Serialization Schema not set");
this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
this.flinkPulsarPropertiesExtractor = getOrNullPropertiesExtractor(propertiesExtractor);
ClosureCleaner.ensureSerializable(serializationSchema);
}

Expand All @@ -147,6 +158,13 @@ public PulsarKeyExtractor<T> getKeyExtractor() {
return flinkPulsarKeyExtractor;
}

/**
* @return pulsar properties extractor.
*/
public PulsarPropertiesExtractor<T> getPulsarPropertiesExtractor() {
return flinkPulsarPropertiesExtractor;
}

/**
* Gets this producer's operating mode.
*/
Expand Down Expand Up @@ -185,6 +203,16 @@ private static <T> PulsarKeyExtractor<T> getOrNullKeyExtractor(PulsarKeyExtracto
}
}

@SuppressWarnings("unchecked")
private static <T> PulsarPropertiesExtractor<T> getOrNullPropertiesExtractor(
PulsarPropertiesExtractor<T> extractor) {
if (null == extractor) {
return PulsarPropertiesExtractor.EMPTY;
} else {
return extractor;
}
}

private Producer<byte[]> createProducer() throws Exception {
PulsarClientImpl client = CachedPulsarClient.getOrCreate(clientConf);
return client.createProducerAsync(producerConf).get();
Expand Down Expand Up @@ -257,6 +285,7 @@ public void invoke(T value, Context context) throws Exception {
}
}
msgBuilder.value(serializedValue)
.properties(this.flinkPulsarPropertiesExtractor.getProperties(value))
.sendAsync()
.thenApply(successCallback)
.exceptionally(failureCallback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.formats.avro.AvroRowSerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
Expand All @@ -53,6 +54,7 @@ public class PulsarAvroTableSink implements AppendStreamTableSink<Row> {
protected String[] fieldNames;
protected TypeInformation[] fieldTypes;
protected PulsarKeyExtractor<Row> keyExtractor;
protected PulsarPropertiesExtractor<Row> propertiesExtractor;
private Class<? extends SpecificRecord> recordClazz;

/**
Expand Down Expand Up @@ -106,7 +108,8 @@ protected FlinkPulsarProducer<Row> createFlinkPulsarProducer() {
clientConfigurationData,
producerConfigurationData,
serializationSchema,
keyExtractor);
keyExtractor,
propertiesExtractor);
}

@Override
Expand Down Expand Up @@ -151,6 +154,7 @@ public TableSink<Row> configure(String[] fieldNames, TypeInformation<?>[] fieldT
fieldNames,
fieldTypes,
recordClazz);
sink.propertiesExtractor = PulsarPropertiesExtractor.EMPTY;

return sink;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private void emitAutoAcking(SourceContext<T> context, Message message) throws IO
}
}

private T deserialize(Message message) throws IOException {
protected T deserialize(Message message) throws IOException {
return deserializer.deserialize(message.getData());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
Expand All @@ -46,6 +47,7 @@ public abstract class PulsarTableSink implements AppendStreamTableSink<Row> {
protected ProducerConfigurationData producerConfigurationData = new ProducerConfigurationData();
protected SerializationSchema<Row> serializationSchema;
protected PulsarKeyExtractor<Row> keyExtractor;
protected PulsarPropertiesExtractor<Row> propertiesExtractor;
protected String[] fieldNames;
protected TypeInformation[] fieldTypes;
protected final String routingKeyFieldName;
Expand Down Expand Up @@ -95,7 +97,8 @@ protected FlinkPulsarProducer<Row> createFlinkPulsarProducer() {
clientConfigurationData,
producerConfigurationData,
serializationSchema,
keyExtractor);
keyExtractor,
propertiesExtractor);
}

@Override
Expand Down Expand Up @@ -141,6 +144,7 @@ public TableSink<Row> configure(String[] fieldNames,
routingKeyFieldName,
fieldNames,
fieldTypes);
sink.propertiesExtractor = PulsarPropertiesExtractor.EMPTY;

return sink;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.flink.streaming.connectors.pulsar.partitioner;

import java.io.Serializable;
import java.util.Collections;
import java.util.Map;

/**
* Extract message properties from a value or others.
*/
public interface PulsarPropertiesExtractor<T> extends Serializable {

PulsarPropertiesExtractor EMPTY = in -> Collections.emptyMap();

/**
* Retrieve properties from the value or others.
*
* @param in the value to extract a key.
* @return key.
*/
Map<String, String> getProperties(T in);

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.avro.generated.NasaMission;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
import org.apache.pulsar.client.api.Authentication;
Expand Down Expand Up @@ -107,12 +108,14 @@ private PulsarAvroTableSink spySink() throws Exception {
Mockito.anyString(),
Mockito.any(Authentication.class),
Mockito.any(SerializationSchema.class),
Mockito.any(PulsarKeyExtractor.class)
Mockito.any(PulsarKeyExtractor.class),
Mockito.any(PulsarPropertiesExtractor.class)
).thenReturn(producer);
FieldUtils.writeField(sink, "fieldNames", fieldNames, true);
FieldUtils.writeField(sink, "fieldTypes", typeInformations, true);
FieldUtils.writeField(sink, "serializationSchema", Mockito.mock(SerializationSchema.class), true);
FieldUtils.writeField(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class), true);
FieldUtils.writeField(sink, "propertiesExtractor", Mockito.mock(PulsarPropertiesExtractor.class), true);
return sink;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarKeyExtractor;
import org.apache.flink.streaming.connectors.pulsar.partitioner.PulsarPropertiesExtractor;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
import org.apache.pulsar.client.api.Authentication;
Expand Down Expand Up @@ -103,13 +104,15 @@ private PulsarJsonTableSink spySink() throws Exception {
Mockito.anyString(),
Mockito.any(Authentication.class),
Mockito.any(SerializationSchema.class),
Mockito.any(PulsarKeyExtractor.class)
Mockito.any(PulsarKeyExtractor.class),
Mockito.any(PulsarPropertiesExtractor.class)
).thenReturn(producer);

FieldUtils.writeField(sink, "fieldNames", fieldNames, true);
FieldUtils.writeField(sink, "fieldTypes", typeInformations, true);
FieldUtils.writeField(sink, "serializationSchema", Mockito.mock(SerializationSchema.class), true);
FieldUtils.writeField(sink, "keyExtractor", Mockito.mock(PulsarKeyExtractor.class), true);
FieldUtils.writeField(sink, "propertiesExtractor", Mockito.mock(PulsarPropertiesExtractor.class), true);
return sink;
}
}

0 comments on commit 5ad3e02

Please sign in to comment.