Skip to content

Commit

Permalink
[FLINK-9979] [table] Support a FlinkKafkaPartitioner for Kafka table …
Browse files Browse the repository at this point in the history
…sink factory

Adds the possibility to add a FlinkKafkaPartitioner to a Kafka table sink
factory. It povides shortcuts for the built-in "fixed" and "round-robin" partitioning.

This closes apache#6440.
  • Loading branch information
twalthr committed Aug 1, 2018
1 parent 1841194 commit 628b71d
Show file tree
Hide file tree
Showing 25 changed files with 223 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.types.Row;

import java.util.Optional;
import java.util.Properties;

/**
Expand Down Expand Up @@ -73,16 +74,23 @@ public Kafka010JsonTableSink(String topic, Properties properties, FlinkKafkaPart
}

@Override
protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
protected FlinkKafkaProducerBase<Row> createKafkaProducer(
String topic,
Properties properties,
SerializationSchema<Row> serializationSchema,
Optional<FlinkKafkaPartitioner<Row>> partitioner) {
return new FlinkKafkaProducer010<>(
topic,
serializationSchema,
properties,
partitioner);
partitioner.orElse(new FlinkFixedPartitioner<>()));
}

@Override
protected Kafka010JsonTableSink createCopy() {
return new Kafka010JsonTableSink(topic, properties, partitioner);
return new Kafka010JsonTableSink(
topic,
properties,
partitioner.orElse(new FlinkFixedPartitioner<>()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;

import java.util.Optional;
import java.util.Properties;

/**
Expand All @@ -36,7 +37,7 @@ public Kafka010TableSink(
TableSchema schema,
String topic,
Properties properties,
FlinkKafkaPartitioner<Row> partitioner,
Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {
super(
schema,
Expand All @@ -51,11 +52,11 @@ protected FlinkKafkaProducerBase<Row> createKafkaProducer(
String topic,
Properties properties,
SerializationSchema<Row> serializationSchema,
FlinkKafkaPartitioner<Row> partitioner) {
Optional<FlinkKafkaPartitioner<Row>> partitioner) {
return new FlinkKafkaProducer010<>(
topic,
serializationSchema,
properties,
partitioner);
partitioner.orElse(null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected KafkaTableSink createKafkaTableSink(
TableSchema schema,
String topic,
Properties properties,
FlinkKafkaPartitioner<Row> partitioner,
Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {

return new Kafka010TableSink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ protected KafkaTableSink createTableSink(
Properties properties,
FlinkKafkaPartitioner<Row> partitioner) {

return new Kafka010JsonTableSink(topic, properties, partitioner);
return new Kafka010JsonTableSink(
topic,
properties,
partitioner);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected KafkaTableSink getExpectedKafkaTableSink(
TableSchema schema,
String topic,
Properties properties,
FlinkKafkaPartitioner<Row> partitioner,
Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {

return new Kafka010TableSink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public Kafka011TableSink(
TableSchema schema,
String topic,
Properties properties,
FlinkKafkaPartitioner<Row> partitioner,
Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {
super(
schema,
Expand All @@ -54,11 +54,11 @@ protected SinkFunction<Row> createKafkaProducer(
String topic,
Properties properties,
SerializationSchema<Row> serializationSchema,
FlinkKafkaPartitioner<Row> partitioner) {
Optional<FlinkKafkaPartitioner<Row>> partitioner) {
return new FlinkKafkaProducer011<>(
topic,
new KeyedSerializationSchemaWrapper<>(serializationSchema),
properties,
Optional.of(partitioner));
partitioner);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected KafkaTableSink createKafkaTableSink(
TableSchema schema,
String topic,
Properties properties,
FlinkKafkaPartitioner<Row> partitioner,
Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {

return new Kafka011TableSink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected KafkaTableSink getExpectedKafkaTableSink(
TableSchema schema,
String topic,
Properties properties,
FlinkKafkaPartitioner<Row> partitioner,
Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {

return new Kafka011TableSink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.types.Row;

import java.util.Optional;
import java.util.Properties;

/**
Expand Down Expand Up @@ -92,17 +93,24 @@ public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitione
}

@Override
protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
protected FlinkKafkaProducerBase<Row> createKafkaProducer(
String topic,
Properties properties,
SerializationSchema<Row> serializationSchema,
Optional<FlinkKafkaPartitioner<Row>> partitioner) {
return new FlinkKafkaProducer08<>(
topic,
serializationSchema,
properties,
partitioner);
partitioner.orElse(new FlinkFixedPartitioner<>()));
}

@Override
protected Kafka08JsonTableSink createCopy() {
return new Kafka08JsonTableSink(topic, properties, partitioner);
return new Kafka08JsonTableSink(
topic,
properties,
partitioner.orElse(new FlinkFixedPartitioner<>()));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;

import java.util.Optional;
import java.util.Properties;

/**
Expand All @@ -36,7 +37,7 @@ public Kafka08TableSink(
TableSchema schema,
String topic,
Properties properties,
FlinkKafkaPartitioner<Row> partitioner,
Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {
super(
schema,
Expand All @@ -51,11 +52,11 @@ protected FlinkKafkaProducerBase<Row> createKafkaProducer(
String topic,
Properties properties,
SerializationSchema<Row> serializationSchema,
FlinkKafkaPartitioner<Row> partitioner) {
Optional<FlinkKafkaPartitioner<Row>> partitioner) {
return new FlinkKafkaProducer08<>(
topic,
serializationSchema,
properties,
partitioner);
partitioner.orElse(null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected KafkaTableSink createKafkaTableSink(
TableSchema schema,
String topic,
Properties properties,
FlinkKafkaPartitioner<Row> partitioner,
Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {

return new Kafka08TableSink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ protected KafkaTableSink createTableSink(
Properties properties,
FlinkKafkaPartitioner<Row> partitioner) {

return new Kafka08JsonTableSink(topic, properties, partitioner);
return new Kafka08JsonTableSink(
topic,
properties,
partitioner);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected KafkaTableSink getExpectedKafkaTableSink(
TableSchema schema,
String topic,
Properties properties,
FlinkKafkaPartitioner<Row> partitioner,
Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {

return new Kafka08TableSink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.types.Row;

import java.util.Optional;
import java.util.Properties;

/**
Expand Down Expand Up @@ -92,16 +93,23 @@ public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitione
}

@Override
protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties, SerializationSchema<Row> serializationSchema, FlinkKafkaPartitioner<Row> partitioner) {
protected FlinkKafkaProducerBase<Row> createKafkaProducer(
String topic,
Properties properties,
SerializationSchema<Row> serializationSchema,
Optional<FlinkKafkaPartitioner<Row>> partitioner) {
return new FlinkKafkaProducer09<>(
topic,
serializationSchema,
properties,
partitioner);
partitioner.orElse(new FlinkFixedPartitioner<>()));
}

@Override
protected Kafka09JsonTableSink createCopy() {
return new Kafka09JsonTableSink(topic, properties, partitioner);
return new Kafka09JsonTableSink(
topic,
properties,
partitioner.orElse(new FlinkFixedPartitioner<>()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;

import java.util.Optional;
import java.util.Properties;

/**
Expand All @@ -36,7 +37,7 @@ public Kafka09TableSink(
TableSchema schema,
String topic,
Properties properties,
FlinkKafkaPartitioner<Row> partitioner,
Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {
super(
schema,
Expand All @@ -51,11 +52,11 @@ protected FlinkKafkaProducerBase<Row> createKafkaProducer(
String topic,
Properties properties,
SerializationSchema<Row> serializationSchema,
FlinkKafkaPartitioner<Row> partitioner) {
Optional<FlinkKafkaPartitioner<Row>> partitioner) {
return new FlinkKafkaProducer09<>(
topic,
serializationSchema,
properties,
partitioner);
partitioner.orElse(null));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected KafkaTableSink createKafkaTableSink(
TableSchema schema,
String topic,
Properties properties,
FlinkKafkaPartitioner<Row> partitioner,
Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {

return new Kafka09TableSink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ protected KafkaTableSink createTableSink(
Properties properties,
FlinkKafkaPartitioner<Row> partitioner) {

return new Kafka09JsonTableSink(topic, properties, partitioner);
return new Kafka09JsonTableSink(
topic,
properties,
partitioner);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected KafkaTableSink getExpectedKafkaTableSink(
TableSchema schema,
String topic,
Properties properties,
FlinkKafkaPartitioner<Row> partitioner,
Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {

return new Kafka09TableSink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
* A version-agnostic Kafka {@link AppendStreamTableSink}.
*
* <p>The version-specific Kafka consumers need to extend this class and
* override {@link #createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)}}.
* override {@link #createKafkaProducer(String, Properties, SerializationSchema, Optional)}}.
*/
@Internal
public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
Expand All @@ -60,7 +60,7 @@ public abstract class KafkaTableSink implements AppendStreamTableSink<Row> {
protected Optional<SerializationSchema<Row>> serializationSchema;

/** Partitioner to select Kafka partition for each item. */
protected final FlinkKafkaPartitioner<Row> partitioner;
protected final Optional<FlinkKafkaPartitioner<Row>> partitioner;

// legacy variables
protected String[] fieldNames;
Expand All @@ -70,7 +70,7 @@ protected KafkaTableSink(
TableSchema schema,
String topic,
Properties properties,
FlinkKafkaPartitioner<Row> partitioner,
Optional<FlinkKafkaPartitioner<Row>> partitioner,
SerializationSchema<Row> serializationSchema) {
this.schema = Optional.of(Preconditions.checkNotNull(schema, "Schema must not be null."));
this.topic = Preconditions.checkNotNull(topic, "Topic must not be null.");
Expand All @@ -96,7 +96,7 @@ public KafkaTableSink(
this.schema = Optional.empty();
this.topic = Preconditions.checkNotNull(topic, "topic");
this.properties = Preconditions.checkNotNull(properties, "properties");
this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner");
this.partitioner = Optional.of(Preconditions.checkNotNull(partitioner, "partitioner"));
this.serializationSchema = Optional.empty();
}

Expand All @@ -113,7 +113,7 @@ protected abstract SinkFunction<Row> createKafkaProducer(
String topic,
Properties properties,
SerializationSchema<Row> serializationSchema,
FlinkKafkaPartitioner<Row> partitioner);
Optional<FlinkKafkaPartitioner<Row>> partitioner);

/**
* Create serialization schema for converting table rows into bytes.
Expand Down
Loading

0 comments on commit 628b71d

Please sign in to comment.