Skip to content

Commit

Permalink
[FLINK-25391][format-avro] Forward catalog table options
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper authored and twalthr committed Jan 25, 2022
1 parent 5175ed0 commit 8f77862
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 2 deletions.
15 changes: 14 additions & 1 deletion docs/content/docs/connectors/table/formats/avro-confluent.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,92 +176,105 @@ Format Options
<tr>
<th class="text-left" style="width: 25%">Option</th>
<th class="text-center" style="width: 8%">Required</th>
<th class="text-center" style="width: 8%">Forwarded</th>
<th class="text-center" style="width: 7%">Default</th>
<th class="text-center" style="width: 10%">Type</th>
<th class="text-center" style="width: 50%">Description</th>
<th class="text-center" style="width: 42%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>format</h5></td>
<td>required</td>
<td>no</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specify what format to use, here should be <code>'avro-confluent'</code>.</td>
</tr>
<tr>
<td><h5>avro-confluent.basic-auth.credentials-source</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Basic auth credentials source for Schema Registry</td>
</tr>
<tr>
<td><h5>avro-confluent.basic-auth.user-info</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Basic auth user info for schema registry</td>
</tr>
<tr>
<td><h5>avro-confluent.bearer-auth.credentials-source</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Bearer auth credentials source for Schema Registry</td>
</tr>
<tr>
<td><h5>avro-confluent.bearer-auth.token</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Bearer auth token for Schema Registry</td>
</tr>
<tr>
<td><h5>avro-confluent.properties</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Map</td>
<td>Properties map that is forwarded to the underlying Schema Registry. This is useful for options that are not officially exposed via Flink config options. However, note that Flink options have higher precedence.</td>
</tr>
<tr>
<td><h5>avro-confluent.ssl.keystore.location</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Location / File of SSL keystore</td>
</tr>
<tr>
<td><h5>avro-confluent.ssl.keystore.password</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Password for SSL keystore</td>
</tr>
<tr>
<td><h5>avro-confluent.ssl.truststore.location</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Location / File of SSL truststore</td>
</tr>
<tr>
<td><h5>avro-confluent.ssl.truststore.password</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Password for SSL truststore</td>
</tr>
<tr>
<td><h5>avro-confluent.subject</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The Confluent Schema Registry subject under which to register the schema used by this format during serialization. By default, 'kafka' and 'upsert-kafka' connectors use '&lt;topic_name&gt;-value' or '&lt;topic_name&gt;-key' as the default subject name if this format is used as the value or key format. But for other connectors (e.g. 'filesystem'), the subject option is required when used as sink.</td>
</tr>
<tr>
<td><h5>avro-confluent.url</h5></td>
<td>required</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The URL of the Confluent Schema Registry to fetch/register schemas.</td>
Expand Down
5 changes: 4 additions & 1 deletion docs/content/docs/connectors/table/formats/avro.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,25 @@ Format Options
<tr>
<th class="text-left" style="width: 25%">Option</th>
<th class="text-center" style="width: 8%">Required</th>
<th class="text-center" style="width: 8%">Forwarded</th>
<th class="text-center" style="width: 7%">Default</th>
<th class="text-center" style="width: 10%">Type</th>
<th class="text-center" style="width: 50%">Description</th>
<th class="text-center" style="width: 42%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>format</h5></td>
<td>required</td>
<td>no</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Specify what format to use, here should be <code>'avro'</code>.</td>
</tr>
<tr>
<td><h5>avro.codec</h5></td>
<td>optional</td>
<td>yes</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>For <a href="{{< ref "docs/connectors/table/filesystem" >}}">Filesystem</a> only, the compression codec for avro. Snappy compression as default. The valid enumerations are: null, deflate, snappy, bzip2, xz.</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BASIC_AUTH_CREDENTIALS_SOURCE;
import static org.apache.flink.formats.avro.registry.confluent.AvroConfluentFormatOptions.BASIC_AUTH_USER_INFO;
Expand Down Expand Up @@ -175,6 +177,23 @@ public Set<ConfigOption<?>> optionalOptions() {
return options;
}

@Override
public Set<ConfigOption<?>> forwardOptions() {
return Stream.of(
URL,
SUBJECT,
PROPERTIES,
SSL_KEYSTORE_LOCATION,
SSL_KEYSTORE_PASSWORD,
SSL_TRUSTSTORE_LOCATION,
SSL_TRUSTSTORE_PASSWORD,
BASIC_AUTH_CREDENTIALS_SOURCE,
BASIC_AUTH_USER_INFO,
BEARER_AUTH_CREDENTIALS_SOURCE,
BEARER_AUTH_TOKEN)
.collect(Collectors.toSet());
}

public static @Nullable Map<String, String> buildOptionalPropertiesMap(
ReadableConfig formatOptions) {
final Map<String, String> properties = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ public Set<ConfigOption<?>> optionalOptions() {
return options;
}

@Override
public Set<ConfigOption<?>> forwardOptions() {
return optionalOptions();
}

private static class AvroGenericRecordBulkFormat
extends AbstractAvroBulkFormat<GenericRecord, RowData, FileSourceSplit> {

Expand Down

0 comments on commit 8f77862

Please sign in to comment.