Skip to content

Commit

Permalink
[FLINK-33495][FLINK-33496] Add DISTRIBUTED BY clause for CREATE TABLE
Browse files Browse the repository at this point in the history
- Adds distribution support to CatalogTable
- Adds connector ability SupportsBucketing
- Adds distribution to TableDescriptor.

This closes apache#24155.
  • Loading branch information
jnh5y authored and twalthr committed Feb 19, 2024
1 parent 64463d4 commit 6c9ac5c
Show file tree
Hide file tree
Showing 58 changed files with 2,431 additions and 105 deletions.
5 changes: 5 additions & 0 deletions flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
"org.apache.flink.sql.parser.ddl.SqlCreateTableAs"
"org.apache.flink.sql.parser.ddl.SqlCreateTableLike"
"org.apache.flink.sql.parser.ddl.SqlCreateView"
"org.apache.flink.sql.parser.ddl.SqlDistribution"
"org.apache.flink.sql.parser.ddl.SqlDropCatalog"
"org.apache.flink.sql.parser.ddl.SqlDropDatabase"
"org.apache.flink.sql.parser.ddl.SqlDropFunction"
Expand Down Expand Up @@ -139,6 +140,7 @@
# keyword, please also add it to 'nonReservedKeywords' section.
# Please keep the keyword in alphabetical order if new keyword is added.
keywords: [
"BUCKETS"
"BYTES"
"CATALOGS"
"CHANGELOG_MODE"
Expand All @@ -147,11 +149,14 @@
"COMPILE"
"COLUMNS"
"DATABASES"
"DISTRIBUTED"
"DISTRIBUTION"
"DRAIN"
"ENFORCED"
"ESTIMATED_COST"
"EXTENDED"
"FUNCTIONS"
"HASH"
"IF"
"JSON_EXECUTION_PLAN"
"PLAN_ADVICE"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1349,6 +1349,48 @@ SqlNodeList TableProperties():
{ return new SqlNodeList(proList, span.end(this)); }
}

SqlNumericLiteral IntoBuckets(SqlParserPos startPos) :
{
SqlNumericLiteral bucketCount;
}
{
<INTO> { bucketCount = UnsignedNumericLiteral();
if (!bucketCount.isInteger()) {
throw SqlUtil.newContextException(getPos(),
ParserResource.RESOURCE.bucketCountMustBePositiveInteger());
}
} <BUCKETS>
{
return bucketCount;
}
}

SqlDistribution SqlDistribution(SqlParserPos startPos) :
{
String distributionKind = null;
SqlNumericLiteral bucketCount = null;
SqlNodeList bucketColumns = SqlNodeList.EMPTY;
SqlDistribution distribution = null;
}
{
(
bucketCount = IntoBuckets(getPos())
|
(
<BY> (
<HASH> { distributionKind = "HASH"; }
| <RANGE> { distributionKind = "RANGE"; }
| { distributionKind = null; }
)
{ bucketColumns = ParenthesizedSimpleIdentifierList(); }
[ bucketCount = IntoBuckets(getPos()) ]
)
)
{
return new SqlDistribution(startPos, distributionKind, bucketColumns, bucketCount);
}
}

SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
{
final SqlParserPos startPos = s.pos();
Expand All @@ -1362,6 +1404,10 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
SqlNode asQuery = null;

SqlNodeList propertyList = SqlNodeList.EMPTY;
String distributionKind = null;
SqlNumericLiteral bucketCount = null;
SqlNodeList bucketColumns = SqlNodeList.EMPTY;
SqlDistribution distribution = null;
SqlNodeList partitionColumns = SqlNodeList.EMPTY;
SqlParserPos pos = startPos;
}
Expand Down Expand Up @@ -1389,6 +1435,11 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
String p = SqlParserUtil.parseString(token.image);
comment = SqlLiteral.createCharString(p, getPos());
}]
[
<DISTRIBUTED>
distribution = SqlDistribution(getPos())
]

