Skip to content

Commit

Permalink
[FLINK-35188][table-api] Introduce CatalogMaterializedTable interface…
Browse files Browse the repository at this point in the history
… to support materialized table
  • Loading branch information
lsyldliu committed Apr 24, 2024
1 parent 9cc6498 commit 2a9876d
Show file tree
Hide file tree
Showing 8 changed files with 885 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@
/** Factory to create the operation executor. */
public class OperationExecutorFactory {

// Hive dialect doesn't support materialized table currently.
private static final Set<TableKind> TABLE_KINDS =
new HashSet<>(Arrays.asList(TableKind.TABLE, TableKind.VIEW));

public static Callable<ResultSet> createGetCatalogsExecutor(
SqlGatewayService service, SessionHandle sessionHandle) {
return () -> executeGetCatalogs(service, sessionHandle);
Expand Down Expand Up @@ -291,14 +295,13 @@ private static ResultSet executeGetColumns(
Set<String> schemaNames =
filterAndSort(
service.listDatabases(sessionHandle, specifiedCatalogName), schemaName);
Set<TableKind> tableKinds = new HashSet<>(Arrays.asList(TableKind.values()));

List<RowData> results = new ArrayList<>();
for (String schema : schemaNames) {
Set<TableInfo> tableInfos =
filterAndSort(
service.listTables(
sessionHandle, specifiedCatalogName, schema, tableKinds),
sessionHandle, specifiedCatalogName, schema, TABLE_KINDS),
candidates -> candidates.getIdentifier().getObjectName(),
tableName);

Expand Down Expand Up @@ -369,10 +372,7 @@ private static ResultSet executeGetPrimaryKeys(
Set<TableInfo> tableInfos =
filterAndSort(
service.listTables(
sessionHandle,
specifiedCatalogName,
schema,
new HashSet<>(Arrays.asList(TableKind.values()))),
sessionHandle, specifiedCatalogName, schema, TABLE_KINDS),
candidate -> candidate.getIdentifier().getObjectName(),
tableName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -963,7 +963,8 @@ public void createTable(
ignoreIfExists);

catalog.createTable(path, resolvedListenedTable, ignoreIfExists);
if (resolvedListenedTable instanceof CatalogTable) {
if (resolvedListenedTable instanceof CatalogTable
|| resolvedListenedTable instanceof CatalogMaterializedTable) {
catalogModificationListeners.forEach(
listener ->
listener.onEvent(
Expand Down Expand Up @@ -1318,6 +1319,8 @@ public ResolvedCatalogBaseTable<?> resolveCatalogBaseTable(CatalogBaseTable base
Preconditions.checkNotNull(schemaResolver, "Schema resolver is not initialized.");
if (baseTable instanceof CatalogTable) {
return resolveCatalogTable((CatalogTable) baseTable);
} else if (baseTable instanceof CatalogMaterializedTable) {
return resolveCatalogMaterializedTable((CatalogMaterializedTable) baseTable);
} else if (baseTable instanceof CatalogView) {
return resolveCatalogView((CatalogView) baseTable);
}
Expand Down Expand Up @@ -1389,6 +1392,42 @@ public ResolvedCatalogTable resolveCatalogTable(CatalogTable table) {
return new ResolvedCatalogTable(table, resolvedSchema);
}

/**
* Resolves a {@link CatalogMaterializedTable} to a validated {@link
* ResolvedCatalogMaterializedTable}.
*/
public ResolvedCatalogMaterializedTable resolveCatalogMaterializedTable(
CatalogMaterializedTable table) {
Preconditions.checkNotNull(schemaResolver, "Schema resolver is not initialized.");

if (table instanceof ResolvedCatalogMaterializedTable) {
return (ResolvedCatalogMaterializedTable) table;
}

final ResolvedSchema resolvedSchema = table.getUnresolvedSchema().resolve(schemaResolver);

// Validate partition keys are included in physical columns
final List<String> physicalColumns =
resolvedSchema.getColumns().stream()
.filter(Column::isPhysical)
.map(Column::getName)
.collect(Collectors.toList());
table.getPartitionKeys()
.forEach(
partitionKey -> {
if (!physicalColumns.contains(partitionKey)) {
throw new ValidationException(
String.format(
"Invalid partition key '%s'. A partition key must "
+ "reference a physical column in the schema. "
+ "Available columns are: %s",
partitionKey, physicalColumns));
}
});

return new ResolvedCatalogMaterializedTable(table, resolvedSchema);
}

/** Resolves a {@link CatalogView} to a validated {@link ResolvedCatalogView}. */
public ResolvedCatalogView resolveCatalogView(CatalogView view) {
Preconditions.checkNotNull(schemaResolver, "Schema resolver is not initialized.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -49,7 +50,8 @@
import static org.assertj.core.api.HamcrestCondition.matching;

/**
* Tests for {@link CatalogTable} to {@link ResolvedCatalogTable} and {@link CatalogView} to {@link
* Tests for {@link CatalogTable} to {@link ResolvedCatalogTable}, {@link CatalogMaterializedTable}
* to {@link ResolvedCatalogMaterializedTable} and {@link CatalogView} to {@link
* ResolvedCatalogView} including {@link CatalogPropertiesUtil}.
*/
class CatalogBaseTableResolutionTest {
Expand Down Expand Up @@ -82,6 +84,17 @@ class CatalogBaseTableResolutionTest {
.primaryKeyNamed("primary_constraint", "id")
.build();

private static final Schema MATERIALIZED_TABLE_SCHEMA =
Schema.newBuilder()
.column("id", DataTypes.INT().notNull())
.column("region", DataTypes.VARCHAR(200))
.withComment("This is a region column.")
.column("county", DataTypes.VARCHAR(200))
.column("topic", DataTypes.VARCHAR(200))
.withComment("") // empty column comment
.primaryKeyNamed("primary_constraint", "id")
.build();

private static final TableSchema LEGACY_TABLE_SCHEMA =
TableSchema.builder()
.add(TableColumn.physical("id", DataTypes.INT().notNull()))
Expand Down Expand Up @@ -117,6 +130,18 @@ class CatalogBaseTableResolutionTest {
UniqueConstraint.primaryKey(
"primary_constraint", Collections.singletonList("id")));

private static final ResolvedSchema RESOLVED_MATERIALIZED_TABLE_SCHEMA =
new ResolvedSchema(
Arrays.asList(
Column.physical("id", DataTypes.INT().notNull()),
Column.physical("region", DataTypes.VARCHAR(200))
.withComment("This is a region column."),
Column.physical("county", DataTypes.VARCHAR(200)),
Column.physical("topic", DataTypes.VARCHAR(200)).withComment("")),
Collections.emptyList(),
UniqueConstraint.primaryKey(
"primary_constraint", Collections.singletonList("id")));

private static final ResolvedSchema RESOLVED_VIEW_SCHEMA =
new ResolvedSchema(
Arrays.asList(
Expand All @@ -140,6 +165,30 @@ void testCatalogTableResolution() {
assertThat(resolvedTable.getSchema()).isEqualTo(LEGACY_TABLE_SCHEMA);
}

@Test
void testCatalogMaterializedTableResolution() {
final CatalogMaterializedTable materializedTable = catalogMaterializedTable();

assertThat(materializedTable.getUnresolvedSchema()).isNotNull();

final ResolvedCatalogMaterializedTable resolvedMaterializedTable =
resolveCatalogBaseTable(ResolvedCatalogMaterializedTable.class, materializedTable);

assertThat(resolvedMaterializedTable.getResolvedSchema())
.isEqualTo(RESOLVED_MATERIALIZED_TABLE_SCHEMA);

assertThat(resolvedMaterializedTable.getDefinitionQuery())
.isEqualTo(materializedTable.getDefinitionQuery());
assertThat(resolvedMaterializedTable.getFreshness())
.isEqualTo(materializedTable.getFreshness());
assertThat(resolvedMaterializedTable.getLogicalRefreshMode())
.isEqualTo(materializedTable.getLogicalRefreshMode());
assertThat(resolvedMaterializedTable.getRefreshMode())
.isEqualTo(materializedTable.getRefreshMode());
assertThat(resolvedMaterializedTable.getRefreshStatus())
.isEqualTo(materializedTable.getRefreshStatus());
}

@Test
void testCatalogViewResolution() {
final CatalogView view = catalogView();
Expand Down Expand Up @@ -319,6 +368,28 @@ private static Map<String, String> catalogTableAsProperties() {
return properties;
}

private static CatalogMaterializedTable catalogMaterializedTable() {
final String comment = "This is an example materialized table.";
final List<String> partitionKeys = Arrays.asList("region", "county");

final String definitionQuery =
String.format(
"SELECT id, region, county FROM %s.%s.T",
DEFAULT_CATALOG, DEFAULT_DATABASE);

CatalogMaterializedTable.Builder builder = CatalogMaterializedTable.newBuilder();
return builder.schema(MATERIALIZED_TABLE_SCHEMA)
.comment(comment)
.partitionKeys(partitionKeys)
.options(Collections.emptyMap())
.definitionQuery(definitionQuery)
.freshness(Duration.ofSeconds(30))
.logicalRefreshMode(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC)
.refreshMode(CatalogMaterializedTable.RefreshMode.CONTINUOUS)
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
.build();
}

private static CatalogView catalogView() {
final String comment = "This is an example table.";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.Factory;
Expand Down Expand Up @@ -66,6 +68,13 @@ public interface Catalog {
* internal catalog-specific objects to their own factory. For example, a custom {@link
* CatalogTable} can be processed by a custom {@link DynamicTableFactory}.
*
* <p>If this catalog support to create materialized table, you should also override this method
* to provide {@link DynamicTableFactory} which help planner to find {@link DynamicTableSource}
* and {@link DynamicTableSink} correctly during compile optimization phase. If you don't
* override this method, you must specify the physical connector identifier that this catalog
* represents storage when create materialized table. Otherwise, the planner can't find the
* {@link DynamicTableFactory}.
*
* <p>Because all factories are interfaces, the returned {@link Factory} instance can implement
* multiple supported extension points. An {@code instanceof} check is performed by the caller
* that checks whether a required factory is implemented; otherwise the discovery process is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public interface CatalogBaseTable {
@PublicEvolving
enum TableKind {
TABLE,
MATERIALIZED_TABLE,
VIEW
}

Expand Down
Loading

0 comments on commit 2a9876d

Please sign in to comment.