Skip to content

Commit

Permalink
[FLINK-13119][docs] Add Blink table config to documentation
Browse files Browse the repository at this point in the history
This closes apache#9024.
  • Loading branch information
Xupingyong authored and twalthr committed Aug 5, 2019
1 parent 949b7e1 commit 00e0baa
Show file tree
Hide file tree
Showing 9 changed files with 354 additions and 5 deletions.
110 changes: 110 additions & 0 deletions docs/_includes/generated/execution_config_configuration.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 65%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>table.exec.async-lookup.buffer-capacity</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">100</td>
<td>The max number of async i/o operation that the async lookup join can trigger.</td>
</tr>
<tr>
<td><h5>table.exec.async-lookup.timeout</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">"3 min"</td>
<td>The async timeout for the asynchronous operation to complete.</td>
</tr>
<tr>
<td><h5>table.exec.disabled-operators</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Mainly for testing. A comma-separated list of operator names, each name represents a kind of disabled operator.
Operators that can be disabled include "NestedLoopJoin", "ShuffleHashJoin", "BroadcastHashJoin", "SortMergeJoin", "HashAgg", "SortAgg".
By default no operator is disabled.</td>
</tr>
<tr>
<td><h5>table.exec.mini-batch.allow-latency</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">"-1 ms"</td>
<td>The maximum latency can be used for MiniBatch to buffer input records. MiniBatch is an optimization to buffer input records to reduce state access. MiniBatch is triggered with the allowed latency interval and when the maximum number of buffered records reached. NOTE: If table.exec.mini-batch.enabled is set true, its value must be greater than zero.</td>
</tr>
<tr>
<td><h5>table.exec.mini-batch.enabled</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">false</td>
<td>Specifies whether to enable MiniBatch optimization. MiniBatch is an optimization to buffer input records to reduce state access. This is disabled by default. To enable this, users should set this config to true. NOTE: If mini-batch is enabled, 'table.exec.mini-batch.allow-latency' and 'table.exec.mini-batch.size' must be set.</td>
</tr>
<tr>
<td><h5>table.exec.mini-batch.size</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">-1</td>
<td>The maximum number of input records can be buffered for MiniBatch. MiniBatch is an optimization to buffer input records to reduce state access. MiniBatch is triggered with the allowed latency interval and when the maximum number of buffered records reached. NOTE: MiniBatch only works for non-windowed aggregations currently. If table.exec.mini-batch.enabled is set true, its value must be positive.</td>
</tr>
<tr>
<td><h5>table.exec.resource.default-parallelism</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">-1</td>
<td>Sets default parallelism for all operators (such as aggregate, join, filter) to run with parallel instances. This config has a higher priority than parallelism of StreamExecutionEnvironment (actually, this config overrides the parallelism of StreamExecutionEnvironment). A value of -1 indicates that no default parallelism is set, then it will fallback to use the parallelism of StreamExecutionEnvironment.</td>
</tr>
<tr>
<td><h5>table.exec.resource.external-buffer-memory</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">"10 mb"</td>
<td>Sets the external buffer memory size that is used in sort merge join and nested join and over window.</td>
</tr>
<tr>
<td><h5>table.exec.resource.hash-agg.memory</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">"128 mb"</td>
<td>Sets the managed memory size of hash aggregate operator.</td>
</tr>
<tr>
<td><h5>table.exec.resource.hash-join.memory</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">"128 mb"</td>
<td>Sets the managed memory for hash join operator. It defines the lower limit.</td>
</tr>
<tr>
<td><h5>table.exec.resource.sort.memory</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">"128 mb"</td>
<td>Sets the managed buffer memory size for sort operator.</td>
</tr>
<tr>
<td><h5>table.exec.shuffle-mode</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">"batch"</td>
<td>Sets exec shuffle mode. Only batch or pipeline can be set.
batch: the job will run stage by stage.
pipeline: the job will run in streaming mode, but it may cause resource deadlock that receiver waits for resource to start when the sender holds resource to wait to send data to the receiver.</td>
</tr>
<tr>
<td><h5>table.exec.sort.async-merge-enabled</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">true</td>
<td>Whether to asynchronously merge sorted spill files.</td>
</tr>
<tr>
<td><h5>table.exec.sort.default-limit</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">-1</td>
<td>Default limit when user don't set a limit after order by. -1 indicates that this configuration is ignored.</td>
</tr>
<tr>
<td><h5>table.exec.sort.max-num-file-handles</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">128</td>
<td>The maximal fan-in for external merge sort. It limits the number of file handles per operator. If it is too small, may cause intermediate merging. But if it is too large, it will cause too many files opened at the same time, consume memory and lead to random reading.</td>
</tr>
<tr>
<td><h5>table.exec.source.idle-timeout</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">"-1 ms"</td>
<td>When a source do not receive any elements for the timeout time, it will be marked as temporarily idle. This allows downstream tasks to advance their watermarks without the need to wait for watermarks from this source while it is idle.</td>
</tr>
<tr>
<td><h5>table.exec.spill-compression.block-size</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">"64 kb"</td>
<td>The memory size used to do compress when spilling data. The larger the memory, the higher the compression ratio, but more memory resource will be consumed by the job.</td>
</tr>
<tr>
<td><h5>table.exec.spill-compression.enabled</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">true</td>
<td>Whether to compress spilled data. Currently we only support compress spilled data for sort and hash-agg and hash-join operators.</td>
</tr>
<tr>
<td><h5>table.exec.window-agg.buffer-size-limit</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">100000</td>
<td>Sets the window elements buffer size limit used in group window agg operator.</td>
</tr>
</tbody>
</table>
54 changes: 54 additions & 0 deletions docs/_includes/generated/optimizer_config_configuration.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 65%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>table.optimizer.agg-phase-strategy</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">"AUTO"</td>
<td>Strategy for aggregate phase. Only AUTO, TWO_PHASE or ONE_PHASE can be set.
AUTO: No special enforcer for aggregate stage. Whether to choose two stage aggregate or one stage aggregate depends on cost.
TWO_PHASE: Enforce to use two stage aggregate which has localAggregate and globalAggregate. Note that if aggregate call does not support optimize into two phase, we will still use one stage aggregate.
ONE_PHASE: Enforce to use one stage aggregate which only has CompleteGlobalAggregate.</td>
</tr>
<tr>
<td><h5>table.optimizer.distinct-agg.split.bucket-num</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">1024</td>
<td>Configure the number of buckets when splitting distinct aggregation. The number is used in the first level aggregation to calculate a bucket key 'hash_code(distinct_key) % BUCKET_NUM' which is used as an additional group key after splitting.</td>
</tr>
<tr>
<td><h5>table.optimizer.distinct-agg.split.enabled</h5><br> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">false</td>
<td>Tells the optimizer whether to split distinct aggregation (e.g. COUNT(DISTINCT col), SUM(DISTINCT col)) into two level. The first aggregation is shuffled by an additional key which is calculated using the hashcode of distinct_key and number of buckets. This optimization is very useful when there is data skew in distinct aggregation and gives the ability to scale-up the job. Default is false.</td>
</tr>
<tr>
<td><h5>table.optimizer.join-reorder-enabled</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">false</td>
<td>Enables join reorder in optimizer. Default is disabled.</td>
</tr>
<tr>
<td><h5>table.optimizer.join.broadcast-threshold</h5><br> <span class="label label-primary">Batch</span></td>
<td style="word-wrap: break-word;">1048576</td>
<td>Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 to disable broadcasting.</td>
</tr>
<tr>
<td><h5>table.optimizer.reuse-source-enabled</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">true</td>
<td>When it is true, the optimizer will try to find out duplicated table sources and reuse them. This works only when table.optimizer.reuse-sub-plan-enabled is true.</td>
</tr>
<tr>
<td><h5>table.optimizer.reuse-sub-plan-enabled</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">true</td>
<td>When it is true, the optimizer will try to find out duplicated sub-plans and reuse them.</td>
</tr>
<tr>
<td><h5>table.optimizer.source.predicate-pushdown-enabled</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">true</td>
<td>When it is true, the optimizer will push down predicates into the FilterableTableSource. Default value is true.</td>
</tr>
</tbody>
</table>
103 changes: 103 additions & 0 deletions docs/dev/table/config.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
---
title: "Configuration"
nav-parent_id: tableapi
nav-pos: 150
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

