diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala index 455c02484bdd6..36b2d42ec4a65 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/GroupWindowITCase.scala @@ -384,6 +384,7 @@ class GroupWindowITCase(mode: StateBackendMode, useTimestampLtz: Boolean) @Test def testWindowAggregateOnUpsertSource(): Unit = { + env.setParallelism(1) val upsertSourceDataId = registerData(upsertSourceCurrencyData) tEnv.executeSql( s""" @@ -417,7 +418,8 @@ class GroupWindowITCase(mode: StateBackendMode, useTimestampLtz: Boolean) val expected = Seq( "US Dollar,1,102,1970-01-01T00:00,1970-01-01T00:00:05", "Yen,1,1,1970-01-01T00:00,1970-01-01T00:00:05", - "Euro,1,118,1970-01-01T00:00:15,1970-01-01T00:00:20") + "Euro,1,118,1970-01-01T00:00:15,1970-01-01T00:00:20", + "RMB,1,702,1970-01-01T00:00,1970-01-01T00:00:05") assertEquals(expected.sorted, sink.getAppendResults.sorted) } @@ -472,6 +474,7 @@ class GroupWindowITCase(mode: StateBackendMode, useTimestampLtz: Boolean) @Test def testWindowAggregateOnUpsertSourcePushdownWatermark(): Unit = { + env.setParallelism(1) val upsertSourceDataId = registerData(upsertSourceCurrencyData) tEnv.executeSql( s""" @@ -501,7 +504,7 @@ class GroupWindowITCase(mode: StateBackendMode, useTimestampLtz: Boolean) tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink) env.execute() val expected = Seq( - "1970-01-01T00:00,1970-01-01T00:00:05,102", + "1970-01-01T00:00,1970-01-01T00:00:05,702", "1970-01-01T00:00:15,1970-01-01T00:00:20,118") assertEquals(expected.sorted, sink.getAppendResults.sorted) }