diff --git a/docs/_includes/generated/execution_config_configuration.html b/docs/_includes/generated/execution_config_configuration.html new file mode 100644 index 0000000000000..d050e592424fd --- /dev/null +++ b/docs/_includes/generated/execution_config_configuration.html @@ -0,0 +1,110 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
KeyDefaultDescription
table.exec.async-lookup.buffer-capacity

Batch Streaming
100The max number of async i/o operation that the async lookup join can trigger.
table.exec.async-lookup.timeout

Batch Streaming
"3 min"The async timeout for the asynchronous operation to complete.
table.exec.disabled-operators

Batch
(none)Mainly for testing. A comma-separated list of operator names, each name represents a kind of disabled operator. +Operators that can be disabled include "NestedLoopJoin", "ShuffleHashJoin", "BroadcastHashJoin", "SortMergeJoin", "HashAgg", "SortAgg". +By default no operator is disabled.
table.exec.mini-batch.allow-latency

Streaming
"-1 ms"The maximum latency can be used for MiniBatch to buffer input records. MiniBatch is an optimization to buffer input records to reduce state access. MiniBatch is triggered with the allowed latency interval and when the maximum number of buffered records reached. NOTE: If table.exec.mini-batch.enabled is set true, its value must be greater than zero.
table.exec.mini-batch.enabled

Streaming
falseSpecifies whether to enable MiniBatch optimization. MiniBatch is an optimization to buffer input records to reduce state access. This is disabled by default. To enable this, users should set this config to true. NOTE: If mini-batch is enabled, 'table.exec.mini-batch.allow-latency' and 'table.exec.mini-batch.size' must be set.
table.exec.mini-batch.size

Streaming
-1The maximum number of input records can be buffered for MiniBatch. MiniBatch is an optimization to buffer input records to reduce state access. MiniBatch is triggered with the allowed latency interval and when the maximum number of buffered records reached. NOTE: MiniBatch only works for non-windowed aggregations currently. If table.exec.mini-batch.enabled is set true, its value must be positive.
table.exec.resource.default-parallelism

Batch Streaming
-1Sets default parallelism for all operators (such as aggregate, join, filter) to run with parallel instances. This config has a higher priority than parallelism of StreamExecutionEnvironment (actually, this config overrides the parallelism of StreamExecutionEnvironment). A value of -1 indicates that no default parallelism is set, then it will fallback to use the parallelism of StreamExecutionEnvironment.
table.exec.resource.external-buffer-memory

Batch
"10 mb"Sets the external buffer memory size that is used in sort merge join and nested join and over window.
table.exec.resource.hash-agg.memory

Batch
"128 mb"Sets the managed memory size of hash aggregate operator.
table.exec.resource.hash-join.memory

Batch
"128 mb"Sets the managed memory for hash join operator. It defines the lower limit.
table.exec.resource.sort.memory

Batch
"128 mb"Sets the managed buffer memory size for sort operator.
table.exec.shuffle-mode

Batch
"batch"Sets exec shuffle mode. Only batch or pipeline can be set. +batch: the job will run stage by stage. +pipeline: the job will run in streaming mode, but it may cause resource deadlock that receiver waits for resource to start when the sender holds resource to wait to send data to the receiver.
table.exec.sort.async-merge-enabled

Batch
trueWhether to asynchronously merge sorted spill files.
table.exec.sort.default-limit

Batch
-1Default limit when user don't set a limit after order by. -1 indicates that this configuration is ignored.
table.exec.sort.max-num-file-handles

Batch
128The maximal fan-in for external merge sort. It limits the number of file handles per operator. If it is too small, may cause intermediate merging. But if it is too large, it will cause too many files opened at the same time, consume memory and lead to random reading.
table.exec.source.idle-timeout

Streaming
"-1 ms"When a source do not receive any elements for the timeout time, it will be marked as temporarily idle. This allows downstream tasks to advance their watermarks without the need to wait for watermarks from this source while it is idle.
table.exec.spill-compression.block-size

Batch
"64 kb"The memory size used to do compress when spilling data. The larger the memory, the higher the compression ratio, but more memory resource will be consumed by the job.
table.exec.spill-compression.enabled

Batch
trueWhether to compress spilled data. Currently we only support compress spilled data for sort and hash-agg and hash-join operators.
table.exec.window-agg.buffer-size-limit

Batch
100000Sets the window elements buffer size limit used in group window agg operator.
diff --git a/docs/_includes/generated/optimizer_config_configuration.html b/docs/_includes/generated/optimizer_config_configuration.html new file mode 100644 index 0000000000000..05c4b64ea4373 --- /dev/null +++ b/docs/_includes/generated/optimizer_config_configuration.html @@ -0,0 +1,54 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
KeyDefaultDescription
table.optimizer.agg-phase-strategy

