Skip to content

Commit

Permalink
[hotfix] [table] Deprecate SchemaValidator#deriveTableSinkSchema
Browse files Browse the repository at this point in the history
The method combines two separate concepts of table schema and field
mapping. This should be split into two methods once we have support
for the corresponding interfaces (see FLINK-9870).
  • Loading branch information
twalthr committed Jul 23, 2018
1 parent 690ab2c commit 48791c1
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public StreamTableSource<Row> createStreamTableSource(Map<String, String> proper
final Optional<String> proctime = SchemaValidator.deriveProctimeAttribute(params);
final List<RowtimeAttributeDescriptor> rowtime = SchemaValidator.deriveRowtimeAttributes(params);
return new TestTableSource(
SchemaValidator.deriveTableSourceSchema(params),
params.getTableSchema(SCHEMA()),
properties.get(CONNECTOR_TEST_PROPERTY),
proctime.orElse(null),
rowtime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,18 +174,15 @@ object SchemaValidator {
attributes.asJava
}

/**
* Derives the table schema for a table source. A table source can directly use "name" and
* "type" and needs no special handling for time attributes or aliasing.
*/
def deriveTableSourceSchema(properties: DescriptorProperties): TableSchema = {
properties.getTableSchema(SCHEMA)
}

/**
* Derives the table schema for a table sink. A sink ignores a proctime attribute and
* needs to track the origin of a rowtime field.
*
* @deprecated This method combines two separate concepts of table schema and field mapping.
* This should be split into two methods once we have support for
* the corresponding interfaces (see FLINK-9870).
*/
@deprecated
def deriveTableSinkSchema(properties: DescriptorProperties): TableSchema = {
val builder = TableSchema.builder()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ abstract class CsvTableSinkFactoryBase extends TableFactory {

// build
val formatSchema = params.getTableSchema(FORMAT_FIELDS)
val tableSchema = SchemaValidator.deriveTableSinkSchema(params)
val tableSchema = params.getTableSchema(SCHEMA)

if (!formatSchema.equals(tableSchema)) {
throw new TableException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class InMemoryTableFactory(terminationCount: Int)
supportsSourceTimestamps = true,
supportsSourceWatermarks = true).validate(params)

val tableSchema = SchemaValidator.deriveTableSourceSchema(params)
val tableSchema = params.getTableSchema(SCHEMA)

// proctime
val proctimeAttributeOpt = SchemaValidator.deriveProctimeAttribute(params)
Expand Down

0 comments on commit 48791c1

Please sign in to comment.