From a71133945e66ec471b4cb07da0693d545c40923c Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Wed, 28 Sep 2016 10:31:59 +0800 Subject: [PATCH] [FLINK-4068] [table] Move constant computations out of code-generated This closes #2560. --- .../api/table/BatchTableEnvironment.scala | 48 ++++---- .../flink/api/table/FlinkRelBuilder.scala | 5 +- .../api/table/StreamTableEnvironment.scala | 40 ++++--- .../flink/api/table/TableEnvironment.scala | 5 +- .../api/table/plan/logical/operators.scala | 3 +- .../api/table/plan/nodes/FlinkCalc.scala | 2 +- .../flink/api/scala/batch/ExplainTest.scala | 4 +- .../api/table/BatchTableEnvironmentTest.scala | 102 +++++++++++++++++ .../table/StreamTableEnvironmentTest.scala | 106 ++++++++++++++++++ .../src/test/scala/resources/testJoin0.out | 2 +- .../src/test/scala/resources/testJoin1.out | 2 +- 11 files changed, 277 insertions(+), 42 deletions(-) create mode 100644 flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala create mode 100644 flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala index ad3ff7aeb6143..10c24509b9bdd 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger import org.apache.calcite.plan.RelOptPlanner.CannotPlanException import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rel.RelNode import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.Programs @@ -228,21 +229,12 @@ abstract class BatchTableEnvironment( } /** - * Translates a [[Table]] into a [[DataSet]]. + * Generates the optimized [[RelNode]] tree from the original relational node tree. * - * The transformation involves optimizing the relational expression tree as defined by - * Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators. - * - * @param table The root node of the relational expression tree. - * @param tpe The [[TypeInformation]] of the resulting [[DataSet]]. - * @tparam A The type of the resulting [[DataSet]]. - * @return The [[DataSet]] that corresponds to the translated [[Table]]. + * @param relNode The original [[RelNode]] tree + * @return The optimized [[RelNode]] tree */ - protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = { - - validateType(tpe) - - val relNode = table.getRelNode + private[flink] def optimize(relNode: RelNode): RelNode = { // decorrelate val decorPlan = RelDecorrelator.decorrelateQuery(relNode) @@ -253,8 +245,7 @@ abstract class BatchTableEnvironment( val dataSetPlan = try { optProgram.run(getPlanner, decorPlan, flinkOutputProps) - } - catch { + } catch { case e: CannotPlanException => throw new TableException( s"Cannot generate a valid execution plan for the given query: \n\n" + @@ -263,13 +254,32 @@ abstract class BatchTableEnvironment( s"Please check the documentation for the set of currently supported SQL features.") case t: TableException => throw new TableException( - s"Cannot generate a valid execution plan for the given query: \n\n" + - s"${RelOptUtil.toString(relNode)}\n" + - s"${t.msg}\n" + - s"Please check the documentation for the set of currently supported SQL features.") + s"Cannot generate a valid execution plan for the given query: \n\n" + + s"${RelOptUtil.toString(relNode)}\n" + + s"${t.msg}\n" + + s"Please check the documentation for the set of currently supported SQL features.") case a: AssertionError => throw a.getCause } + dataSetPlan + } + + /** + * Translates a [[Table]] into a [[DataSet]]. + * + * The transformation involves optimizing the relational expression tree as defined by + * Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators. + * + * @param table The root node of the relational expression tree. + * @param tpe The [[TypeInformation]] of the resulting [[DataSet]]. + * @tparam A The type of the resulting [[DataSet]]. + * @return The [[DataSet]] that corresponds to the translated [[Table]]. + */ + protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = { + + validateType(tpe) + + val dataSetPlan = optimize(table.getRelNode) dataSetPlan match { case node: DataSetRel => diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala index 3827f05ff9571..34ed4ce7734de 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/FlinkRelBuilder.scala @@ -21,8 +21,8 @@ package org.apache.flink.api.table import org.apache.calcite.jdbc.CalciteSchema import org.apache.calcite.plan.{Context, RelOptCluster, RelOptPlanner, RelOptSchema} import org.apache.calcite.prepare.CalciteCatalogReader -import org.apache.calcite.rex.RexBuilder -import org.apache.calcite.schema.SchemaPlus +import org.apache.calcite.rex.{RexExecutorImpl, RexBuilder} +import org.apache.calcite.schema.{Schemas, SchemaPlus} import org.apache.calcite.tools.Frameworks.PlannerAction import org.apache.calcite.tools.{FrameworkConfig, Frameworks, RelBuilder} @@ -66,6 +66,7 @@ object FlinkRelBuilder { } }) val planner = clusters(0).getPlanner + planner.setExecutor(config.getExecutor) val defaultRelOptSchema = relOptSchemas(0).asInstanceOf[CalciteCatalogReader] // create Flink type factory diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala index e3e5751a3b2d6..44d90ac85fa00 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala @@ -22,10 +22,12 @@ import java.util.concurrent.atomic.AtomicInteger import org.apache.calcite.plan.RelOptPlanner.CannotPlanException import org.apache.calcite.plan.RelOptUtil +import org.apache.calcite.rel.RelNode import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.Programs import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.DataSet import org.apache.flink.api.table.expressions.Expression import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode} import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel} @@ -228,22 +230,12 @@ abstract class StreamTableEnvironment( } /** - * Translates a [[Table]] into a [[DataStream]]. - * - * The transformation involves optimizing the relational expression tree as defined by - * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators. + * Generates the optimized [[RelNode]] tree from the original relational node tree. * - * @param table The root node of the relational expression tree. - * @param tpe The [[TypeInformation]] of the resulting [[DataStream]]. - * @tparam A The type of the resulting [[DataStream]]. - * @return The [[DataStream]] that corresponds to the translated [[Table]]. + * @param relNode The root node of the relational expression tree. + * @return The optimized [[RelNode]] tree */ - protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = { - - validateType(tpe) - - val relNode = table.getRelNode - + private[flink] def optimize(relNode: RelNode): RelNode = { // decorrelate val decorPlan = RelDecorrelator.decorrelateQuery(relNode) @@ -262,6 +254,26 @@ abstract class StreamTableEnvironment( s"This exception indicates that the query uses an unsupported SQL feature.\n" + s"Please check the documentation for the set of currently supported SQL features.") } + dataStreamPlan + } + + + /** + * Translates a [[Table]] into a [[DataStream]]. + * + * The transformation involves optimizing the relational expression tree as defined by + * Table API calls and / or SQL queries and generating corresponding [[DataStream]] operators. + * + * @param table The root node of the relational expression tree. + * @param tpe The [[TypeInformation]] of the resulting [[DataStream]]. + * @tparam A The type of the resulting [[DataStream]]. + * @return The [[DataStream]] that corresponds to the translated [[Table]]. + */ + protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = { + + validateType(tpe) + + val dataStreamPlan = optimize(table.getRelNode) dataStreamPlan match { case node: DataStreamRel => diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala index f56df0cd48030..02204b1ea11fe 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala @@ -24,7 +24,8 @@ import java.util.concurrent.atomic.AtomicInteger import org.apache.calcite.config.Lex import org.apache.calcite.plan.RelOptPlanner import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.schema.SchemaPlus +import org.apache.calcite.rex.RexExecutorImpl +import org.apache.calcite.schema.{Schemas, SchemaPlus} import org.apache.calcite.schema.impl.AbstractTable import org.apache.calcite.sql.SqlOperatorTable import org.apache.calcite.sql.parser.SqlParser @@ -77,6 +78,8 @@ abstract class TableEnvironment(val config: TableConfig) { .costFactory(new DataSetCostFactory) .typeSystem(new FlinkTypeSystem) .operatorTable(sqlOperatorTable) + // set the executor to evaluate constant expressions + .executor(new RexExecutorImpl(Schemas.createDataContext(null))) .build // the builder for Calcite RelNodes, Calcite's representation of a relational expression tree. diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala index ccdab85cf589a..066e9d6aa92d2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala @@ -87,7 +87,8 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend // Calcite's RelBuilder does not translate identity projects even if they rename fields. // Add a projection ourselves (will be automatically removed by translation rules). val project = LogicalProject.create(relBuilder.peek(), - projectList.map(_.toRexNode(relBuilder)).asJava, + // avoid AS call + projectList.map(_.asInstanceOf[Alias].child.toRexNode(relBuilder)).asJava, projectList.map(_.name).asJava) relBuilder.build() // pop previous relNode relBuilder.push(project) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala index aa5492f9b3463..d5f80105bb7f2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala @@ -134,7 +134,7 @@ trait FlinkCalc { val proj = calcProgram.getProjectList.asScala.toList val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList val localExprs = calcProgram.getExprList.asScala.toList - val outFields = calcProgram.getInputRowType.getFieldNames.asScala.toList + val outFields = calcProgram.getOutputRowType.getFieldNames.asScala.toList proj .map(expression(_, inFields, Some(localExprs))) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala index ab70ec53190d2..9d00dda8e8b6d 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala @@ -72,7 +72,7 @@ class ExplainTest val result = tEnv.explain(table).replaceAll("\\r\\n", "\n") val source = scala.io.Source.fromFile(testFilePath + "../../src/test/scala/resources/testJoin0.out").mkString.replaceAll("\\r\\n", "\n") - assertEquals(result, source) + assertEquals(source, result) } @Test @@ -87,7 +87,7 @@ class ExplainTest val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n") val source = scala.io.Source.fromFile(testFilePath + "../../src/test/scala/resources/testJoin1.out").mkString.replaceAll("\\r\\n", "\n") - assertEquals(result, source) + assertEquals(source, result) } @Test diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala new file mode 100644 index 0000000000000..0344deec9d8d0 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/BatchTableEnvironmentTest.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table + +import org.apache.flink.api.scala.ExecutionEnvironment +import org.apache.flink.api.scala.table._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.junit.Assert._ +import org.junit.Test + + +class BatchTableEnvironmentTest { + + @Test + def testReduceExpressionForSQL(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val sqlQuery = "SELECT " + + "(3+4)+a, " + + "b+(1+2), " + + "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " + + "TRIM(BOTH ' STRING '), " + + "'test' || 'string', " + + "NULLIF(1, 1), " + + "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " + + "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " + + "1 IS NULL, " + + "'TEST' LIKE '%EST', " + + "FLOOR(2.5), " + + "'TEST' IN ('west', 'TEST', 'rest') " + + "FROM MyTable WHERE a>(1+7)" + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) + tEnv.registerTable("MyTable", ds) + + val table = tEnv.sql(sqlQuery) + + val optimized = tEnv.optimize(table.getRelNode) + val optimizedString = optimized.toString + assertTrue(optimizedString.contains(">(_1, 8)")) + assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0")) + assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1")) + assertTrue(optimizedString.contains("'b' AS EXPR$2")) + assertTrue(optimizedString.contains("'STRING' AS EXPR$3")) + assertTrue(optimizedString.contains("'teststring' AS EXPR$4")) + assertTrue(optimizedString.contains("null AS EXPR$5")) + assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6")) + assertTrue(optimizedString.contains("19 AS EXPR$7")) + assertTrue(optimizedString.contains("false AS EXPR$8")) + assertTrue(optimizedString.contains("true AS EXPR$9")) + assertTrue(optimizedString.contains("2 AS EXPR$10")) + assertTrue(optimizedString.contains("true AS EXPR$11")) + } + + @Test + def testReduceExpressionForTableAPI(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c) + + val table = ds + .where('a > (1+7)) + .select((3+4).toExpr + 6, + (11 === 1) ? ("a", "b"), + " STRING ".trim, + "test" + "string", + "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second, + 1.isNull, + "TEST".like("%EST"), + 2.5.toExpr.floor()) + + + val optimized = tEnv.optimize(table.getRelNode) + val optimizedString = optimized.toString + assertTrue(optimizedString.contains(">(_1, 8)")) + assertTrue(optimizedString.contains("13 AS _c0")) + assertTrue(optimizedString.contains("'b' AS _c1")) + assertTrue(optimizedString.contains("'STRING' AS _c2")) + assertTrue(optimizedString.contains("'teststring' AS _c3")) + assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4")) + assertTrue(optimizedString.contains("false AS _c5")) + assertTrue(optimizedString.contains("true AS _c6")) + assertTrue(optimizedString.contains("2E0 AS _c7")) + } + +} diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala new file mode 100644 index 0000000000000..52bf9ace3e2e4 --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/table/StreamTableEnvironmentTest.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table + +import org.apache.flink.api.scala.stream.utils.StreamTestData +import org.apache.flink.api.scala.table._ +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase +import org.junit.Test +import org.junit.Assert._ + + + +class StreamTableEnvironmentTest extends StreamingMultipleProgramsTestBase{ + + @Test + def testReduceExpression(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val sqlQuery = "SELECT STREAM " + + "(3+4)+a, " + + "b+(1+2), " + + "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " + + "TRIM(BOTH ' STRING '), " + + "'test' || 'string', " + + "NULLIF(1, 1), " + + "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " + + "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " + + "1 IS NULL, " + + "'TEST' LIKE '%EST', " + + "FLOOR(2.5), " + + "'TEST' IN ('west', 'TEST', 'rest') " + + "FROM MyTable WHERE a>(1+7)" + + val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) + tEnv.registerTable("MyTable", t) + + val table = tEnv.sql(sqlQuery) + + val optimized = tEnv.optimize(table.getRelNode) + val optimizedString = optimized.toString + assertTrue(optimizedString.contains(">(_1, 8)")) + assertTrue(optimizedString.contains("+(7, _1) AS EXPR$0")) + assertTrue(optimizedString.contains("+(_2, 3) AS EXPR$1")) + assertTrue(optimizedString.contains("'b' AS EXPR$2")) + assertTrue(optimizedString.contains("'STRING' AS EXPR$3")) + assertTrue(optimizedString.contains("'teststring' AS EXPR$4")) + assertTrue(optimizedString.contains("null AS EXPR$5")) + assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS EXPR$6")) + assertTrue(optimizedString.contains("19 AS EXPR$7")) + assertTrue(optimizedString.contains("false AS EXPR$8")) + assertTrue(optimizedString.contains("true AS EXPR$9")) + assertTrue(optimizedString.contains("2 AS EXPR$10")) + assertTrue(optimizedString.contains("true AS EXPR$11")) + } + + @Test + def testReduceExpressionForTableAPI(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + + val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) + + val table = t + .where('a > (1+7)) + .select((3+4).toExpr + 6, + (11 === 1) ? ("a", "b"), + " STRING ".trim, + "test" + "string", + "1990-10-14 23:00:00.123".toTimestamp + 10.day + 1.second, + 1.isNull, + "TEST".like("%EST"), + 2.5.toExpr.floor()) + + + val optimized = tEnv.optimize(table.getRelNode) + val optimizedString = optimized.toString + assertTrue(optimizedString.contains(">(_1, 8)")) + assertTrue(optimizedString.contains("13 AS _c0")) + assertTrue(optimizedString.contains("'b' AS _c1")) + assertTrue(optimizedString.contains("'STRING' AS _c2")) + assertTrue(optimizedString.contains("'teststring' AS _c3")) + assertTrue(optimizedString.contains("1990-10-24 23:00:01 AS _c4")) + assertTrue(optimizedString.contains("false AS _c5")) + assertTrue(optimizedString.contains("true AS _c6")) + assertTrue(optimizedString.contains("2E0 AS _c7")) + } + +} diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out index f71ea9f24482a..11961ef9793c2 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out @@ -36,7 +36,7 @@ Stage 6 : Data Source Partitioning : RANDOM_PARTITIONED Stage 1 : FlatMap - content : select: (a, c AS b) + content : select: (a, c) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : FlatMap diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out index f117cd90c0252..c6e8b34cabd85 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out @@ -121,7 +121,7 @@ Stage 6 : Data Source Filter Factor : (none) Stage 1 : FlatMap - content : select: (a, c AS b) + content : select: (a, c) ship_strategy : Forward exchange_mode : PIPELINED driver_strategy : FlatMap