Skip to content

Commit

Permalink
[FLINK-20379][connector/kafka] Rename KafkaRecordDeserializer to Kafk…
Browse files Browse the repository at this point in the history
…aRecordDeserializationSchema to follow the naming convention.
  • Loading branch information
lindong28 authored and becketqin committed Mar 11, 2021
1 parent cb987a1 commit 133385e
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader;
import org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter;
import org.apache.flink.connector.kafka.source.reader.KafkaSourceReader;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitSerializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
Expand All @@ -60,7 +60,7 @@
* .setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
* .setGroupId("MyGroup")
* .setTopics(Arrays.asList(TOPIC1, TOPIC2))
* .setDeserializer(new TestingKafkaRecordDeserializer())
* .setDeserializer(new TestingKafkaRecordDeserializationSchema())
* .setStartingOffsets(OffsetsInitializer.earliest())
* .build();
* }</pre>
Expand All @@ -80,7 +80,7 @@ public class KafkaSource<OUT>
private final OffsetsInitializer stoppingOffsetsInitializer;
// Boundedness
private final Boundedness boundedness;
private final KafkaRecordDeserializer<OUT> deserializationSchema;
private final KafkaRecordDeserializationSchema<OUT> deserializationSchema;
// The configurations.
private final Properties props;

