Skip to content

Commit

Permalink
[FLINK-35193][table] Support drop materialized table syntax
Browse files Browse the repository at this point in the history
  • Loading branch information
hackergin authored and lsyldliu committed May 14, 2024
1 parent 65d31e2 commit 8551ef3
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
"org.apache.flink.sql.parser.ddl.SqlDropCatalog"
"org.apache.flink.sql.parser.ddl.SqlDropDatabase"
"org.apache.flink.sql.parser.ddl.SqlDropFunction"
"org.apache.flink.sql.parser.ddl.SqlDropMaterializedTable"
"org.apache.flink.sql.parser.ddl.SqlDropPartitions"
"org.apache.flink.sql.parser.ddl.SqlDropPartitions.AlterTableDropPartitionsContext"
"org.apache.flink.sql.parser.ddl.SqlDropTable"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1801,6 +1801,34 @@ SqlCreate SqlCreateMaterializedTable(Span s, boolean replace, boolean isTemporar
}
}

/**
* Parses a DROP MATERIALIZED TABLE statement.
*/
SqlDrop SqlDropMaterializedTable(Span s, boolean replace, boolean isTemporary) :
{
SqlIdentifier tableName = null;
boolean ifExists = false;
}
{
<MATERIALIZED>
{
if (isTemporary) {
throw SqlUtil.newContextException(
getPos(),
ParserResource.RESOURCE.dropTemporaryMaterializedTableUnsupported());
}
}
<TABLE>

ifExists = IfExistsOpt()

tableName = CompoundIdentifier()

{
return new SqlDropMaterializedTable(s.pos(), tableName, ifExists);
}
}

/**
* Parses alter materialized table.
*/
Expand Down Expand Up @@ -2427,6 +2455,8 @@ SqlDrop SqlDropExtended(Span s, boolean replace) :
(
drop = SqlDropCatalog(s, replace)
|
drop = SqlDropMaterializedTable(s, replace, isTemporary)
|
drop = SqlDropTable(s, replace, isTemporary)
|
drop = SqlDropView(s, replace, isTemporary)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.sql.parser.ddl;

import org.apache.calcite.sql.SqlDrop;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.ImmutableNullableList;

import java.util.List;

/** DROP MATERIALIZED TABLE DDL sql call. */
public class SqlDropMaterializedTable extends SqlDrop {

private static final SqlOperator OPERATOR =
new SqlSpecialOperator("DROP MATERIALIZED TABLE", SqlKind.DROP_TABLE);

private final SqlIdentifier tableIdentifier;

public SqlDropMaterializedTable(
SqlParserPos pos, SqlIdentifier tableIdentifier, boolean ifExists) {
super(OPERATOR, pos, ifExists);
this.tableIdentifier = tableIdentifier;
}

public String[] fullTableName() {
return tableIdentifier.names.toArray(new String[0]);
}

public boolean getIfExists() {
return this.ifExists;
}

@Override
public List<SqlNode> getOperandList() {
return ImmutableNullableList.of(tableIdentifier);
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("DROP MATERIALIZED TABLE");
if (ifExists) {
writer.keyword("IF EXISTS");
}
tableIdentifier.unparse(writer, leftPrec, rightPrec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,7 @@ public interface ParserResource {

@Resources.BaseMessage("REPLACE MATERIALIZED TABLE is not supported.")
Resources.ExInst<ParseException> replaceMaterializedTableUnsupported();

@Resources.BaseMessage("DROP TEMPORARY MATERIALIZED TABLE is not supported.")
Resources.ExInst<ParseException> dropTemporaryMaterializedTableUnsupported();
}
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,31 @@ void testAlterMaterializedTableReset() {
+ " ");
}

@Test
void testDropMaterializedTable() {
final String sql = "DROP MATERIALIZED TABLE tbl1";
final String expected = "DROP MATERIALIZED TABLE `TBL1`";
sql(sql).ok(expected);

final String sql2 = "DROP MATERIALIZED TABLE IF EXISTS tbl1";
sql(sql2).ok("DROP MATERIALIZED TABLE IF EXISTS `TBL1`");

final String sql3 = "DROP MATERIALIZED TABLE tb1 ^IF^ EXISTS";
sql(sql3)
.fails(
"Encountered \"IF\" at line 1, column 29.\n"
+ "Was expecting one of:\n"
+ " <EOF> \n"
+ " \".\" ...\n"
+ " ");
}

@Test
void testDropTemporaryMaterializedTable() {
final String sql = "DROP TEMPORARY ^MATERIALIZED^ TABLE tbl1";
sql(sql).fails("DROP TEMPORARY MATERIALIZED TABLE is not supported.");
}

public SqlParserFixture fixture() {
return SqlParserFixture.DEFAULT.withConfig(
c -> c.withParserFactory(FlinkSqlParserImpl.FACTORY));
Expand Down

0 comments on commit 8551ef3

Please sign in to comment.