From 6f9633cd9d3124e2abe5034806fd48cb130e2d93 Mon Sep 17 00:00:00 2001 From: Kurt Young Date: Tue, 6 Dec 2016 09:23:22 +0800 Subject: [PATCH] [FLINK-5257] [table] Include optimized logical plan in explain(). This closes #2949. --- .../api/table/BatchTableEnvironment.scala | 44 ++++++++++++------- .../api/table/StreamTableEnvironment.scala | 44 ++++++++++++------- .../src/test/scala/resources/testFilter0.out | 4 ++ .../src/test/scala/resources/testFilter1.out | 4 ++ .../scala/resources/testFilterStream0.out | 4 ++ .../src/test/scala/resources/testJoin0.out | 6 +++ .../src/test/scala/resources/testJoin1.out | 6 +++ .../src/test/scala/resources/testUnion0.out | 5 +++ .../src/test/scala/resources/testUnion1.out | 5 +++ .../test/scala/resources/testUnionStream0.out | 5 +++ 10 files changed, 96 insertions(+), 31 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala index 24b385ce535d0..918b01b66db05 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/BatchTableEnvironment.scala @@ -26,9 +26,9 @@ import org.apache.calcite.rel.RelNode import org.apache.calcite.sql2rel.RelDecorrelator import org.apache.calcite.tools.{Programs, RuleSet} import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} import org.apache.flink.api.java.io.DiscardingOutputFormat import org.apache.flink.api.java.typeutils.TypeExtractor +import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} import org.apache.flink.api.table.explain.PlanJsonParser import org.apache.flink.api.table.expressions.Expression import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode} @@ -166,22 +166,25 @@ abstract class BatchTableEnvironment( * @param extended Flag to include detailed optimizer estimates. */ private[flink] def explain(table: Table, extended: Boolean): String = { - - val ast = RelOptUtil.toString(table.getRelNode) - val dataSet = translate[Row](table)(TypeExtractor.createTypeInfo(classOf[Row])) + val ast = table.getRelNode + val optimizedPlan = optimize(ast) + val dataSet = translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row])) dataSet.output(new DiscardingOutputFormat[Row]) val env = dataSet.getExecutionEnvironment val jasonSqlPlan = env.getExecutionPlan val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended) s"== Abstract Syntax Tree ==" + - System.lineSeparator + - s"$ast" + - System.lineSeparator + - s"== Physical Execution Plan ==" + - System.lineSeparator + - s"$sqlPlan" - + System.lineSeparator + + s"${RelOptUtil.toString(ast)}" + + System.lineSeparator + + s"== Optimized Logical Plan ==" + + System.lineSeparator + + s"${RelOptUtil.toString(optimizedPlan)}" + + System.lineSeparator + + s"== Physical Execution Plan ==" + + System.lineSeparator + + s"$sqlPlan" } /** @@ -275,17 +278,27 @@ abstract class BatchTableEnvironment( * Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators. * * @param table The root node of the relational expression tree. - * @param tpe The [[TypeInformation]] of the resulting [[DataSet]]. + * @param tpe The [[TypeInformation]] of the resulting [[DataSet]]. * @tparam A The type of the resulting [[DataSet]]. * @return The [[DataSet]] that corresponds to the translated [[Table]]. */ protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = { + val dataSetPlan = optimize(table.getRelNode) + translate(dataSetPlan) + } + /** + * Translates a logical [[RelNode]] into a [[DataSet]]. + * + * @param logicalPlan The root node of the relational expression tree. + * @param tpe The [[TypeInformation]] of the resulting [[DataSet]]. + * @tparam A The type of the resulting [[DataSet]]. + * @return The [[DataSet]] that corresponds to the translated [[Table]]. + */ + protected def translate[A](logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataSet[A] = { validateType(tpe) - val dataSetPlan = optimize(table.getRelNode) - - dataSetPlan match { + logicalPlan match { case node: DataSetRel => node.translateToPlan( this, @@ -294,5 +307,4 @@ abstract class BatchTableEnvironment( case _ => ??? } } - } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala index bca8d79b0d301..8f00586d700b3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/StreamTableEnvironment.scala @@ -32,8 +32,8 @@ import org.apache.flink.api.table.expressions.Expression import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode} import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel} import org.apache.flink.api.table.plan.rules.FlinkRuleSets +import org.apache.flink.api.table.plan.schema.{DataStreamTable, TableSourceTable} import org.apache.flink.api.table.sinks.{StreamTableSink, TableSink} -import org.apache.flink.api.table.plan.schema.{TableSourceTable, DataStreamTable} import org.apache.flink.api.table.sources.StreamTableSource import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment @@ -291,12 +291,24 @@ abstract class StreamTableEnvironment( * @return The [[DataStream]] that corresponds to the translated [[Table]]. */ protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = { + val dataStreamPlan = optimize(table.getRelNode) + translate(dataStreamPlan) + } - validateType(tpe) + /** + * Translates a logical [[RelNode]] into a [[DataStream]]. + * + * @param logicalPlan The root node of the relational expression tree. + * @param tpe The [[TypeInformation]] of the resulting [[DataStream]]. + * @tparam A The type of the resulting [[DataStream]]. + * @return The [[DataStream]] that corresponds to the translated [[Table]]. + */ + protected def translate[A] + (logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataStream[A] = { - val dataStreamPlan = optimize(table.getRelNode) + validateType(tpe) - dataStreamPlan match { + logicalPlan match { case node: DataStreamRel => node.translateToPlan( this, @@ -304,7 +316,6 @@ abstract class StreamTableEnvironment( ).asInstanceOf[DataStream[A]] case _ => ??? } - } /** @@ -314,10 +325,9 @@ abstract class StreamTableEnvironment( * @param table The table for which the AST and execution plan will be returned. */ def explain(table: Table): String = { - - val ast = RelOptUtil.toString(table.getRelNode) - - val dataStream = translate[Row](table)(TypeExtractor.createTypeInfo(classOf[Row])) + val ast = table.getRelNode + val optimizedPlan = optimize(ast) + val dataStream = translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row])) val env = dataStream.getExecutionEnvironment val jsonSqlPlan = env.getExecutionPlan @@ -325,12 +335,16 @@ abstract class StreamTableEnvironment( val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jsonSqlPlan, false) s"== Abstract Syntax Tree ==" + - System.lineSeparator + - s"$ast" + - System.lineSeparator + - s"== Physical Execution Plan ==" + - System.lineSeparator + - s"$sqlPlan" + System.lineSeparator + + s"${RelOptUtil.toString(ast)}" + + System.lineSeparator + + s"== Optimized Logical Plan ==" + + System.lineSeparator + + s"${RelOptUtil.toString(optimizedPlan)}" + + System.lineSeparator + + s"== Physical Execution Plan ==" + + System.lineSeparator + + s"$sqlPlan" } } diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out b/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out index b3786d9a1a182..b6ea86f467646 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testFilter0.out @@ -2,6 +2,10 @@ LogicalFilter(condition=[=(MOD($0, 2), 0)]) LogicalTableScan(table=[[_DataSetTable_0]]) +== Optimized Logical Plan == +DataSetCalc(select=[a, b], where=[=(MOD(a, 2), 0)]) + DataSetScan(table=[[_DataSetTable_0]]) + == Physical Execution Plan == Stage 3 : Data Source content : collect elements with CollectionInputFormat diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out b/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out index 104946685cf3c..719edd9e10beb 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testFilter1.out @@ -2,6 +2,10 @@ LogicalFilter(condition=[=(MOD($0, 2), 0)]) LogicalTableScan(table=[[_DataSetTable_0]]) +== Optimized Logical Plan == +DataSetCalc(select=[a, b], where=[=(MOD(a, 2), 0)]) + DataSetScan(table=[[_DataSetTable_0]]) + == Physical Execution Plan == Stage 3 : Data Source content : collect elements with CollectionInputFormat diff --git a/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out index 20ae2b1ed624d..022f6c9b05e81 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testFilterStream0.out @@ -2,6 +2,10 @@ LogicalFilter(condition=[=(MOD($0, 2), 0)]) LogicalTableScan(table=[[_DataStreamTable_0]]) +== Optimized Logical Plan == +DataStreamCalc(select=[a, b], where=[=(MOD(a, 2), 0)]) + DataStreamScan(table=[[_DataStreamTable_0]]) + == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out index 11961ef9793c2..4f091f693e595 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin0.out @@ -5,6 +5,12 @@ LogicalProject(a=[$0], c=[$2]) LogicalTableScan(table=[[_DataSetTable_0]]) LogicalTableScan(table=[[_DataSetTable_1]]) +== Optimized Logical Plan == +DataSetCalc(select=[a, c]) + DataSetJoin(where=[=(b, d)], join=[a, b, c, d], joinType=[InnerJoin]) + DataSetScan(table=[[_DataSetTable_0]]) + DataSetScan(table=[[_DataSetTable_1]]) + == Physical Execution Plan == Stage 4 : Data Source content : collect elements with CollectionInputFormat diff --git a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out index c6e8b34cabd85..e9dad5729fcad 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testJoin1.out @@ -5,6 +5,12 @@ LogicalProject(a=[$0], c=[$2]) LogicalTableScan(table=[[_DataSetTable_0]]) LogicalTableScan(table=[[_DataSetTable_1]]) +== Optimized Logical Plan == +DataSetCalc(select=[a, c]) + DataSetJoin(where=[=(b, d)], join=[a, b, c, d], joinType=[InnerJoin]) + DataSetScan(table=[[_DataSetTable_0]]) + DataSetScan(table=[[_DataSetTable_1]]) + == Physical Execution Plan == Stage 4 : Data Source content : collect elements with CollectionInputFormat diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out index d17517fce9d10..5fbd1b559bf43 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testUnion0.out @@ -3,6 +3,11 @@ LogicalUnion(all=[true]) LogicalTableScan(table=[[_DataSetTable_0]]) LogicalTableScan(table=[[_DataSetTable_1]]) +== Optimized Logical Plan == +DataSetUnion(union=[count, word]) + DataSetScan(table=[[_DataSetTable_0]]) + DataSetScan(table=[[_DataSetTable_1]]) + == Physical Execution Plan == Stage 3 : Data Source content : collect elements with CollectionInputFormat diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out b/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out index 875f77b08ea05..d7d343beb0aa2 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testUnion1.out @@ -3,6 +3,11 @@ LogicalUnion(all=[true]) LogicalTableScan(table=[[_DataSetTable_0]]) LogicalTableScan(table=[[_DataSetTable_1]]) +== Optimized Logical Plan == +DataSetUnion(union=[count, word]) + DataSetScan(table=[[_DataSetTable_0]]) + DataSetScan(table=[[_DataSetTable_1]]) + == Physical Execution Plan == Stage 3 : Data Source content : collect elements with CollectionInputFormat diff --git a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out index ac3635d59467b..fc83c0d4dbfd8 100644 --- a/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out +++ b/flink-libraries/flink-table/src/test/scala/resources/testUnionStream0.out @@ -3,6 +3,11 @@ LogicalUnion(all=[true]) LogicalTableScan(table=[[_DataStreamTable_0]]) LogicalTableScan(table=[[_DataStreamTable_1]]) +== Optimized Logical Plan == +DataStreamUnion(union=[count, word]) + DataStreamScan(table=[[_DataStreamTable_0]]) + DataStreamScan(table=[[_DataStreamTable_1]]) + == Physical Execution Plan == Stage 1 : Data Source content : collect elements with CollectionInputFormat