Skip to content

Commit

Permalink
[FLINK-25391][connector-kafka] Forward catalog table options
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper authored and twalthr committed Jan 25, 2022
1 parent c926031 commit 0c34c99
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 9 deletions.
24 changes: 23 additions & 1 deletion docs/content/docs/connectors/table/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -179,50 +179,57 @@ Connector Options
<tr>
<th class="text-left" style="width: 25%">Option</th>
<th class="text-center" style="width: 8%">Required</th>
<th class="text-center" style="width: 8%">Forwarded</th>
<th class="text-center" style="width: 7%">Default</th>
<th class="text-center" style="width: 10%">Type</th>
<th class="text-center" style="width: 50%">Description</th>
<th class="text-center" style="width: 42%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>connector</h5></td>
<td>required</td>
<td>no</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specify what connector to use, for Kafka use <code>'kafka'</code>.</td>
</tr>
<tr>
<td><h5>topic</h5></td>
<td>required for sink</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like <code>'topic-1;topic-2'</code>. Note, only one of "topic-pattern" and "topic" can be specified for sources. When the table is used as sink, the topic name is the topic to write data to. Note topic list is not supported for sinks.</td>
</tr>
<tr>
<td><h5>topic-pattern</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of "topic-pattern" and "topic" can be specified for sources.</td>
</tr>
<tr>
<td><h5>properties.bootstrap.servers</h5></td>
<td>required</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Comma separated list of Kafka brokers.</td>
</tr>
<tr>
<td><h5>properties.group.id</h5></td>
<td>optional for source, not applicable for sink</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The id of the consumer group for Kafka source. If group ID is not specified, an automatically generated id "KafkaSource-{tableIdentifier}" will be used.</td>
</tr>
<tr>
<td><h5>properties.*</h5></td>
<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>
Expand All @@ -232,6 +239,7 @@ Connector Options
<tr>
<td><h5>format</h5></td>
<td>required</td>
<td>no</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The format used to deserialize and serialize the value part of Kafka messages.
Expand All @@ -243,6 +251,7 @@ Connector Options
<tr>
<td><h5>key.format</h5></td>
<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The format used to deserialize and serialize the key part of Kafka messages.
Expand All @@ -254,6 +263,7 @@ Connector Options
<tr>
<td><h5>key.fields</h5></td>
<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">[]</td>
<td>List&lt;String&gt;</td>
<td>Defines an explicit list of physical columns from the table schema that configure the data
Expand All @@ -264,6 +274,7 @@ Connector Options
<tr>
<td><h5>key.fields-prefix</h5></td>
<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Defines a custom prefix for all fields of the key format to avoid name clashes with fields
Expand All @@ -277,6 +288,7 @@ Connector Options
<tr>
<td><h5>value.format</h5></td>
<td>required</td>
<td>no</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The format used to deserialize and serialize the value part of Kafka messages.
Expand All @@ -288,6 +300,7 @@ Connector Options
<tr>
<td><h5>value.fields-include</h5></td>
<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">ALL</td>
<td><p>Enum</p>Possible values: [ALL, EXCEPT_KEY]</td>
<td>Defines a strategy how to deal with key columns in the data type of the value format. By
Expand All @@ -298,6 +311,7 @@ Connector Options
<tr>
<td><h5>scan.startup.mode</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">group-offsets</td>
<td>String</td>
<td>Startup mode for Kafka consumer, valid values are <code>'earliest-offset'</code>, <code>'latest-offset'</code>, <code>'group-offsets'</code>, <code>'timestamp'</code> and <code>'specific-offsets'</code>.
Expand All @@ -306,6 +320,7 @@ Connector Options
<tr>
<td><h5>scan.startup.specific-offsets</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specify offsets for each partition in case of <code>'specific-offsets'</code> startup mode, e.g. <code>'partition:0,offset:42;partition:1,offset:300'</code>.
Expand All @@ -314,20 +329,23 @@ Connector Options
<tr>
<td><h5>scan.startup.timestamp-millis</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>Start from the specified epoch timestamp (milliseconds) used in case of <code>'timestamp'</code> startup mode.</td>
</tr>
<tr>
<td><h5>scan.topic-partition-discovery.interval</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Duration</td>
<td>Interval for consumer to discover dynamically created Kafka topics and partitions periodically.</td>
</tr>
<tr>
<td><h5>sink.partitioner</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">'default'</td>
<td>String</td>
<td>Output partitioning from Flink's partitions into Kafka's partitions. Valid values are
Expand All @@ -343,27 +361,31 @@ Connector Options
<tr>
<td><h5>sink.semantic</h5></td>
<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">at-least-once</td>
<td>String</td>
<td>Deprecated: Please use <code>sink.delivery-guarantee</code>.</td>
</tr>
<tr>
<td><h5>sink.delivery-guarantee</h5></td>
<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">at-least-once</td>
<td>String</td>
<td>Defines the delivery semantic for the Kafka sink. Valid enumerationns are <code>'at-least-once'</code>, <code>'exactly-once'</code> and <code>'none'</code>. See <a href='#consistency-guarantees'>Consistency guarantees</a> for more details. </td>
</tr>
<tr>
<td><h5>sink.transactional-id-prefix</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>If the delivery guarantee is configured as <code>'exactly-once'</code> this value must be set and is used a prefix for the identifier of all opened Kafka transactions.</td>
</tr>
<tr>
<td><h5>sink.parallelism</h5></td>
<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>Defines the parallelism of the Kafka sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.DELIVERY_GUARANTEE;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FIELDS;
Expand Down Expand Up @@ -146,14 +148,19 @@ public Set<ConfigOption<?>> optionalOptions() {

@Override
public Set<ConfigOption<?>> forwardOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(TOPIC);
options.add(TOPIC_PATTERN);
options.add(SCAN_STARTUP_MODE);
options.add(SCAN_STARTUP_SPECIFIC_OFFSETS);
options.add(SCAN_TOPIC_PARTITION_DISCOVERY);
options.add(SCAN_STARTUP_TIMESTAMP_MILLIS);
return options;
return Stream.of(
PROPS_BOOTSTRAP_SERVERS,
PROPS_GROUP_ID,
TOPIC,
TOPIC_PATTERN,
SCAN_STARTUP_MODE,
SCAN_STARTUP_SPECIFIC_OFFSETS,
SCAN_TOPIC_PARTITION_DISCOVERY,
SCAN_STARTUP_TIMESTAMP_MILLIS,
SINK_PARTITIONER,
SINK_PARALLELISM,
TRANSACTIONAL_ID_PREFIX)
.collect(Collectors.toSet());
}

@Override
Expand Down

0 comments on commit 0c34c99

Please sign in to comment.