Skip to content

Commit

Permalink
[FLINK-3192] [TableAPI] Add explain support to print the sql-executio…
Browse files Browse the repository at this point in the history
…n plan.

This closes apache#1477
  • Loading branch information
gallenvara authored and fhueske committed Jan 12, 2016
1 parent b474da6 commit dbbab0a
Show file tree
Hide file tree
Showing 14 changed files with 1,104 additions and 0 deletions.
6 changes: 6 additions & 0 deletions flink-staging/flink-table/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
*/
package org.apache.flink.api.table

import org.apache.flink.api.java.io.DiscardingOutputFormat
import org.apache.flink.api.table.explain.PlanJsonParser
import org.apache.flink.api.table.expressions.analysis.{GroupByAnalyzer, PredicateAnalyzer, SelectionAnalyzer}
import org.apache.flink.api.table.expressions.{Expression, ResolvedFieldReference, UnresolvedFieldReference}
import org.apache.flink.api.table.parser.ExpressionParser
import org.apache.flink.api.table.plan._
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._

/**
* The abstraction for writing Table API programs. Similar to how the batch and streaming APIs
Expand Down Expand Up @@ -267,5 +271,24 @@ case class Table(private[flink] val operation: PlanNode) {
this.copy(operation = UnionAll(operation, right.operation))
}

/**
* Get the process of the sql parsing, print AST and physical execution plan.The AST
* show the structure of the supplied statement. The execution plan shows how the table
* referenced by the statement will be scanned.
*/
def explain(extended: Boolean): String = {
val ast = operation
val dataSet = this.toDataSet[Row]
val env = dataSet.getExecutionEnvironment
dataSet.output(new DiscardingOutputFormat[Row])
val jasonSqlPlan = env.getExecutionPlan()
val sqlPlan = PlanJsonParser.getSqlExecutionPlan(jasonSqlPlan, extended)
val result = "== Abstract Syntax Tree ==\n" + ast + "\n\n" + "== Physical Execution Plan ==" +
"\n" + sqlPlan
return result
}

def explain(): String = explain(false)

override def toString: String = s"Expression($operation)"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.table.explain;

import java.util.List;

public class Node {
private int id;
private String type;
private String pact;
private String contents;
private int parallelism;
private String driver_strategy;
private List<Predecessors> predecessors;
private List<Global_properties> global_properties;
private List<LocalProperty> local_properties;
private List<Estimates> estimates;
private List<Costs> costs;
private List<Compiler_hints> compiler_hints;

public int getId() {
return id;
}
public String getType() {
return type;
}
public String getPact() {
return pact;
}
public String getContents() {
return contents;
}
public int getParallelism() {
return parallelism;
}
public String getDriver_strategy() {
return driver_strategy;
}
public List<Predecessors> getPredecessors() {
return predecessors;
}
public List<Global_properties> getGlobal_properties() {
return global_properties;
}
public List<LocalProperty> getLocal_properties() {
return local_properties;
}
public List<Estimates> getEstimates() {
return estimates;
}
public List<Costs> getCosts() {
return costs;
}
public List<Compiler_hints> getCompiler_hints() {
return compiler_hints;
}
}

class Predecessors {
private String ship_strategy;
private String exchange_mode;

public String getShip_strategy() {
return ship_strategy;
}
public String getExchange_mode() {
return exchange_mode;
}
}

class Global_properties {
private String name;
private String value;

public String getValue() {
return value;
}
public String getName() {
return name;
}
}

class LocalProperty {
private String name;
private String value;

public String getValue() {
return value;
}
public String getName() {
return name;
}
}

class Estimates {
private String name;
private String value;

public String getValue() {
return value;
}
public String getName() {
return name;
}
}

class Costs {
private String name;
private String value;

public String getValue() {
return value;
}
public String getName() {
return name;
}
}