By default, the Table & SQL API is preconfigured for producing accurate results with acceptable
performance.

Depending on the requirements of a table program, it might be necessary to adjust
certain parameters for optimization. For example, unbounded streaming programs may need to ensure
that the required state size is capped (see [streaming concepts](./streaming/query_configuration.html)).

* This will be replaced by the TOC
{:toc}

### Overview

In every table environment, the `TableConfig` offers options for configuring the current session.

For common or important configuration options, the `TableConfig` provides getters and setters methods
with detailed inline documentation.

For more advanced configuration, users can directly access the underlying key-value map. The following
sections list all available options that can be used to adjust Flink Table & SQL API programs.

<span class="label label-danger">Attention</span> Because options are read at different point in time
when performing operations, it is recommended to set configuration options early after instantiating a
table environment.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
// instantiate table environment
TableEnvironment tEnv = ...

tEnv.getConfig() // access high-level configuration
.getConfiguration() // set low-level key-value options
.setString("table.exec.mini-batch.enabled", "true")
.setString("table.exec.mini-batch.allow-latency", "5 s")
.setString("table.exec.mini-batch.size", "5000");
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
// instantiate table environment
val tEnv: TableEnvironment = ...

tEnv.getConfig // access high-level configuration
.getConfiguration // set low-level key-value options
.setString("table.exec.mini-batch.enabled", "true")
.setString("table.exec.mini-batch.allow-latency", "5 s")
.setString("table.exec.mini-batch.size", "5000")
{% endhighlight %}
</div>

