Skip to content

Commit

Permalink
[FLINK-26281][connectors/elasticsearch] setting default delivery guar…
Browse files Browse the repository at this point in the history
…antee to AT_LEAST_ONCE
  • Loading branch information
alpreu authored and fapaul committed Mar 16, 2022
1 parent 63b920b commit 7c4c470
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 49 deletions.
52 changes: 8 additions & 44 deletions docs/content/docs/connectors/datastream/elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,7 @@ flushes of the buffered actions in progress.

### Elasticsearch Sinks and Fault Tolerance

By default, the Flink Elasticsearch Sink will not provide any strong delivery guarantees.
Users have the option to enable at-least-once semantics for the Elasticsearch sink.

With Flink’s checkpointing enabled, the Flink Elasticsearch Sink can guarantee
With Flink’s checkpointing enabled, the Flink Elasticsearch Sink guarantees
at-least-once delivery of action requests to Elasticsearch clusters. It does
so by waiting for all pending action requests in the `BulkProcessor` at the
time of checkpoints. This effectively assures that all requests before the
Expand All @@ -226,63 +223,30 @@ proceeding to process more records sent to the sink.

More details on checkpoints and fault tolerance are in the [fault tolerance docs]({{< ref "docs/learn-flink/fault_tolerance" >}}).

To use fault tolerant Elasticsearch Sinks, at-least-once delivery has to be configured and checkpointing of the topology needs to be enabled at the execution environment:
To use fault tolerant Elasticsearch Sinks, checkpointing of the topology needs to be enabled at the execution environment:

{{< tabs "d00d1e93-4844-40d7-b0ec-9ec37e73145e" >}}
{{< tab "Java" >}}
Elasticsearch 6:
```java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs

Elasticsearch6SinkBuilder sinkBuilder = new Elasticsearch6SinkBuilder<String>()
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setHosts(new HttpHost("127.0.0.1", 9200, "http"))
.setEmitter(
(element, context, indexer) ->
indexer.add(createIndexRequest(element)));
```

Elasticsearch 7:
```java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs

Elasticsearch7SinkBuilder sinkBuilder = new Elasticsearch7SinkBuilder<String>()
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setHosts(new HttpHost("127.0.0.1", 9200, "http"))
.setEmitter(
(element, context, indexer) ->
indexer.add(createIndexRequest(element)));
```
{{< /tab >}}
{{< tab "Scala" >}}
Elasticsearch 6:
```scala
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs

val sinkBuilder = new Elasticsearch6SinkBuilder[String]
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setHosts(new HttpHost("127.0.0.1", 9200, "http"))
.setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) =>
indexer.add(createIndexRequest(element)))
```

Elasticsearch 7:
```scala
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs

val sinkBuilder = new Elasticsearch7SinkBuilder[String]
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setHosts(new HttpHost("127.0.0.1", 9200, "http"))
.setEmitter((element: String, context: SinkWriter.Context, indexer: RequestIndexer) =>
indexer.add(createIndexRequest(element)))
```
{{< /tab >}}
{{< /tabs >}}

<p style="border-radius: 5px; padding: 5px" class="bg-info">
<b>IMPORTANT</b>: Checkpointing is not enabled by default but the default delivery guarantee is AT_LEAST_ONCE.
This causes the sink to buffer requests until it either finishes or the BulkProcessor flushes automatically.
By default, the BulkProcessor will flush after 1000 added Actions. To configure the processor to flush more frequently, please refer to the <a href="#configuring-the-internal-bulk-processor">BulkProcessor configuration section</a>.
</p>

### Handling Failing Elasticsearch Requests

Elasticsearch action requests may fail due to a variety of reasons, including
Expand Down
2 changes: 1 addition & 1 deletion docs/content/docs/connectors/table/elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ Connector Options
<td><h5>sink.delivery-guarantee</h5></td>
<td>optional</td>
<td>no</td>
<td style="word-wrap: break-word;">NONE</td>
<td style="word-wrap: break-word;">AT_LEAST_ONCE</td>
<td>String</td>
<td>Optional delivery guarantee when committing. Valid values are <code>NONE</code> or <code>AT_LEAST_ONCE</code>.</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.connector.elasticsearch.sink;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.base.DeliveryGuarantee;
Expand Down Expand Up @@ -88,4 +89,9 @@ public SinkWriter<IN> createWriter(InitContext context) throws IOException {
context.metricGroup(),
context.getMailboxExecutor());
}

@VisibleForTesting
DeliveryGuarantee getDeliveryGuarantee() {
return deliveryGuarantee;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public abstract class ElasticsearchSinkBuilderBase<
private FlushBackoffType bulkFlushBackoffType = FlushBackoffType.NONE;
private int bulkFlushBackoffRetries = -1;
private long bulkFlushBackOffDelay = -1;
private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.NONE;
private DeliveryGuarantee deliveryGuarantee = DeliveryGuarantee.AT_LEAST_ONCE;
private List<HttpHost> hosts;
protected ElasticsearchEmitter<? super IN> emitter;
private String username;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,6 @@ public class ElasticsearchConnectorOptions {
public static final ConfigOption<DeliveryGuarantee> DELIVERY_GUARANTEE_OPTION =
ConfigOptions.key("sink.delivery-guarantee")
.enumType(DeliveryGuarantee.class)
.defaultValue(DeliveryGuarantee.NONE)
.defaultValue(DeliveryGuarantee.AT_LEAST_ONCE)
.withDescription("Optional delivery guarantee when committing.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;

Expand Down Expand Up @@ -56,6 +57,12 @@ Stream<DynamicTest> testValidBuilders() {
builder -> assertDoesNotThrow(builder::build));
}

@Test
void testDefaultDeliveryGuarantee() {
assertThat(createMinimalBuilder().build().getDeliveryGuarantee())
.isEqualTo(DeliveryGuarantee.AT_LEAST_ONCE);
}

@Test
void testThrowIfExactlyOnceConfigured() {
assertThrows(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
* .source(element.f1)
* );
* })
* .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
* .build();
* }</pre>
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
* .source(element.f1)
* );
* })
* .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
* .build();
* }</pre>
*
Expand Down

0 comments on commit 7c4c470

Please sign in to comment.