Skip to content

Commit

Permalink
[Flink-3226] Translate logical plan FlinkRels into physical plan Data…
Browse files Browse the repository at this point in the history
…SetRels.
  • Loading branch information
chengxiang li authored and vasia committed Mar 18, 2016
1 parent ba27832 commit b8028db
Show file tree
Hide file tree
Showing 16 changed files with 1,025 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.table.plan

class PlanGenException(message: String, exception: Exception) extends
RuntimeException(message: String, exception: Exception){

def this(message: String){
this(message, null)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.flink.api.table.plan

import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.rel.core.JoinRelType._
import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
Expand All @@ -29,8 +29,11 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo
import org.apache.flink.api.java.typeutils.ValueTypeInfo._
import org.apache.flink.api.table.typeinfo.RowTypeInfo
import org.apache.flink.api.table.{Row, TableException}

import scala.collection.JavaConversions._
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.api.java.operators.join.JoinType
import org.apache.calcite.rel.core.JoinRelType
import org.apache.calcite.sql.`type`.SqlTypeName

object TypeConverter {

Expand Down Expand Up @@ -139,4 +142,10 @@ object TypeConverter {
returnType.asInstanceOf[TypeInformation[Any]]
}

def sqlJoinTypeToFlinkJoinType(sqlJoinType: JoinRelType): JoinType = sqlJoinType match {
case INNER => JoinType.INNER
case LEFT => JoinType.LEFT_OUTER
case RIGHT => JoinType.RIGHT_OUTER
case FULL => JoinType.FULL_OUTER
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.table.plan.functions

import java.lang.Iterable

import com.google.common.base.Preconditions
import org.apache.flink.api.common.functions.RichGroupReduceFunction
import org.apache.flink.api.table.plan.functions.aggregate.Aggregate
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector

import scala.collection.JavaConversions._

/**
* A wrapper Flink GroupReduceOperator UDF of aggregates, it takes the grouped data as input,
* feed to the aggregates, and collect the record with aggregated value.
*
* @param aggregates Sql aggregate functions.
* @param fields The grouped keys' index.
*/
class AggregateFunction(
private val aggregates: Array[Aggregate[_ <: Any]],
private val fields: Array[Int]) extends RichGroupReduceFunction[Any, Any] {

override def open(config: Configuration) {
Preconditions.checkNotNull(aggregates)
Preconditions.checkNotNull(fields)
Preconditions.checkArgument(aggregates.size == fields.size)

aggregates.foreach(_.initiateAggregate)
}

override def reduce(records: Iterable[Any], out: Collector[Any]): Unit = {
var currentValue: Any = null

// iterate all input records, feed to each aggregate.
val aggregateAndField = aggregates.zip(fields)
records.foreach {
value =>
currentValue = value
aggregateAndField.foreach {
case (aggregate, field) =>
aggregate.aggregate(FunctionUtils.getFieldValue(value, field))
}
}

// reuse the latest record, and set all the aggregated values.
aggregateAndField.foreach {
case (aggregate, field) =>
FunctionUtils.putFieldValue(currentValue, field, aggregate.getAggregated())
}

out.collect(currentValue)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.table.plan.functions

import org.apache.flink.api.table.Row

object FunctionUtils {

def getFieldValue(record: Any, fieldIndex: Int): Any = {
record match {
case row: Row => row.productElement(fieldIndex)
case _ => throw new UnsupportedOperationException("Do not support types other than Row now.")
}
}

def putFieldValue(record: Any, fieldIndex: Int, fieldValue: Any): Unit = {
record match {
case row: Row => row.setField(fieldIndex, fieldValue)
case _ => throw new UnsupportedOperationException("Do not support types other than Row now.")
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.table.plan.functions.aggregate

/**
* Represent a Sql aggregate function, user should initiate the aggregate at first, then feed it
* with grouped aggregate field values, and get aggregated value finally.
* @tparam T
*/
trait Aggregate[T] {
/**
* Initiate current aggregate state.
*/
def initiateAggregate

/**
* Feed the aggregate field value.
* @param value
*/
def aggregate(value: Any)

/**
* Return final aggregated value.
* @return
*/
def getAggregated(): T
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.table.plan.functions.aggregate

import java.util

import org.apache.calcite.rel.core.AggregateCall
import org.apache.calcite.sql.SqlAggFunction
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.sql.`type`.SqlTypeName._
import org.apache.calcite.sql.fun._
import org.apache.flink.api.common.functions.RichGroupReduceFunction
import org.apache.flink.api.table.plan.PlanGenException
import org.apache.flink.api.table.plan.functions.AggregateFunction

object AggregateFactory {

def createAggregateInstance(aggregateCalls: Seq[AggregateCall]):
RichGroupReduceFunction[Any, Any] = {

val fieldIndexes = new Array[Int](aggregateCalls.size)
val aggregates = new Array[Aggregate[_ <: Any]](aggregateCalls.size)
aggregateCalls.zipWithIndex.map { case (aggregateCall, index) =>
val sqlType = aggregateCall.getType
val argList: util.List[Integer] = aggregateCall.getArgList
// currently assume only aggregate on singleton field.
if (argList.isEmpty) {
if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
fieldIndexes(index) = 0
} else {
throw new PlanGenException("Aggregate fields should not be empty.")
}
} else {
fieldIndexes(index) = argList.get(0);
}
aggregateCall.getAggregation match {
case _: SqlSumAggFunction | _: SqlSumEmptyIsZeroAggFunction => {
sqlType.getSqlTypeName match {
case TINYINT =>
aggregates(index) = new TinyIntSumAggregate
case SMALLINT =>
aggregates(index) = new SmallIntSumAggregate
case INTEGER =>
aggregates(index) = new IntSumAggregate
case BIGINT =>
aggregates(index) = new LongSumAggregate
case FLOAT =>
aggregates(index) = new FloatSumAggregate
case DOUBLE =>
aggregates(index) = new DoubleSumAggregate
case sqlType: SqlTypeName =>
throw new PlanGenException("Sum aggregate does no support type:" + sqlType)
}
}
case _: SqlAvgAggFunction => {
sqlType.getSqlTypeName match {
case TINYINT =>
aggregates(index) = new TinyIntAvgAggregate
case SMALLINT =>
aggregates(index) = new SmallIntAvgAggregate
case INTEGER =>
aggregates(index) = new IntAvgAggregate
case BIGINT =>
aggregates(index) = new LongAvgAggregate
case FLOAT =>
aggregates(index) = new FloatAvgAggregate
case DOUBLE =>
aggregates(index) = new DoubleAvgAggregate
case sqlType: SqlTypeName =>
throw new PlanGenException("Avg aggregate does no support type:" + sqlType)
}
}
case sqlMinMaxFunction: SqlMinMaxAggFunction => {
if (sqlMinMaxFunction.isMin) {
sqlType.getSqlTypeName match {
case TINYINT =>
aggregates(index) = new TinyIntMinAggregate
case SMALLINT =>
aggregates(index) = new SmallIntMinAggregate
case INTEGER =>
aggregates(index) = new IntMinAggregate
case BIGINT =>
aggregates(index) = new LongMinAggregate
case FLOAT =>
aggregates(index) = new FloatMinAggregate
case DOUBLE =>
aggregates(index) = new DoubleMinAggregate
case sqlType: SqlTypeName =>
throw new PlanGenException("Min aggregate does no support type:" + sqlType)
}
} else {
sqlType.getSqlTypeName match {
case TINYINT =>
aggregates(index) = new TinyIntMaxAggregate
case SMALLINT =>
aggregates(index) = new SmallIntMaxAggregate
case INTEGER =>
aggregates(index) = new IntMaxAggregate
case BIGINT =>
aggregates(index) = new LongMaxAggregate
case FLOAT =>
aggregates(index) = new FloatMaxAggregate
case DOUBLE =>
aggregates(index) = new DoubleMaxAggregate
case sqlType: SqlTypeName =>
throw new PlanGenException("Max aggregate does no support type:" + sqlType)
}
}
}
case _: SqlCountAggFunction =>
aggregates(index) = new CountAggregate
case unSupported: SqlAggFunction =>
throw new PlanGenException("unsupported Function: " + unSupported.getName)
}
}

new AggregateFunction(aggregates, fieldIndexes)
}

}
Loading

0 comments on commit b8028db

Please sign in to comment.