Skip to content

Commit

Permalink
[FLINK-7014] [table] Expose isDeterministic interface to UserDefinedF…
Browse files Browse the repository at this point in the history
…unction

This closes apache#4200
  • Loading branch information
Xpray authored and wuchong committed Jun 29, 2017
1 parent 958d376 commit b59148c
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ abstract class UserDefinedFunction extends Serializable {
@throws(classOf[Exception])
def close(): Unit = {}

/**
* @return true iff a call to this function is guaranteed to always return
* the same result given the same parameters; true is assumed by default
* if user's function is not pure functional, like random(), date(), now()...
* isDeterministic must return false
*/
def isDeterministic: Boolean = true

final def functionIdentifier: String = {
val md5 = DigestUtils.md5Hex(serialize(this))
getClass.getCanonicalName.replace('.', '$').concat("$").concat(md5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ class AggSqlFunction(
) {

def getFunction: AggregateFunction[_, _] = aggregateFunction

override def isDeterministic: Boolean = aggregateFunction.isDeterministic
}

object AggSqlFunction {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class ScalarSqlFunction(

def getScalarFunction = scalarFunction

override def isDeterministic: Boolean = scalarFunction.isDeterministic
}

object ScalarSqlFunction {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class TableSqlFunction(
*/
def getPojoFieldMapping = functionImpl.fieldIndexes

override def isDeterministic: Boolean = udtf.isDeterministic
}

object TableSqlFunction {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ package org.apache.flink.table
import org.apache.flink.api.scala._
import org.apache.flink.table.api.Types
import org.apache.flink.table.api.scala._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.table.utils.TableTestBase
import org.apache.flink.table.utils.TableTestUtil._
import org.junit.Test
import org.junit.{Ignore, Test}

class ExpressionReductionTest extends TableTestBase {

Expand Down Expand Up @@ -458,4 +459,51 @@ class ExpressionReductionTest extends TableTestBase {
util.verifySql(sqlQuery, expected)
}

// todo this NPE is caused by Calcite, it shall pass when [CALCITE-1860] is fixed
@Ignore
def testReduceDeterministicUDF(): Unit = {
val util = streamTestUtil()
val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)

// if isDeterministic = true, will cause a Calcite NPE, which will be fixed in [CALCITE-1860]
val result = table
.select('a, 'b, 'c, DeterministicNullFunc() as 'd)
.where("d.isNull")
.select('a, 'b, 'c)

val expected: String = streamTableNode(0)

util.verifyTable(result, expected)
}

@Test
def testReduceNonDeterministicUDF(): Unit = {
val util = streamTestUtil()
val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)

val result = table
.select('a, 'b, 'c, NonDeterministicNullFunc() as 'd)
.where("d.isNull")
.select('a, 'b, 'c)

val expected = unaryNode(
"DataStreamCalc",
streamTableNode(0),
term("select", "a", "b", "c"),
term("where", s"IS NULL(${NonDeterministicNullFunc.functionIdentifier}())")
)

util.verifyTable(result, expected)
}

}

object NonDeterministicNullFunc extends ScalarFunction {
def eval(): String = null
override def isDeterministic = false
}

object DeterministicNullFunc extends ScalarFunction {
def eval(): String = null
override def isDeterministic = true
}

0 comments on commit b59148c

Please sign in to comment.