Skip to content

Commit

Permalink
[FLINK-14841][table] add create and drop function ddl in SQL parser
Browse files Browse the repository at this point in the history
this closes apache#10240.
  • Loading branch information
HuangZhenQiu authored and bowenli86 committed Nov 20, 2019
1 parent beba9dc commit 679398c
Show file tree
Hide file tree
Showing 5 changed files with 337 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
"org.apache.flink.sql.parser.ddl.SqlDropDatabase",
"org.apache.flink.sql.parser.ddl.SqlAlterDatabase",
"org.apache.flink.sql.parser.ddl.SqlAlterFunction",
"org.apache.flink.sql.parser.ddl.SqlCreateFunction",
"org.apache.flink.sql.parser.ddl.SqlDropFunction",
"org.apache.flink.sql.parser.dml.RichSqlInsert",
"org.apache.flink.sql.parser.dml.RichSqlInsertKeyword",
"org.apache.flink.sql.parser.dql.SqlShowCatalogs",
Expand Down Expand Up @@ -440,7 +442,8 @@
createStatementParserMethods: [
"SqlCreateTable",
"SqlCreateView",
"SqlCreateDatabase"
"SqlCreateDatabase",
"SqlCreateFunction"
]

# List of methods for parsing extensions to "DROP" calls.
Expand All @@ -449,6 +452,7 @@
"SqlDropTable",
"SqlDropView",
"SqlDropDatabase"
"SqlDropFunction"
]

# Binary operators tokens
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,67 @@ SqlDescribeDatabase SqlDescribeDatabase() :

}

SqlCreate SqlCreateFunction(Span s, boolean replace) :
{
SqlIdentifier functionIdentifier = null;
SqlCharStringLiteral functionClassName = null;
String functionLanguage = null;
boolean ifNotExists = false;
boolean isTemporary = false;
boolean isSystemFunction = false;
}
{
[ <TEMPORARY> {isTemporary = true;}
[ <SYSTEM> { isSystemFunction = true; } ]
]

<FUNCTION>

[ <IF> <NOT> <EXISTS> { ifNotExists = true; } ]

functionIdentifier = CompoundIdentifier()

<AS> <QUOTED_STRING> {
String p = SqlParserUtil.parseString(token.image);
functionClassName = SqlLiteral.createCharString(p, getPos());
}
[<LANGUAGE>
(
<JAVA> { functionLanguage = "JAVA"; }
|
<SCALA> { functionLanguage = "SCALA"; }
|
<SQL> { functionLanguage = "SQL"; }
)
]
{
return new SqlCreateFunction(s.pos(), functionIdentifier, functionClassName, functionLanguage,
ifNotExists, isTemporary, isSystemFunction);
}
}

SqlDrop SqlDropFunction(Span s, boolean replace) :
{
SqlIdentifier functionIdentifier = null;
boolean ifExists = false;
boolean isTemporary = false;
boolean isSystemFunction = false;
}
{
[ <TEMPORARY> {isTemporary = true;}
[ <SYSTEM> { isSystemFunction = true; } ]
]
<FUNCTION>

[ <IF> <EXISTS> { ifExists = true; } ]

functionIdentifier = CompoundIdentifier()

{
return new SqlDropFunction(s.pos(), functionIdentifier, ifExists, isTemporary, isSystemFunction);
}
}

