Skip to content

Commit

Permalink
[FLINK-13587][table-planner-blink] Fix some operator names are not se…
Browse files Browse the repository at this point in the history
…t in blink planner

This closes apache#9363
  • Loading branch information
wuchong committed Aug 9, 2019
1 parent c649c8b commit bf37130
Show file tree
Hide file tree
Showing 87 changed files with 405 additions and 600 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,19 @@
/**
* Converts an {@link RelNode} to string with only the information from the RelNode itself
* without the information from its inputs. This is mainly used to generate
* {@link FlinkRelNode#getDisplayName()}.
* {@link FlinkRelNode#getRelDetailedDescription()}.
*/
public class RelDisplayNameWriterImpl implements RelWriter {
public class RelDescriptionWriterImpl implements RelWriter {

private static final String STREAM_EXEC = "StreamExec";
private static final String BATCH_EXEC = "BatchExec";
/**
* all the supported prefixes of RelNode class name (i.e. all the implementation of {@link FlinkRelNode}).
*/
private static final String[] REL_TYPE_NAME_PREFIXES = {"StreamExec", "BatchExec", "FlinkLogical"};

private final PrintWriter pw;
private final List<Pair<String, Object>> values = new ArrayList<>();

public RelDisplayNameWriterImpl(PrintWriter pw) {
public RelDescriptionWriterImpl(PrintWriter pw) {
this.pw = pw;
}

Expand Down Expand Up @@ -95,12 +97,11 @@ public RelWriter done(RelNode node) {

private String getNodeTypeName(RelNode rel) {
String typeName = rel.getRelTypeName();
if (typeName.startsWith(STREAM_EXEC)) {
return typeName.substring(STREAM_EXEC.length());
} else if (typeName.startsWith(BATCH_EXEC)) {
return typeName.substring(BATCH_EXEC.length());
} else {
throw new IllegalStateException("Unsupported RelNode class name '" + typeName + "'");
for (String prefix : REL_TYPE_NAME_PREFIXES) {
if (typeName.startsWith(prefix)) {
return typeName.substring(prefix.length());
}
}
throw new IllegalStateException("Unsupported RelNode class name '" + typeName + "'");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,7 @@ object CorrelateCodeGenerator {

new OneInputTransformation(
inputTransformation,
RelExplainUtil.correlateOpName(
inputRelType,
rexCall,
sqlFunction,
outDataType,
expression),
ruleDescription,
substituteStreamOperator,
BaseRowTypeInfo.of(returnType),
parallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class BatchPlanner(
val explainLevel = if (extended) {
SqlExplainLevel.ALL_ATTRIBUTES
} else {
SqlExplainLevel.EXPPLAN_ATTRIBUTES
SqlExplainLevel.DIGEST_ATTRIBUTES
}
sb.append(ExecNodePlanDumper.dagToString(execNodes, explainLevel))
sb.append(System.lineSeparator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class StreamPlanner(
val (explainLevel, withRetractTraits) = if (extended) {
(SqlExplainLevel.ALL_ATTRIBUTES, true)
} else {
(SqlExplainLevel.EXPPLAN_ATTRIBUTES, false)
(SqlExplainLevel.DIGEST_ATTRIBUTES, false)
}
sb.append(ExecNodePlanDumper.dagToString(
execNodes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@

package org.apache.flink.table.planner.plan.nodes

import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.flink.table.planner.plan.nodes.ExpressionFormat.ExpressionFormat
import org.apache.flink.table.planner.plan.utils.RelDescriptionWriterImpl

import org.apache.calcite.rel.RelNode
import org.apache.calcite.rex._
import org.apache.calcite.sql.SqlAsOperator
import org.apache.calcite.sql.SqlKind._
import org.apache.flink.table.planner.plan.nodes.ExpressionFormat.ExpressionFormat
import org.apache.flink.table.planner.plan.utils.RelDisplayNameWriterImpl

import java.io.{PrintWriter, StringWriter}

Expand All @@ -35,13 +36,17 @@ import scala.collection.JavaConversions._
trait FlinkRelNode extends RelNode {

/**
* Returns the display name of the RelNode. The display name are usually used to be displayed
* in log or Web UI.
* Returns a string which describes the detailed information of relational expression
* with attributes which contribute to the plan output.
*
* This method leverages [[RelNode#explain]] with
* [[org.apache.calcite.sql.SqlExplainLevel.EXPPLAN_ATTRIBUTES]] explain level to generate
* the description.
*/
def getDisplayName: String = {
def getRelDetailedDescription: String = {
val sw = new StringWriter
val pw = new PrintWriter(sw)
val relWriter = new RelDisplayNameWriterImpl(pw)
val relWriter = new RelDescriptionWriterImpl(pw)
this.explain(relWriter)
sw.toString
}
Expand Down Expand Up @@ -114,14 +119,6 @@ trait FlinkRelNode extends RelNode {
throw new IllegalArgumentException(s"Unknown expression type '${expr.getClass}': $expr")
}
}

def getExpressionFormat(pw: RelWriter): ExpressionFormat = pw match {
// infix format is more readable for displaying
case _: RelDisplayNameWriterImpl => ExpressionFormat.Infix
// traditional writer prefers prefix expression format, e.g. +(x, y)
case _ => ExpressionFormat.Prefix
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.metadata.RelMetadataQuery
import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel}
import org.apache.calcite.rex.{RexInputRef, RexLiteral, RexNode}
import org.apache.calcite.sql.SqlExplainLevel
import org.apache.calcite.util.Litmus

import java.util
Expand Down Expand Up @@ -82,14 +83,19 @@ abstract class Expand(

override def explainTerms(pw: RelWriter): RelWriter = {
val names = outputRowType.getFieldNames
val terms = projects.map {
project =>
project.zipWithIndex.map {
case (r: RexInputRef, i: Int) => s"${names.get(i)}=[${r.getName}]"
case (l: RexLiteral, i: Int) => s"${names.get(i)}=[${l.getValue3}]"
case (o, _) => s"$o"
}.mkString("{", ", ", "}")
}.mkString(", ")
val terms = if (pw.getDetailLevel == SqlExplainLevel.EXPPLAN_ATTRIBUTES) {
// improve the readability
names.mkString(", ")
} else {
projects.map {
project =>
project.zipWithIndex.map {
case (r: RexInputRef, i: Int) => s"${names.get(i)}=[${r.getName}]"
case (l: RexLiteral, i: Int) => s"${names.get(i)}=[${l.getValue3}]"
case (o, _) => s"$o"
}.mkString("{", ", ", "}")
}.mkString(", ")
}
super.explainTerms(pw).item("projects", terms)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ package org.apache.flink.table.planner.plan.nodes.common

import org.apache.flink.table.planner.plan.nodes.ExpressionFormat.ExpressionFormat
import org.apache.flink.table.planner.plan.nodes.{ExpressionFormat, FlinkRelNode}
import org.apache.flink.table.planner.plan.utils.RelExplainUtil.conditionToString

import org.apache.flink.table.planner.plan.utils.RelExplainUtil.{conditionToString, preferExpressionFormat}
import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
import org.apache.calcite.rel.core.Calc
import org.apache.calcite.rel.metadata.RelMetadataQuery
Expand Down Expand Up @@ -60,9 +59,9 @@ abstract class CommonCalc(

override def explainTerms(pw: RelWriter): RelWriter = {
pw.input("input", getInput)
.item("select", projectionToString())
.item("select", projectionToString(preferExpressionFormat(pw)))
.itemIf("where",
conditionToString(calcProgram, getExpressionString),
conditionToString(calcProgram, getExpressionString, preferExpressionFormat(pw)),
calcProgram.getCondition != null)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.{
import org.apache.flink.table.planner.plan.nodes.FlinkRelNode
import org.apache.flink.table.planner.plan.utils.LookupJoinUtil._
import org.apache.flink.table.planner.plan.utils.{JoinTypeUtil, RelExplainUtil}
import org.apache.flink.table.planner.plan.utils.RelExplainUtil.preferExpressionFormat
import org.apache.flink.table.planner.utils.TableConfigUtils.getMillisecondFromConfigDuration
import org.apache.flink.table.runtime.operators.join.lookup.{AsyncLookupJoinRunner, AsyncLookupJoinWithCalcRunner, LookupJoinRunner, LookupJoinWithCalcRunner}
import org.apache.flink.table.runtime.types.ClassLogicalTypeConverter
Expand Down Expand Up @@ -126,7 +127,8 @@ abstract class CommonLookupJoin(
val resultFieldNames = getRowType.getFieldNames.asScala.toArray
val lookupableSource = tableSource.asInstanceOf[LookupableTableSource[_]]
val whereString = calcOnTemporalTable match {
case Some(calc) => RelExplainUtil.conditionToString(calc, getExpressionString)
case Some(calc) => RelExplainUtil.conditionToString(
calc, getExpressionString, preferExpressionFormat(pw))
case None => "N/A"
}

Expand Down Expand Up @@ -343,7 +345,7 @@ abstract class CommonLookupJoin(

new OneInputTransformation(
inputTransformation,
"LookupJoin",
getRelDetailedDescription,
operator,
BaseRowTypeInfo.of(resultRowType),
inputTransformation.getParallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
package org.apache.flink.table.planner.plan.nodes.common

import org.apache.flink.table.planner.plan.nodes.physical.FlinkPhysicalRel
import org.apache.flink.table.planner.plan.utils.{JoinTypeUtil, JoinUtil, RelExplainUtil}
import org.apache.flink.table.planner.plan.utils.{JoinTypeUtil, JoinUtil}
import org.apache.flink.table.planner.plan.utils.RelExplainUtil.preferExpressionFormat
import org.apache.flink.table.runtime.operators.join.FlinkJoinType

import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
Expand Down Expand Up @@ -78,8 +79,8 @@ abstract class CommonPhysicalJoin(
override def explainTerms(pw: RelWriter): RelWriter = {
pw.input("left", getLeft).input("right", getRight)
.item("joinType", flinkJoinType.toString)
.item("where",
RelExplainUtil.expressionToString(getCondition, inputRowType, getExpressionString))
.item("where", getExpressionString(
getCondition, inputRowType.getFieldNames.toList, None, preferExpressionFormat(pw)))
.item("select", getRowType.getFieldNames.mkString(", "))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.flink.table.planner.delegation.BatchPlanner
import org.apache.flink.table.planner.plan.`trait`.{FlinkRelDistribution, FlinkRelDistributionTraitDef, TraitUtil}
import org.apache.flink.table.planner.plan.nodes.common.CommonCalc
import org.apache.flink.table.planner.plan.nodes.exec.{BatchExecNode, ExecNode}
import org.apache.flink.table.planner.plan.utils.RelExplainUtil
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo

import org.apache.calcite.plan._
Expand Down Expand Up @@ -158,7 +157,7 @@ class BatchExecCalc(

new OneInputTransformation(
inputTransform,
RelExplainUtil.calcToString(calcProgram, getExpressionString),
getRelDetailedDescription,
operator,
BaseRowTypeInfo.of(outputType),
inputTransform.getParallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class BatchExecCorrelate(
val inputTransformation = getInputNodes.get(0).translateToPlan(planner)
.asInstanceOf[Transformation[BaseRow]]
val operatorCtx = CodeGeneratorContext(config)
CorrelateCodeGenerator.generateCorrelateTransformation(
val transformation = CorrelateCodeGenerator.generateCorrelateTransformation(
config,
operatorCtx,
inputTransformation,
Expand All @@ -202,6 +202,8 @@ class BatchExecCorrelate(
retainHeader = false,
getExpressionString,
"BatchExecCorrelate")
transformation.setName(getRelDetailedDescription)
transformation
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,9 @@ class BatchExecExpand(
projects,
opName = "BatchExpand")

val operatorName = s"BatchExecExpand: ${getRowType.getFieldList.map(_.getName).mkString(", ")}"
new OneInputTransformation(
inputTransform,
operatorName,
getRelDetailedDescription,
operator,
BaseRowTypeInfo.of(outputType),
inputTransform.getParallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch

import org.apache.flink.table.api.TableException
import org.apache.flink.table.functions.UserDefinedFunction
import org.apache.flink.table.planner.plan.utils.{FlinkRelOptUtil, RelExplainUtil}
import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
import org.apache.flink.table.planner.utils.AggregatePhaseStrategy
import org.apache.flink.table.planner.utils.TableConfigUtils.getAggPhaseStrategy

Expand Down Expand Up @@ -74,18 +74,6 @@ abstract class BatchExecGroupAggregateBase(

def getAggCallToAggFunction: Seq[(AggregateCall, UserDefinedFunction)] = aggCallToAggFunction

def aggOperatorName(prefix: String): String = {
RelExplainUtil.aggOperatorName(
prefix,
grouping,
auxGrouping,
inputRowType,
outputRowType,
aggCallToAggFunction,
isMerge,
isFinal)
}

protected def isEnforceTwoStageAgg: Boolean = {
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
getAggPhaseStrategy(tableConfig) == AggregatePhaseStrategy.TWO_PHASE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,4 @@ class BatchExecHashAggregate(
//~ ExecNode methods -----------------------------------------------------------

override def getDamBehavior = DamBehavior.FULL_DAM

override def getOperatorName: String = {
val aggregateNamePrefix = if (isMerge) "Global" else "Complete"
aggOperatorName(aggregateNamePrefix + "HashAggregate")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ abstract class BatchExecHashAggregateBase(
replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
}

def getOperatorName: String

override protected def translateToPlanInternal(
planner: BatchPlanner): Transformation[BaseRow] = {
val config = planner.getTableConfig
Expand Down Expand Up @@ -147,7 +145,7 @@ abstract class BatchExecHashAggregateBase(
val operator = new CodeGenOperatorFactory[BaseRow](generatedOperator)
val ret = new OneInputTransformation(
input,
getOperatorName,
getRelDetailedDescription,
operator,
BaseRowTypeInfo.of(outputType),
input.getParallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,22 +265,12 @@ class BatchExecHashJoin(
val ret = new TwoInputTransformation[BaseRow, BaseRow, BaseRow](
build,
probe,
getOperatorName,
getRelDetailedDescription,
operator,
BaseRowTypeInfo.of(FlinkTypeFactory.toLogicalRowType(getRowType)),
probe.getParallelism)
val resource = NodeResourceUtil.fromManagedMem(managedMemoryInMB)
ret.setResources(resource, resource)
ret
}

private def getOperatorName: String = {
val joinExpressionStr = if (getCondition != null) {
val inFields = inputRowType.getFieldNames.toList
s"where: ${getExpressionString(getCondition, inFields, None, ExpressionFormat.Infix)}, "
} else {
""
}
s"HashJoin($joinExpressionStr${if (leftIsBuild) "buildLeft" else "buildRight"})"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,4 @@ class BatchExecHashWindowAggregate(

override def getDamBehavior: DamBehavior = DamBehavior.FULL_DAM

override def getOperatorName: String = {
val aggregateNamePrefix = if (isMerge) "Global" else "Complete"
aggregateNamePrefix + "WindowHashAggregateBatchExec"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,6 @@ abstract class BatchExecHashWindowAggregateBase(
replaceInput(ordinalInParent, newInputNode.asInstanceOf[RelNode])
}

def getOperatorName: String

override protected def translateToPlanInternal(
planner: BatchPlanner): Transformation[BaseRow] = {
val config = planner.getTableConfig
Expand Down Expand Up @@ -149,7 +147,7 @@ abstract class BatchExecHashWindowAggregateBase(
val operator = new CodeGenOperatorFactory[BaseRow](generatedOperator)
val ret = new OneInputTransformation(
input,
getOperatorName,
getRelDetailedDescription,
operator,
BaseRowTypeInfo.of(outputType),
input.getParallelism)
Expand Down
Loading

0 comments on commit bf37130

Please sign in to comment.