Skip to content

Commit

Permalink
[hotfix][table-planner][tests] Use final for variables in `CommonExec…
Browse files Browse the repository at this point in the history
…SinkITCase`
  • Loading branch information
matriv authored and twalthr committed Feb 7, 2022
1 parent b795dd5 commit a2fa1e3
Showing 1 changed file with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void testStreamRecordTimestampInserterSinkRuntimeProvider()
.sink(buildRuntimeSinkProvider(new TestTimestampWriter(timestamps)))
.build();
tableEnv.createTable("T1", sourceDescriptor);
String sqlStmt = "INSERT INTO T1 SELECT * FROM T1";
final String sqlStmt = "INSERT INTO T1 SELECT * FROM T1";
assertPlan(tableEnv, sqlStmt, true);
tableEnv.executeSql(sqlStmt).await();
assertTimestampResults(timestamps, rows);
Expand Down Expand Up @@ -159,7 +159,7 @@ public void invoke(
})
.build();
tableEnv.createTable("T1", sourceDescriptor);
String sqlStmt = "INSERT INTO T1 SELECT * FROM T1";
final String sqlStmt = "INSERT INTO T1 SELECT * FROM T1";
assertPlan(tableEnv, sqlStmt, true);
tableEnv.executeSql(sqlStmt).await();
Collections.sort(timestamps.get());
Expand Down Expand Up @@ -201,7 +201,7 @@ public void testUnifiedSinksAreUsableWithDataStreamSinkProvider()
.sink(buildDataStreamSinkProvider(fetched))
.build();
tableEnv.createTable("T1", sourceDescriptor);
String sqlStmt = "INSERT INTO T1 SELECT * FROM T1";
final String sqlStmt = "INSERT INTO T1 SELECT * FROM T1";
tableEnv.executeSql(sqlStmt).await();
final List<Integer> fetchedRows =
fetched.get().stream().map(r -> r.getInt(0)).sorted().collect(Collectors.toList());
Expand Down Expand Up @@ -389,7 +389,7 @@ public void testNullEnforcer() throws ExecutionException, InterruptedException {
.build());

// Default config - ignore (no trim)
ExecutionException ee =
final ExecutionException ee =
assertThrows(
ExecutionException.class,
() -> tableEnv.executeSql("INSERT INTO T1 SELECT * FROM T1").await());
Expand All @@ -404,7 +404,7 @@ public void testNullEnforcer() throws ExecutionException, InterruptedException {

// Test not including a NOT NULL column
results.get().clear();
ValidationException ve =
final ValidationException ve =
assertThrows(
ValidationException.class,
() ->
Expand Down Expand Up @@ -530,8 +530,8 @@ private static void assertPlan(
StreamTableEnvironment tableEnv,
String sql,
boolean containsStreamRecordTimestampInserter) {
String explainStr = tableEnv.explainSql(sql, ExplainDetail.JSON_EXECUTION_PLAN);
String containedStr = "StreamRecordTimestampInserter(rowtime field: 2";
final String explainStr = tableEnv.explainSql(sql, ExplainDetail.JSON_EXECUTION_PLAN);
final String containedStr = "StreamRecordTimestampInserter(rowtime field: 2";
if (containsStreamRecordTimestampInserter) {
assertThat(explainStr).contains(containedStr);
} else {
Expand All @@ -540,7 +540,7 @@ private static void assertPlan(
}

private static Schema schemaStreamRecordTimestampInserter(boolean withWatermark) {
Schema.Builder builder =
final Schema.Builder builder =
Schema.newBuilder()
.column("a", "INT")
.column("b", "STRING")
Expand Down Expand Up @@ -605,7 +605,7 @@ public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(
context.createDataStructureConverter(
getFactoryContext().getPhysicalRowDataType());

return SourceFunctionProvider.of(new TestSourceFunction(rows, converter), true);
return SourceFunctionProvider.of(new TestSourceFunction(rows, converter), false);
}
}

Expand Down

0 comments on commit a2fa1e3

Please sign in to comment.