From 3b6e8db11fe41432d1f56ba84364d8b1351322ac Mon Sep 17 00:00:00 2001 From: Feng Jin Date: Thu, 9 May 2024 16:24:57 +0800 Subject: [PATCH] [FLINK-35197][table] Support convert alter materialized table suspend/resume nodes to operations --- ...AlterMaterializedTableResumeOperation.java | 63 +++++++++++++++++++ ...lterMaterializedTableSuspendOperation.java | 44 +++++++++++++ ...AlterMaterializedTableResumeConverter.java | 55 ++++++++++++++++ ...lterMaterializedTableSuspendConverter.java | 37 +++++++++++ .../converters/SqlNodeConverters.java | 2 + ...izedTableNodeToOperationConverterTest.java | 40 +++++++++--- 6 files changed, 234 insertions(+), 7 deletions(-) create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableResumeOperation.java create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableSuspendOperation.java create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableResumeConverter.java create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableSuspendConverter.java diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableResumeOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableResumeOperation.java new file mode 100644 index 0000000000000..907d5b7cc3be9 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableResumeOperation.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.operations.OperationUtils; + +import java.util.Map; + +/** Operation to describe a ALTER MATERIALIZED TABLE ... SUSPEND statement. */ +@Internal +public class AlterMaterializedTableResumeOperation extends AlterMaterializedTableOperation { + + private final Map dynamicOptions; + + public AlterMaterializedTableResumeOperation( + ObjectIdentifier tableIdentifier, Map options) { + super(tableIdentifier); + this.dynamicOptions = options; + } + + public Map getDynamicOptions() { + return dynamicOptions; + } + + @Override + public TableResultInternal execute(Context ctx) { + throw new UnsupportedOperationException( + "AlterMaterializedTableResumeOperation doesn't support ExecutableOperation yet."); + } + + @Override + public String asSummaryString() { + StringBuilder builder = + new StringBuilder( + String.format( + "ALTER MATERIALIZED TABLE %s RESUME", + tableIdentifier.asSummaryString())); + if (!dynamicOptions.isEmpty()) { + builder.append( + String.format(" WITH (%s)", OperationUtils.formatProperties(dynamicOptions))); + } + return builder.toString(); + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableSuspendOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableSuspendOperation.java new file mode 100644 index 0000000000000..36ef249924028 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableSuspendOperation.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations.materializedtable; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.internal.TableResultInternal; +import org.apache.flink.table.catalog.ObjectIdentifier; + +/** Operation to describe a ALTER MATERIALIZED TABLE ... SUSPEND statement. */ +@Internal +public class AlterMaterializedTableSuspendOperation extends AlterMaterializedTableOperation { + + public AlterMaterializedTableSuspendOperation(ObjectIdentifier tableIdentifier) { + super(tableIdentifier); + } + + @Override + public TableResultInternal execute(Context ctx) { + throw new UnsupportedOperationException( + "AlterMaterializedTableResumeOperation doesn't support ExecutableOperation yet."); + } + + @Override + public String asSummaryString() { + return String.format( + "ALTER MATERIALIZED TABLE %s SUSPEND", tableIdentifier.asSummaryString()); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableResumeConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableResumeConverter.java new file mode 100644 index 0000000000000..afb7f272e7bc4 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableResumeConverter.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableResume; +import org.apache.flink.sql.parser.ddl.SqlTableOption; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableResumeOperation; + +import java.util.HashMap; +import java.util.Map; + +/** A converter for {@link SqlAlterMaterializedTableResume}. */ +public class SqlAlterMaterializedTableResumeConverter + implements SqlNodeConverter { + @Override + public Operation convertSqlNode( + SqlAlterMaterializedTableResume sqlAlterMaterializedTableResume, + ConvertContext context) { + UnresolvedIdentifier unresolvedIdentifier = + UnresolvedIdentifier.of(sqlAlterMaterializedTableResume.fullTableName()); + ObjectIdentifier identifier = + context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier); + + // get table options + Map options = new HashMap<>(); + sqlAlterMaterializedTableResume + .getPropertyList() + .getList() + .forEach( + p -> + options.put( + ((SqlTableOption) p).getKeyString(), + ((SqlTableOption) p).getValueString())); + return new AlterMaterializedTableResumeOperation(identifier, options); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableSuspendConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableSuspendConverter.java new file mode 100644 index 0000000000000..37a2af735e3d7 --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlAlterMaterializedTableSuspendConverter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableSuspend; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableSuspendOperation; + +/** A converter for {@link SqlAlterMaterializedTableSuspend}. */ +public class SqlAlterMaterializedTableSuspendConverter + implements SqlNodeConverter { + @Override + public Operation convertSqlNode(SqlAlterMaterializedTableSuspend node, ConvertContext context) { + UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(node.fullTableName()); + ObjectIdentifier identifier = + context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier); + return new AlterMaterializedTableSuspendOperation(identifier); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java index 3211cd250a6f3..948790dfbe635 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java @@ -59,6 +59,8 @@ public class SqlNodeConverters { register(new SqlDescribeJobConverter()); register(new SqlCreateMaterializedTableConverter()); register(new SqlAlterMaterializedTableRefreshConverter()); + register(new SqlAlterMaterializedTableSuspendConverter()); + register(new SqlAlterMaterializedTableResumeConverter()); } /** diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java index e044c48b0a3ac..5200cc3e151bb 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java @@ -25,6 +25,8 @@ import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation; +import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableResumeOperation; +import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableSuspendOperation; import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation; import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap; @@ -45,7 +47,7 @@ public class SqlMaterializedTableNodeToOperationConverterTest extends SqlNodeToOperationConversionTestBase { @Test - public void testCreateMaterializedTable() { + void testCreateMaterializedTable() { final String sql = "CREATE MATERIALIZED TABLE mtbl1 (\n" + " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED" @@ -96,7 +98,7 @@ public void testCreateMaterializedTable() { } @Test - public void testContinuousRefreshMode() { + void testContinuousRefreshMode() { // test continuous mode derived by specify freshness automatically final String sql = "CREATE MATERIALIZED TABLE mtbl1\n" @@ -134,7 +136,7 @@ public void testContinuousRefreshMode() { } @Test - public void testFullRefreshMode() { + void testFullRefreshMode() { // test full mode derived by specify freshness automatically final String sql = "CREATE MATERIALIZED TABLE mtbl1\n" @@ -172,7 +174,7 @@ public void testFullRefreshMode() { } @Test - public void testCreateMaterializedTableWithInvalidPrimaryKey() { + void testCreateMaterializedTableWithInvalidPrimaryKey() { // test unsupported constraint final String sql = "CREATE MATERIALIZED TABLE mtbl1 (\n" @@ -213,7 +215,7 @@ public void testCreateMaterializedTableWithInvalidPrimaryKey() { } @Test - public void testCreateMaterializedTableWithInvalidPartitionKey() { + void testCreateMaterializedTableWithInvalidPartitionKey() { final String sql = "CREATE MATERIALIZED TABLE mtbl1\n" + "PARTITIONED BY (a, e)\n" @@ -227,7 +229,7 @@ public void testCreateMaterializedTableWithInvalidPartitionKey() { } @Test - public void testCreateMaterializedTableWithInvalidFreshnessType() { + void testCreateMaterializedTableWithInvalidFreshnessType() { // test negative freshness value final String sql = "CREATE MATERIALIZED TABLE mtbl1\n" @@ -261,7 +263,7 @@ public void testCreateMaterializedTableWithInvalidFreshnessType() { } @Test - public void testAlterMaterializedTableRefreshOperationWithPartitionSpec() { + void testAlterMaterializedTableRefreshOperationWithPartitionSpec() { final String sql = "ALTER MATERIALIZED TABLE mtbl1 REFRESH PARTITION (ds1 = '1', ds2 = '2')"; @@ -286,4 +288,28 @@ public void testAlterMaterializedTableRefreshOperationWithoutPartitionSpec() { assertThat(op.getTableIdentifier().toString()).isEqualTo("`builtin`.`default`.`mtbl1`"); assertThat(op.getPartitionSpec()).isEmpty(); } + + @Test + void testAlterMaterializedTableSuspend() { + final String sql = "ALTER MATERIALIZED TABLE mtbl1 SUSPEND"; + Operation operation = parse(sql); + assertThat(operation).isInstanceOf(AlterMaterializedTableSuspendOperation.class); + } + + @Test + void testAlterMaterializedTableResume() { + final String sql1 = "ALTER MATERIALIZED TABLE mtbl1 RESUME"; + Operation operation = parse(sql1); + assertThat(operation).isInstanceOf(AlterMaterializedTableResumeOperation.class); + assertThat(operation.asSummaryString()) + .isEqualTo("ALTER MATERIALIZED TABLE builtin.default.mtbl1 RESUME"); + + final String sql2 = "ALTER MATERIALIZED TABLE mtbl1 RESUME WITH ('k1' = 'v1')"; + Operation operation2 = parse(sql2); + assertThat(operation2).isInstanceOf(AlterMaterializedTableResumeOperation.class); + assertThat(((AlterMaterializedTableResumeOperation) operation2).getDynamicOptions()) + .containsEntry("k1", "v1"); + assertThat(operation2.asSummaryString()) + .isEqualTo("ALTER MATERIALIZED TABLE builtin.default.mtbl1 RESUME WITH (k1: [v1])"); + } }