Skip to content

Commit

Permalink
[FLINK-35197][table] Support convert alter materialized table suspend…
Browse files Browse the repository at this point in the history
…/resume nodes to operations
  • Loading branch information
hackergin committed May 11, 2024
1 parent e80c286 commit 3b6e8db
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations.materializedtable;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.operations.OperationUtils;

import java.util.Map;

/** Operation to describe a ALTER MATERIALIZED TABLE ... SUSPEND statement. */
@Internal
public class AlterMaterializedTableResumeOperation extends AlterMaterializedTableOperation {

private final Map<String, String> dynamicOptions;

public AlterMaterializedTableResumeOperation(
ObjectIdentifier tableIdentifier, Map<String, String> options) {
super(tableIdentifier);
this.dynamicOptions = options;
}

public Map<String, String> getDynamicOptions() {
return dynamicOptions;
}

@Override
public TableResultInternal execute(Context ctx) {
throw new UnsupportedOperationException(
"AlterMaterializedTableResumeOperation doesn't support ExecutableOperation yet.");
}

@Override
public String asSummaryString() {
StringBuilder builder =
new StringBuilder(
String.format(
"ALTER MATERIALIZED TABLE %s RESUME",
tableIdentifier.asSummaryString()));
if (!dynamicOptions.isEmpty()) {
builder.append(
String.format(" WITH (%s)", OperationUtils.formatProperties(dynamicOptions)));
}
return builder.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations.materializedtable;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.ObjectIdentifier;

/** Operation to describe a ALTER MATERIALIZED TABLE ... SUSPEND statement. */
@Internal
public class AlterMaterializedTableSuspendOperation extends AlterMaterializedTableOperation {

public AlterMaterializedTableSuspendOperation(ObjectIdentifier tableIdentifier) {
super(tableIdentifier);
}

@Override
public TableResultInternal execute(Context ctx) {
throw new UnsupportedOperationException(
"AlterMaterializedTableResumeOperation doesn't support ExecutableOperation yet.");
}

@Override
public String asSummaryString() {
return String.format(
"ALTER MATERIALIZED TABLE %s SUSPEND", tableIdentifier.asSummaryString());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.operations.converters;

import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableResume;
import org.apache.flink.sql.parser.ddl.SqlTableOption;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableResumeOperation;

import java.util.HashMap;
import java.util.Map;

/** A converter for {@link SqlAlterMaterializedTableResume}. */
public class SqlAlterMaterializedTableResumeConverter
implements SqlNodeConverter<SqlAlterMaterializedTableResume> {
@Override
public Operation convertSqlNode(
SqlAlterMaterializedTableResume sqlAlterMaterializedTableResume,
ConvertContext context) {
UnresolvedIdentifier unresolvedIdentifier =
UnresolvedIdentifier.of(sqlAlterMaterializedTableResume.fullTableName());
ObjectIdentifier identifier =
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);

// get table options
Map<String, String> options = new HashMap<>();
sqlAlterMaterializedTableResume
.getPropertyList()
.getList()
.forEach(
p ->
options.put(
((SqlTableOption) p).getKeyString(),
((SqlTableOption) p).getValueString()));
return new AlterMaterializedTableResumeOperation(identifier, options);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.operations.converters;

import org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTableSuspend;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableSuspendOperation;

/** A converter for {@link SqlAlterMaterializedTableSuspend}. */
public class SqlAlterMaterializedTableSuspendConverter
implements SqlNodeConverter<SqlAlterMaterializedTableSuspend> {
@Override
public Operation convertSqlNode(SqlAlterMaterializedTableSuspend node, ConvertContext context) {
UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(node.fullTableName());
ObjectIdentifier identifier =
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
return new AlterMaterializedTableSuspendOperation(identifier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class SqlNodeConverters {
register(new SqlDescribeJobConverter());
register(new SqlCreateMaterializedTableConverter());
register(new SqlAlterMaterializedTableRefreshConverter());
register(new SqlAlterMaterializedTableSuspendConverter());
register(new SqlAlterMaterializedTableResumeConverter());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableRefreshOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableResumeOperation;
import org.apache.flink.table.operations.materializedtable.AlterMaterializedTableSuspendOperation;
import org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;

import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
Expand All @@ -45,7 +47,7 @@ public class SqlMaterializedTableNodeToOperationConverterTest
extends SqlNodeToOperationConversionTestBase {

@Test
public void testCreateMaterializedTable() {
void testCreateMaterializedTable() {
final String sql =
"CREATE MATERIALIZED TABLE mtbl1 (\n"
+ " CONSTRAINT ct1 PRIMARY KEY(a) NOT ENFORCED"
Expand Down Expand Up @@ -96,7 +98,7 @@ public void testCreateMaterializedTable() {
}

@Test
public void testContinuousRefreshMode() {
void testContinuousRefreshMode() {
// test continuous mode derived by specify freshness automatically
final String sql =
"CREATE MATERIALIZED TABLE mtbl1\n"
Expand Down Expand Up @@ -134,7 +136,7 @@ public void testContinuousRefreshMode() {
}

@Test
public void testFullRefreshMode() {
void testFullRefreshMode() {
// test full mode derived by specify freshness automatically
final String sql =
"CREATE MATERIALIZED TABLE mtbl1\n"
Expand Down Expand Up @@ -172,7 +174,7 @@ public void testFullRefreshMode() {
}

@Test
public void testCreateMaterializedTableWithInvalidPrimaryKey() {
void testCreateMaterializedTableWithInvalidPrimaryKey() {
// test unsupported constraint
final String sql =
"CREATE MATERIALIZED TABLE mtbl1 (\n"
Expand Down Expand Up @@ -213,7 +215,7 @@ public void testCreateMaterializedTableWithInvalidPrimaryKey() {
}

@Test
public void testCreateMaterializedTableWithInvalidPartitionKey() {
void testCreateMaterializedTableWithInvalidPartitionKey() {
final String sql =
"CREATE MATERIALIZED TABLE mtbl1\n"
+ "PARTITIONED BY (a, e)\n"
Expand All @@ -227,7 +229,7 @@ public void testCreateMaterializedTableWithInvalidPartitionKey() {
}

@Test
public void testCreateMaterializedTableWithInvalidFreshnessType() {
void testCreateMaterializedTableWithInvalidFreshnessType() {
// test negative freshness value
final String sql =
"CREATE MATERIALIZED TABLE mtbl1\n"
Expand Down Expand Up @@ -261,7 +263,7 @@ public void testCreateMaterializedTableWithInvalidFreshnessType() {
}

@Test
public void testAlterMaterializedTableRefreshOperationWithPartitionSpec() {
void testAlterMaterializedTableRefreshOperationWithPartitionSpec() {
final String sql =
"ALTER MATERIALIZED TABLE mtbl1 REFRESH PARTITION (ds1 = '1', ds2 = '2')";

Expand All @@ -286,4 +288,28 @@ public void testAlterMaterializedTableRefreshOperationWithoutPartitionSpec() {
assertThat(op.getTableIdentifier().toString()).isEqualTo("`builtin`.`default`.`mtbl1`");
assertThat(op.getPartitionSpec()).isEmpty();
}

@Test
void testAlterMaterializedTableSuspend() {
final String sql = "ALTER MATERIALIZED TABLE mtbl1 SUSPEND";
Operation operation = parse(sql);
assertThat(operation).isInstanceOf(AlterMaterializedTableSuspendOperation.class);
}

@Test
void testAlterMaterializedTableResume() {
final String sql1 = "ALTER MATERIALIZED TABLE mtbl1 RESUME";
Operation operation = parse(sql1);
assertThat(operation).isInstanceOf(AlterMaterializedTableResumeOperation.class);
assertThat(operation.asSummaryString())
.isEqualTo("ALTER MATERIALIZED TABLE builtin.default.mtbl1 RESUME");

final String sql2 = "ALTER MATERIALIZED TABLE mtbl1 RESUME WITH ('k1' = 'v1')";
Operation operation2 = parse(sql2);
assertThat(operation2).isInstanceOf(AlterMaterializedTableResumeOperation.class);
assertThat(((AlterMaterializedTableResumeOperation) operation2).getDynamicOptions())
.containsEntry("k1", "v1");
assertThat(operation2.asSummaryString())
.isEqualTo("ALTER MATERIALIZED TABLE builtin.default.mtbl1 RESUME WITH (k1: [v1])");
}
}

0 comments on commit 3b6e8db

Please sign in to comment.