Skip to content

Commit

Permalink
[FLINK-21009] Can not disable certain options in Elasticsearch 7 conn…
Browse files Browse the repository at this point in the history
…ector
  • Loading branch information
dawidwys committed Jan 19, 2021
1 parent 3bf07d0 commit 9f24c48
Showing 3 changed files with 50 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -386,13 +386,11 @@ protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
}

if (bulkProcessorFlushMaxSizeMb != null) {
bulkProcessorBuilder.setBulkSize(
new ByteSizeValue(bulkProcessorFlushMaxSizeMb, ByteSizeUnit.MB));
configureBulkSize(bulkProcessorBuilder);
}

if (bulkProcessorFlushIntervalMillis != null) {
bulkProcessorBuilder.setFlushInterval(
TimeValue.timeValueMillis(bulkProcessorFlushIntervalMillis));
configureFlushInterval(bulkProcessorBuilder);
}

// if backoff retrying is disabled, bulkProcessorFlushBackoffPolicy will be null
@@ -402,6 +400,27 @@ protected BulkProcessor buildBulkProcessor(BulkProcessor.Listener listener) {
return bulkProcessorBuilder.build();
}

private void configureBulkSize(BulkProcessor.Builder bulkProcessorBuilder) {
final ByteSizeUnit sizeUnit;
if (bulkProcessorFlushMaxSizeMb == -1) {
// bulk size can be disabled with -1, however the ByteSizeValue constructor accepts -1
// only with BYTES as the size unit
sizeUnit = ByteSizeUnit.BYTES;
} else {
sizeUnit = ByteSizeUnit.MB;
}
bulkProcessorBuilder.setBulkSize(new ByteSizeValue(bulkProcessorFlushMaxSizeMb, sizeUnit));
}

private void configureFlushInterval(BulkProcessor.Builder bulkProcessorBuilder) {
if (bulkProcessorFlushIntervalMillis == -1) {
bulkProcessorBuilder.setFlushInterval(null);
} else {
bulkProcessorBuilder.setFlushInterval(
TimeValue.timeValueMillis(bulkProcessorFlushIntervalMillis));
}
}

private void checkErrorAndRethrow() {
Throwable cause = failureThrowable.get();
if (cause != null) {
Original file line number Diff line number Diff line change
@@ -109,31 +109,45 @@ public Builder(
}

/**
* Sets the maximum number of actions to buffer for each bulk request.
* Sets the maximum number of actions to buffer for each bulk request. You can pass -1 to
* disable it.
*
* @param numMaxActions the maximum number of actions to buffer per bulk request.
*/
public void setBulkFlushMaxActions(int numMaxActions) {
Preconditions.checkArgument(
numMaxActions == -1 || numMaxActions > 0,
"Max number of buffered actions must be larger than 0.");

this.bulkRequestsConfig.put(
CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, String.valueOf(numMaxActions));
}

/**
* Sets the maximum size of buffered actions, in mb, per bulk request.
* Sets the maximum size of buffered actions, in mb, per bulk request. You can pass -1 to
* disable it.
*
* @param maxSizeMb the maximum size of buffered actions, in mb.
*/
public void setBulkFlushMaxSizeMb(int maxSizeMb) {
Preconditions.checkArgument(
maxSizeMb == -1 || maxSizeMb > 0,
"Max size of buffered actions must be larger than 0.");

this.bulkRequestsConfig.put(
CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, String.valueOf(maxSizeMb));
}

/**
* Sets the bulk flush interval, in milliseconds.
* Sets the bulk flush interval, in milliseconds. You can pass -1 to disable it.
*
* @param intervalMillis the bulk flush interval, in milliseconds.
*/
public void setBulkFlushInterval(long intervalMillis) {
Preconditions.checkArgument(
intervalMillis == -1 || intervalMillis >= 0,
"Interval (in milliseconds) between each flush must be larger than or equal to 0.");

this.bulkRequestsConfig.put(
CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, String.valueOf(intervalMillis));
}
Original file line number Diff line number Diff line change
@@ -109,39 +109,43 @@ public Builder(
}

/**
* Sets the maximum number of actions to buffer for each bulk request.
* Sets the maximum number of actions to buffer for each bulk request. You can pass -1 to
* disable it.
*
* @param numMaxActions the maximum number of actions to buffer per bulk request.
*/
public void setBulkFlushMaxActions(int numMaxActions) {
Preconditions.checkArgument(
numMaxActions > 0, "Max number of buffered actions must be larger than 0.");
numMaxActions == -1 || numMaxActions > 0,
"Max number of buffered actions must be larger than 0.");

this.bulkRequestsConfig.put(
CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, String.valueOf(numMaxActions));
}

/**
* Sets the maximum size of buffered actions, in mb, per bulk request.
* Sets the maximum size of buffered actions, in mb, per bulk request. You can pass -1 to
* disable it.
*
* @param maxSizeMb the maximum size of buffered actions, in mb.
*/
public void setBulkFlushMaxSizeMb(int maxSizeMb) {
Preconditions.checkArgument(
maxSizeMb > 0, "Max size of buffered actions must be larger than 0.");
maxSizeMb == -1 || maxSizeMb > 0,
"Max size of buffered actions must be larger than 0.");

this.bulkRequestsConfig.put(
CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, String.valueOf(maxSizeMb));
}

/**
* Sets the bulk flush interval, in milliseconds.
* Sets the bulk flush interval, in milliseconds. You can pass -1 to disable it.
*
* @param intervalMillis the bulk flush interval, in milliseconds.
*/
public void setBulkFlushInterval(long intervalMillis) {
Preconditions.checkArgument(
intervalMillis >= 0,
intervalMillis == -1 || intervalMillis >= 0,
"Interval (in milliseconds) between each flush must be larger than or equal to 0.");

this.bulkRequestsConfig.put(

0 comments on commit 9f24c48

Please sign in to comment.