Skip to content

Commit

Permalink
[FLINK-27196][table] Remove SerializationSchemaFactory, Deserializati…
Browse files Browse the repository at this point in the history
…onSchemaFactory and TableFormatFactory

Signed-off-by: slinkydeveloper <[email protected]>
  • Loading branch information
slinkydeveloper authored and MartijnVisser committed Apr 13, 2022
1 parent f8d2bdb commit 8212230
Show file tree
Hide file tree
Showing 7 changed files with 2 additions and 511 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.factories.TableFormatFactory;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.tsextractors.TimestampExtractor;
import org.apache.flink.table.sources.wmstrategies.WatermarkStrategy;
Expand All @@ -41,20 +40,10 @@

import static java.lang.String.format;
import static org.apache.flink.table.descriptors.DescriptorProperties.EXPR;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_ROWTIME;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_DATA_TYPE;
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
import static org.apache.flink.table.descriptors.Rowtime.ROWTIME;
import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_CLASS;
import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_FROM;
import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_SERIALIZED;
import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE;
import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD;
import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_CLASS;
import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_DELAY;
import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_SERIALIZED;
import static org.apache.flink.table.descriptors.Rowtime.ROWTIME_WATERMARKS_TYPE;
import static org.apache.flink.table.descriptors.Schema.SCHEMA;
import static org.apache.flink.table.descriptors.Schema.SCHEMA_DATA_TYPE;
import static org.apache.flink.table.descriptors.Schema.SCHEMA_FROM;
Expand Down Expand Up @@ -139,44 +128,6 @@ else if (proctimeFound) {
}
}

/**
* Returns keys for a {@link TableFormatFactory#supportedProperties()} method that are accepted
* for schema derivation using {@code deriveFormatFields(DescriptorProperties)}.
*/
public static List<String> getSchemaDerivationKeys() {
List<String> keys = new ArrayList<>();

// schema
keys.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
keys.add(SCHEMA + ".#." + SCHEMA_TYPE);
keys.add(SCHEMA + ".#." + SCHEMA_NAME);
keys.add(SCHEMA + ".#." + SCHEMA_FROM);
// computed column
keys.add(SCHEMA + ".#." + EXPR);

// time attributes
keys.add(SCHEMA + ".#." + SCHEMA_PROCTIME);
keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_TYPE);
keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_FROM);
keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_CLASS);
keys.add(SCHEMA + ".#." + ROWTIME_TIMESTAMPS_SERIALIZED);
keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_TYPE);
keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_CLASS);
keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_SERIALIZED);
keys.add(SCHEMA + ".#." + ROWTIME_WATERMARKS_DELAY);

// watermark
keys.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_ROWTIME);
keys.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_EXPR);
keys.add(SCHEMA + "." + WATERMARK + ".#." + WATERMARK_STRATEGY_DATA_TYPE);

// table constraint
keys.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
keys.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_COLUMNS);

return keys;
}

/** Finds the proctime attribute if defined. */
public static Optional<String> deriveProctimeAttribute(DescriptorProperties properties) {
Map<String, String> names = properties.getIndexedProperty(SCHEMA, SCHEMA_NAME);
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.descriptors.Descriptor;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
Expand Down Expand Up @@ -389,12 +388,10 @@ private static <T extends TableFactory> List<T> filterBySupportedProperties(
plainGivenKeys.stream()
.filter(p -> !requiredContextKeys.contains(p))
.collect(Collectors.toList());
List<String> givenFilteredKeys =
filterSupportedPropertiesFactorySpecific(factory, givenContextFreeKeys);

boolean allTrue = true;
List<String> unsupportedKeys = new ArrayList<>();
for (String k : givenFilteredKeys) {
for (String k : givenContextFreeKeys) {
if (!(tuple2.f0.contains(k) || tuple2.f1.stream().anyMatch(k::startsWith))) {
allTrue = false;
unsupportedKeys.add(k);
Expand Down Expand Up @@ -462,22 +459,6 @@ private static List<String> extractWildcardPrefixes(List<String> propertyKeys) {
*/
private static List<String> filterSupportedPropertiesFactorySpecific(
TableFactory factory, List<String> keys) {

if (factory instanceof TableFormatFactory) {
boolean includeSchema = ((TableFormatFactory) factory).supportsSchemaDerivation();
return keys.stream()
.filter(
k -> {
if (includeSchema) {
return k.startsWith(Schema.SCHEMA + ".")
|| k.startsWith(FORMAT + ".");
} else {
return k.startsWith(FORMAT + ".");
}
})
.collect(Collectors.toList());
} else {
return keys;
}
return keys;
}
}

This file was deleted.

Loading

0 comments on commit 8212230

Please sign in to comment.