Skip to content

Commit

Permalink
[FLINK-24388][table-planner] Introduce Module#getTableSourceFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
Airblader authored and twalthr committed Oct 1, 2021
1 parent f115934 commit 2ab2475
Show file tree
Hide file tree
Showing 12 changed files with 330 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connectors.hive.util.JobConfUtils;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.hive.HiveCatalog;
Expand Down Expand Up @@ -65,21 +66,10 @@ public Set<ConfigOption<?>> optionalOptions() {

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
boolean isHiveTable = HiveCatalog.isHiveTable(context.getCatalogTable().getOptions());
final boolean isHiveTable = HiveCatalog.isHiveTable(context.getCatalogTable().getOptions());

// we don't support temporary hive tables yet
if (isHiveTable && !context.isTemporary()) {
Integer configuredParallelism =
Configuration.fromMap(context.getCatalogTable().getOptions())
.get(FileSystemConnectorOptions.SINK_PARALLELISM);
JobConf jobConf = JobConfUtils.createJobConfWithCredentials(hiveConf);
return new HiveTableSink(
context.getConfiguration(),
jobConf,
context.getObjectIdentifier(),
context.getCatalogTable(),
configuredParallelism);
} else {
if (!isHiveTable || context.isTemporary()) {
return FactoryUtil.createTableSink(
null, // we already in the factory of catalog
context.getObjectIdentifier(),
Expand All @@ -88,59 +78,60 @@ public DynamicTableSink createDynamicTableSink(Context context) {
context.getClassLoader(),
context.isTemporary());
}

final Integer configuredParallelism =
Configuration.fromMap(context.getCatalogTable().getOptions())
.get(FileSystemConnectorOptions.SINK_PARALLELISM);
final JobConf jobConf = JobConfUtils.createJobConfWithCredentials(hiveConf);
return new HiveTableSink(
context.getConfiguration(),
jobConf,
context.getObjectIdentifier(),
context.getCatalogTable(),
configuredParallelism);
}

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
boolean isHiveTable = HiveCatalog.isHiveTable(context.getCatalogTable().getOptions());
final ReadableConfig configuration =
Configuration.fromMap(context.getCatalogTable().getOptions());

// we don't support temporary hive tables yet
if (isHiveTable && !context.isTemporary()) {
CatalogTable catalogTable = Preconditions.checkNotNull(context.getCatalogTable());

boolean isStreamingSource =
Boolean.parseBoolean(
catalogTable
.getOptions()
.getOrDefault(
STREAMING_SOURCE_ENABLE.key(),
STREAMING_SOURCE_ENABLE.defaultValue().toString()));

boolean includeAllPartition =
STREAMING_SOURCE_PARTITION_INCLUDE
.defaultValue()
.equals(
catalogTable
.getOptions()
.getOrDefault(
STREAMING_SOURCE_PARTITION_INCLUDE.key(),
STREAMING_SOURCE_PARTITION_INCLUDE
.defaultValue()));
JobConf jobConf = JobConfUtils.createJobConfWithCredentials(hiveConf);
// hive table source that has not lookup ability
if (isStreamingSource && includeAllPartition) {
return new HiveTableSource(
jobConf,
context.getConfiguration(),
context.getObjectIdentifier().toObjectPath(),
catalogTable);
} else {
// hive table source that has scan and lookup ability
return new HiveLookupTableSource(
jobConf,
context.getConfiguration(),
context.getObjectIdentifier().toObjectPath(),
catalogTable);
}
final boolean isHiveTable = HiveCatalog.isHiveTable(context.getCatalogTable().getOptions());

} else {
return FactoryUtil.createTableSource(
null, // we already in the factory of catalog
// we don't support temporary hive tables yet
if (!isHiveTable || context.isTemporary()) {
return FactoryUtil.createDynamicTableSource(
null,
context.getObjectIdentifier(),
context.getCatalogTable(),
context.getConfiguration(),
context.getClassLoader(),
context.isTemporary());
}

final CatalogTable catalogTable = Preconditions.checkNotNull(context.getCatalogTable());

final boolean isStreamingSource = configuration.get(STREAMING_SOURCE_ENABLE);
final boolean includeAllPartition =
STREAMING_SOURCE_PARTITION_INCLUDE
.defaultValue()
.equals(configuration.get(STREAMING_SOURCE_PARTITION_INCLUDE));
final JobConf jobConf = JobConfUtils.createJobConfWithCredentials(hiveConf);

// hive table source that has not lookup ability
if (isStreamingSource && includeAllPartition) {
return new HiveTableSource(
jobConf,
context.getConfiguration(),
context.getObjectIdentifier().toObjectPath(),
catalogTable);
} else {
// hive table source that has scan and lookup ability
return new HiveLookupTableSource(
jobConf,
context.getConfiguration(),
context.getObjectIdentifier().toObjectPath(),
catalogTable);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.filesystem.FileSystemConnectorOptions.PartitionOrder;
import org.apache.flink.table.filesystem.FileSystemLookupFunction;
Expand Down Expand Up @@ -282,8 +283,9 @@ private DynamicTableSource getTableSource(String tableName) throws Exception {
ObjectIdentifier.of(hiveCatalog.getName(), "default", tableName);
CatalogTable catalogTable =
(CatalogTable) hiveCatalog.getTable(tableIdentifier.toObjectPath());
return FactoryUtil.createTableSource(
hiveCatalog,
return FactoryUtil.createDynamicTableSource(
(DynamicTableSourceFactory)
hiveCatalog.getFactory().orElseThrow(IllegalStateException::new),
tableIdentifier,
tableEnvInternal.getCatalogManager().resolveCatalogTable(catalogTable),
tableEnv.getConfig().getConfiguration(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.filesystem.FileSystemConnectorOptions;
import org.apache.flink.table.filesystem.FileSystemLookupFunction;
Expand Down Expand Up @@ -360,8 +361,11 @@ private FileSystemLookupFunction<HiveTablePartition> getLookupFunction(String ta
(CatalogTable) hiveCatalog.getTable(tableIdentifier.toObjectPath());
HiveLookupTableSource hiveTableSource =
(HiveLookupTableSource)
FactoryUtil.createTableSource(
hiveCatalog,
FactoryUtil.createDynamicTableSource(
(DynamicTableSourceFactory)
hiveCatalog
.getFactory()
.orElseThrow(IllegalStateException::new),
tableIdentifier,
tableEnvInternal
.getCatalogManager()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.TableFactory;
import org.apache.flink.table.factories.TableSinkFactoryContextImpl;
Expand All @@ -47,6 +48,7 @@
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand All @@ -70,31 +72,33 @@ public static void close() {

@Test
public void testGenericTable() throws Exception {
TableSchema schema =
final TableSchema schema =
TableSchema.builder()
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT())
.build();

Map<String, String> properties = new HashMap<>();
properties.put(FactoryUtil.CONNECTOR.key(), "COLLECTION");

catalog.createDatabase("mydb", new CatalogDatabaseImpl(new HashMap<>(), ""), true);
ObjectPath path = new ObjectPath("mydb", "mytable");
CatalogTable table = new CatalogTableImpl(schema, properties, "csv table");
catalog.createTable(path, table, true);
Optional<TableFactory> opt = catalog.getTableFactory();
assertTrue(opt.isPresent());
HiveTableFactory tableFactory = (HiveTableFactory) opt.get();
TableSource tableSource =

final Map<String, String> options =
Collections.singletonMap(FactoryUtil.CONNECTOR.key(), "COLLECTION");
final CatalogTable table = new CatalogTableImpl(schema, options, "csv table");
catalog.createTable(new ObjectPath("mydb", "mytable"), table, true);

final Optional<TableFactory> tableFactoryOpt = catalog.getTableFactory();
assertTrue(tableFactoryOpt.isPresent());
final HiveTableFactory tableFactory = (HiveTableFactory) tableFactoryOpt.get();

final TableSource tableSource =
tableFactory.createTableSource(
new TableSourceFactoryContextImpl(
ObjectIdentifier.of("mycatalog", "mydb", "mytable"),
table,
new Configuration(),
false));
assertTrue(tableSource instanceof StreamTableSource);
TableSink tableSink =

final TableSink tableSink =
tableFactory.createTableSink(
new TableSinkFactoryContextImpl(
ObjectIdentifier.of("mycatalog", "mydb", "mytable"),
Expand All @@ -107,32 +111,32 @@ public void testGenericTable() throws Exception {

@Test
public void testHiveTable() throws Exception {
ResolvedSchema schema =
final ResolvedSchema schema =
ResolvedSchema.of(
Column.physical("name", DataTypes.STRING()),
Column.physical("age", DataTypes.INT()));

Map<String, String> properties = new HashMap<>();
properties.put(FactoryUtil.CONNECTOR.key(), SqlCreateHiveTable.IDENTIFIER);

catalog.createDatabase("mydb", new CatalogDatabaseImpl(new HashMap<>(), ""), true);
ObjectPath path = new ObjectPath("mydb", "mytable");
CatalogTable table =
new CatalogTableImpl(
TableSchema.fromResolvedSchema(schema), properties, "hive table");
catalog.createTable(path, table, true);

DynamicTableSource tableSource =
FactoryUtil.createTableSource(
catalog,

final Map<String, String> options =
Collections.singletonMap(
FactoryUtil.CONNECTOR.key(), SqlCreateHiveTable.IDENTIFIER);
final CatalogTable table =
new CatalogTableImpl(TableSchema.fromResolvedSchema(schema), options, "hive table");
catalog.createTable(new ObjectPath("mydb", "mytable"), table, true);

final DynamicTableSource tableSource =
FactoryUtil.createDynamicTableSource(
(DynamicTableSourceFactory)
catalog.getFactory().orElseThrow(IllegalStateException::new),
ObjectIdentifier.of("mycatalog", "mydb", "mytable"),
new ResolvedCatalogTable(table, schema),
new Configuration(),
Thread.currentThread().getContextClassLoader(),
false);
assertTrue(tableSource instanceof HiveTableSource);

DynamicTableSink tableSink =
final DynamicTableSink tableSink =
FactoryUtil.createTableSink(
catalog,
ObjectIdentifier.of("mycatalog", "mydb", "mytable"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void testErrorMessage() throws Exception {
String[] errorStack =
new String[] {
"at org.apache.flink.table.factories.FactoryUtil.discoverFactory",
"at org.apache.flink.table.factories.FactoryUtil.createTableSource"
"at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource"
};
for (String stack : errorStack) {
assertThat(output, not(containsString(stack)));
Expand All @@ -165,7 +165,7 @@ public void testVerboseErrorMessage() throws Exception {
new String[] {
"org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'invalid'",
"at org.apache.flink.table.factories.FactoryUtil.discoverFactory",
"at org.apache.flink.table.factories.FactoryUtil.createTableSource"
"at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource"
};
for (String error : errors) {
assertThat(output, containsString(error));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.util.StringUtils;

Expand All @@ -36,6 +37,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkArgument;
Expand Down Expand Up @@ -186,6 +188,25 @@ public Optional<FunctionDefinition> getFunctionDefinition(String name) {
return Optional.empty();
}

/**
* Returns the first factory found in the loaded modules given a selector.
*
* <p>Modules are checked in the order in which they have been loaded. The first factory
* returned by a module will be used. If no loaded module provides a factory, {@link
* Optional#empty()} is returned.
*/
@SuppressWarnings("unchecked")
public <T extends Factory> Optional<T> getFactory(Function<Module, Optional<T>> selector) {
for (final String moduleName : usedModules) {
final Optional<T> factory = selector.apply(loadedModules.get(moduleName));
if (factory.isPresent()) {
return factory;
}
}

return Optional.empty();
}

@VisibleForTesting
List<String> getUsedModules() {
return usedModules;
Expand Down
Loading

0 comments on commit 2ab2475

Please sign in to comment.