Skip to content

Commit

Permalink
[FLINK-21032][table] Not use ParallelFiniteTestSource in CompactionIT…
Browse files Browse the repository at this point in the history
…CaseBase

This closes apache#15538
  • Loading branch information
JingsongLi authored Apr 14, 2021
1 parent 0461e4b commit cf2a4e4
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@

package org.apache.flink.table.planner.runtime.stream.sql;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.table.planner.runtime.utils.ParallelFiniteTestSource;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
Expand Down Expand Up @@ -73,14 +74,16 @@ public void init() throws IOException {

DataStream<Row> stream =
new DataStream<>(
env().getJavaEnv()
.addSource(
new ParallelFiniteTestSource<>(rows),
new RowTypeInfo(
new TypeInformation[] {
Types.INT, Types.STRING, Types.STRING
},
new String[] {"a", "b", "c"})));
env().getJavaEnv()
.addSource(
new FiniteTestSource<>(rows),
new RowTypeInfo(
new TypeInformation[] {
Types.INT, Types.STRING, Types.STRING
},
new String[] {"a", "b", "c"})))
.filter((FilterFunction<Row>) value -> true)
.setParallelism(3); // to parallel tasks

tEnv().createTemporaryView("my_table", stream);
}
Expand All @@ -102,9 +105,13 @@ public void testNonPartition() throws Exception {
}

public void innerTestNonPartition(int parallelism) throws Exception {
env().setParallelism(parallelism);
createTable(resultPath);
tEnv().executeSql("insert into sink_table select * from my_table").await();
String sql =
String.format(
"insert into sink_table /*+ OPTIONS('sink.parallelism' = '%d') */"
+ " select * from my_table",
parallelism);
tEnv().executeSql(sql).await();

assertIterator(tEnv().executeSql("select * from sink_table").collect());

Expand Down

This file was deleted.

0 comments on commit cf2a4e4

Please sign in to comment.