Skip to content

Commit

Permalink
[FLINK-26180][connectors/filesystem] Update docs to introduce the com…
Browse files Browse the repository at this point in the history
…paction for FileSink.

This closes apache#18797.
  • Loading branch information
pltbkd authored and gaoyunhaii committed Feb 17, 2022
1 parent 54b21e8 commit 8b6ca8e
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 0 deletions.
64 changes: 64 additions & 0 deletions docs/content.zh/docs/connectors/datastream/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,70 @@ val sink = FileSink
{{< /tab >}}
{{< /tabs >}}

### 文件合并

1.15 版本开始 `FileSink` 开始支持已经提交 `pending` 文件的合并,从而允许应用设置一个较小的时间周期并且避免生成大量的小文件。
尤其是当用户使用 [bulk 格式]({{< ref "docs/connectors/datastream/filesystem#bulk-encoded-formats" >}}) 的时候:
这种格式要求用户必须在 checkpoint 的时候切换文件。

文件合并功能可以通过以下代码打开:

{{< tabs "enablecompaction" >}}
{{< tab "Java" >}}
```java

FileSink<Integer> fileSink=
FileSink.forRowFormat(new Path(path),new SimpleStringEncoder<Integer>())
.enableCompact(
FileCompactStrategy.Builder.newBuilder()
.setNumCompactThreads(1024)
.enableCompactionOnCheckpoint(5)
.build(),
new RecordWiseFileCompactor<>(
new DecoderBasedReader.Factory<>(SimpleStringDecoder::new)))
.build();

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala

val fileSink: FileSink[Integer] =
FileSink.forRowFormat(new Path(path), new SimpleStringEncoder[Integer]())
.enableCompact(
FileCompactStrategy.Builder.newBuilder()
.setNumCompactThreads(1024)
.enableCompactionOnCheckpoint(5)
.build(),
new RecordWiseFileCompactor(
new DecoderBasedReader.Factory(() => new SimpleStringDecoder)))
.build()

```
{{< /tab >}}
{{< /tabs >}}

这一功能开启后,在文件转为 `pending` 状态与文件最终提交之间会进行文件合并。这些 `pending` 状态的文件将首先被提交为一个以 `.` 开头的
临时文件。这些文件随后将会按照用户指定的策略和合并方式进行合并并生成合并后的 `pending` 状态的文件。
然后这些文件将被发送给 Committer 并提交为正式文件,在这之后,原始的临时文件也会被删除掉。

当开启文件合并功能时,用户需要指定 {{< javadoc file="org/apache/flink/connector/file/sink/compactor/FileCompactStrategy.html" name="FileCompactStrategy">}} 与
{{< javadoc file="org/apache/flink/connector/file/sink/compactor/FileCompactor.html" name="FileCompactor">}} 。

{{< javadoc file="org/apache/flink/connector/file/sink/compactor/FileCompactStrategy.html" name="FileCompactStrategy">}} 指定何时以及哪些文件将被合并。
目前有两个并行的条件:目标文件大小与间隔的 Checkpoint 数量。当目前缓存的文件的总大小达到指定的阈值,或自上次合并后经过的 Checkpoint 次数已经达到指定次数时,
`FileSink` 将创建一个异步任务来合并当前缓存的文件。

{{< javadoc file="org/apache/flink/connector/file/sink/compactor/FileCompactor.html" name="FileCompactor">}} 指定如何将给定的路径列表对应的文件进行合并将结果写入
到 {{< javadoc file="org/apache/flink/streaming/api/functions/sink/filesystem//CompactingFileWriter.html" name="CompactingFileWriter">}} 中。根据所给定的 `CompactingFileWriter` 的类型,它可以分为两类:

- **{{< javadoc file="org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.html" name="OutputStreamBasedFileCompactor">}}** : 这种类型的 `CompactingFileWriter` 可以被转换为一个输出流,用户可以将合并后的结果直接写入该流中。这种类型的 `CompactingFileWriter` 的一个例子是 {{< javadoc file="org/apache/flink/connector/file/sink/compactor/ConcatFileCompactor.html" name="ConcatFileCompactor">}},它直接将给定的文件进行合并并将结果写到输出流中。
- **{{< javadoc file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html" name="RecordWiseFileCompactor">}}** :这种类型的 `CompactingFileWriter` 允许用户将按条写入记录。`CompactingFileWriter` 的一个例子是 {{< javadoc file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html" name="RecordWiseFileCompactor">}} ,它从给定的文件中读出记录并写出到 `CompactingFileWriter` 中。用户需要指定如何从原始文件中读出记录。