class Compiler_hints {
private String name;
private String value;

public String getValue() {
return value;
}
public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.table.explain;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.DeserializationFeature;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.LinkedHashMap;
import java.util.List;

public class PlanJsonParser {

public static String getSqlExecutionPlan(String t, Boolean extended) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();

//not every node is same, ignore the unknown field
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

PlanTree tree = objectMapper.readValue(t, PlanTree.class);
LinkedHashMap<String, Integer> map = new LinkedHashMap<>();
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
int tabCount = 0;

for (int index = 0; index < tree.getNodes().size(); index++) {
Node tempNode = tree.getNodes().get(index);

//input with operation such as join or union is coordinate, keep the same indent
if ((tempNode.getPact().equals("Data Source")) && (map.containsKey(tempNode.getPact()))) {
tabCount = map.get(tempNode.getPact());
}
else {
map.put(tempNode.getPact(), tabCount);
}

printTab(tabCount, pw);
pw.print("Stage " + tempNode.getId() + " : " + tempNode.getPact() + "\n");

printTab(tabCount + 1, pw);
String content = tempNode.getContents();

//drop the hashcode of object instance
int dele = tempNode.getContents().indexOf("@");
if (dele > -1) content = tempNode.getContents().substring(0, dele);

//replace with certain content if node is dataSource to pass
//unit tests, because java and scala use different api to
//get input element
if (tempNode.getPact().equals("Data Source"))
content = "collect elements with CollectionInputFormat";
pw.print("content : " + content + "\n");

List<Predecessors> predecessors = tempNode.getPredecessors();
if (predecessors != null) {
printTab(tabCount + 1, pw);
pw.print("ship_strategy : " + predecessors.get(0).getShip_strategy() + "\n");

printTab(tabCount + 1, pw);
pw.print("exchange_mode : " + predecessors.get(0).getExchange_mode() + "\n");
}

if (tempNode.getDriver_strategy() != null) {
printTab(tabCount + 1, pw);
pw.print("driver_strategy : " + tempNode.getDriver_strategy() + "\n");
}

printTab(tabCount + 1, pw);
pw.print(tempNode.getGlobal_properties().get(0).getName() + " : "
+ tempNode.getGlobal_properties().get(0).getValue() + "\n");

if (extended) {
List<Global_properties> globalProperties = tempNode.getGlobal_properties();
for (int i = 1; i < globalProperties.size(); i++) {
printTab(tabCount + 1, pw);
pw.print(globalProperties.get(i).getName() + " : "
+ globalProperties.get(i).getValue() + "\n");
}

List<LocalProperty> localProperties = tempNode.getLocal_properties();
for (int i = 0; i < localProperties.size(); i++) {
printTab(tabCount + 1, pw);
pw.print(localProperties.get(i).getName() + " : "
+ localProperties.get(i).getValue() + "\n");
}

List<Estimates> estimates = tempNode.getEstimates();
for (int i = 0; i < estimates.size(); i++) {
printTab(tabCount + 1, pw);
pw.print(estimates.get(i).getName() + " : "
+ estimates.get(i).getValue() + "\n");
}

List<Costs> costs = tempNode.getCosts();
for (int i = 0; i < costs.size(); i++) {
printTab(tabCount + 1, pw);
pw.print(costs.get(i).getName() + " : "
+ costs.get(i).getValue() + "\n");
}

List<Compiler_hints> compilerHintses = tempNode.getCompiler_hints();
for (int i = 0; i < compilerHintses.size(); i++) {
printTab(tabCount + 1, pw);
pw.print(compilerHintses.get(i).getName() + " : "
+ compilerHintses.get(i).getValue() + "\n");
}
}
tabCount++;
pw.print("\n");
}
pw.close();
return sw.toString();
}

private static void printTab(int tabCount, PrintWriter pw) {
for (int i = 0; i < tabCount; i++)
pw.print("\t");
}
}

class PlanTree {
private List<Node> nodes;

public List<Node> getNodes() {
return nodes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ sealed abstract class PlanNode extends TreeNode[PlanNode] { self: Product =>
*/
case class Root[T](input: T, outputFields: Seq[(String, TypeInformation[_])]) extends PlanNode {
val children = Nil
override def toString = s"Root($outputFields)"
}

/**
Expand Down
Loading

0 comments on commit dbbab0a

Please sign in to comment.