Skip to content

Commit

Permalink
[FLINK-24388][table-planner] Introduce Module#getTableSinkFactory
Browse files Browse the repository at this point in the history
This closes apache#17384.
  • Loading branch information
Airblader authored and twalthr committed Oct 1, 2021
1 parent 2ab2475 commit aa51c8f
Show file tree
Hide file tree
Showing 13 changed files with 183 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public DynamicTableSink createDynamicTableSink(Context context) {

// we don't support temporary hive tables yet
if (!isHiveTable || context.isTemporary()) {
return FactoryUtil.createTableSink(
null, // we already in the factory of catalog
return FactoryUtil.createDynamicTableSink(
null,
context.getObjectIdentifier(),
context.getCatalogTable(),
context.getConfiguration(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.filesystem.FileSystemConnectorOptions.PartitionOrder;
Expand Down Expand Up @@ -299,8 +300,9 @@ private DynamicTableSink getTableSink(String tableName) throws Exception {
ObjectIdentifier.of(hiveCatalog.getName(), "default", tableName);
CatalogTable catalogTable =
(CatalogTable) hiveCatalog.getTable(tableIdentifier.toObjectPath());
return FactoryUtil.createTableSink(
hiveCatalog,
return FactoryUtil.createDynamicTableSink(
(DynamicTableSinkFactory)
hiveCatalog.getFactory().orElseThrow(IllegalStateException::new),
tableIdentifier,
tableEnvInternal.getCatalogManager().resolveCatalogTable(catalogTable),
tableEnv.getConfig().getConfiguration(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.TableFactory;
Expand Down Expand Up @@ -137,8 +138,9 @@ public void testHiveTable() throws Exception {
assertTrue(tableSource instanceof HiveTableSource);

final DynamicTableSink tableSink =
FactoryUtil.createTableSink(
catalog,
FactoryUtil.createDynamicTableSink(
(DynamicTableSinkFactory)
catalog.getFactory().orElseThrow(IllegalStateException::new),
ObjectIdentifier.of("mycatalog", "mydb", "mytable"),
new ResolvedCatalogTable(table, schema),
new Configuration(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,12 @@ public static DynamicTableSource createTableSource(
/**
* Creates a {@link DynamicTableSink} from a {@link CatalogTable}.
*
* <p>It considers {@link Catalog#getFactory()} if provided.
* <p>If {@param preferredFactory} is passed, the table sink is created from that factory.
* Otherwise, an attempt is made to discover a matching factory using Java SPI (see {@link
* Factory} for details).
*/
public static DynamicTableSink createTableSink(
@Nullable Catalog catalog,
public static DynamicTableSink createDynamicTableSink(
@Nullable DynamicTableSinkFactory preferredFactory,
ObjectIdentifier objectIdentifier,
ResolvedCatalogTable catalogTable,
ReadableConfig configuration,
Expand All @@ -206,9 +208,13 @@ public static DynamicTableSink createTableSink(
final DefaultDynamicTableContext context =
new DefaultDynamicTableContext(
objectIdentifier, catalogTable, configuration, classLoader, isTemporary);

try {
final DynamicTableSinkFactory factory =
getDynamicTableFactory(DynamicTableSinkFactory.class, catalog, context);
preferredFactory != null
? preferredFactory
: discoverTableFactory(DynamicTableSinkFactory.class, context);

return factory.createDynamicTableSink(context);
} catch (Throwable t) {
throw new ValidationException(
Expand All @@ -225,6 +231,35 @@ public static DynamicTableSink createTableSink(
}
}

/**
* Creates a {@link DynamicTableSink} from a {@link CatalogTable}.
*
* <p>It considers {@link Catalog#getFactory()} if provided.
*
* @deprecated Use {@link #createDynamicTableSink(DynamicTableSinkFactory, ObjectIdentifier,
* ResolvedCatalogTable, ReadableConfig, ClassLoader, boolean)} instead.
*/
@Deprecated
public static DynamicTableSink createTableSink(
@Nullable Catalog catalog,
ObjectIdentifier objectIdentifier,
ResolvedCatalogTable catalogTable,
ReadableConfig configuration,
ClassLoader classLoader,
boolean isTemporary) {
final DefaultDynamicTableContext context =
new DefaultDynamicTableContext(
objectIdentifier, catalogTable, configuration, classLoader, isTemporary);

return createDynamicTableSink(
getDynamicTableFactory(DynamicTableSinkFactory.class, catalog, context),
objectIdentifier,
catalogTable,
configuration,
classLoader,
isTemporary);
}

/**
* Creates a utility that helps validating options for a {@link CatalogFactory}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.table.module;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.functions.FunctionDefinition;

Expand Down Expand Up @@ -74,5 +75,26 @@ default Optional<DynamicTableSourceFactory> getTableSourceFactory() {
return Optional.empty();
}

/**
* Returns a {@link DynamicTableSinkFactory} for creating sink tables.
*
* <p>A factory is determined with the following precedence rule:
*
* <ul>
* <li>1. Factory provided by the corresponding catalog of a persisted table.
* <li>2. Factory provided by a module.
* <li>3. Factory discovered using Java SPI.
* </ul>
*
* <p>This will be called on loaded modules in the order in which they have been loaded. The
* first factory returned will be used.
*
* <p>This method can be useful to disable Java SPI completely or influence how temporary table
* sinks should be created without a corresponding catalog.
*/
default Optional<DynamicTableSinkFactory> getTableSinkFactory() {
return Optional.empty();
}

// user defined types, operators, rules, etc
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public static DynamicTableSink createTableSink(

public static DynamicTableSink createTableSink(
ResolvedSchema schema, List<String> partitionKeys, Map<String, String> options) {
return FactoryUtil.createTableSink(
return FactoryUtil.createDynamicTableSink(
null,
IDENTIFIER,
new ResolvedCatalogTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public BatchExecSink(
String description) {
super(
tableSinkSpec,
tableSinkSpec.getTableSink().getChangelogMode(ChangelogMode.insertOnly()),
ChangelogMode.insertOnly(),
true, // isBounded
getNewNodeId(),
Collections.singletonList(inputProperty),
Expand All @@ -56,7 +56,6 @@ public BatchExecSink(
protected Transformation<Object> translateToPlanInternal(PlannerBase planner) {
final Transformation<RowData> inputTransform =
(Transformation<RowData>) getInputEdges().get(0).translateToPlan(planner);
return createSinkTransformation(
planner.getExecEnv(), planner.getTableConfig(), inputTransform, -1, false);
return createSinkTransformation(planner, inputTransform, -1, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
import org.apache.flink.table.planner.connectors.TransformationSinkProvider;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
Expand Down Expand Up @@ -86,20 +87,20 @@ public abstract class CommonExecSink extends ExecNodeBase<Object>
@JsonProperty(FIELD_NAME_DYNAMIC_TABLE_SINK)
protected final DynamicTableSinkSpec tableSinkSpec;

@JsonIgnore private final ChangelogMode changelogMode;
@JsonIgnore private final ChangelogMode inputChangelogMode;
@JsonIgnore private final boolean isBounded;

protected CommonExecSink(
DynamicTableSinkSpec tableSinkSpec,
ChangelogMode changelogMode,
ChangelogMode inputChangelogMode,
boolean isBounded,
int id,
List<InputProperty> inputProperties,
LogicalType outputType,
String description) {
super(id, inputProperties, outputType, description);
this.tableSinkSpec = tableSinkSpec;
this.changelogMode = changelogMode;
this.inputChangelogMode = inputChangelogMode;
this.isBounded = isBounded;
}

Expand All @@ -109,12 +110,12 @@ public DynamicTableSinkSpec getTableSinkSpec() {

@SuppressWarnings("unchecked")
protected Transformation<Object> createSinkTransformation(
StreamExecutionEnvironment env,
TableConfig tableConfig,
PlannerBase planner,
Transformation<RowData> inputTransform,
int rowtimeFieldIndex,
boolean upsertMaterialize) {
final DynamicTableSink tableSink = tableSinkSpec.getTableSink();
final DynamicTableSink tableSink = tableSinkSpec.getTableSink(planner);
final ChangelogMode changelogMode = tableSink.getChangelogMode(inputChangelogMode);
final ResolvedSchema schema = tableSinkSpec.getCatalogTable().getResolvedSchema();

final SinkRuntimeProvider runtimeProvider =
Expand All @@ -127,23 +128,33 @@ protected Transformation<Object> createSinkTransformation(
final int sinkParallelism = deriveSinkParallelism(inputTransform, runtimeProvider);

Transformation<RowData> sinkTransform =
applyNotNullEnforcer(inputTransform, tableConfig, physicalRowType);
applyNotNullEnforcer(inputTransform, planner.getTableConfig(), physicalRowType);

sinkTransform = applyKeyBy(sinkTransform, primaryKeys, sinkParallelism, upsertMaterialize);
sinkTransform =
applyKeyBy(
changelogMode,
sinkTransform,
primaryKeys,
sinkParallelism,
upsertMaterialize);

if (upsertMaterialize) {
sinkTransform =
applyUpsertMaterialize(
sinkTransform,
primaryKeys,
sinkParallelism,
tableConfig,
planner.getTableConfig(),
physicalRowType);
}

return (Transformation<Object>)
applySinkProvider(
sinkTransform, env, runtimeProvider, rowtimeFieldIndex, sinkParallelism);
sinkTransform,
planner.getExecEnv(),
runtimeProvider,
rowtimeFieldIndex,
sinkParallelism);
}

/**
Expand Down Expand Up @@ -220,6 +231,7 @@ private int deriveSinkParallelism(
* messages.
*/
private Transformation<RowData> applyKeyBy(
ChangelogMode changelogMode,
Transformation<RowData> inputTransform,
int[] primaryKeys,
int sinkParallelism,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
Expand Down Expand Up @@ -59,11 +62,17 @@ public DynamicTableSinkSpec(
this.sinkAbilitySpecs = sinkAbilitySpecs;
}

public DynamicTableSink getTableSink() {
public DynamicTableSink getTableSink(PlannerBase planner) {
if (tableSink == null) {
final DynamicTableSinkFactory factory =
planner.getFlinkContext()
.getModuleManager()
.getFactory(Module::getTableSinkFactory)
.orElse(null);

tableSink =
FactoryUtil.createTableSink(
null, // catalog, TODO support create Factory from catalog
FactoryUtil.createDynamicTableSink(
factory,
objectIdentifier,
catalogTable,
configuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public StreamExecSink(
String description) {
super(
tableSinkSpec,
tableSinkSpec.getTableSink().getChangelogMode(inputChangelogMode),
inputChangelogMode,
false, // isBounded
getNewNodeId(),
Collections.singletonList(inputProperty),
Expand All @@ -96,7 +96,7 @@ public StreamExecSink(
@JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
super(
tableSinkSpec,
tableSinkSpec.getTableSink().getChangelogMode(inputChangelogMode),
inputChangelogMode,
false, // isBounded
id,
inputProperties,
Expand Down Expand Up @@ -138,10 +138,6 @@ protected Transformation<Object> translateToPlanInternal(PlannerBase planner) {
}

return createSinkTransformation(
planner.getExecEnv(),
planner.getTableConfig(),
inputTransform,
rowtimeFieldIndex,
upsertMaterialize);
planner, inputTransform, rowtimeFieldIndex, upsertMaterialize);
}
}
Loading

0 comments on commit aa51c8f

Please sign in to comment.