Skip to content

Commit

Permalink
[FLINK-4590] [table] Some Table API tests are failing when debug lvl …
Browse files Browse the repository at this point in the history
…is set to DEBUG

This closes apache#2504.
  • Loading branch information
twalthr committed Sep 26, 2016
1 parent 70e71c1 commit 7eb45c1
Show file tree
Hide file tree
Showing 18 changed files with 193 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ abstract class BatchScan(
cluster: RelOptCluster,
traitSet: RelTraitSet,
table: RelOptTable,
rowType: RelDataType)
rowRelDataType: RelDataType)
extends TableScan(cluster, traitSet, table)
with DataSetRel {

override def deriveRowType() = rowType
override def deriveRowType() = rowRelDataType

override def toString: String = {
s"Source(from: (${rowType.getFieldNames.asScala.toList.mkString(", ")}))"
s"Source(from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")}))"
}

override def computeSelfCost (planner: RelOptPlanner, metadata: RelMetadataQuery): RelOptCost = {
Expand Down Expand Up @@ -81,14 +81,14 @@ abstract class BatchScan(

val mapFunc = getConversionMapper(
config,
false,
nullableInput = false,
inputType,
determinedType,
"DataSetSourceConversion",
getRowType.getFieldNames,
Some(flinkTable.fieldIndexes))

val opName = s"from: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
val opName = s"from: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"

input.map(mapFunc).name(opName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ class BatchTableSourceScan(
rowType: RelDataType)
extends BatchScan(cluster, traitSet, table, rowType) {

val tableSourceTable = table.unwrap(classOf[TableSourceTable])
val tableSourceTable = getTable.unwrap(classOf[TableSourceTable])
val tableSource = tableSourceTable.tableSource.asInstanceOf[BatchTableSource[_]]

override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new BatchTableSourceScan(
cluster,
traitSet,
table,
rowType
getTable,
getRowType
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,21 @@ class DataSetAggregate(
traitSet: RelTraitSet,
input: RelNode,
namedAggregates: Seq[CalcitePair[AggregateCall, String]],
rowType: RelDataType,
rowRelDataType: RelDataType,
inputType: RelDataType,
grouping: Array[Int])
extends SingleRel(cluster, traitSet, input)
with DataSetRel {

override def deriveRowType() = rowType
override def deriveRowType() = rowRelDataType

override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataSetAggregate(
cluster,
traitSet,
inputs.get(0),
namedAggregates,
rowType,
getRowType,
inputType,
grouping)
}
Expand Down Expand Up @@ -91,15 +91,15 @@ class DataSetAggregate(
val groupingKeys = grouping.indices.toArray
// add grouping fields, position keys in the input, and input type
val aggregateResult = AggregateUtil.createOperatorFunctionsForAggregates(namedAggregates,
inputType, rowType, grouping, config)
inputType, getRowType, grouping, config)

val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(
val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(
tableEnv,
// tell the input operator that this operator currently only supports Rows as input
Some(TypeConverter.DEFAULT_ROW_TYPE))

// get the output types
val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala
val fieldTypes: Array[TypeInformation[_]] = getRowType.getFieldList.asScala
.map(field => FlinkTypeFactory.toTypeInfo(field.getType))
.toArray

Expand Down Expand Up @@ -138,14 +138,14 @@ class DataSetAggregate(
// if the expected type is not a Row, inject a mapper to convert to the expected type
expectedType match {
case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] =>
val mapName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
val mapName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"
result.map(getConversionMapper(
config,
false,
rowTypeInfo.asInstanceOf[TypeInformation[Any]],
expectedType.get,
"DataSetAggregateConversion",
rowType.getFieldNames.asScala
config = config,
nullableInput = false,
inputType = rowTypeInfo.asInstanceOf[TypeInformation[Any]],
expectedType = expectedType.get,
conversionOperatorName = "DataSetAggregateConversion",
fieldNames = getRowType.getFieldNames.asScala
))
.name(mapName)
case _ => result
Expand All @@ -161,7 +161,7 @@ class DataSetAggregate(
private def aggregationToString: String = {

val inFields = inputType.getFieldNames.asScala
val outFields = rowType.getFieldNames.asScala
val outFields = getRowType.getFieldNames.asScala

val groupStrings = grouping.map( inFields(_) )

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,32 +40,32 @@ class DataSetCalc(
cluster: RelOptCluster,
traitSet: RelTraitSet,
input: RelNode,
rowType: RelDataType,
rowRelDataType: RelDataType,
calcProgram: RexProgram,
ruleDescription: String)
extends SingleRel(cluster, traitSet, input)
with FlinkCalc
with DataSetRel {

override def deriveRowType() = rowType
override def deriveRowType() = rowRelDataType

override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataSetCalc(
cluster,
traitSet,
inputs.get(0),
rowType,
getRowType,
calcProgram,
ruleDescription)
}

override def toString: String = calcToString(calcProgram, getExpressionString(_, _, _))
override def toString: String = calcToString(calcProgram, getExpressionString)

override def explainTerms(pw: RelWriter): RelWriter = {
super.explainTerms(pw)
.item("select", selectionToString(calcProgram, getExpressionString(_, _, _)))
.item("select", selectionToString(calcProgram, getExpressionString))
.itemIf("where",
conditionToString(calcProgram, getExpressionString(_, _, _)),
conditionToString(calcProgram, getExpressionString),
calcProgram.getCondition != null)
}

Expand Down Expand Up @@ -95,7 +95,7 @@ class DataSetCalc(

val config = tableEnv.getConfig

val inputDS = input.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val inputDS = getInput.asInstanceOf[DataSetRel].translateToPlan(tableEnv)

val returnType = determineReturnType(
getRowType,
Expand All @@ -120,7 +120,7 @@ class DataSetCalc(
returnType)

val mapFunc = calcMapFunction(genFunction)
inputDS.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString(_, _, _)))
inputDS.flatMap(mapFunc).name(calcOpName(calcProgram, getExpressionString))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,22 @@ import scala.collection.JavaConverters._
class DataSetIntersect(
cluster: RelOptCluster,
traitSet: RelTraitSet,
left: RelNode,
right: RelNode,
rowType: RelDataType,
leftNode: RelNode,
rightNode: RelNode,
rowRelDataType: RelDataType,
all: Boolean)
extends BiRel(cluster, traitSet, left, right)
extends BiRel(cluster, traitSet, leftNode, rightNode)
with DataSetRel {

override def deriveRowType() = rowType
override def deriveRowType() = rowRelDataType

override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataSetIntersect(
cluster,
traitSet,
inputs.get(0),
inputs.get(1),
rowType,
getRowType,
all
)
}
Expand Down Expand Up @@ -115,7 +115,7 @@ class DataSetIntersect(
"DataSetIntersectConversion",
getRowType.getFieldNames)

val opName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"

intersectDs.map(mapFunc).name(opName)
}
Expand All @@ -127,7 +127,7 @@ class DataSetIntersect(
}

private def intersectSelectionToString: String = {
rowType.getFieldNames.asScala.toList.mkString(", ")
getRowType.getFieldNames.asScala.toList.mkString(", ")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,28 +44,28 @@ import scala.collection.mutable.ArrayBuffer
class DataSetJoin(
cluster: RelOptCluster,
traitSet: RelTraitSet,
left: RelNode,
right: RelNode,
rowType: RelDataType,
leftNode: RelNode,
rightNode: RelNode,
rowRelDataType: RelDataType,
joinCondition: RexNode,
joinRowType: RelDataType,
joinInfo: JoinInfo,
keyPairs: List[IntPair],
joinType: JoinRelType,
joinHint: JoinHint,
ruleDescription: String)
extends BiRel(cluster, traitSet, left, right)
extends BiRel(cluster, traitSet, leftNode, rightNode)
with DataSetRel {

override def deriveRowType() = rowType
override def deriveRowType() = rowRelDataType

override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataSetJoin(
cluster,
traitSet,
inputs.get(0),
inputs.get(1),
rowType,
getRowType,
joinCondition,
joinRowType,
joinInfo,
Expand Down Expand Up @@ -113,7 +113,7 @@ class DataSetJoin(
val rightKeys = ArrayBuffer.empty[Int]
if (keyPairs.isEmpty) {
// if no equality keys => not supported
throw new TableException(
throw TableException(
"Joins should have at least one equality condition.\n" +
s"\tLeft: ${left.toString},\n" +
s"\tRight: ${right.toString},\n" +
Expand All @@ -135,7 +135,7 @@ class DataSetJoin(
leftKeys.add(pair.source)
rightKeys.add(pair.target)
} else {
throw new TableException(
throw TableException(
"Equality join predicate on incompatible types.\n" +
s"\tLeft: ${left.toString},\n" +
s"\tRight: ${right.toString},\n" +
Expand All @@ -156,7 +156,7 @@ class DataSetJoin(
}

if (nullCheck && !config.getNullCheck) {
throw new TableException("Null check in TableConfig must be enabled for outer joins.")
throw TableException("Null check in TableConfig must be enabled for outer joins.")
}

val generator = new CodeGenerator(
Expand Down Expand Up @@ -205,7 +205,7 @@ class DataSetJoin(
}

private def joinSelectionToString: String = {
rowType.getFieldNames.asScala.toList.mkString(", ")
getRowType.getFieldNames.asScala.toList.mkString(", ")
}

private def joinConditionToString: String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,22 @@ import scala.collection.JavaConverters._
class DataSetMinus(
cluster: RelOptCluster,
traitSet: RelTraitSet,
left: RelNode,
right: RelNode,
rowType: RelDataType,
leftNode: RelNode,
rightNode: RelNode,
rowRelDataType: RelDataType,
all: Boolean)
extends BiRel(cluster, traitSet, left, right)
extends BiRel(cluster, traitSet, leftNode, rightNode)
with DataSetRel {

override def deriveRowType() = rowType
override def deriveRowType() = rowRelDataType

override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataSetMinus(
cluster,
traitSet,
inputs.get(0),
inputs.get(1),
rowType,
getRowType,
all
)
}
Expand All @@ -75,6 +75,17 @@ class DataSetMinus(
}
}

override def estimateRowCount(mq: RelMetadataQuery): Double = {
// from org.apache.calcite.rel.metadata.RelMdUtil.getMinusRowCount
val children = this.getInputs
var rowCnt = mq.getRowCount(children.head)
getInputs.tail.foreach(rowCnt -= 0.5 * mq.getRowCount(_))
if (rowCnt < 0) {
rowCnt = 0.0
}
rowCnt
}

override def translateToPlan(
tableEnv: BatchTableEnvironment,
expectedType: Option[TypeInformation[Any]]): DataSet[Any] = {
Expand Down Expand Up @@ -108,14 +119,14 @@ class DataSetMinus(
// conversion
if (determinedType != leftType) {
val mapFunc = getConversionMapper(
config,
false,
leftType,
determinedType,
"DataSetMinusConversion",
getRowType.getFieldNames)
config = config,
nullableInput = false,
inputType = leftType,
expectedType = determinedType,
conversionOperatorName = "DataSetMinusConversion",
fieldNames = getRowType.getFieldNames)

val opName = s"convert: (${rowType.getFieldNames.asScala.toList.mkString(", ")})"
val opName = s"convert: (${getRowType.getFieldNames.asScala.toList.mkString(", ")})"

minusDs.map(mapFunc).name(opName)
}
Expand All @@ -127,7 +138,7 @@ class DataSetMinus(
}

private def minusSelectionToString: String = {
rowType.getFieldNames.asScala.toList.mkString(", ")
getRowType.getFieldNames.asScala.toList.mkString(", ")
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ class DataSetScan(
rowType: RelDataType)
extends BatchScan(cluster, traitSet, table, rowType) {

val dataSetTable: DataSetTable[Any] = table.unwrap(classOf[DataSetTable[Any]])
val dataSetTable: DataSetTable[Any] = getTable.unwrap(classOf[DataSetTable[Any]])

override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
new DataSetScan(
cluster,
traitSet,
table,
rowType
getTable,
getRowType
)
}

Expand Down
Loading

0 comments on commit 7eb45c1

Please sign in to comment.