<div data-lang="python" markdown="1">
{% highlight python %}
# instantiate table environment
t_env = ...

t_env.get_config() # access high-level configuration
.get_configuration() # set low-level key-value options
.set_string("table.exec.mini-batch.enabled", "true")
.set_string("table.exec.mini-batch.allow-latency", "5 s")
.set_string("table.exec.mini-batch.size", "5000");
{% endhighlight %}
</div>
</div>

<span class="label label-danger">Attention</span> Currently, key-value options are only supported
for the Blink planner.

### Execution Options

The following options can be used to tune the performance of the query execution.

{% include generated/execution_config_configuration.html %}

### Optimizer Options

The following options can be used to adjust the behavior of the query optimizer to get a better execution plan.

{% include generated/optimizer_config_configuration.html %}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,38 @@ public final class Documentation {
int position() default Integer.MAX_VALUE;
}

/**
* Annotation used on table config options for adding meta data labels.
*
* <p>The {@link TableOption#execMode()} argument indicates the execution mode the config works for
* (batch, streaming or both).
*/
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Internal
public @interface TableOption {
ExecMode execMode();
}

/**
* The execution mode the config works for.
*/
public enum ExecMode {

BATCH("Batch"), STREAMING("Streaming"), BATCH_STREAMING("Batch and Streaming");

private final String name;

ExecMode(String name) {
this.name = name;
}

@Override
public String toString() {
return name;
}
}

/**
* Annotation used on config option fields to exclude the config option from documentation.
*/
Expand Down
5 changes: 5 additions & 0 deletions flink-docs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ under the License.
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-prometheus_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public class ConfigOptionsDocGenerator {
new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.configuration"),
new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.runtime.clusterframework"),
new OptionsClassLocation("flink-metrics/flink-metrics-prometheus", "org.apache.flink.metrics.prometheus"),
new OptionsClassLocation("flink-state-backends/flink-statebackend-rocksdb", "org.apache.flink.contrib.streaming.state")
new OptionsClassLocation("flink-state-backends/flink-statebackend-rocksdb", "org.apache.flink.contrib.streaming.state"),
new OptionsClassLocation("flink-table/flink-table-api-java", "org.apache.flink.table.api.config")
};

static final Set<String> EXCLUSIONS = new HashSet<>(Arrays.asList(
Expand Down Expand Up @@ -260,10 +261,26 @@ private static String toHtmlTable(final List<OptionWithMetaInfo> options) {
private static String toHtmlString(final OptionWithMetaInfo optionWithMetaInfo) {
ConfigOption<?> option = optionWithMetaInfo.option;
String defaultValue = stringifyDefault(optionWithMetaInfo);
Documentation.TableOption tableOption = optionWithMetaInfo.field.getAnnotation(Documentation.TableOption.class);
StringBuilder execModeStringBuilder = new StringBuilder();
if (tableOption != null) {
Documentation.ExecMode execMode = tableOption.execMode();
if (Documentation.ExecMode.BATCH_STREAMING.equals(execMode)) {
execModeStringBuilder.append("<br> <span class=\"label label-primary\">")
.append(Documentation.ExecMode.BATCH.toString())
.append("</span> <span class=\"label label-primary\">")
.append(Documentation.ExecMode.STREAMING.toString())
.append("</span>");
} else {
execModeStringBuilder.append("<br> <span class=\"label label-primary\">")
.append(execMode.toString())
.append("</span>");
}
}

return "" +
" <tr>\n" +
" <td><h5>" + escapeCharacters(option.key()) + "</h5></td>\n" +
" <td><h5>" + escapeCharacters(option.key()) + "</h5>" + execModeStringBuilder.toString() + "</td>\n" +
" <td style=\"word-wrap: break-word;\">" + escapeCharacters(addWordBreakOpportunities(defaultValue)) + "</td>\n" +
" <td>" + formatter.format(option.description()) + "</td>\n" +
" </tr>\n";
Expand Down
Loading

0 comments on commit 00e0baa

Please sign in to comment.