Skip to content

Commit

Permalink
[FLINK-7764][kafka]FlinkKafkaProducer010 does not accept name, uid, o…
Browse files Browse the repository at this point in the history
…r parallelism
  • Loading branch information
xccui authored and aljoscha committed Nov 2, 2017
1 parent 8198967 commit 786a6cb
Showing 1 changed file with 54 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
Expand Down Expand Up @@ -320,18 +322,23 @@ record = new ProducerRecord<>(targetTopic, flinkKafkaPartitioner.partition(value
* Configuration object returned by the writeToKafkaWithTimestamps() call.
*
* <p>This is only kept because it's part of the public API. It is not necessary anymore, now
* that the {@link SinkFunction} interface provides timestamps.
* that the {@link SinkFunction} interface provides timestamps.</p>
*
* <p>To enable the settings, this fake sink must override all the public methods
* in {@link DataStreamSink}.</p>
*/
public static class FlinkKafkaProducer010Configuration<T> extends DataStreamSink<T> {

private final FlinkKafkaProducer010 producer;
private final SinkTransformation<T> transformation;

private FlinkKafkaProducer010Configuration(
DataStreamSink originalSink,
DataStreamSink<T> originalSink,
DataStream<T> inputStream,
FlinkKafkaProducer010<T> producer) {
//noinspection unchecked
super(inputStream, originalSink.getTransformation().getOperator());
this.transformation = originalSink.getTransformation();
this.producer = producer;
}

Expand Down Expand Up @@ -367,6 +374,50 @@ public void setFlushOnCheckpoint(boolean flush) {
public void setWriteTimestampToKafka(boolean writeTimestampToKafka) {
producer.writeTimestampToKafka = writeTimestampToKafka;
}
}

// *************************************************************************
// Override methods to use the transformation in this class.
// *************************************************************************

@Override
public SinkTransformation<T> getTransformation() {
return transformation;
}

@Override
public DataStreamSink<T> name(String name) {
transformation.setName(name);
return this;
}

@Override
public DataStreamSink<T> uid(String uid) {
transformation.setUid(uid);
return this;
}

@Override
public DataStreamSink<T> setUidHash(String uidHash) {
transformation.setUidHash(uidHash);
return this;
}

@Override
public DataStreamSink<T> setParallelism(int parallelism) {
transformation.setParallelism(parallelism);
return this;
}

@Override
public DataStreamSink<T> disableChaining() {
this.transformation.setChainingStrategy(ChainingStrategy.NEVER);
return this;
}

@Override
public DataStreamSink<T> slotSharingGroup(String slotSharingGroup) {
transformation.setSlotSharingGroup(slotSharingGroup);
return this;
}
}
}

0 comments on commit 786a6cb

Please sign in to comment.