diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java index 5f869ec35fad6..b155576bce93b 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java @@ -27,6 +27,7 @@ * Kafka 0.8 {@link KafkaTableSink} that serializes data in JSON format. */ public class Kafka08JsonTableSink extends KafkaJsonTableSink { + /** * Creates {@link KafkaTableSink} for Kafka 0.8 * diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java deleted file mode 100644 index f870adf565e74..0000000000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkITCase.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.table.Row; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; - -public class Kafka08JsonTableSinkITCase extends KafkaTableSinkTestBase { - - @Override - protected KafkaTableSink createTableSink() { - Kafka08JsonTableSink sink = new Kafka08JsonTableSink( - TOPIC, - createSinkProperties(), - createPartitioner()); - return sink.configure(FIELD_NAMES, FIELD_TYPES); - } - - protected DeserializationSchema createRowDeserializationSchema() { - return new JsonRowDeserializationSchema( - FIELD_NAMES, FIELD_TYPES); - } -} - diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java new file mode 100644 index 0000000000000..b1e6db953d660 --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import java.util.Properties; + +public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase { + + @Override + protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner partitioner, + final FlinkKafkaProducerBase kafkaProducer) { + + return new Kafka08JsonTableSink(topic, properties, partitioner) { + @Override + protected FlinkKafkaProducerBase createKafkaProducer(String topic, Properties properties, + SerializationSchema serializationSchema, KafkaPartitioner partitioner) { + return kafkaProducer; + } + }; + } + + @Override + @SuppressWarnings("unchecked") + protected Class> getSerializationSchema() { + return (Class) JsonRowSerializationSchema.class; + } +} + diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java deleted file mode 100644 index 74415f84c4297..0000000000000 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkITCase.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.streaming.connectors.kafka; - -import org.apache.flink.api.table.Row; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; - -public class Kafka09JsonTableSinkITCase extends KafkaTableSinkTestBase { - - @Override - protected KafkaTableSink createTableSink() { - Kafka09JsonTableSink sink = new Kafka09JsonTableSink( - TOPIC, - createSinkProperties(), - createPartitioner()); - return sink.configure(FIELD_NAMES, FIELD_TYPES); - } - - protected DeserializationSchema createRowDeserializationSchema() { - return new JsonRowDeserializationSchema( - FIELD_NAMES, FIELD_TYPES); - } -} diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java new file mode 100644 index 0000000000000..bfdcf6859919d --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.table.Row; +import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; +import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; +import org.apache.flink.streaming.util.serialization.SerializationSchema; + +import java.util.Properties; + +public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase { + + @Override + protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner partitioner, + final FlinkKafkaProducerBase kafkaProducer) { + return new Kafka09JsonTableSink(topic, properties, partitioner) { + @Override + protected FlinkKafkaProducerBase createKafkaProducer(String topic, Properties properties, + SerializationSchema serializationSchema, KafkaPartitioner partitioner) { + return kafkaProducer; + } + }; + } + + @Override + @SuppressWarnings("unchecked") + protected Class> getSerializationSchema() { + return (Class) JsonRowSerializationSchema.class; + } +} + diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java index 8f5e8110e05ec..714d9cd9085ea 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java @@ -42,6 +42,7 @@ public abstract class KafkaTableSink implements StreamTableSink { protected final KafkaPartitioner partitioner; protected String[] fieldNames; protected TypeInformation[] fieldTypes; + /** * Creates KafkaTableSink * diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java index 5e55b0ab46b64..e46ca08c9f39a 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java @@ -17,123 +17,89 @@ */ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.table.Row; +import org.apache.flink.api.table.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.internals.TypeUtil; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.test.util.SuccessException; +import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.junit.Test; import java.io.Serializable; -import java.util.HashSet; import java.util.Properties; -import static org.apache.flink.test.util.TestUtils.tryExecute; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotSame; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; -public abstract class KafkaTableSinkTestBase extends KafkaTestBase implements Serializable { +public abstract class KafkaTableSinkTestBase implements Serializable { - protected final static String TOPIC = "customPartitioningTestTopic"; - protected final static int PARALLELISM = 1; - protected final static String[] FIELD_NAMES = new String[] {"field1", "field2"}; - protected final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class}); + private final static String TOPIC = "testTopic"; + private final static String[] FIELD_NAMES = new String[] {"field1", "field2"}; + private final static TypeInformation[] FIELD_TYPES = TypeUtil.toTypeInfo(new Class[] {Integer.class, String.class}); + + private final KafkaPartitioner partitioner = new CustomPartitioner(); + private final Properties properties = createSinkProperties(); + @SuppressWarnings("unchecked") + private final FlinkKafkaProducerBase kafkaProducer = mock(FlinkKafkaProducerBase.class); @Test + @SuppressWarnings("unchecked") public void testKafkaTableSink() throws Exception { - LOG.info("Starting KafkaTableSinkTestBase.testKafkaTableSink()"); - - createTestTopic(TOPIC, PARALLELISM, 1); - StreamExecutionEnvironment env = createEnvironment(); - - createProducingTopology(env); - createConsumingTopology(env); + DataStream dataStream = mock(DataStream.class); + KafkaTableSink kafkaTableSink = createTableSink(); + kafkaTableSink.emitDataStream(dataStream); - tryExecute(env, "custom partitioning test"); - deleteTestTopic(TOPIC); - LOG.info("Finished KafkaTableSinkTestBase.testKafkaTableSink()"); + verify(dataStream).addSink(kafkaProducer); } - private StreamExecutionEnvironment createEnvironment() { - StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort); - env.setRestartStrategy(RestartStrategies.noRestart()); - env.getConfig().disableSysoutLogging(); - return env; + @Test + @SuppressWarnings("unchecked") + public void testCreatedProducer() throws Exception { + DataStream dataStream = mock(DataStream.class); + KafkaTableSink kafkaTableSink = spy(createTableSink()); + kafkaTableSink.emitDataStream(dataStream); + + verify(kafkaTableSink).createKafkaProducer( + eq(TOPIC), + eq(properties), + any(getSerializationSchema()), + eq(partitioner)); } - private void createProducingTopology(StreamExecutionEnvironment env) { - DataStream stream = env.addSource(new SourceFunction() { - private boolean running = true; - - @Override - public void run(SourceContext ctx) throws Exception { - long cnt = 0; - while (running) { - Row row = new Row(2); - row.setField(0, cnt); - row.setField(1, "kafka-" + cnt); - ctx.collect(row); - cnt++; - } - } - - @Override - public void cancel() { - running = false; - } - }) - .setParallelism(1); - - KafkaTableSink kafkaTableSinkBase = createTableSink(); - - kafkaTableSinkBase.emitDataStream(stream); + @Test + public void testConfiguration() { + KafkaTableSink kafkaTableSink = createTableSink(); + KafkaTableSink newKafkaTableSink = kafkaTableSink.configure(FIELD_NAMES, FIELD_TYPES); + assertNotSame(kafkaTableSink, newKafkaTableSink); + + assertArrayEquals(FIELD_NAMES, newKafkaTableSink.getFieldNames()); + assertArrayEquals(FIELD_TYPES, newKafkaTableSink.getFieldTypes()); + assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType()); } - private void createConsumingTopology(StreamExecutionEnvironment env) { - DeserializationSchema deserializationSchema = createRowDeserializationSchema(); - - FlinkKafkaConsumerBase source = kafkaServer.getConsumer(TOPIC, deserializationSchema, standardProps); - - env.addSource(source).setParallelism(PARALLELISM) - .map(new RichMapFunction() { - @Override - public Integer map(Row value) { - return (Integer) value.productElement(0); - } - }).setParallelism(PARALLELISM) - - .addSink(new SinkFunction() { - HashSet ids = new HashSet<>(); - @Override - public void invoke(Integer value) throws Exception { - ids.add(value); - - if (ids.size() == 100) { - throw new SuccessException(); - } - } - }).setParallelism(1); - } + protected abstract KafkaTableSink createTableSink(String topic, Properties properties, + KafkaPartitioner partitioner, FlinkKafkaProducerBase kafkaProducer); - protected KafkaPartitioner createPartitioner() { - return new CustomPartitioner(); - } + protected abstract Class> getSerializationSchema(); - protected Properties createSinkProperties() { - return FlinkKafkaProducerBase.getPropertiesFromBrokerList(KafkaTestBase.brokerConnectionStrings); + private KafkaTableSink createTableSink() { + return createTableSink(TOPIC, properties, partitioner, kafkaProducer); } - protected abstract KafkaTableSink createTableSink(); - - protected abstract DeserializationSchema createRowDeserializationSchema(); - + private static Properties createSinkProperties() { + Properties properties = new Properties(); + properties.setProperty("testKey", "testValue"); + return properties; + } - public static class CustomPartitioner extends KafkaPartitioner implements Serializable { + private static class CustomPartitioner extends KafkaPartitioner implements Serializable { @Override public int partition(Row next, byte[] serializedKey, byte[] serializedValue, int numPartitions) { return 0;