Skip to content

Commit

Permalink
[FLINK-20947][planner] Fix idle source doesn't work when pushing wate…
Browse files Browse the repository at this point in the history
…rmark into the source

This closes apache#14679
  • Loading branch information
fsk119 authored Jan 22, 2021
1 parent 22736dc commit 90e680c
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@

import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.collectRows;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaTableTestUtils.readLines;
import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT;
import static org.apache.flink.table.utils.TableTestMatchers.deepEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
Expand Down Expand Up @@ -822,6 +823,86 @@ public void testPerPartitionWatermarkKafka() throws Exception {
deleteTestTopic(topic);
}

@Test
public void testPerPartitionWatermarkWithIdleSource() throws Exception {
if (isLegacyConnector) {
return;
}
// we always use a different topic name for each parameterized topic,
// in order to make sure the topic can be created.
final String topic = "idle_partition_watermark_topic_" + format;
createTestTopic(topic, 4, 1);

// ---------- Produce an event time stream into Kafka -------------------
String groupId = standardProps.getProperty("group.id");
String bootstraps = standardProps.getProperty("bootstrap.servers");
tEnv.getConfig()
.getConfiguration()
.set(TABLE_EXEC_SOURCE_IDLE_TIMEOUT, Duration.ofMillis(100));

final String createTable =
String.format(
"CREATE TABLE kafka (\n"
+ " `partition_id` INT,\n"
+ " `value` INT,\n"
+ " `timestamp` TIMESTAMP(3),\n"
+ " WATERMARK FOR `timestamp` AS `timestamp`\n"
+ ") WITH (\n"
+ " 'connector' = 'kafka',\n"
+ " 'topic' = '%s',\n"
+ " 'properties.bootstrap.servers' = '%s',\n"
+ " 'properties.group.id' = '%s',\n"
+ " 'scan.startup.mode' = 'earliest-offset',\n"
+ " 'sink.partitioner' = '%s',\n"
+ " 'format' = '%s'\n"
+ ")",
topic, bootstraps, groupId, TestPartitioner.class.getName(), format);

tEnv.executeSql(createTable);

// Only two partitions have elements and others are idle.
// When idle timer triggers, the WatermarkOutputMultiplexer will use the minimum watermark
// among active partitions as the output watermark.
// Therefore, we need to make sure the watermark in the each partition is large enough to
// trigger the window.
String initialValues =
"INSERT INTO kafka\n"
+ "VALUES\n"
+ " (0, 0, TIMESTAMP '2020-03-08 13:12:11.123'),\n"
+ " (0, 1, TIMESTAMP '2020-03-08 13:15:12.223'),\n"
+ " (0, 2, TIMESTAMP '2020-03-08 16:12:13.323'),\n"
+ " (1, 3, TIMESTAMP '2020-03-08 13:13:11.123'),\n"
+ " (1, 4, TIMESTAMP '2020-03-08 13:19:11.133'),\n"
+ " (1, 5, TIMESTAMP '2020-03-08 16:13:11.143')\n";
tEnv.executeSql(initialValues).await();

// ---------- Consume stream from Kafka -------------------

env.setParallelism(1);
String createSink =
"CREATE TABLE MySink(\n"
+ " `id` INT,\n"
+ " `cnt` BIGINT\n"
+ ") WITH (\n"
+ " 'connector' = 'values'\n"
+ ")";
tEnv.executeSql(createSink);
TableResult tableResult =
tEnv.executeSql(
"INSERT INTO MySink\n"
+ "SELECT `partition_id` as `id`, COUNT(`value`) as `cnt`\n"
+ "FROM kafka\n"
+ "GROUP BY `partition_id`, TUMBLE(`timestamp`, INTERVAL '1' HOUR) ");

final List<String> expected = Arrays.asList("+I[0, 2]", "+I[1, 2]");
KafkaTableTestUtils.waitingExpectedResults("MySink", expected, Duration.ofSeconds(5));

// ------------- cleanup -------------------

tableResult.getJobClient().ifPresent(JobClient::cancel);
deleteTestTopic(topic);
}