Batch Streaming
"AUTO"Strategy for aggregate phase. Only AUTO, TWO_PHASE or ONE_PHASE can be set. +AUTO: No special enforcer for aggregate stage. Whether to choose two stage aggregate or one stage aggregate depends on cost. +TWO_PHASE: Enforce to use two stage aggregate which has localAggregate and globalAggregate. Note that if aggregate call does not support optimize into two phase, we will still use one stage aggregate. +ONE_PHASE: Enforce to use one stage aggregate which only has CompleteGlobalAggregate.
table.optimizer.distinct-agg.split.bucket-num

Streaming
1024Configure the number of buckets when splitting distinct aggregation. The number is used in the first level aggregation to calculate a bucket key 'hash_code(distinct_key) % BUCKET_NUM' which is used as an additional group key after splitting.
table.optimizer.distinct-agg.split.enabled

Streaming
falseTells the optimizer whether to split distinct aggregation (e.g. COUNT(DISTINCT col), SUM(DISTINCT col)) into two level. The first aggregation is shuffled by an additional key which is calculated using the hashcode of distinct_key and number of buckets. This optimization is very useful when there is data skew in distinct aggregation and gives the ability to scale-up the job. Default is false.
table.optimizer.join-reorder-enabled

Batch Streaming
falseEnables join reorder in optimizer. Default is disabled.
table.optimizer.join.broadcast-threshold

Batch
1048576Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 to disable broadcasting.
table.optimizer.reuse-source-enabled

Batch Streaming
trueWhen it is true, the optimizer will try to find out duplicated table sources and reuse them. This works only when table.optimizer.reuse-sub-plan-enabled is true.
table.optimizer.reuse-sub-plan-enabled

Batch Streaming
trueWhen it is true, the optimizer will try to find out duplicated sub-plans and reuse them.
table.optimizer.source.predicate-pushdown-enabled

Batch Streaming
trueWhen it is true, the optimizer will push down predicates into the FilterableTableSource. Default value is true.
diff --git a/docs/dev/table/config.md b/docs/dev/table/config.md new file mode 100644 index 0000000000000..2b46f49b597bf --- /dev/null +++ b/docs/dev/table/config.md @@ -0,0 +1,103 @@ +--- +title: "Configuration" +nav-parent_id: tableapi +nav-pos: 150 +--- + + +By default, the Table & SQL API is preconfigured for producing accurate results with acceptable +performance. + +Depending on the requirements of a table program, it might be necessary to adjust +certain parameters for optimization. For example, unbounded streaming programs may need to ensure +that the required state size is capped (see [streaming concepts](./streaming/query_configuration.html)). + +* This will be replaced by the TOC +{:toc} + +### Overview + +In every table environment, the `TableConfig` offers options for configuring the current session. + +For common or important configuration options, the `TableConfig` provides getters and setters methods +with detailed inline documentation. + +For more advanced configuration, users can directly access the underlying key-value map. The following +sections list all available options that can be used to adjust Flink Table & SQL API programs. + +Attention Because options are read at different point in time +when performing operations, it is recommended to set configuration options early after instantiating a +table environment. + +
+
+{% highlight java %} +// instantiate table environment +TableEnvironment tEnv = ... + +tEnv.getConfig() // access high-level configuration + .getConfiguration() // set low-level key-value options + .setString("table.exec.mini-batch.enabled", "true") + .setString("table.exec.mini-batch.allow-latency", "5 s") + .setString("table.exec.mini-batch.size", "5000"); +{% endhighlight %} +
+ +
+{% highlight scala %} +// instantiate table environment +val tEnv: TableEnvironment = ... + +tEnv.getConfig // access high-level configuration + .getConfiguration // set low-level key-value options + .setString("table.exec.mini-batch.enabled", "true") + .setString("table.exec.mini-batch.allow-latency", "5 s") + .setString("table.exec.mini-batch.size", "5000") +{% endhighlight %} +
+ +
+{% highlight python %} +# instantiate table environment +t_env = ... + +t_env.get_config() # access high-level configuration + .get_configuration() # set low-level key-value options + .set_string("table.exec.mini-batch.enabled", "true") + .set_string("table.exec.mini-batch.allow-latency", "5 s") + .set_string("table.exec.mini-batch.size", "5000"); +{% endhighlight %} +
+
+ +Attention Currently, key-value options are only supported +for the Blink planner. + +### Execution Options + +The following options can be used to tune the performance of the query execution. + +{% include generated/execution_config_configuration.html %} + +### Optimizer Options + +The following options can be used to adjust the behavior of the query optimizer to get a better execution plan. + +{% include generated/optimizer_config_configuration.html %} diff --git a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java index aed0d2975f490..9884049311287 100644 --- a/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java +++ b/flink-annotations/src/main/java/org/apache/flink/annotation/docs/Documentation.java @@ -59,6 +59,38 @@ public final class Documentation { int position() default Integer.MAX_VALUE; } + /** + * Annotation used on table config options for adding meta data labels. + * + *

