Skip to content

Commit

Permalink
[FLINK-33371] Make TestValues sinks return results as Rows
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys committed Oct 27, 2023
1 parent 838fe2f commit e914eb7
Show file tree
Hide file tree
Showing 39 changed files with 285 additions and 249 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,16 @@ public static String registerRowData(Seq<RowData> data) {
*
* @param tableName the table name of the registered table sink.
*/
public static List<String> getRawResults(String tableName) {
public static List<String> getRawResultsAsStrings(String tableName) {
return TestValuesRuntimeFunctions.getRawResultsAsStrings(tableName);
}

/**
* Returns received raw results of the registered table sink.
*
* @param tableName the table name of the registered table sink.
*/
public static List<Row> getRawResults(String tableName) {
return TestValuesRuntimeFunctions.getRawResults(tableName);
}

Expand All @@ -234,16 +243,25 @@ public static List<String> getRawResults(String tableName) {
*
* <p>The raw results are encoded with {@link RowKind}.
*/
public static List<String> getOnlyRawResults() {
return TestValuesRuntimeFunctions.getOnlyRawResults();
public static List<String> getOnlyRawResultsAsStrings() {
return TestValuesRuntimeFunctions.getOnlyRawResultsAsStrings();
}

/**
* Returns materialized (final) results of the registered table sink.
*
* @param tableName the table name of the registered table sink.
*/
public static List<String> getResultsAsStrings(String tableName) {
return TestValuesRuntimeFunctions.getResultsAsStrings(tableName);
}

/**
* Returns materialized (final) results of the registered table sink.
*
* @param tableName the table name of the registered table sink.
*/
public static List<String> getResults(String tableName) {
public static List<Row> getResults(String tableName) {
return TestValuesRuntimeFunctions.getResults(tableName);
}

Expand Down Expand Up @@ -1936,7 +1954,7 @@ public Optional<Integer> getParallelism() {
@Override
public SinkFunction<RowData> createSinkFunction() {
return new AppendingSinkFunction(
tableName, converter, rowtimeIndex);
tableName, consumedDataType, converter, rowtimeIndex);
}
};
case "OutputFormat":
Expand All @@ -1960,7 +1978,10 @@ public DataStreamSink<?> consumeDataStream(
DataStreamSink<RowData> sink =
dataStream.addSink(
new AppendingSinkFunction(
tableName, converter, rowtimeIndex));
tableName,
consumedDataType,
converter,
rowtimeIndex));
providerContext.generateUid("sink-function").ifPresent(sink::uid);
return sink;
}
Expand Down Expand Up @@ -1989,6 +2010,7 @@ public Optional<Integer> getParallelism() {
sinkFunction =
new KeyedUpsertingSinkFunction(
tableName,
consumedDataType,
converter,
primaryKeyIndices,
Arrays.stream(targetColumns).mapToInt(a -> a[0]).toArray(),
Expand All @@ -2000,7 +2022,8 @@ public Optional<Integer> getParallelism() {
"Retracting Sink doesn't support '"
+ SINK_EXPECTED_MESSAGES_NUM.key()
+ "' yet.");
sinkFunction = new RetractingSinkFunction(tableName, converter);
sinkFunction =
new RetractingSinkFunction(tableName, consumedDataType, converter);
}
return SinkFunctionProvider.of(sinkFunction, this.parallelism);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ void testReplaceTableAS() throws Exception {
.await();

// verify written rows
assertThat(TestValuesTableFactory.getResults("target").toString())
assertThat(TestValuesTableFactory.getResultsAsStrings("target").toString())
.isEqualTo("[+I[1, 1, Hi], +I[2, 2, Hello], +I[3, 2, Hello world]]");

// verify the table after replacing
Expand Down Expand Up @@ -96,7 +96,7 @@ void testCreateOrReplaceTableAS() throws Exception {
.await();

// verify written rows
assertThat(TestValuesTableFactory.getResults("target").toString())
assertThat(TestValuesTableFactory.getResultsAsStrings("target").toString())
.isEqualTo("[+I[1, Hi], +I[2, Hello], +I[3, Hello world]]");

// verify the table after replacing
Expand All @@ -116,7 +116,7 @@ void testCreateOrReplaceTableASWithTableNotExist() throws Exception {
.await();

// verify written rows
assertThat(TestValuesTableFactory.getResults("not_exist_target").toString())
assertThat(TestValuesTableFactory.getResultsAsStrings("not_exist_target").toString())
.isEqualTo("[+I[1, Hi], +I[2, Hello], +I[3, Hello world]]");

// verify the table after replacing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void testBuildRightStringKeyAdaptiveHashJoin() throws Exception {

private void asserResult(String sinkTableName, int resultSize) {
// Due to concern OOM and record value is same, here just assert result size
List<String> result = TestValuesTableFactory.getResults(sinkTableName);
List<String> result = TestValuesTableFactory.getResultsAsStrings(sinkTableName);
assertThat(result.size()).isEqualTo(resultSize);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ void testChangelogSource() throws Exception {
"+I[user1, Tom, [email protected], 8.10, 16.20]",
"+I[user3, Bailey, [email protected], 9.99, 19.98]",
"+I[user4, Tina, [email protected], 11.30, 22.60]");
assertResult(expected, TestValuesTableFactory.getResults("user_sink"));
assertResult(expected, TestValuesTableFactory.getResultsAsStrings("user_sink"));
}

@Test
Expand All @@ -74,7 +74,7 @@ void testToUpsertSource() throws Exception {
"+I[user1, Tom, [email protected], 8.10, 16.20]",
"+I[user3, Bailey, [email protected], 9.99, 19.98]",
"+I[user4, Tina, [email protected], 11.30, 22.60]");
assertResult(expected, TestValuesTableFactory.getResults("user_sink"));
assertResult(expected, TestValuesTableFactory.getResultsAsStrings("user_sink"));
}

// ------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ void testDifferentStateTtlForDifferentOneInputOperator() throws Exception {
"+I[Jerry, 1, 2, 99.9]",
"+I[Olivia, 2, 4, 1100.0]",
"+I[Michael, 1, 3, 599.9]");
assertResult(expected, TestValuesTableFactory.getResults("OrdersStats"));
assertResult(expected, TestValuesTableFactory.getResultsAsStrings("OrdersStats"));
}

@Test
Expand Down Expand Up @@ -187,7 +187,7 @@ void testDifferentStateTtlForSameTwoInputStreamOperator() throws Exception {
List<String> expected =
Arrays.asList(
"+I[1, 1000002, TRUCK]", "+I[1, 1000004, RAIL]", "+I[1, 1000005, AIR]");
assertResult(expected, TestValuesTableFactory.getResults("OrdersShipInfo"));
assertResult(expected, TestValuesTableFactory.getResultsAsStrings("OrdersShipInfo"));
}

private static Map<String, String> getProperties(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ void testSystemFuncByObject() throws ExecutionException, InterruptedException {
"insert into MySink SELECT a, v FROM MyTable, lateral table(STRING_SPLIT(a, ',')) as T(v)";
compileSqlAndExecutePlan(query).await();
List<String> expected = Arrays.asList("+I[1,1,hi, 1]", "+I[1,1,hi, 1]", "+I[1,1,hi, hi]");
assertResult(expected, TestValuesTableFactory.getResults("MySink"));
assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink"));
}

@Test
Expand All @@ -61,7 +61,7 @@ void testSystemFuncByClass() throws ExecutionException, InterruptedException {
"insert into MySink SELECT a, v FROM MyTable, lateral table(STRING_SPLIT(a, ',')) as T(v)";
compileSqlAndExecutePlan(query).await();
List<String> expected = Arrays.asList("+I[1,1,hi, 1]", "+I[1,1,hi, 1]", "+I[1,1,hi, hi]");
assertResult(expected, TestValuesTableFactory.getResults("MySink"));
assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink"));
}

@Test
Expand All @@ -73,7 +73,7 @@ void testTemporaryFuncByObject() throws ExecutionException, InterruptedException
"insert into MySink SELECT a, v FROM MyTable, lateral table(STRING_SPLIT(a, ',')) as T(v)";
compileSqlAndExecutePlan(query).await();
List<String> expected = Arrays.asList("+I[1,1,hi, 1]", "+I[1,1,hi, 1]", "+I[1,1,hi, hi]");
assertResult(expected, TestValuesTableFactory.getResults("MySink"));
assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink"));
}

@Test
Expand All @@ -85,7 +85,7 @@ void testTemporaryFuncByClass() throws ExecutionException, InterruptedException
"insert into MySink SELECT a, v FROM MyTable, lateral table(STRING_SPLIT(a, ',')) as T(v)";
compileSqlAndExecutePlan(query).await();
List<String> expected = Arrays.asList("+I[1,1,hi, 1]", "+I[1,1,hi, 1]", "+I[1,1,hi, hi]");
assertResult(expected, TestValuesTableFactory.getResults("MySink"));
assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink"));
}

@Test
Expand All @@ -99,7 +99,7 @@ void testFilter() throws ExecutionException, InterruptedException {
+ "where try_cast(v as int) > 0";
compileSqlAndExecutePlan(query).await();
List<String> expected = Arrays.asList("+I[1,1,hi, 1]", "+I[1,1,hi, 1]");
assertResult(expected, TestValuesTableFactory.getResults("MySink"));
assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink"));
}

@Test
Expand All @@ -114,6 +114,6 @@ void testUnnest() throws ExecutionException, InterruptedException {
"INSERT INTO MySink SELECT name, nested FROM MyNestedTable CROSS JOIN UNNEST(arr) AS t (nested)";
compileSqlAndExecutePlan(query).await();
List<String> expected = Arrays.asList("+I[Bob, 1]", "+I[Bob, 2]", "+I[Bob, 3]");
assertResult(expected, TestValuesTableFactory.getResults("MySink"));
assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ void testDeduplication() throws Exception {

assertResult(
Arrays.asList("+I[1, terry, pen, 1000]", "+I[4, bob, apple, 4000]"),
TestValuesTableFactory.getRawResults("MySink"));
TestValuesTableFactory.getRawResultsAsStrings("MySink"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void testExpand() throws Exception {
+ "from MyTable group by b")
.await();

List<String> result = TestValuesTableFactory.getResults("MySink");
List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink");
assertResult(Arrays.asList("+I[1, 1, Hi]", "+I[2, 2, Hello world]"), result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void testSimpleAggCallsWithGroupBy() throws Exception {
+ "from MyTable group by b")
.await();

List<String> result = TestValuesTableFactory.getResults("MySink");
List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink");
assertResult(Arrays.asList("+I[1, 1, null, Hi]", "+I[2, 2, 2.0, Hello]"), result);
}

Expand Down Expand Up @@ -124,7 +124,7 @@ void testDistinctAggCalls() throws Exception {
+ "from MyTable group by e")
.await();

List<String> result = TestValuesTableFactory.getResults("MySink");
List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink");
assertResult(
Arrays.asList(
"+I[1, 1, 4, 12, 32, 6.0, 5]",
Expand Down Expand Up @@ -164,7 +164,7 @@ void testUserDefinedAggCallsWithoutMerge() throws Exception {
+ "from MyTable group by e")
.await();

List<String> result = TestValuesTableFactory.getResults("MySink");
List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink");
assertResult(
Arrays.asList("+I[1, 77, 0, 1]", "+I[2, 120, 0, 2]", "+I[3, 58, 0, 3]"), result);
}
Expand Down Expand Up @@ -194,7 +194,7 @@ void testUserDefinedAggCallsWithMerge() throws Exception {
+ "from MyTable group by e")
.await();

List<String> result = TestValuesTableFactory.getResults("MySink");
List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink");
assertResult(
Arrays.asList(
"+I[1, 1, Hallo Welt wie|Hallo|GHI|EFG|DEF]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void testEventTimeTumbleWindow() throws Exception {
+ "GROUP BY name, TUMBLE(rowtime, INTERVAL '5' SECOND)")
.await();

List<String> result = TestValuesTableFactory.getResults("MySink");
List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink");
assertResult(
Arrays.asList(
"+I[a, 2020-10-10T00:00, 2020-10-10T00:00:05, 4, 10, 2]",
Expand All @@ -104,7 +104,7 @@ void testEventTimeHopWindow() throws Exception {
+ "GROUP BY name, HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '10' SECOND)")
.await();

List<String> result = TestValuesTableFactory.getResults("MySink");
List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink");
assertResult(
Arrays.asList(
"+I[a, 1]",
Expand Down Expand Up @@ -132,7 +132,7 @@ void testEventTimeSessionWindow() throws Exception {
+ "GROUP BY name, Session(rowtime, INTERVAL '3' SECOND)")
.await();

List<String> result = TestValuesTableFactory.getResults("MySink");
List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink");
assertResult(
Arrays.asList(
"+I[a, 1]", "+I[a, 4]", "+I[b, 1]", "+I[b, 1]", "+I[b, 2]", "+I[null, 1]"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ void testIncrementalAggregate() throws IOException, ExecutionException, Interrup
+ "from MyTable group by b")
.await();

List<String> result = TestValuesTableFactory.getResults("MySink");
List<String> result = TestValuesTableFactory.getResultsAsStrings("MySink");
assertResult(Arrays.asList("+I[1, 1]", "+I[2, 2]"), result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ void testProcessTimeInnerJoin() throws Exception {
"+I[1, HiHi, Hi6]",
"+I[1, HiHi, Hi8]",
"+I[2, HeHe, Hi5]");
assertResult(expected, TestValuesTableFactory.getResults("MySink"));
assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink"));
}

@Test
Expand Down Expand Up @@ -112,6 +112,6 @@ void testRowTimeInnerJoin() throws Exception {
"+I[1, HiHi, Hi2]",
"+I[1, HiHi, Hi3]",
"+I[2, HeHe, Hi5]");
assertResult(expected, TestValuesTableFactory.getResults("MySink"));
assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ void testIsNullInnerJoinWithNullCond() throws Exception {
"+I[1, HiHi, Hi8]",
"+I[2, HeHe, Hi5]",
"+I[null, HeHe, Hi9]");
assertResult(expected, TestValuesTableFactory.getResults("MySink"));
assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink"));
}

@Test
Expand All @@ -132,7 +132,7 @@ void testJoin() throws Exception {
List<String> expected =
Arrays.asList(
"+I[Hello world, Hallo Welt]", "+I[Hello, Hallo Welt]", "+I[Hi, Hallo]");
assertResult(expected, TestValuesTableFactory.getResults("MySink"));
assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink"));
}

@Test
Expand All @@ -141,7 +141,7 @@ void testInnerJoin() throws Exception {
compileSqlAndExecutePlan("insert into MySink \n" + "SELECT a1, b1 FROM A JOIN B ON a1 = b1")
.await();
List<String> expected = Arrays.asList("+I[1, 1]", "+I[2, 2]", "+I[2, 2]");
assertResult(expected, TestValuesTableFactory.getResults("MySink"));
assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink"));
}

@Test
Expand All @@ -152,7 +152,7 @@ void testJoinWithFilter() throws Exception {
+ "SELECT a3, b4 FROM A, B where a2 = b2 and a2 < 2")
.await();
List<String> expected = Arrays.asList("+I[Hi, Hallo]");
assertResult(expected, TestValuesTableFactory.getResults("MySink"));
assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink"));
}

@Test
Expand All @@ -163,6 +163,6 @@ void testInnerJoinWithDuplicateKey() throws Exception {
+ "SELECT a1, b1, b3 FROM A JOIN B ON a1 = b1 AND a1 = b3")
.await();
List<String> expected = Arrays.asList("+I[2, 2, 2]");
assertResult(expected, TestValuesTableFactory.getResults("MySink"));
assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,6 @@ void testLimit() throws ExecutionException, InterruptedException, IOException {
compileSqlAndExecutePlan(sql).await();

List<String> expected = Arrays.asList("+I[2, a, 6]", "+I[4, b, 8]", "+I[6, c, 10]");
assertResult(expected, TestValuesTableFactory.getResults("result"));
assertResult(expected, TestValuesTableFactory.getResultsAsStrings("result"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ void testJoinLookupTable() throws Exception {
"+I[1, 12, Julian, Julian]",
"+I[2, 15, Hello, Jark]",
"+I[3, 15, Fabian, Fabian]");
assertResult(expected, TestValuesTableFactory.getResults("MySink"));
assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink"));
}

@Test
Expand All @@ -91,7 +91,7 @@ void testJoinLookupTableWithPushDown() throws Exception {
.await();
List<String> expected =
Arrays.asList("+I[2, 15, Hello, Jark]", "+I[3, 15, Fabian, Fabian]");
assertResult(expected, TestValuesTableFactory.getResults("MySink"));
assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink"));
}

@Test
Expand All @@ -108,6 +108,6 @@ void testLeftJoinLookupTableWithPreFilter() throws Exception {
"+I[3, 15, Fabian, null]",
"+I[8, 11, Hello world, null]",
"+I[9, 12, Hello world!, null]");
assertResult(expected, TestValuesTableFactory.getResults("MySink"));
assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink"));
}
}
Loading

0 comments on commit e914eb7

Please sign in to comment.