Skip to content

Commit

Permalink
[FLINK-24977][connectors/kafka] Replace ConfigOption lookup in Map<St…
Browse files Browse the repository at this point in the history
…ring, String> object with proper lookup in Configuration object
  • Loading branch information
alpreu authored and fapaul committed Dec 8, 2021
1 parent d2da7b0 commit 551dfe4
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,9 @@ private static void validatePKConstraints(
&& format.getChangelogMode().containsOnly(RowKind.INSERT)) {
Configuration configuration = Configuration.fromMap(options);
String formatName =
configuration.getOptional(FactoryUtil.FORMAT).orElse(options.get(VALUE_FORMAT));
configuration
.getOptional(FactoryUtil.FORMAT)
.orElse(configuration.get(VALUE_FORMAT));
throw new ValidationException(
String.format(
"The Kafka table '%s' with '%s' format doesn't support defining PRIMARY KEY constraint"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,12 @@
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.DEBEZIUM_AVRO_CONFLUENT;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

/** Abstract test base for {@link KafkaDynamicTableFactory}. */
public class KafkaDynamicTableFactoryTest extends TestLogger {
Expand Down Expand Up @@ -840,16 +840,21 @@ public void testPrimaryKeyValidation() {
// pk can be defined on cdc table, should pass
createTableSink(pkSchema, sinkOptions);

try {
createTableSink(pkSchema, getBasicSinkOptions());
fail();
} catch (Throwable t) {
String error =
"The Kafka table 'default.default.t1' with 'test-format' format"
+ " doesn't support defining PRIMARY KEY constraint on the table, because it can't"
+ " guarantee the semantic of primary key.";
assertEquals(error, t.getCause().getMessage());
}
assertThatExceptionOfType(ValidationException.class)
.isThrownBy(() -> createTableSink(pkSchema, getBasicSinkOptions()))
.havingRootCause()
.withMessage(
"The Kafka table 'default.default.t1' with 'test-format' format"
+ " doesn't support defining PRIMARY KEY constraint on the table, because it can't"
+ " guarantee the semantic of primary key.");

assertThatExceptionOfType(ValidationException.class)
.isThrownBy(() -> createTableSink(pkSchema, getKeyValueOptions()))
.havingRootCause()
.withMessage(
"The Kafka table 'default.default.t1' with 'test-format' format"
+ " doesn't support defining PRIMARY KEY constraint on the table, because it can't"
+ " guarantee the semantic of primary key.");

Map<String, String> sourceOptions =
getModifiedOptions(
Expand All @@ -864,16 +869,13 @@ public void testPrimaryKeyValidation() {
// pk can be defined on cdc table, should pass
createTableSource(pkSchema, sourceOptions);

try {
createTableSource(pkSchema, getBasicSourceOptions());
fail();
} catch (Throwable t) {
String error =
"The Kafka table 'default.default.t1' with 'test-format' format"
+ " doesn't support defining PRIMARY KEY constraint on the table, because it can't"
+ " guarantee the semantic of primary key.";
assertEquals(error, t.getCause().getMessage());
}
assertThatExceptionOfType(ValidationException.class)
.isThrownBy(() -> createTableSource(pkSchema, getBasicSourceOptions()))
.havingRootCause()
.withMessage(
"The Kafka table 'default.default.t1' with 'test-format' format"
+ " doesn't support defining PRIMARY KEY constraint on the table, because it can't"
+ " guarantee the semantic of primary key.");
}

// --------------------------------------------------------------------------------------------
Expand Down

0 comments on commit 551dfe4

Please sign in to comment.