The {@link TableOption#execMode()} argument indicates the execution mode the config works for + * (batch, streaming or both). + */ + @Target(ElementType.FIELD) + @Retention(RetentionPolicy.RUNTIME) + @Internal + public @interface TableOption { + ExecMode execMode(); + } + + /** + * The execution mode the config works for. + */ + public enum ExecMode { + + BATCH("Batch"), STREAMING("Streaming"), BATCH_STREAMING("Batch and Streaming"); + + private final String name; + + ExecMode(String name) { + this.name = name; + } + + @Override + public String toString() { + return name; + } + } + /** * Annotation used on config option fields to exclude the config option from documentation. */ diff --git a/flink-docs/pom.xml b/flink-docs/pom.xml index ab0b8b83f17f2..384b09110807d 100644 --- a/flink-docs/pom.xml +++ b/flink-docs/pom.xml @@ -59,6 +59,11 @@ under the License. ${project.version} test-jar + + org.apache.flink + flink-table-api-java + ${project.version} + org.apache.flink flink-metrics-prometheus_${scala.binary.version} diff --git a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java index 0dffb60a3a826..0ee34429a257f 100644 --- a/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java +++ b/flink-docs/src/main/java/org/apache/flink/docs/configuration/ConfigOptionsDocGenerator.java @@ -62,7 +62,8 @@ public class ConfigOptionsDocGenerator { new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.configuration"), new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.runtime.clusterframework"), new OptionsClassLocation("flink-metrics/flink-metrics-prometheus", "org.apache.flink.metrics.prometheus"), - new OptionsClassLocation("flink-state-backends/flink-statebackend-rocksdb", "org.apache.flink.contrib.streaming.state") + new OptionsClassLocation("flink-state-backends/flink-statebackend-rocksdb", "org.apache.flink.contrib.streaming.state"), + new OptionsClassLocation("flink-table/flink-table-api-java", "org.apache.flink.table.api.config") }; static final Set EXCLUSIONS = new HashSet<>(Arrays.asList( @@ -260,10 +261,26 @@ private static String toHtmlTable(final List options) { private static String toHtmlString(final OptionWithMetaInfo optionWithMetaInfo) { ConfigOption option = optionWithMetaInfo.option; String defaultValue = stringifyDefault(optionWithMetaInfo); + Documentation.TableOption tableOption = optionWithMetaInfo.field.getAnnotation(Documentation.TableOption.class); + StringBuilder execModeStringBuilder = new StringBuilder(); + if (tableOption != null) { + Documentation.ExecMode execMode = tableOption.execMode(); + if (Documentation.ExecMode.BATCH_STREAMING.equals(execMode)) { + execModeStringBuilder.append("
") + .append(Documentation.ExecMode.BATCH.toString()) + .append(" ") + .append(Documentation.ExecMode.STREAMING.toString()) + .append(""); + } else { + execModeStringBuilder.append("
") + .append(execMode.toString()) + .append(""); + } + } return "" + " \n" + - "

" + escapeCharacters(option.key()) + "
\n" + + "
" + escapeCharacters(option.key()) + "
" + execModeStringBuilder.toString() + "\n" + " " + escapeCharacters(addWordBreakOpportunities(defaultValue)) + "\n" + " " + formatter.format(option.description()) + "\n" + " \n"; diff --git a/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocsCompletenessITCase.java b/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocsCompletenessITCase.java index bfc55c1ebbdd1..9ae2595357558 100644 --- a/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocsCompletenessITCase.java +++ b/flink-docs/src/test/java/org/apache/flink/docs/configuration/ConfigOptionsDocsCompletenessITCase.java @@ -177,7 +177,8 @@ private static Collection parseDocumentedOptionsFromFile(Path .map(element -> element.getElementsByTag("tbody").get(0)) .flatMap(element -> element.getElementsByTag("tr").stream()) .map(tableRow -> { - String key = tableRow.child(0).text(); + // Use split to exclude document key tag. + String key = tableRow.child(0).text().split(" ")[0]; String defaultValue = tableRow.child(1).text(); String description = tableRow.child(2) .childNodes() diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java index 1c6c8ab9d3a80..81b464d0b138d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java @@ -18,6 +18,7 @@ package org.apache.flink.table.api.config; +import org.apache.flink.annotation.docs.Documentation; import org.apache.flink.configuration.ConfigOption; import static org.apache.flink.configuration.ConfigOptions.key; @@ -32,6 +33,7 @@ public class ExecutionConfigOptions { // ------------------------------------------------------------------------ // Source Options // ------------------------------------------------------------------------ + @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) public static final ConfigOption TABLE_EXEC_SOURCE_IDLE_TIMEOUT = key("table.exec.source.idle-timeout") .defaultValue("-1 ms") @@ -43,11 +45,13 @@ public class ExecutionConfigOptions { // ------------------------------------------------------------------------ // Sort Options // ------------------------------------------------------------------------ + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) public static final ConfigOption TABLE_EXEC_SORT_DEFAULT_LIMIT = key("table.exec.sort.default-limit") .defaultValue(-1) .withDescription("Default limit when user don't set a limit after order by. -1 indicates that this configuration is ignored."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) public static final ConfigOption TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES = key("table.exec.sort.max-num-file-handles") .defaultValue(128) @@ -55,6 +59,7 @@ public class ExecutionConfigOptions { "If it is too small, may cause intermediate merging. But if it is too large, " + "it will cause too many files opened at the same time, consume memory and lead to random reading."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) public static final ConfigOption TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED = key("table.exec.sort.async-merge-enabled") .defaultValue(true) @@ -63,12 +68,14 @@ public class ExecutionConfigOptions { // ------------------------------------------------------------------------ // Spill Options // ------------------------------------------------------------------------ + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) public static final ConfigOption TABLE_EXEC_SPILL_COMPRESSION_ENABLED = key("table.exec.spill-compression.enabled") .defaultValue(true) .withDescription("Whether to compress spilled data. " + "Currently we only support compress spilled data for sort and hash-agg and hash-join operators."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) public static final ConfigOption TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE = key("table.exec.spill-compression.block-size") .defaultValue("64 kb") @@ -79,7 +86,7 @@ public class ExecutionConfigOptions { // ------------------------------------------------------------------------ // Resource Options // ------------------------------------------------------------------------ - + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) public static final ConfigOption TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM = key("table.exec.resource.default-parallelism") .defaultValue(-1) @@ -91,21 +98,25 @@ public class ExecutionConfigOptions { "default parallelism is set, then it will fallback to use the parallelism " + "of StreamExecutionEnvironment."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) public static final ConfigOption TABLE_EXEC_RESOURCE_EXTERNAL_BUFFER_MEMORY = key("table.exec.resource.external-buffer-memory") .defaultValue("10 mb") .withDescription("Sets the external buffer memory size that is used in sort merge join and nested join and over window."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) public static final ConfigOption TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY = key("table.exec.resource.hash-agg.memory") .defaultValue("128 mb") .withDescription("Sets the managed memory size of hash aggregate operator."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) public static final ConfigOption TABLE_EXEC_RESOURCE_HASH_JOIN_MEMORY = key("table.exec.resource.hash-join.memory") .defaultValue("128 mb") .withDescription("Sets the managed memory for hash join operator. It defines the lower limit."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) public static final ConfigOption TABLE_EXEC_RESOURCE_SORT_MEMORY = key("table.exec.resource.sort.memory") .defaultValue("128 mb") @@ -118,6 +129,7 @@ public class ExecutionConfigOptions { /** * See {@code org.apache.flink.table.runtime.operators.window.grouping.HeapWindowsGrouping}. */ + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) public static final ConfigOption TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT = key("table.exec.window-agg.buffer-size-limit") .defaultValue(100 * 1000) @@ -126,11 +138,13 @@ public class ExecutionConfigOptions { // ------------------------------------------------------------------------ // Async Lookup Options // ------------------------------------------------------------------------ + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) public static final ConfigOption TABLE_EXEC_ASYNC_LOOKUP_BUFFER_CAPACITY = key("table.exec.async-lookup.buffer-capacity") .defaultValue(100) .withDescription("The max number of async i/o operation that the async lookup join can trigger."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) public static final ConfigOption TABLE_EXEC_ASYNC_LOOKUP_TIMEOUT = key("table.exec.async-lookup.timeout") .defaultValue("3 min") @@ -139,7 +153,7 @@ public class ExecutionConfigOptions { // ------------------------------------------------------------------------ // MiniBatch Options // ------------------------------------------------------------------------ - + @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) public static final ConfigOption TABLE_EXEC_MINIBATCH_ENABLED = key("table.exec.mini-batch.enabled") .defaultValue(false) @@ -149,6 +163,7 @@ public class ExecutionConfigOptions { "NOTE: If mini-batch is enabled, 'table.exec.mini-batch.allow-latency' and " + "'table.exec.mini-batch.size' must be set."); + @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) public static final ConfigOption TABLE_EXEC_MINIBATCH_ALLOW_LATENCY = key("table.exec.mini-batch.allow-latency") .defaultValue("-1 ms") @@ -157,6 +172,7 @@ public class ExecutionConfigOptions { "MiniBatch is triggered with the allowed latency interval and when the maximum number of buffered records reached. " + "NOTE: If " + TABLE_EXEC_MINIBATCH_ENABLED.key() + " is set true, its value must be greater than zero."); + @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) public static final ConfigOption TABLE_EXEC_MINIBATCH_SIZE = key("table.exec.mini-batch.size") .defaultValue(-1L) @@ -169,6 +185,7 @@ public class ExecutionConfigOptions { // ------------------------------------------------------------------------ // Other Exec Options // ------------------------------------------------------------------------ + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) public static final ConfigOption TABLE_EXEC_DISABLED_OPERATORS = key("table.exec.disabled-operators") .noDefaultValue() @@ -178,6 +195,7 @@ public class ExecutionConfigOptions { "\"SortMergeJoin\", \"HashAgg\", \"SortAgg\".\n" + "By default no operator is disabled."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) public static final ConfigOption TABLE_EXEC_SHUFFLE_MODE = key("table.exec.shuffle-mode") .defaultValue("batch") diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java index 8d6c1fdadeb37..c9ad6efe18346 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java @@ -18,6 +18,7 @@ package org.apache.flink.table.api.config; +import org.apache.flink.annotation.docs.Documentation; import org.apache.flink.configuration.ConfigOption; import static org.apache.flink.configuration.ConfigOptions.key; @@ -32,6 +33,7 @@ public class OptimizerConfigOptions { // ------------------------------------------------------------------------ // Optimizer Options // ------------------------------------------------------------------------ + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) public static final ConfigOption TABLE_OPTIMIZER_AGG_PHASE_STRATEGY = key("table.optimizer.agg-phase-strategy") .defaultValue("AUTO") @@ -42,12 +44,14 @@ public class OptimizerConfigOptions { "Note that if aggregate call does not support optimize into two phase, we will still use one stage aggregate.\n" + "ONE_PHASE: Enforce to use one stage aggregate which only has CompleteGlobalAggregate."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH) public static final ConfigOption TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD = key("table.optimizer.join.broadcast-threshold") .defaultValue(1024 * 1024L) .withDescription("Configures the maximum size in bytes for a table that will be broadcast to all worker " + "nodes when performing a join. By setting this value to -1 to disable broadcasting."); + @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) public static final ConfigOption TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED = key("table.optimizer.distinct-agg.split.enabled") .defaultValue(false) @@ -58,6 +62,7 @@ public class OptimizerConfigOptions { "when there is data skew in distinct aggregation and gives the ability to scale-up the job. " + "Default is false."); + @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING) public static final ConfigOption TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_BUCKET_NUM = key("table.optimizer.distinct-agg.split.bucket-num") .defaultValue(1024) @@ -65,23 +70,27 @@ public class OptimizerConfigOptions { "The number is used in the first level aggregation to calculate a bucket key " + "'hash_code(distinct_key) % BUCKET_NUM' which is used as an additional group key after splitting."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) public static final ConfigOption TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED = key("table.optimizer.reuse-sub-plan-enabled") .defaultValue(true) .withDescription("When it is true, the optimizer will try to find out duplicated sub-plans and reuse them."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) public static final ConfigOption TABLE_OPTIMIZER_REUSE_SOURCE_ENABLED = key("table.optimizer.reuse-source-enabled") .defaultValue(true) .withDescription("When it is true, the optimizer will try to find out duplicated table sources and " + "reuse them. This works only when " + TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED.key() + " is true."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) public static final ConfigOption TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED = key("table.optimizer.source.predicate-pushdown-enabled") .defaultValue(true) .withDescription("When it is true, the optimizer will push down predicates into the FilterableTableSource. " + "Default value is true."); + @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING) public static final ConfigOption TABLE_OPTIMIZER_JOIN_REORDER_ENABLED = key("table.optimizer.join-reorder-enabled") .defaultValue(false)