Skip to content

Commit

Permalink
[FLINK-5257] [table] Include optimized logical plan in explain().
Browse files Browse the repository at this point in the history
This closes apache#2949.
  • Loading branch information
KurtYoung authored and fhueske committed Dec 6, 2016
1 parent ebe228d commit 6f9633c
Show file tree
Hide file tree
Showing 10 changed files with 96 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.{Programs, RuleSet}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.table.explain.PlanJsonParser
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
Expand Down Expand Up @@ -166,22 +166,25 @@ abstract class BatchTableEnvironment(
* @param extended Flag to include detailed optimizer estimates.
*/
private[flink] def explain(table: Table, extended: Boolean): String = {

val ast = RelOptUtil.toString(table.getRelNode)
val dataSet = translate[Row](table)(TypeExtractor.createTypeInfo(classOf[Row]))
val ast = table.getRelNode
val optimizedPlan = optimize(ast)
val dataSet = translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row]))
dataSet.output(new DiscardingOutputFormat[Row])
val env = dataSet.getExecutionEnvironment
val jasonSqlPlan = env.getExecutionPlan
val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)

s"== Abstract Syntax Tree ==" +
System.lineSeparator +
s"$ast" +
System.lineSeparator +
s"== Physical Execution Plan ==" +
System.lineSeparator +
s"$sqlPlan"

System.lineSeparator +
s"${RelOptUtil.toString(ast)}" +
System.lineSeparator +
s"== Optimized Logical Plan ==" +
System.lineSeparator +
s"${RelOptUtil.toString(optimizedPlan)}" +
System.lineSeparator +
s"== Physical Execution Plan ==" +
System.lineSeparator +
s"$sqlPlan"
}

/**
Expand Down Expand Up @@ -275,17 +278,27 @@ abstract class BatchTableEnvironment(
* Table API calls and / or SQL queries and generating corresponding [[DataSet]] operators.
*
* @param table The root node of the relational expression tree.
* @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
* @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
* @tparam A The type of the resulting [[DataSet]].
* @return The [[DataSet]] that corresponds to the translated [[Table]].
*/
protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataSet[A] = {
val dataSetPlan = optimize(table.getRelNode)
translate(dataSetPlan)
}

