Skip to content

Commit

Permalink
Expose new entries check delay in the broker.conf (apache#7154)
Browse files Browse the repository at this point in the history
Motivation
Expose new entries check delay in the broker.conf
  • Loading branch information
codelipenghui authored Jun 4, 2020
1 parent 8638022 commit b2329e4
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 1 deletion.
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,12 @@ managedLedgerReadEntryTimeoutSeconds=0
# Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it).
managedLedgerAddEntryTimeoutSeconds=0

# New entries check delay for the cursor under the managed ledger.
# If no new messages in the topic, the cursor will try to check again after the delay time.
# For consumption latency sensitive scenario, can set to a smaller value or set to 0.
# Of course, use a smaller value may degrade consumption throughput. Default is 10ms.
managedLedgerNewEntriesCheckDelayInMillis=10

### --- Load balancer --- ###

# Enable load balancer
Expand Down
6 changes: 6 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,12 @@ managedLedgerReadEntryTimeoutSeconds=0
# Add entry timeout when broker tries to publish message to bookkeeper (0 to disable it).
managedLedgerAddEntryTimeoutSeconds=0

# New entries check delay for the cursor under the managed ledger.
# If no new messages in the topic, the cursor will try to check again after the delay time.
# For consumption latency sensitive scenario, can set to a smaller value or set to 0.
# Of course, use a smaller value may degrade consumption throughput. Default is 10ms.
managedLedgerNewEntriesCheckDelayInMillis=10

# Use Open Range-Set to cache unacked messages
managedLedgerUnackedRangesOpenCacheSetEnabled=true

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class ManagedLedgerConfig {
private Class<? extends EnsemblePlacementPolicy> bookKeeperEnsemblePlacementPolicyClassName;
private Map<String, Object> bookKeeperEnsemblePlacementPolicyProperties;
private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE;
private int newEntriesCheckDelayInMillis = 10;
private Clock clock = Clock.systemUTC();

public boolean isCreateIfMissing() {
Expand Down Expand Up @@ -602,4 +603,12 @@ public boolean isDeletionAtBatchIndexLevelEnabled() {
public void setDeletionAtBatchIndexLevelEnabled(boolean deletionAtBatchIndexLevelEnabled) {
this.deletionAtBatchIndexLevelEnabled = deletionAtBatchIndexLevelEnabled;
}

public int getNewEntriesCheckDelayInMillis() {
return newEntriesCheckDelayInMillis;
}

public void setNewEntriesCheckDelayInMillis(int newEntriesCheckDelayInMillis) {
this.newEntriesCheckDelayInMillis = newEntriesCheckDelayInMillis;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntrie
// If the managed ledger was indeed terminated, we need to notify the cursor
callback.readEntriesFailed(new NoMoreEntriesToReadException("Topic was terminated"), ctx);
}
}), 10, TimeUnit.MILLISECONDS);
}), config.getNewEntriesCheckDelayInMillis(), TimeUnit.MILLISECONDS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1224,6 +1224,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Add entry timeout when broker tries to publish message to bookkeeper.(0 to disable it)")
private long managedLedgerAddEntryTimeoutSeconds = 0;

@FieldContext(category = CATEGORY_STORAGE_ML,
doc = "New entries check delay for the cursor under the managed ledger. \n"
+ "If no new messages in the topic, the cursor will try to check again after the delay time. \n"
+ "For consumption latency sensitive scenario, can set to a smaller value or set to 0.\n"
+ "Of course, this may degrade consumption throughput. Default is 10ms.")
private int managedLedgerNewEntriesCheckDelayInMillis = 10;

/*** --- Load balancer --- ****/
@FieldContext(
category = CATEGORY_LOAD_BALANCER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,7 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t
managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));

managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
managedLedgerConfig.setNewEntriesCheckDelayInMillis(serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis());

future.complete(managedLedgerConfig);
}, (exception) -> future.completeExceptionally(exception)));
Expand Down

0 comments on commit b2329e4

Please sign in to comment.