Skip to content

Commit

Permalink
Transparent batching of ZK operations (apache#13043)
Browse files Browse the repository at this point in the history
* Transparent batching of ZK operations

* Addressed comments

* Handle default switch case

* Fixed issues in MockZookeeper with usage of multi()

* Wrap Throwable with MetadataStoreException

* Fixed getChildren in MockZookeeper

* Handle cases in which ZK is failing the partial request, but with error=OK

* Fixed test to wait for put result

* Fixed test that was not waiting on put future to complete
  • Loading branch information
merlimat authored Dec 2, 2021
1 parent 2209d2a commit 0497f67
Show file tree
Hide file tree
Showing 20 changed files with 1,112 additions and 163 deletions.
15 changes: 15 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,21 @@ brokerClientTlsCiphers=
brokerClientTlsProtocols=


### --- Metadata Store --- ###

# Whether we should enable metadata operations batching
metadataStoreBatchingEnabled=true

# Maximum delay to impose on batching grouping
metadataStoreBatchingMaxDelayMillis=5

# Maximum number of operations to include in a singular batch
metadataStoreBatchingMaxOperations=1000

# Maximum size of a batch
metadataStoreBatchingMaxSizeKb=128


### --- Authentication --- ###

# Enable authentication
Expand Down
14 changes: 14 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,20 @@ maxConsumersPerSubscription=0
# Use 0 or negative number to disable the check
maxNumPartitionsPerPartitionedTopic=0

### --- Metadata Store --- ###

# Whether we should enable metadata operations batching
metadataStoreBatchingEnabled=true

# Maximum delay to impose on batching grouping
metadataStoreBatchingMaxDelayMillis=5

# Maximum number of operations to include in a singular batch
metadataStoreBatchingMaxOperations=1000

# Maximum size of a batch
metadataStoreBatchingMaxSizeKb=128

### --- TLS --- ###
# Deprecated - Use webServicePortTls and brokerServicePortTls instead
tlsEnabled=false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.client.api.DigestType;
Expand Down Expand Up @@ -324,6 +325,30 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private long topicLoadTimeoutSeconds = 60;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Whether we should enable metadata operations batching"
)
private boolean metadataStoreBatchingEnabled = true;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Maximum delay to impose on batching grouping"
)
private int metadataStoreBatchingMaxDelayMillis = 5;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Maximum number of operations to include in a singular batch"
)
private int metadataStoreBatchingMaxOperations = 1_000;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Maximum size of a batch"
)
private int metadataStoreBatchingMaxSizeKb = 128;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Configuration file path for local metadata store. It's supported by RocksdbMetadataStore for now."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,10 @@ public MetadataStore createConfigurationMetadataStore() throws MetadataStoreExce
.sessionTimeoutMillis((int) config.getZooKeeperSessionTimeoutMillis())
.allowReadOnlyOperations(false)
.configFilePath(config.getMetadataStoreConfigPath())
.batchingEnabled(config.isMetadataStoreBatchingEnabled())
.batchingMaxDelayMillis(config.getMetadataStoreBatchingMaxDelayMillis())
.batchingMaxOperations(config.getMetadataStoreBatchingMaxOperations())
.batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb())
.build());
}

Expand Down Expand Up @@ -910,6 +914,10 @@ public MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreExce
.sessionTimeoutMillis((int) config.getZooKeeperSessionTimeoutMillis())
.allowReadOnlyOperations(false)
.configFilePath(config.getMetadataStoreConfigPath())
.batchingEnabled(config.isMetadataStoreBatchingEnabled())
.batchingMaxDelayMillis(config.getMetadataStoreBatchingMaxDelayMillis())
.batchingMaxOperations(config.getMetadataStoreBatchingMaxOperations())
.batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb())
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ public void testCreateLookupResult() throws Exception {
pulsar.getLocalMetadataStore().put(path,
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(ld),
Optional.empty(),
EnumSet.of(CreateOption.Ephemeral));
EnumSet.of(CreateOption.Ephemeral)).join();