/**
* Translates a logical [[RelNode]] into a [[DataSet]].
*
* @param logicalPlan The root node of the relational expression tree.
* @param tpe The [[TypeInformation]] of the resulting [[DataSet]].
* @tparam A The type of the resulting [[DataSet]].
* @return The [[DataSet]] that corresponds to the translated [[Table]].
*/
protected def translate[A](logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataSet[A] = {
validateType(tpe)

val dataSetPlan = optimize(table.getRelNode)

dataSetPlan match {
logicalPlan match {
case node: DataSetRel =>
node.translateToPlan(
this,
Expand All @@ -294,5 +307,4 @@ abstract class BatchTableEnvironment(
case _ => ???
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.table.plan.logical.{CatalogNode, LogicalRelNode}
import org.apache.flink.api.table.plan.nodes.datastream.{DataStreamConvention, DataStreamRel}
import org.apache.flink.api.table.plan.rules.FlinkRuleSets
import org.apache.flink.api.table.plan.schema.{DataStreamTable, TableSourceTable}
import org.apache.flink.api.table.sinks.{StreamTableSink, TableSink}
import org.apache.flink.api.table.plan.schema.{TableSourceTable, DataStreamTable}
import org.apache.flink.api.table.sources.StreamTableSource
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
Expand Down Expand Up @@ -291,20 +291,31 @@ abstract class StreamTableEnvironment(
* @return The [[DataStream]] that corresponds to the translated [[Table]].
*/
protected def translate[A](table: Table)(implicit tpe: TypeInformation[A]): DataStream[A] = {
val dataStreamPlan = optimize(table.getRelNode)
translate(dataStreamPlan)
}

validateType(tpe)
/**
* Translates a logical [[RelNode]] into a [[DataStream]].
*
* @param logicalPlan The root node of the relational expression tree.
* @param tpe The [[TypeInformation]] of the resulting [[DataStream]].
* @tparam A The type of the resulting [[DataStream]].
* @return The [[DataStream]] that corresponds to the translated [[Table]].
*/
protected def translate[A]
(logicalPlan: RelNode)(implicit tpe: TypeInformation[A]): DataStream[A] = {

val dataStreamPlan = optimize(table.getRelNode)
validateType(tpe)

dataStreamPlan match {
logicalPlan match {
case node: DataStreamRel =>
node.translateToPlan(
this,
Some(tpe.asInstanceOf[TypeInformation[Any]])
).asInstanceOf[DataStream[A]]
case _ => ???
}

}

/**
Expand All @@ -314,23 +325,26 @@ abstract class StreamTableEnvironment(
* @param table The table for which the AST and execution plan will be returned.
*/
def explain(table: Table): String = {

val ast = RelOptUtil.toString(table.getRelNode)

val dataStream = translate[Row](table)(TypeExtractor.createTypeInfo(classOf[Row]))
val ast = table.getRelNode
val optimizedPlan = optimize(ast)
val dataStream = translate[Row](optimizedPlan)(TypeExtractor.createTypeInfo(classOf[Row]))

val env = dataStream.getExecutionEnvironment
val jsonSqlPlan = env.getExecutionPlan

val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jsonSqlPlan, false)

s"== Abstract Syntax Tree ==" +
System.lineSeparator +
s"$ast" +
System.lineSeparator +
s"== Physical Execution Plan ==" +
System.lineSeparator +
s"$sqlPlan"
System.lineSeparator +
s"${RelOptUtil.toString(ast)}" +
System.lineSeparator +
s"== Optimized Logical Plan ==" +
System.lineSeparator +
s"${RelOptUtil.toString(optimizedPlan)}" +
System.lineSeparator +
s"== Physical Execution Plan ==" +
System.lineSeparator +
s"$sqlPlan"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
LogicalFilter(condition=[=(MOD($0, 2), 0)])
LogicalTableScan(table=[[_DataSetTable_0]])

== Optimized Logical Plan ==
DataSetCalc(select=[a, b], where=[=(MOD(a, 2), 0)])
DataSetScan(table=[[_DataSetTable_0]])

== Physical Execution Plan ==
Stage 3 : Data Source
content : collect elements with CollectionInputFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
LogicalFilter(condition=[=(MOD($0, 2), 0)])
LogicalTableScan(table=[[_DataSetTable_0]])

== Optimized Logical Plan ==
DataSetCalc(select=[a, b], where=[=(MOD(a, 2), 0)])
DataSetScan(table=[[_DataSetTable_0]])

== Physical Execution Plan ==
Stage 3 : Data Source
content : collect elements with CollectionInputFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
LogicalFilter(condition=[=(MOD($0, 2), 0)])
LogicalTableScan(table=[[_DataStreamTable_0]])

== Optimized Logical Plan ==
DataStreamCalc(select=[a, b], where=[=(MOD(a, 2), 0)])
DataStreamScan(table=[[_DataStreamTable_0]])

== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ LogicalProject(a=[$0], c=[$2])
LogicalTableScan(table=[[_DataSetTable_0]])
LogicalTableScan(table=[[_DataSetTable_1]])

== Optimized Logical Plan ==
DataSetCalc(select=[a, c])
DataSetJoin(where=[=(b, d)], join=[a, b, c, d], joinType=[InnerJoin])
DataSetScan(table=[[_DataSetTable_0]])
DataSetScan(table=[[_DataSetTable_1]])

== Physical Execution Plan ==
Stage 4 : Data Source
content : collect elements with CollectionInputFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ LogicalProject(a=[$0], c=[$2])
LogicalTableScan(table=[[_DataSetTable_0]])
LogicalTableScan(table=[[_DataSetTable_1]])

== Optimized Logical Plan ==
DataSetCalc(select=[a, c])
DataSetJoin(where=[=(b, d)], join=[a, b, c, d], joinType=[InnerJoin])
DataSetScan(table=[[_DataSetTable_0]])
DataSetScan(table=[[_DataSetTable_1]])

== Physical Execution Plan ==
Stage 4 : Data Source
content : collect elements with CollectionInputFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ LogicalUnion(all=[true])
LogicalTableScan(table=[[_DataSetTable_0]])
LogicalTableScan(table=[[_DataSetTable_1]])

== Optimized Logical Plan ==
DataSetUnion(union=[count, word])
DataSetScan(table=[[_DataSetTable_0]])
DataSetScan(table=[[_DataSetTable_1]])

== Physical Execution Plan ==
Stage 3 : Data Source
content : collect elements with CollectionInputFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ LogicalUnion(all=[true])
LogicalTableScan(table=[[_DataSetTable_0]])
LogicalTableScan(table=[[_DataSetTable_1]])

== Optimized Logical Plan ==
DataSetUnion(union=[count, word])
DataSetScan(table=[[_DataSetTable_0]])
DataSetScan(table=[[_DataSetTable_1]])

== Physical Execution Plan ==
Stage 3 : Data Source
content : collect elements with CollectionInputFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ LogicalUnion(all=[true])
LogicalTableScan(table=[[_DataStreamTable_0]])
LogicalTableScan(table=[[_DataStreamTable_1]])

== Optimized Logical Plan ==
DataStreamUnion(union=[count, word])
DataStreamScan(table=[[_DataStreamTable_0]])
DataStreamScan(table=[[_DataStreamTable_1]])

== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Expand Down

0 comments on commit 6f9633c

Please sign in to comment.