Skip to content

Commit

Permalink
[FLINK-4745] [table] Convert KafkaTableSource test to unit tests
Browse files Browse the repository at this point in the history
This closes apache#2603.
  • Loading branch information
twalthr committed Oct 10, 2016
1 parent b949d42 commit 5e30ba3
Show file tree
Hide file tree
Showing 9 changed files with 184 additions and 266 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import org.apache.curator.framework.CuratorFramework;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -33,7 +31,6 @@
import org.junit.Test;

import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -368,66 +365,4 @@ public void run() {

curatorFramework.close();
}

@Test
public void testJsonTableSource() throws Exception {
String topic = UUID.randomUUID().toString();

// Names and types are determined in the actual test method of the
// base test class.
Kafka08JsonTableSource tableSource = new Kafka08JsonTableSource(
topic,
standardProps,
new String[] {
"long",
"string",
"boolean",
"double",
"missing-field"},
new TypeInformation<?>[] {
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.BOOLEAN_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO });

// Don't fail on missing field, but set to null (default)
tableSource.setFailOnMissingField(false);

runJsonTableSource(topic, tableSource);
}

@Test
public void testJsonTableSourceWithFailOnMissingField() throws Exception {
String topic = UUID.randomUUID().toString();

// Names and types are determined in the actual test method of the
// base test class.
Kafka08JsonTableSource tableSource = new Kafka08JsonTableSource(
topic,
standardProps,
new String[] {
"long",
"string",
"boolean",
"double",
"missing-field"},
new TypeInformation<?>[] {
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.BOOLEAN_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO });

// Don't fail on missing field, but set to null (default)
tableSource.setFailOnMissingField(true);

try {
runJsonTableSource(topic, tableSource);
fail("Did not throw expected Exception");
} catch (Exception e) {
Throwable rootCause = e.getCause().getCause().getCause();
assertTrue("Unexpected root cause", rootCause instanceof IllegalStateException);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {

@Override
protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
final FlinkKafkaProducerBase<Row> kafkaProducer) {
final FlinkKafkaProducerBase<Row> kafkaProducer) {

return new Kafka08JsonTableSink(topic, properties, partitioner) {
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kafka;

import java.util.Properties;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.table.Row;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;

public class Kafka08JsonTableSourceTest extends KafkaTableSourceTestBase {

@Override
protected KafkaTableSource createTableSource(String topic, Properties properties, String[] fieldNames, TypeInformation<?>[] typeInfo) {
return new Kafka08JsonTableSource(topic, properties, fieldNames, typeInfo);
}

@Override
@SuppressWarnings("unchecked")
protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
return (Class) JsonRowDeserializationSchema.class;
}

@Override
@SuppressWarnings("unchecked")
protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
return (Class) FlinkKafkaConsumer08.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,8 @@

package org.apache.flink.streaming.connectors.kafka;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.junit.Test;

import java.util.Properties;
import java.util.UUID;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class Kafka09ITCase extends KafkaConsumerTestBase {

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -126,75 +118,4 @@ public void testEndOfStream() throws Exception {
public void testMetrics() throws Throwable {
runMetricsTest();
}

@Test
public void testJsonTableSource() throws Exception {
String topic = UUID.randomUUID().toString();

Properties props = new Properties();
props.putAll(standardProps);
props.putAll(secureProps);

// Names and types are determined in the actual test method of the
// base test class.
Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource(
topic,
props,
new String[] {
"long",
"string",
"boolean",
"double",
"missing-field"},
new TypeInformation<?>[] {
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.BOOLEAN_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO });

// Don't fail on missing field, but set to null (default)
tableSource.setFailOnMissingField(false);

runJsonTableSource(topic, tableSource);
}

@Test
public void testJsonTableSourceWithFailOnMissingField() throws Exception {
String topic = UUID.randomUUID().toString();

Properties props = new Properties();
props.putAll(standardProps);
props.putAll(secureProps);

// Names and types are determined in the actual test method of the
// base test class.
Kafka09JsonTableSource tableSource = new Kafka09JsonTableSource(
topic,
props,
new String[] {
"long",
"string",
"boolean",
"double",
"missing-field"},
new TypeInformation<?>[] {
BasicTypeInfo.LONG_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.BOOLEAN_TYPE_INFO,
BasicTypeInfo.DOUBLE_TYPE_INFO,
BasicTypeInfo.LONG_TYPE_INFO });

// Don't fail on missing field, but set to null (default)
tableSource.setFailOnMissingField(true);

try {
runJsonTableSource(topic, tableSource);
fail("Did not throw expected Exception");
} catch (Exception e) {
Throwable rootCause = e.getCause().getCause().getCause();
assertTrue("Unexpected root cause", rootCause instanceof IllegalStateException);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
@Override
protected KafkaTableSink createTableSink(String topic, Properties properties, KafkaPartitioner<Row> partitioner,
final FlinkKafkaProducerBase<Row> kafkaProducer) {

return new Kafka09JsonTableSink(topic, properties, partitioner) {
@Override
protected FlinkKafkaProducerBase<Row> createKafkaProducer(String topic, Properties properties,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.kafka;

import java.util.Properties;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.table.Row;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;

public class Kafka09JsonTableSourceTest extends KafkaTableSourceTestBase {

@Override
protected KafkaTableSource createTableSource(String topic, Properties properties, String[] fieldNames, TypeInformation<?>[] typeInfo) {
return new Kafka09JsonTableSource(topic, properties, fieldNames, typeInfo);
}

@Override
@SuppressWarnings("unchecked")
protected Class<DeserializationSchema<Row>> getDeserializationSchema() {
return (Class) JsonRowDeserializationSchema.class;
}

@Override
@SuppressWarnings("unchecked")
protected Class<FlinkKafkaConsumerBase<Row>> getFlinkKafkaConsumer() {
return (Class) FlinkKafkaConsumer09.class;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -786,104 +786,6 @@ public void flatMap(Tuple3<Integer, Integer, String> value, Collector<Integer> o
}
}

/**
* Runs a table source test with JSON data.
*
* The table source needs to parse the following JSON fields:
* - "long" -> number
* - "string" -> "string"
* - "boolean" -> true|false
* - "double" -> fraction
*/
public void runJsonTableSource(String topic, KafkaTableSource kafkaTableSource) throws Exception {
final ObjectMapper mapper = new ObjectMapper();

final int numElements = 1024;
final long[] longs = new long[numElements];
final String[] strings = new String[numElements];
final boolean[] booleans = new boolean[numElements];
final double[] doubles = new double[numElements];

final byte[][] serializedJson = new byte[numElements][];

ThreadLocalRandom random = ThreadLocalRandom.current();

for (int i = 0; i < numElements; i++) {
longs[i] = random.nextLong();
strings[i] = Integer.toHexString(random.nextInt());
booleans[i] = random.nextBoolean();
doubles[i] = random.nextDouble();

ObjectNode entry = mapper.createObjectNode();
entry.put("long", longs[i]);
entry.put("string", strings[i]);
entry.put("boolean", booleans[i]);
entry.put("double", doubles[i]);

serializedJson[i] = mapper.writeValueAsBytes(entry);
}

// Produce serialized JSON data
createTestTopic(topic, 1, 1);

Properties props = new Properties();
props.putAll(standardProps);
props.putAll(secureProps);

StreamExecutionEnvironment env = StreamExecutionEnvironment
.createRemoteEnvironment("localhost", flinkPort);
env.getConfig().disableSysoutLogging();

env.addSource(new SourceFunction<byte[]>() {
@Override
public void run(SourceContext<byte[]> ctx) throws Exception {
for (int i = 0; i < numElements; i++) {
ctx.collect(serializedJson[i]);
}
}

@Override
public void cancel() {
}
}).addSink(kafkaServer.getProducer(
topic,
new ByteArraySerializationSchema(),
props,
null));

// Execute blocks
env.execute();

// Register as table source
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.getTableEnvironment(env);
tableEnvironment.registerTableSource("kafka", kafkaTableSource);

Table result = tableEnvironment.ingest("kafka");

tableEnvironment.toDataStream(result, Row.class).addSink(new SinkFunction<Row>() {

int i = 0;

@Override
public void invoke(Row value) throws Exception {
assertEquals(5, value.productArity());
assertEquals(longs[i], value.productElement(0));
assertEquals(strings[i], value.productElement(1));
assertEquals(booleans[i], value.productElement(2));
assertEquals(doubles[i], value.productElement(3));
assertNull(value.productElement(4));

if (i == numElements-1) {
throw new SuccessException();
} else {
i++;
}
}
});

tryExecutePropagateExceptions(env, "KafkaTableSource");
}

/**
* Serialization scheme forwarding byte[] records.
*/
Expand Down
Loading

0 comments on commit 5e30ba3

Please sign in to comment.