LookupResult noListener = pulsar.getNamespaceService().createLookupResult(candidateBroker, false, null).get();
LookupResult withListener = pulsar.getNamespaceService().createLookupResult(candidateBroker, false, listener).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,28 @@ public class MetadataStoreConfig {
*/
@Builder.Default
private final String configFilePath = null;

/**
* Whether we should enable metadata operations batching
*/
@Builder.Default
private final boolean batchingEnabled = true;

/**
* Maximum delay to impose on batching grouping
*/
@Builder.Default
private final int batchingMaxDelayMillis = 5;

/**
* Maximum number of operations to include in a singular batch
*/
@Builder.Default
private final int batchingMaxOperations = 1_000;

/**
* Maximum size of a batch
*/
@Builder.Default
private final int batchingMaxSizeKb = 128;
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand All @@ -60,7 +61,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co

private final CopyOnWriteArrayList<Consumer<Notification>> listeners = new CopyOnWriteArrayList<>();
private final CopyOnWriteArrayList<Consumer<SessionEvent>> sessionListeners = new CopyOnWriteArrayList<>();
private final ExecutorService executor;
protected final ScheduledExecutorService executor;
private final AsyncLoadingCache<String, List<String>> childrenCache;
private final AsyncLoadingCache<String, Boolean> existsCache;
private final CopyOnWriteArrayList<MetadataCacheImpl<?>> metadataCaches = new CopyOnWriteArrayList<>();
Expand All @@ -76,7 +77,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co

protected AbstractMetadataStore() {
this.executor = Executors
.newSingleThreadExecutor(new DefaultThreadFactory("metadata-store"));
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("metadata-store"));
registerListener(this);

this.childrenCache = Caffeine.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,9 @@ public void close() throws MetadataStoreException {

@Override
public CompletableFuture<Optional<GetResult>> storeGet(String path) {
log.info("getChildrenFromStore.path={},instanceId={}", path, instanceId);
if (log.isDebugEnabled()) {
log.debug("getFromStore.path={},instanceId={}", path, instanceId);
}
try {
dbStateLock.readLock().lock();
if (state == State.CLOSED) {
Expand Down Expand Up @@ -366,7 +368,9 @@ public CompletableFuture<Optional<GetResult>> storeGet(String path) {

@Override
protected CompletableFuture<List<String>> getChildrenFromStore(String path) {
log.info("getChildrenFromStore.path={},instanceId={}", path, instanceId);
if (log.isDebugEnabled()) {
log.debug("getChildrenFromStore.path={},instanceId={}", path, instanceId);
}
try {
dbStateLock.readLock().lock();
if (state == State.CLOSED) {
Expand Down Expand Up @@ -409,7 +413,9 @@ protected CompletableFuture<List<String>> getChildrenFromStore(String path) {

@Override
protected CompletableFuture<Boolean> existsFromStore(String path) {
log.info("existsFromStore.path={},instanceId={}", path, instanceId);
if (log.isDebugEnabled()) {
log.debug("existsFromStore.path={},instanceId={}", path, instanceId);
}
try {
dbStateLock.readLock().lock();
if (state == State.CLOSED) {
Expand All @@ -432,7 +438,9 @@ protected CompletableFuture<Boolean> existsFromStore(String path) {

@Override
protected CompletableFuture<Void> storeDelete(String path, Optional<Long> expectedVersion) {
log.info("storeDelete.path={},instanceId={}", path, instanceId);
if (log.isDebugEnabled()) {
log.debug("storeDelete.path={},instanceId={}", path, instanceId);
}
try {
dbStateLock.readLock().lock();
if (state == State.CLOSED) {
Expand Down Expand Up @@ -466,7 +474,9 @@ protected CompletableFuture<Void> storeDelete(String path, Optional<Long> expect
@Override
protected CompletableFuture<Stat> storePut(String path, byte[] data, Optional<Long> expectedVersion,
EnumSet<CreateOption> options) {
log.info("storePut.path={},instanceId={}", path, instanceId);
if (log.isDebugEnabled()) {
log.debug("storePut.path={},instanceId={}", path, instanceId);
}
try {
dbStateLock.readLock().lock();
if (state == State.CLOSED) {
Expand Down
Loading

0 comments on commit 0497f67

Please sign in to comment.