Skip to content

Commit

Permalink
[FLINK-27698][tests][table] Migrate flink-table-api-java-bridge to JU…
Browse files Browse the repository at this point in the history
…nit5
  • Loading branch information
snuyanzin authored and zentol committed May 20, 2022
1 parent c57b849 commit d993232
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@

import org.apache.flink.table.data.DecimalData;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.math.BigDecimal;

import static org.assertj.core.api.Assertions.assertThat;

/** Tests that the data generator is valid for every combination of precision and scale. */
public class DecimalDataRandomGeneratorTest {
class DecimalDataRandomGeneratorTest {

@Test
public void testGenerateDecimalValues() {
void testGenerateDecimalValues() {
for (int precision = 1; precision <= 38; precision++) {
for (int scale = 0; scale <= precision; scale++) {
DecimalDataRandomGenerator gen =
Expand Down Expand Up @@ -88,7 +88,7 @@ public void testGenerateDecimalValues() {
}

@Test
public void testMinMax() {
void testMinMax() {
for (int precision = 1; precision <= 38; precision++) {
for (int scale = 0; scale <= precision; scale++) {
BigDecimal min = BigDecimal.valueOf(-10.0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.flink.table.utils.PlannerMock;
import org.apache.flink.types.Row;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.Collections;
Expand All @@ -41,9 +41,9 @@
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link StreamTableEnvironmentImpl}. */
public class StreamTableEnvironmentImplTest {
class StreamTableEnvironmentImplTest {
@Test
public void testAppendStreamDoesNotOverwriteTableConfig() {
void testAppendStreamDoesNotOverwriteTableConfig() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> elements = env.fromElements(1, 2, 3);

Expand All @@ -58,7 +58,7 @@ public void testAppendStreamDoesNotOverwriteTableConfig() {
}

@Test
public void testRetractStreamDoesNotOverwriteTableConfig() {
void testRetractStreamDoesNotOverwriteTableConfig() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> elements = env.fromElements(1, 2, 3);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -38,7 +38,7 @@
import static org.assertj.core.api.Assertions.fail;

/** Tests for {@link BlackHoleTableSinkFactory}. */
public class BlackHoleSinkFactoryTest {
class BlackHoleSinkFactoryTest {

private static final ResolvedSchema SCHEMA =
ResolvedSchema.of(
Expand All @@ -47,7 +47,7 @@ public class BlackHoleSinkFactoryTest {
Column.physical("f2", DataTypes.BIGINT()));

@Test
public void testBlackHole() {
void testBlackHole() {
Map<String, String> properties = new HashMap<>();
properties.put("connector", "blackhole");

Expand All @@ -59,7 +59,7 @@ public void testBlackHole() {
}

@Test
public void testWrongKey() {
void testWrongKey() {
try {
Map<String, String> properties = new HashMap<>();
properties.put("connector", "blackhole");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,19 @@
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.util.TernaryBoolean;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

import static org.apache.flink.table.descriptors.OldCsvValidator.FORMAT_FIELDS;
import static org.apache.flink.table.descriptors.Schema.SCHEMA;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for CsvTableSourceFactory and CsvTableSinkFactory. */
@RunWith(Parameterized.class)
public class CsvTableSinkFactoryTest {
class CsvTableSinkFactoryTest {

private static TableSchema testingSchema =
TableSchema.builder()
Expand All @@ -55,54 +54,53 @@ public class CsvTableSinkFactoryTest {
.field("myfield5", DataTypes.ARRAY(DataTypes.TINYINT()))
.build();

@Parameterized.Parameter public TernaryBoolean deriveSchema;

@Parameterized.Parameters(name = "deriveSchema = {0}")
public static TernaryBoolean[] getDeriveSchema() {
return new TernaryBoolean[] {
TernaryBoolean.TRUE, TernaryBoolean.FALSE, TernaryBoolean.UNDEFINED
};
private static Stream<TernaryBoolean> getDeriveSchema() {
return Stream.of(TernaryBoolean.TRUE, TernaryBoolean.FALSE, TernaryBoolean.UNDEFINED);
}

@Test
public void testAppendTableSinkFactory() {
DescriptorProperties descriptor = createDescriptor(testingSchema);
@ParameterizedTest(name = "deriveSchema = {0}")
@MethodSource("getDeriveSchema")
void testAppendTableSinkFactory(TernaryBoolean deriveSchema) {
DescriptorProperties descriptor = createDescriptor(testingSchema, deriveSchema);
descriptor.putString("update-mode", "append");
TableSink sink = createTableSink(descriptor);

assertThat(sink).isInstanceOf(CsvTableSink.class);
assertThat(sink.getConsumedDataType()).isEqualTo(testingSchema.toRowDataType());
}

@Test
public void testBatchTableSinkFactory() {
DescriptorProperties descriptor = createDescriptor(testingSchema);
@ParameterizedTest(name = "deriveSchema = {0}")
@MethodSource("getDeriveSchema")
void testBatchTableSinkFactory(TernaryBoolean deriveSchema) {
DescriptorProperties descriptor = createDescriptor(testingSchema, deriveSchema);
TableSink sink = createTableSink(descriptor);

assertThat(sink).isInstanceOf(CsvTableSink.class);
assertThat(sink.getConsumedDataType()).isEqualTo(testingSchema.toRowDataType());
}

@Test
public void testAppendTableSourceFactory() {
DescriptorProperties descriptor = createDescriptor(testingSchema);
@ParameterizedTest(name = "deriveSchema = {0}")
@MethodSource("getDeriveSchema")
void testAppendTableSourceFactory(TernaryBoolean deriveSchema) {
DescriptorProperties descriptor = createDescriptor(testingSchema, deriveSchema);
descriptor.putString("update-mode", "append");
TableSource sink = createTableSource(descriptor);

assertThat(sink).isInstanceOf(CsvTableSource.class);
assertThat(sink.getProducedDataType()).isEqualTo(testingSchema.toRowDataType());
}

@Test
public void testBatchTableSourceFactory() {
DescriptorProperties descriptor = createDescriptor(testingSchema);
@ParameterizedTest(name = "deriveSchema = {0}")
@MethodSource("getDeriveSchema")
void testBatchTableSourceFactory(TernaryBoolean deriveSchema) {
DescriptorProperties descriptor = createDescriptor(testingSchema, deriveSchema);
TableSource sink = createTableSource(descriptor);

assertThat(sink).isInstanceOf(CsvTableSource.class);
assertThat(sink.getProducedDataType()).isEqualTo(testingSchema.toRowDataType());
}

private DescriptorProperties createDescriptor(TableSchema schema) {
private DescriptorProperties createDescriptor(TableSchema schema, TernaryBoolean deriveSchema) {
Map<String, String> properties = new HashMap<>();
properties.put("connector.type", "filesystem");
properties.put("connector.property-version", "1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.util.InstantiationUtil;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -52,7 +52,7 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for {@link DataGenTableSourceFactory}. */
public class DataGenTableSourceFactoryTest {
class DataGenTableSourceFactoryTest {

private static final ResolvedSchema SCHEMA =
ResolvedSchema.of(
Expand All @@ -62,7 +62,7 @@ public class DataGenTableSourceFactoryTest {
Column.physical("f3", DataTypes.TIMESTAMP()));

@Test
public void testDataTypeCoverage() throws Exception {
void testDataTypeCoverage() throws Exception {
ResolvedSchema schema =
ResolvedSchema.of(
Column.physical("f0", DataTypes.CHAR(1)),
Expand Down Expand Up @@ -129,7 +129,7 @@ public void testDataTypeCoverage() throws Exception {
}

@Test
public void testSource() throws Exception {
void testSource() throws Exception {
DescriptorProperties descriptor = new DescriptorProperties();
descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
descriptor.putLong(DataGenConnectorOptions.ROWS_PER_SECOND.key(), 100);
Expand Down Expand Up @@ -204,7 +204,7 @@ private List<RowData> runGenerator(ResolvedSchema schema, DescriptorProperties d
}

@Test
public void testSequenceCheckpointRestore() throws Exception {
void testSequenceCheckpointRestore() throws Exception {
DescriptorProperties descriptor = new DescriptorProperties();
descriptor.putString(FactoryUtil.CONNECTOR.key(), "datagen");
descriptor.putString(
Expand Down Expand Up @@ -241,7 +241,7 @@ public void testSequenceCheckpointRestore() throws Exception {
}

@Test
public void testLackStartForSequence() {
void testLackStartForSequence() {
assertThatThrownBy(
() -> {
DescriptorProperties descriptor = new DescriptorProperties();
Expand All @@ -268,7 +268,7 @@ public void testLackStartForSequence() {
}

@Test
public void testLackEndForSequence() {
void testLackEndForSequence() {
assertThatThrownBy(
() -> {
DescriptorProperties descriptor = new DescriptorProperties();
Expand All @@ -295,7 +295,7 @@ public void testLackEndForSequence() {
}

@Test
public void testWrongKey() {
void testWrongKey() {
assertThatThrownBy(
() -> {
DescriptorProperties descriptor = new DescriptorProperties();
Expand All @@ -313,7 +313,7 @@ public void testWrongKey() {
}

@Test
public void testWrongStartInRandom() {
void testWrongStartInRandom() {
assertThatThrownBy(
() -> {
DescriptorProperties descriptor = new DescriptorProperties();
Expand All @@ -340,7 +340,7 @@ public void testWrongStartInRandom() {
}

@Test
public void testWrongLenInRandomLong() {
void testWrongLenInRandomLong() {
assertThatThrownBy(
() -> {
DescriptorProperties descriptor = new DescriptorProperties();
Expand All @@ -367,7 +367,7 @@ public void testWrongLenInRandomLong() {
}

@Test
public void testWrongTypes() {
void testWrongTypes() {
assertThatThrownBy(
() -> {
DescriptorProperties descriptor = new DescriptorProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.table.types.utils.TypeConversions;

/** Tests for {@link CsvTableSource}. */
public class CsvTableSourceTest extends TableSourceTestBase {
class CsvTableSourceTest extends TableSourceTestBase {

@Override
protected TableSource<?> createTableSource(TableSchema requestedSchema) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

org.apache.flink.util.TestLoggerExtension
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import static org.assertj.core.api.Assertions.assertThat;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assume.assumeThat;
import static org.assertj.core.api.Assumptions.assumeThat;

/** Collection of tests that verify assumptions that table sources should meet. */
public abstract class TableSourceTestBase {
Expand All @@ -46,10 +45,10 @@ public abstract class TableSourceTestBase {
* <p>Required by {@code PushProjectIntoTableSourceScanRule}.
*/
@Test
public void testEmptyProjection() {
void testEmptyProjection() {
TableSource<?> source =
createTableSource(TableSchema.builder().field("f0", DataTypes.INT()).build());
assumeThat(source, instanceOf(ProjectableTableSource.class));
assumeThat(source).isInstanceOf(ProjectableTableSource.class);

ProjectableTableSource<?> projectableTableSource = (ProjectableTableSource<?>) source;

Expand All @@ -64,15 +63,15 @@ public void testEmptyProjection() {
* <p>Required by {@code PushProjectIntoTableSourceScanRule}.
*/
@Test
public void testProjectionReturnsDifferentSource() {
void testProjectionReturnsDifferentSource() {
TableSource<?> source =
createTableSource(
TableSchema.builder()
.field("f0", DataTypes.INT())
.field("f1", DataTypes.STRING())
.field("f2", DataTypes.BIGINT())
.build());
assumeThat(source, instanceOf(ProjectableTableSource.class));
assumeThat(source).isInstanceOf(ProjectableTableSource.class);

ProjectableTableSource<?> projectableTableSource = (ProjectableTableSource<?>) source;

Expand Down

0 comments on commit d993232

Please sign in to comment.