Skip to content

Commit

Permalink
[FLINK-24004][table-planner] Introduce TableFactoryHarness
Browse files Browse the repository at this point in the history
  • Loading branch information
Airblader authored and twalthr committed Aug 31, 2021
1 parent 8abf8c6 commit bc69c1c
Show file tree
Hide file tree
Showing 5 changed files with 445 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@
* TableEnvironment#createTemporaryTable(String, TableDescriptor)}.
*/
@PublicEvolving
public final class TableDescriptor {
public class TableDescriptor {

private final @Nullable Schema schema;
private final Map<String, String> options;
private final List<String> partitionKeys;
private final @Nullable String comment;

private TableDescriptor(
protected TableDescriptor(
@Nullable Schema schema,
Map<String, String> options,
List<String> partitionKeys,
Expand Down Expand Up @@ -101,6 +101,21 @@ public Optional<String> getComment() {

// ---------------------------------------------------------------------------------------------

/** Converts this descriptor into a {@link CatalogTable}. */
public CatalogTable toCatalogTable() {
final Schema schema =
getSchema()
.orElseThrow(
() ->
new ValidationException(
"Missing schema in TableDescriptor. "
+ "A schema is typically required. "
+ "It can only be omitted at certain "
+ "documented locations."));

return CatalogTable.of(schema, getComment().orElse(null), getPartitionKeys(), getOptions());
}

/** Converts this immutable instance into a mutable {@link Builder}. */
public Builder toBuilder() {
return new Builder(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.SqlParserException;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
Expand Down Expand Up @@ -470,8 +469,7 @@ public void createTemporaryTable(String path, TableDescriptor descriptor) {
private void createTemporaryTableInternal(
UnresolvedIdentifier path, TableDescriptor descriptor) {
final ObjectIdentifier tableIdentifier = catalogManager.qualifyIdentifier(path);
final CatalogTable catalogTable = convertTableDescriptor(descriptor);
catalogManager.createTemporaryTable(catalogTable, tableIdentifier, false);
catalogManager.createTemporaryTable(descriptor.toCatalogTable(), tableIdentifier, false);
}

@Override
Expand All @@ -481,27 +479,7 @@ public void createTable(String path, TableDescriptor descriptor) {

final ObjectIdentifier tableIdentifier =
catalogManager.qualifyIdentifier(getParser().parseIdentifier(path));
final CatalogTable catalogTable = convertTableDescriptor(descriptor);
catalogManager.createTable(catalogTable, tableIdentifier, false);
}

private CatalogTable convertTableDescriptor(TableDescriptor descriptor) {
final Schema schema =
descriptor
.getSchema()
.orElseThrow(
() ->
new ValidationException(
"Missing schema in TableDescriptor. "
+ "A schema is typically required. "
+ "It can only be omitted at certain "
+ "documented locations."));

return CatalogTable.of(
schema,
descriptor.getComment().orElse(null),
descriptor.getPartitionKeys(),
descriptor.getOptions());
catalogManager.createTable(descriptor.toCatalogTable(), tableIdentifier, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@

/** Default implementation of a {@link CatalogTable}. */
@Internal
class DefaultCatalogTable implements CatalogTable {
public class DefaultCatalogTable implements CatalogTable {

private final Schema schema;
private final @Nullable String comment;
private final List<String> partitionKeys;
private final Map<String, String> options;

DefaultCatalogTable(
protected DefaultCatalogTable(
Schema schema,
@Nullable String comment,
List<String> partitionKeys,
Expand Down
Loading

0 comments on commit bc69c1c

Please sign in to comment.