{{< hint info >}}
**重要** 如果启用了文件合并功能,文件可见的时间会被延长。
{{< /hint >}}

### Important Considerations

#### General
Expand Down
67 changes: 67 additions & 0 deletions docs/content/docs/connectors/datastream/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -958,6 +958,73 @@ val sink = FileSink
{{< /tab >}}
{{< /tabs >}}

### Compaction

Since version 1.15 `FileSink` supports compaction of the `pending` files,
which allows the application to have smaller checkpoint interval without generating a lot of small files,
especially when using the [bulk encoded formats]({{< ref "docs/connectors/datastream/filesystem#bulk-encoded-formats" >}})
that have to rolling on taking checkpoints.

Compaction could be enabled with

{{< tabs "enablecompaction" >}}
{{< tab "Java" >}}
```java

FileSink<Integer> fileSink=
FileSink.forRowFormat(new Path(path),new SimpleStringEncoder<Integer>())
.enableCompact(
FileCompactStrategy.Builder.newBuilder()
.setSizeThreshold(1024)
.enableCompactionOnCheckpoint(5)
.build(),
new RecordWiseFileCompactor<>(
new DecoderBasedReader.Factory<>(SimpleStringDecoder::new)))
.build();

```
{{< /tab >}}
{{< tab "Scala" >}}
```scala

val fileSink: FileSink[Integer] =
FileSink.forRowFormat(new Path(path), new SimpleStringEncoder[Integer]())
.enableCompact(
FileCompactStrategy.Builder.newBuilder()
.setSizeThreshold(1024)
.enableCompactionOnCheckpoint(5)
.build(),
new RecordWiseFileCompactor(
new DecoderBasedReader.Factory(() => new SimpleStringDecoder)))
.build()

```
{{< /tab >}}
{{< /tabs >}}

Once enabled, the compaction happens between the files become `pending` and get committed. The pending files will
be first committed to temporary files whose path starts with `.`. Then these files will be compacted according to
the strategy by the compactor specified by the users, and the new compacted pending files will be generated.
Then these pending files will be emitted to the committer to be committed to the formal files. After that, the source files will be removed.

When enabling compaction, you need to specify the {{< javadoc file="org/apache/flink/connector/file/sink/compactor/FileCompactStrategy.html" name="FileCompactStrategy">}}
and the {{< javadoc file="org/apache/flink/connector/file/sink/compactor/FileCompactor.html" name="FileCompactor">}}.

The {{< javadoc file="org/apache/flink/connector/file/sink/compactor/FileCompactStrategy.html" name="FileCompactStrategy">}} specifies
when and which files get compacted. Currently, there are two parallel conditions: the target file size and the number of checkpoints get passed.
Once the total size of the cached files has reached the size threshold or the number of checkpoints since the last compaction has reached the specified number,
the cached files will be scheduled to compact.

The {{< javadoc file="org/apache/flink/connector/file/sink/compactor/FileCompactor.html" name="FileCompactor">}} specifies how to compact
the give list of `Path` and write the result to {{< javadoc file="org/apache/flink/streaming/api/functions/sink/filesystem/CompactingFileWriter.html" name="CompactingFileWriter">}}. It could be classified into two types according to the type of the give `CompactingFileWriter`:

- **{{< javadoc file="org/apache/flink/connector/file/sink/compactor/OutputStreamBasedFileCompactor.html" name="OutputStreamBasedFileCompactor">}}**: The given `CompactingFileWriter` could be converted into an output stream that users could write the compacted results into. An example is the {{< javadoc file="org/apache/flink/connector/file/sink/compactor/ConcatFileCompactor.html" name="ConcatFileCompactor">}} that concats the list of files directly.
- **{{< javadoc file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html" name="RecordWiseFileCompactor">}}**: The given `CompactingFileWriter` allows users to write records one-by-one into it. An example is the {{< javadoc file="org/apache/flink/connector/file/sink/compactor/RecordWiseFileCompactor.html" name="RecordWiseFileCompactor">}} that reads records from the source files and then writes them with the `CompactingFileWriter`. Users need to specify how to read records from the source files.

{{< hint info >}}
**Important** Once the compaction is enabled, the written files need to wait for longer time before they get visible.
{{< /hint >}}

### Important Considerations

#### General
Expand Down

0 comments on commit 8b6ca8e

Please sign in to comment.