// --------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ protected FlinkLogicalTableSourceScan getNewScan(
Duration idleTimeout =
configuration.get(ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT);
if (!idleTimeout.isZero() && !idleTimeout.isNegative()) {
watermarkStrategy.withIdleness(idleTimeout);
watermarkStrategy = watermarkStrategy.withIdleness(idleTimeout);
digest = String.format("%s, idletimeout=[%s]", digest, idleTimeout.toMillis());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@
import org.junit.Before;
import org.junit.Test;

import java.time.Duration;

import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_SOURCE_IDLE_TIMEOUT;

/**
* Test rule {@link PushWatermarkIntoTableSourceScanAcrossCalcRule} and {@link
* PushWatermarkIntoTableSourceScanRule}.
*/
public class PushWatermarkIntoTableSourceScanRuleTest extends TableTestBase {
private StreamTableTestUtil util = streamTestUtil(new TableConfig());
private final StreamTableTestUtil util = streamTestUtil(new TableConfig());

@Before
public void setup() {
Expand Down Expand Up @@ -230,4 +234,26 @@ public void testWatermarkOnMetadata() {
util.tableEnv().executeSql(ddl);
util.verifyRelPlan("SELECT * FROM MyTable");
}

@Test
public void testWatermarkWithIdleSource() {
util.tableEnv()
.getConfig()
.getConfiguration()
.set(TABLE_EXEC_SOURCE_IDLE_TIMEOUT, Duration.ofMillis(1000));
String ddl =
"CREATE TABLE MyTable("
+ " a INT,\n"
+ " b BIGINT,\n"
+ " c TIMESTAMP(3),\n"
+ " WATERMARK FOR c AS c - INTERVAL '5' SECOND\n"
+ ") WITH (\n"
+ " 'connector' = 'values',\n"
+ " 'enable-watermark-push-down' = 'true',\n"
+ " 'bounded' = 'false',\n"
+ " 'disable-lookup' = 'true'"
+ ")";
util.tableEnv().executeSql(ddl);
util.verifyRelPlan("select a, c from MyTable");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,43 +136,61 @@ FlinkLogicalCalc(select=[a, b, c, g])
]]>
</Resource>
</TestCase>
<TestCase name="testWatermarkWithMultiInputUdf">
<TestCase name="testWatermarkOnRow">
<Resource name="sql">
<![CDATA[SELECT * FROM MyTable]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
+- LogicalWatermarkAssigner(rowtime=[d], watermark=[func(func($3, $0), $0)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[func($2, $0)])
LogicalProject(a=[$0], b=[$1], c=[$2], e=[$3])
+- LogicalWatermarkAssigner(rowtime=[e], watermark=[-($3, 5000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], e=[$2.d])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
FlinkLogicalCalc(select=[a, b, c, d])
+- FlinkLogicalCalc(select=[a, b, c, Reinterpret(func(c, a)) AS d])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[func(func(func($2, $0), $0), $0)]]], fields=[a, b, c])
FlinkLogicalCalc(select=[a, b, c, e])
+- FlinkLogicalCalc(select=[a, b, c, Reinterpret(c.d) AS e])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-($2.d, 5000:INTERVAL SECOND)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
<TestCase name="testWatermarkOnRow">
<TestCase name="testWatermarkWithIdleSource">
<Resource name="sql">
<![CDATA[select a, c from MyTable]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], c=[$2])
+- LogicalWatermarkAssigner(rowtime=[c], watermark=[-($2, 5000:INTERVAL SECOND)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
FlinkLogicalCalc(select=[a, c])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-($2, 5000:INTERVAL SECOND)], idletimeout=[1000]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
<TestCase name="testWatermarkWithMultiInputUdf">
<Resource name="sql">
<![CDATA[SELECT * FROM MyTable]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], e=[$3])
+- LogicalWatermarkAssigner(rowtime=[e], watermark=[-($3, 5000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], e=[$2.d])
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3])
+- LogicalWatermarkAssigner(rowtime=[d], watermark=[func(func($3, $0), $0)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[func($2, $0)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
FlinkLogicalCalc(select=[a, b, c, e])
+- FlinkLogicalCalc(select=[a, b, c, Reinterpret(c.d) AS e])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[-($2.d, 5000:INTERVAL SECOND)]]], fields=[a, b, c])
FlinkLogicalCalc(select=[a, b, c, d])
+- FlinkLogicalCalc(select=[a, b, c, Reinterpret(func(c, a)) AS d])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MyTable, watermark=[func(func(func($2, $0), $0), $0)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.flink.table.utils.LegacyRowResource
import org.apache.flink.types.Row

import org.junit.Assert.assertEquals
import org.junit.{Ignore, Rule, Test}
import org.junit.{Rule, Test}

import java.sql.Timestamp
import java.time.LocalDateTime
Expand Down

0 comments on commit 90e680c

Please sign in to comment.