[
<PARTITIONED> <BY>
partitionColumns = ParenthesizedSimpleIdentifierList()
Expand All @@ -1406,6 +1457,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
columnList,
constraints,
propertyList,
distribution,
partitionColumns,
watermark,
comment,
Expand All @@ -1423,6 +1475,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
columnList,
constraints,
propertyList,
distribution,
partitionColumns,
watermark,
comment,
Expand All @@ -1436,6 +1489,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
columnList,
constraints,
propertyList,
distribution,
partitionColumns,
watermark,
comment,
Expand All @@ -1451,6 +1505,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean isTemporary) :
columnList,
constraints,
propertyList,
distribution,
partitionColumns,
watermark,
comment,
Expand Down Expand Up @@ -1503,6 +1558,8 @@ SqlTableLikeOption SqlTableLikeOption():
<ALL> { featureOption = FeatureOption.ALL;}
|
<CONSTRAINTS> { featureOption = FeatureOption.CONSTRAINTS;}
|
<DISTRIBUTION> { featureOption = FeatureOption.DISTRIBUTION;}
|
<GENERATED> { featureOption = FeatureOption.GENERATED;}
|
Expand Down Expand Up @@ -1551,6 +1608,10 @@ SqlNode SqlReplaceTable() :
List<SqlTableConstraint> constraints = new ArrayList<SqlTableConstraint>();
SqlWatermark watermark = null;
SqlNodeList columnList = SqlNodeList.EMPTY;
String distributionKind = null;
SqlNumericLiteral bucketCount = null;
SqlNodeList bucketColumns = SqlNodeList.EMPTY;
SqlDistribution distribution = null;
SqlNodeList partitionColumns = SqlNodeList.EMPTY;
boolean ifNotExists = false;
}
Expand Down Expand Up @@ -1582,6 +1643,10 @@ SqlNode SqlReplaceTable() :
String p = SqlParserUtil.parseString(token.image);
comment = SqlLiteral.createCharString(p, getPos());
}]
[
<DISTRIBUTED>
distribution = SqlDistribution(getPos())
]
[
<PARTITIONED> <BY>
partitionColumns = ParenthesizedSimpleIdentifierList()
Expand All @@ -1598,6 +1663,7 @@ SqlNode SqlReplaceTable() :
columnList,
constraints,
propertyList,
distribution,
partitionColumns,
watermark,
comment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {

private final List<SqlTableConstraint> tableConstraints;

public SqlDistribution getDistribution() {
return distribution;
}

private final SqlDistribution distribution;

private final SqlNodeList partitionKeyList;

private final SqlWatermark watermark;
Expand All @@ -77,6 +83,7 @@ public SqlCreateTable(
SqlNodeList columnList,
List<SqlTableConstraint> tableConstraints,
SqlNodeList propertyList,
SqlDistribution distribution,
SqlNodeList partitionKeyList,
@Nullable SqlWatermark watermark,
@Nullable SqlCharStringLiteral comment,
Expand All @@ -89,6 +96,7 @@ public SqlCreateTable(
columnList,
tableConstraints,
propertyList,
distribution,
partitionKeyList,
watermark,
comment,
Expand All @@ -103,6 +111,7 @@ protected SqlCreateTable(
SqlNodeList columnList,
List<SqlTableConstraint> tableConstraints,
SqlNodeList propertyList,
@Nullable SqlDistribution distribution,
SqlNodeList partitionKeyList,
@Nullable SqlWatermark watermark,
@Nullable SqlCharStringLiteral comment,
Expand All @@ -114,6 +123,7 @@ protected SqlCreateTable(
this.tableConstraints =
requireNonNull(tableConstraints, "table constraints should not be null");
this.propertyList = requireNonNull(propertyList, "propertyList should not be null");
this.distribution = distribution;
this.partitionKeyList =
requireNonNull(partitionKeyList, "partitionKeyList should not be null");
this.watermark = watermark;
Expand Down Expand Up @@ -256,6 +266,10 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
comment.unparse(writer, leftPrec, rightPrec);
}

if (this.distribution != null) {
distribution.unparse(writer, leftPrec, rightPrec);
}

if (this.partitionKeyList.size() > 0) {
writer.newlineAndIndent();
writer.keyword("PARTITIONED BY");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public SqlCreateTableAs(
SqlNodeList columnList,
List<SqlTableConstraint> tableConstraints,
SqlNodeList propertyList,
SqlDistribution distribution,
SqlNodeList partitionKeyList,
@Nullable SqlWatermark watermark,
@Nullable SqlCharStringLiteral comment,
Expand All @@ -92,6 +93,7 @@ public SqlCreateTableAs(
columnList,
tableConstraints,
propertyList,
distribution,
partitionKeyList,
watermark,
comment,
Expand Down Expand Up @@ -134,6 +136,11 @@ public void validate() throws SqlValidateException {
getParserPosition(),
"CREATE TABLE AS SELECT syntax does not support to specify explicit watermark yet.");
}
if (getDistribution() != null) {
throw new SqlValidateException(
getParserPosition(),
"CREATE TABLE AS SELECT syntax does not support creating distributed tables yet.");
}
// TODO flink dialect supports dynamic partition
if (getPartitionKeyList().size() > 0) {
throw new SqlValidateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public SqlCreateTableLike(
SqlNodeList columnList,
List<SqlTableConstraint> tableConstraints,
SqlNodeList propertyList,
SqlDistribution distribution,
SqlNodeList partitionKeyList,
@Nullable SqlWatermark watermark,
@Nullable SqlCharStringLiteral comment,
Expand All @@ -94,6 +95,7 @@ public SqlCreateTableLike(
columnList,
tableConstraints,
propertyList,
distribution,
partitionKeyList,
watermark,
comment,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.sql.parser.ddl;

import org.apache.calcite.sql.SqlCall;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlNumericLiteral;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.util.ImmutableNullableList;

import javax.annotation.Nullable;

import java.util.List;
import java.util.Optional;

/**
* Distribution statement in CREATE TABLE DDL, e.g. {@code DISTRIBUTED BY HASH(column1, column2)
* INTO BUCKETS 10}.
*/
public class SqlDistribution extends SqlCall {

private static final SqlSpecialOperator OPERATOR =
new SqlSpecialOperator("DISTRIBUTED BY", SqlKind.OTHER);

private final String distributionKind;
private final SqlNodeList bucketColumns;
private final SqlNumericLiteral bucketCount;

public SqlDistribution(
SqlParserPos pos,
@Nullable String distributionKind,
@Nullable SqlNodeList bucketColumns,
@Nullable SqlNumericLiteral bucketCount) {
super(pos);
this.distributionKind = distributionKind;
this.bucketColumns = bucketColumns;
this.bucketCount = bucketCount;
}

@Override
public SqlOperator getOperator() {
return OPERATOR;
}

@Override
public List<SqlNode> getOperandList() {
return ImmutableNullableList.of(bucketCount, bucketColumns);
}

@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.newlineAndIndent();

if (bucketColumns.size() == 0 && bucketCount != null) {
writer.keyword("DISTRIBUTED INTO");
bucketCount.unparse(writer, leftPrec, rightPrec);
writer.keyword("BUCKETS");
writer.newlineAndIndent();
return;
}

writer.keyword("DISTRIBUTED BY");
if (distributionKind != null) {
writer.print(distributionKind);
}
SqlWriter.Frame bucketFrame = writer.startList("(", ")");
bucketColumns.unparse(writer, leftPrec, rightPrec);
writer.endList(bucketFrame);

if (bucketCount != null) {
writer.keyword("INTO");
bucketCount.unparse(writer, leftPrec, rightPrec);
writer.keyword("BUCKETS");
}
writer.newlineAndIndent();
}

public Optional<String> getDistributionKind() {
return Optional.ofNullable(distributionKind);
}

public SqlNumericLiteral getBucketCount() {
return bucketCount;
}

public SqlNodeList getBucketColumns() {
return bucketColumns;
}
}
Loading

0 comments on commit 6c9ac5c

Please sign in to comment.