Skip to content

Commit

Permalink
[FLINK-23877][connector/pulsar] Since we have use the StartCursor, it…
Browse files Browse the repository at this point in the history
…'s no need to expose the resetIncludeHead option for users.
  • Loading branch information
syhily authored and AHeise committed Aug 26, 2021
1 parent 0fc9008 commit 111b7ec
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -474,16 +474,6 @@ private PulsarSourceOptions() {
code("replicateSubscriptionState"))
.build());

public static final ConfigOption<Boolean> PULSAR_RESET_INCLUDE_HEAD =
ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "resetIncludeHead")
.booleanType()
.defaultValue(false);

public static final ConfigOption<Boolean> PULSAR_BATCH_INDEX_ACK_ENABLED =
ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "batchIndexAckEnabled")
.booleanType()
.defaultValue(false);

public static final ConfigOption<Boolean> PULSAR_ACK_RECEIPT_ENABLED =
ConfigOptions.key(CONSUMER_CONFIG_PREFIX + "ackReceiptEnabled")
.booleanType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_READ_COMPACTED;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_RECEIVER_QUEUE_SIZE;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_REPLICATE_SUBSCRIPTION_STATE;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_RESET_INCLUDE_HEAD;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_RETRY_ENABLE;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_RETRY_LETTER_TOPIC;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_INITIAL_POSITION;
Expand Down Expand Up @@ -166,9 +165,6 @@ public static <T> ConsumerBuilder<T> createConsumerBuilder(
configuration,
PULSAR_AUTO_UPDATE_PARTITIONS_INTERVAL_SECONDS,
v -> builder.autoUpdatePartitionsInterval(v, SECONDS));
if (configuration.getOptional(PULSAR_RESET_INCLUDE_HEAD).orElse(false)) {
builder.startMessageIdInclusive();
}
setOptionValue(configuration, PULSAR_RETRY_ENABLE, builder::enableRetry);
setOptionValue(
configuration,
Expand Down

0 comments on commit 111b7ec

Please sign in to comment.