Expand All @@ -89,7 +89,7 @@ public class KafkaSource<OUT>
OffsetsInitializer startingOffsetsInitializer,
@Nullable OffsetsInitializer stoppingOffsetsInitializer,
Boundedness boundedness,
KafkaRecordDeserializer<OUT> deserializationSchema,
KafkaRecordDeserializationSchema<OUT> deserializationSchema,
Properties props) {
this.subscriber = subscriber;
this.startingOffsetsInitializer = startingOffsetsInitializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.TopicPartition;
Expand Down Expand Up @@ -96,7 +96,7 @@ public class KafkaSourceBuilder<OUT> {
private OffsetsInitializer stoppingOffsetsInitializer;
// Boundedness
private Boundedness boundedness;
private KafkaRecordDeserializer<OUT> deserializationSchema;
private KafkaRecordDeserializationSchema<OUT> deserializationSchema;
// The configurations.
protected Properties props;

Expand Down Expand Up @@ -303,15 +303,15 @@ public KafkaSourceBuilder<OUT> setBounded(OffsetsInitializer stoppingOffsetsInit
}

/**
* Sets the {@link KafkaRecordDeserializer deserializer} of the {@link
* Sets the {@link KafkaRecordDeserializationSchema deserializer} of the {@link
* org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for KafkaSource.
*
* @param recordDeserializer the deserializer for Kafka {@link
* org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord}.
* @return this KafkaSourceBuilder.
*/
public KafkaSourceBuilder<OUT> setDeserializer(
KafkaRecordDeserializer<OUT> recordDeserializer) {
KafkaRecordDeserializationSchema<OUT> recordDeserializer) {
this.deserializationSchema = recordDeserializer;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.util.Collector;
import org.apache.flink.util.FlinkRuntimeException;
Expand Down Expand Up @@ -69,14 +69,16 @@ public class KafkaPartitionSplitReader<T>
private static final long POLL_TIMEOUT = 10000L;

private final KafkaConsumer<byte[], byte[]> consumer;
private final KafkaRecordDeserializer<T> deserializationSchema;
private final KafkaRecordDeserializationSchema<T> deserializationSchema;
private final Map<TopicPartition, Long> stoppingOffsets;
private final SimpleCollector<T> collector;
private final String groupId;
private final int subtaskId;

public KafkaPartitionSplitReader(
Properties props, KafkaRecordDeserializer<T> deserializationSchema, int subtaskId) {
Properties props,
KafkaRecordDeserializationSchema<T> deserializationSchema,
int subtaskId) {
this.subtaskId = subtaskId;
Properties consumerProps = new Properties();
consumerProps.putAll(props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import java.util.Map;

/** An interface for the deserialization of Kafka records. */
public interface KafkaRecordDeserializer<T> extends Serializable, ResultTypeQueryable<T> {
public interface KafkaRecordDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

/**
* Deserialize a consumer record into the given collector.
Expand All @@ -42,31 +42,32 @@ void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<T> collector)
throws Exception;

/**
* Wraps a Kafka {@link Deserializer} to a {@link KafkaRecordDeserializer}.
* Wraps a Kafka {@link Deserializer} to a {@link KafkaRecordDeserializationSchema}.
*
* @param valueDeserializerClass the deserializer class used to deserialize the value.
* @param <V> the value type.
* @return A {@link KafkaRecordDeserializer} that deserialize the value with the given
* @return A {@link KafkaRecordDeserializationSchema} that deserialize the value with the given
* deserializer.
*/
static <V> KafkaRecordDeserializer<V> valueOnly(
static <V> KafkaRecordDeserializationSchema<V> valueOnly(
Class<? extends Deserializer<V>> valueDeserializerClass) {
return new ValueDeserializerWrapper<>(valueDeserializerClass, Collections.emptyMap());
}

/**
* Wraps a Kafka {@link Deserializer} to a {@link KafkaRecordDeserializer}.
* Wraps a Kafka {@link Deserializer} to a {@link KafkaRecordDeserializationSchema}.
*
* @param valueDeserializerClass the deserializer class used to deserialize the value.
* @param config the configuration of the value deserializer, only valid when the deserializer
* is an implementation of {@code Configurable}.
* @param <V> the value type.
* @param <D> the type of the deserializer.
* @return A {@link KafkaRecordDeserializer} that deserialize the value with the given
* @return A {@link KafkaRecordDeserializationSchema} that deserialize the value with the given
* deserializer.
*/
static <V, D extends Configurable & Deserializer<V>> KafkaRecordDeserializer<V> valueOnly(
Class<D> valueDeserializerClass, Map<String, String> config) {
static <V, D extends Configurable & Deserializer<V>>
KafkaRecordDeserializationSchema<V> valueOnly(
Class<D> valueDeserializerClass, Map<String, String> config) {
return new ValueDeserializerWrapper<>(valueDeserializerClass, config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.Map;

/** A package private class to wrap {@link Deserializer}. */
class ValueDeserializerWrapper<T> implements KafkaRecordDeserializer<T> {
class ValueDeserializerWrapper<T> implements KafkaRecordDeserializationSchema<T> {
private static final long serialVersionUID = 5409547407386004054L;
private static final Logger LOG = LoggerFactory.getLogger(ValueDeserializerWrapper.class);
private final String deserializerClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
Expand Down Expand Up @@ -70,7 +70,7 @@ public void testBasicRead() throws Exception {
.setBootstrapServers(KafkaSourceTestEnv.brokerConnectionStrings)
.setGroupId("testBasicRead")
.setTopics(Arrays.asList(TOPIC1, TOPIC2))
.setDeserializer(new TestingKafkaRecordDeserializer())
.setDeserializer(new TestingKafkaRecordDeserializationSchema())
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.latest())
.build();
Expand All @@ -95,8 +95,8 @@ private PartitionAndValue(TopicPartition tp, int value) {
}
}

private static class TestingKafkaRecordDeserializer
implements KafkaRecordDeserializer<PartitionAndValue> {
private static class TestingKafkaRecordDeserializationSchema
implements KafkaRecordDeserializationSchema<PartitionAndValue> {
private static final long serialVersionUID = -3765473065594331694L;
private transient Deserializer<Integer> deserializer;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.kafka.source.KafkaSourceTestEnv;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;

import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand Down Expand Up @@ -175,7 +175,7 @@ private KafkaPartitionSplitReader<Integer> createReader() {
props.putAll(KafkaSourceTestEnv.getConsumerProperties(ByteArrayDeserializer.class));
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
return new KafkaPartitionSplitReader<>(
props, KafkaRecordDeserializer.valueOnly(IntegerDeserializer.class), 0);
props, KafkaRecordDeserializationSchema.valueOnly(IntegerDeserializer.class), 0);
}

private Map<String, KafkaPartitionSplit> assignSplits(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.KafkaSourceTestEnv;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.connector.testutils.source.reader.SourceReaderTestBase;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
Expand Down Expand Up @@ -246,7 +246,8 @@ private SourceReader<Integer, KafkaPartitionSplit> createReader(
KafkaSource.<Integer>builder()
.setClientIdPrefix("KafkaSourceReaderTest")
.setDeserializer(
KafkaRecordDeserializer.valueOnly(IntegerDeserializer.class))
KafkaRecordDeserializationSchema.valueOnly(
IntegerDeserializer.class))
.setPartitions(Collections.singleton(new TopicPartition("AnyTopic", 0)))
.setProperty(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
Expand Down

0 comments on commit 133385e

Please sign in to comment.