SqlAlterFunction SqlAlterFunction() :
{
SqlIdentifier functionIdentifier = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.sql.parser.ddl;

import org.apache.flink.sql.parser.ExtendedSqlNode;
import org.apache.flink.sql.parser.error.SqlValidateException;

import org.apache.calcite.sql.SqlCharStringLiteral;
import org.apache.calcite.sql.SqlCreate;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.ImmutableNullableList;

import javax.annotation.Nonnull;

import java.util.List;

import static java.util.Objects.requireNonNull;

/**
* CREATE FUNCTION DDL sql call.
*/
public class SqlCreateFunction extends SqlCreate implements ExtendedSqlNode {

public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE FUNCTION", SqlKind.CREATE_FUNCTION);

private final SqlIdentifier functionIdentifier;

private final SqlCharStringLiteral functionClassName;

private final String functionLanguage;

private final boolean isTemporary;

private final boolean isSystemFunction;

public SqlCreateFunction(
SqlParserPos pos,
SqlIdentifier functionIdentifier,
SqlCharStringLiteral functionClassName,
String functionLanguage,
boolean ifNotExists,
boolean isTemporary,
boolean isSystemFunction) {
super(OPERATOR, pos, false, ifNotExists);
this.functionIdentifier = requireNonNull(functionIdentifier);
this.functionClassName = requireNonNull(functionClassName);
this.isSystemFunction = requireNonNull(isSystemFunction);
this.isTemporary = isTemporary;
this.functionLanguage = functionLanguage;
}

@Override
public SqlOperator getOperator() {
return OPERATOR;
}

@Nonnull
@Override
public List<SqlNode> getOperandList() {
return ImmutableNullableList.of(functionIdentifier, functionClassName);
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("CREATE");
if (isTemporary) {
writer.keyword("TEMPORARY");
}
if (isSystemFunction) {
writer.keyword("SYSTEM");
}
writer.keyword("FUNCTION");
if (ifNotExists) {
writer.keyword("IF NOT EXISTS");
}
functionIdentifier.unparse(writer, leftPrec, rightPrec);
writer.keyword("AS");
functionClassName.unparse(writer, leftPrec, rightPrec);
if (functionLanguage != null) {
writer.keyword("LANGUAGE");
writer.keyword(functionLanguage);
}
}

@Override
public void validate() throws SqlValidateException {
// no-op
}

public boolean isIfNotExists() {
return ifNotExists;
}

public boolean isSystemFunction() {
return isSystemFunction;
}

public SqlCharStringLiteral getFunctionClassName() {
return this.functionClassName;
}

public String getFunctionLanguage() {
return this.functionLanguage;
}

public String[] getFunctionIdentifier() {
return functionIdentifier.names.toArray(new String[0]);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.sql.parser.ddl;

import org.apache.flink.sql.parser.ExtendedSqlNode;
import org.apache.flink.sql.parser.error.SqlValidateException;

import org.apache.calcite.sql.SqlDrop;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.ImmutableNullableList;

import javax.annotation.Nonnull;

import java.util.List;

import static java.util.Objects.requireNonNull;

/**
* DROP FUNCTION DDL sql call.
*/
public class SqlDropFunction extends SqlDrop implements ExtendedSqlNode {

public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("DROP FUNCTION", SqlKind.DROP_FUNCTION);

private final SqlIdentifier functionIdentifier;

private final boolean isTemporary;

private final boolean isSystemFunction;

public SqlDropFunction(
SqlParserPos pos,
SqlIdentifier functionIdentifier,
boolean ifExists,
boolean isTemporary,
boolean isSystemFunction) {
super(OPERATOR, pos, ifExists);
this.functionIdentifier = requireNonNull(functionIdentifier);
this.isSystemFunction = requireNonNull(isSystemFunction);
this.isTemporary = isTemporary;
}

@Nonnull
@Override
public List<SqlNode> getOperandList() {
return ImmutableNullableList.of(functionIdentifier);
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("DROP");
if (isTemporary) {
writer.keyword("TEMPORARY");
}
if (isSystemFunction) {
writer.keyword("SYSTEM");
}
writer.keyword("FUNCTION");
if (ifExists) {
writer.keyword("IF EXISTS");
}
functionIdentifier.unparse(writer, leftPrec, rightPrec);
}

@Override
public void validate() throws SqlValidateException {
// no-op
}

public String[] getFunctionIdentifier() {
return functionIdentifier.names.toArray(new String[0]);
}

public boolean getIfExists() {
return this.ifExists;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,48 @@ public void testCreateViewWithEmptyFields() {
sql(sql).node(new ValidationMatcher());
}

@Test
public void testCreateFunction() {
check("create function catalog1.db1.function1 as 'org.apache.fink.function.function1'",
"CREATE FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");

check("create temporary function catalog1.db1.function1 as 'org.apache.fink.function.function1'",
"CREATE TEMPORARY FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");

check("create temporary system function catalog1.db1.function1 as 'org.apache.fink.function.function1'",
"CREATE TEMPORARY SYSTEM FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");

check("create temporary function db1.function1 as 'org.apache.fink.function.function1'",
"CREATE TEMPORARY FUNCTION `DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");

check("create temporary function function1 as 'org.apache.fink.function.function1'",
"CREATE TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'");

check("create temporary function if not exists catalog1.db1.function1 as 'org.apache.fink.function.function1'",
"CREATE TEMPORARY FUNCTION IF NOT EXISTS `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'");

check("create temporary function function1 as 'org.apache.fink.function.function1' language java",
"CREATE TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE JAVA");

check("create temporary system function function1 as 'org.apache.fink.function.function1' language scala",
"CREATE TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE SCALA");
}

@Test
public void testDropTemporaryFunction() {
check("drop temporary function catalog1.db1.function1",
"DROP TEMPORARY FUNCTION `CATALOG1`.`DB1`.`FUNCTION1`");

check("drop temporary system function catalog1.db1.function1",
"DROP TEMPORARY SYSTEM FUNCTION `CATALOG1`.`DB1`.`FUNCTION1`");

check("drop temporary function if exists catalog1.db1.function1",
"DROP TEMPORARY FUNCTION IF EXISTS `CATALOG1`.`DB1`.`FUNCTION1`");

check("drop temporary system function if exists catalog1.db1.function1",
"DROP TEMPORARY SYSTEM FUNCTION IF EXISTS `CATALOG1`.`DB1`.`FUNCTION1`");
}

/** Matcher that invokes the #validate() of the {@link ExtendedSqlNode} instance. **/
private static class ValidationMatcher extends BaseMatcher<SqlNode> {
private String expectedColumnSql;
Expand Down

0 comments on commit 679398c

Please sign in to comment.