Skip to content

Commit

Permalink
[Pulsar SQL] Add max split entry queue size bytes limitation (apache#…
Browse files Browse the repository at this point in the history
…9628)

### Motivation

In Pulsar SQL, there are two configurations `pulsar.max-split-entry-queue-size` and `pulsar.max-split-message-queue-size` to control the entry queue and message queue capacity, but some entries are so big some are small, it's hard to control the queue size bytes and the message queue size bytes.

### Modifications

Add a new configuration `pulsar.max-split-queue-cache-size` to control the entry queue size bytes and the message queue size bytes. Half of this configuration will assign to entry queue size bytes and the left quota assign to message queue size bytes.
  • Loading branch information
gaoran10 authored Mar 14, 2021
1 parent c94d782 commit 1f6ce7a
Show file tree
Hide file tree
Showing 14 changed files with 525 additions and 13 deletions.
3 changes: 3 additions & 0 deletions conf/presto/catalog/pulsar.properties
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ pulsar.target-num-splits=2
pulsar.max-split-message-queue-size=10000
# max entry queue size
pulsar.max-split-entry-queue-size=1000
# half of this value is used as max entry queue size bytes and the left is used as max message queue size bytes,
# the queue size bytes shouldn't exceed this value, but it's not strict, the default value -1 indicate no limit.
pulsar.max-split-queue-cache-size=-1
# Rewrite namespace delimiter
# Warn: avoid using symbols allowed by Namespace (a-zA-Z_0-9 -=:%)
# to prevent erroneous rewriting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ protected void waitForZooKeeperWatchers() {
}
}

public TenantInfo createDefaultTenantInfo() throws PulsarAdminException {
protected TenantInfo createDefaultTenantInfo() throws PulsarAdminException {
// create local cluster if not exist
if (!admin.clusters().getClusters().contains(configClusterName)) {
admin.clusters().createCluster(configClusterName, new ClusterData());
Expand Down
29 changes: 29 additions & 0 deletions pulsar-sql/presto-pulsar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,35 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-broker</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-broker</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>testmocks</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http</artifactId>
<version>${jetty.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class PulsarConnectorConfig implements AutoCloseable {
private int targetNumSplits = 2;
private int maxSplitMessageQueueSize = 10000;
private int maxSplitEntryQueueSize = 1000;
private long maxSplitQueueSizeBytes = -1;
private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
private String statsProvider = NullStatsProvider.class.getName();

Expand Down Expand Up @@ -159,6 +160,17 @@ public PulsarConnectorConfig setMaxSplitEntryQueueSize(int maxSplitEntryQueueSiz
return this;
}

@NotNull
public long getMaxSplitQueueSizeBytes() {
return this.maxSplitQueueSizeBytes;
}

@Config("pulsar.max-split-queue-cache-size")
public PulsarConnectorConfig setMaxSplitQueueSizeBytes(long maxSplitQueueSizeBytes) {
this.maxSplitQueueSizeBytes = maxSplitQueueSizeBytes;
return this;
}

@NotNull
public String getStatsProvider() {
return statsProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.sql.presto.util.CacheSizeAllocator;
import org.apache.pulsar.sql.presto.util.NoStrictCacheSizeAllocator;
import org.apache.pulsar.sql.presto.util.NullCacheSizeAllocator;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.SpscArrayQueue;

Expand All @@ -79,7 +82,9 @@ public class PulsarRecordCursor implements RecordCursor {
private PulsarConnectorConfig pulsarConnectorConfig;
private ReadOnlyCursor cursor;
private SpscArrayQueue<RawMessage> messageQueue;
private CacheSizeAllocator messageQueueCacheSizeAllocator;
private SpscArrayQueue<Entry> entryQueue;
private CacheSizeAllocator entryQueueCacheSizeAllocator;
private RawMessage currentMessage;
private int maxBatchSize;
private long completedBytes = 0;
Expand Down Expand Up @@ -137,6 +142,7 @@ public PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pu
pulsarConnectorConfig),
new PulsarConnectorMetricsTracker(pulsarConnectorCache.getStatsProvider()));
this.decoderFactory = decoderFactory;
initEntryCacheSizeAllocator(pulsarConnectorConfig);
}

// Exposed for testing purposes
Expand Down Expand Up @@ -167,6 +173,7 @@ private void initialize(List<PulsarColumnHandle> columnHandles, PulsarSplit puls
this.metricsTracker = pulsarConnectorMetricsTracker;
this.readOffloaded = pulsarConnectorConfig.getManagedLedgerOffloadDriver() != null;
this.pulsarConnectorConfig = pulsarConnectorConfig;
initEntryCacheSizeAllocator(pulsarConnectorConfig);

try {
this.schemaInfoProvider = new PulsarSqlSchemaInfoProvider(this.topicName,
Expand Down Expand Up @@ -222,7 +229,7 @@ public void setPulsarSqlSchemaInfoProvider(PulsarSqlSchemaInfoProvider schemaInf
@VisibleForTesting
class DeserializeEntries implements Runnable {

protected boolean isRunning = false;
protected boolean isRunning = false;

private final Thread thread;

Expand All @@ -249,6 +256,8 @@ public void run() {
public void accept(Entry entry) {

try {
entryQueueCacheSizeAllocator.release(entry.getLength());

long bytes = entry.getDataBuffer().readableBytes();
completedBytes += bytes;
// register stats for bytes read
Expand All @@ -269,9 +278,16 @@ public void accept(Entry entry) {
// start time for message queue read
metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();

// enqueue deserialize message from this entry
while (!messageQueue.offer(message)) {
Thread.sleep(1);
while (true) {
if (!haveAvailableCacheSize(
messageQueueCacheSizeAllocator, messageQueue)
|| !messageQueue.offer(message)) {
Thread.sleep(1);
} else {
messageQueueCacheSizeAllocator.allocate(
message.getData().readableBytes());
break;
}
}

// stats for how long a read from message queue took
Expand Down Expand Up @@ -335,7 +351,7 @@ public void run() {

ReadOnlyCursorImpl readOnlyCursorImpl = ((ReadOnlyCursorImpl) cursor);
// check if ledger is offloaded
if (!readOffloaded && readOnlyCursorImpl.getCurrentLedgerInfo().hasOffloadContext()) {
if (!readOffloaded && readOnlyCursorImpl.getCurrentLedgerInfo().hasOffloadContext()) {
log.warn(
"Ledger %s is offloaded for topic %s. Ignoring it because offloader is not configured",
readOnlyCursorImpl.getCurrentLedgerInfo().getLedgerId(), pulsarSplit.getTableName());
Expand All @@ -347,8 +363,14 @@ public void run() {

entriesProcessed += entriesToSkip;
} else {
if (!haveAvailableCacheSize(entryQueueCacheSizeAllocator, entryQueue)) {
metricsTracker.incr_READ_ATTEMPTS_FAIL();
return;
}
// if the available size is invalid and the entry queue size is 0, read one entry
outstandingReadsRequests.decrementAndGet();
cursor.asyncReadEntries(batchSize, this, System.nanoTime(), PositionImpl.latest);
cursor.asyncReadEntries(batchSize, entryQueueCacheSizeAllocator.getAvailableCacheSize(),
this, System.nanoTime(), PositionImpl.latest);
}

// stats for successful read request
Expand All @@ -370,6 +392,7 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
public Entry get() {
Entry entry = entries.get(i);
i++;
entryQueueCacheSizeAllocator.allocate(entry.getLength());
return entry;
}
}, entries.size());
Expand Down Expand Up @@ -399,6 +422,19 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
}
}

/**
* Check the queue has available cache size quota or not.
* 1. If the CacheSizeAllocator is NullCacheSizeAllocator, return true.
* 2. If the available cache size > 0, return true.
* 3. If the available cache size is invalid and the queue size == 0, return true, ensure not block the query.
*/
private boolean haveAvailableCacheSize(CacheSizeAllocator cacheSizeAllocator, SpscArrayQueue queue) {
if (cacheSizeAllocator instanceof NullCacheSizeAllocator) {
return true;
}
return cacheSizeAllocator.getAvailableCacheSize() > 0 || queue.size() == 0;
}

@Override
public boolean advanceNextPosition() {

Expand Down Expand Up @@ -427,6 +463,7 @@ public boolean advanceNextPosition() {

currentMessage = messageQueue.poll();
if (currentMessage != null) {
messageQueueCacheSizeAllocator.release(currentMessage.getData().readableBytes());
break;
} else {
try {
Expand Down Expand Up @@ -654,4 +691,19 @@ private void checkFieldType(int field, Class<?> expected) {
checkArgument(actual == expected, "Expected field %s to be type %s but is %s", field, expected, actual);
}

private void initEntryCacheSizeAllocator(PulsarConnectorConfig connectorConfig) {
if (connectorConfig.getMaxSplitQueueSizeBytes() >= 0) {
this.entryQueueCacheSizeAllocator = new NoStrictCacheSizeAllocator(
connectorConfig.getMaxSplitQueueSizeBytes() / 2);
this.messageQueueCacheSizeAllocator = new NoStrictCacheSizeAllocator(
connectorConfig.getMaxSplitQueueSizeBytes() / 2);
log.info("Init cacheSizeAllocator with maxSplitEntryQueueSizeBytes {}.",
connectorConfig.getMaxSplitQueueSizeBytes());
} else {
this.entryQueueCacheSizeAllocator = new NullCacheSizeAllocator();
this.messageQueueCacheSizeAllocator = new NullCacheSizeAllocator();
log.info("Init cacheSizeAllocator with NullCacheSizeAllocator.");
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.sql.presto.util;

/**
* Cache size allocator.
*/
public interface CacheSizeAllocator {

/**
* Get available cache size.
*
* @return available cache size
*/
public long getAvailableCacheSize();

/**
* Cost available cache.
*
* @param size allocate size
*/
public void allocate(long size);

/**
* Release allocated cache size.
*
* @param size release size
*/
public void release(long size);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.sql.presto.util;

import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* Cache size allocator.
*/
public class NoStrictCacheSizeAllocator implements CacheSizeAllocator {

private final long maxCacheSize;
private final LongAdder availableCacheSize;
private final ReentrantLock lock;

public NoStrictCacheSizeAllocator(long maxCacheSize) {
this.maxCacheSize = maxCacheSize;
this.availableCacheSize = new LongAdder();
this.availableCacheSize.add(maxCacheSize);
this.lock = new ReentrantLock();
}

public long getAvailableCacheSize() {
if (availableCacheSize.longValue() < 0) {
return 0;
}
return availableCacheSize.longValue();
}

/**
* This operation will cost available cache size.
* if the request size exceed the available size, it's should be allowed,
* because maybe one entry size exceed the size and
* the query must be finished, the available size will become invalid.
*
* @param size allocate size
*/
public void allocate(long size) {
try {
lock.lock();
availableCacheSize.add(-size);
} finally {
lock.unlock();
}
}

/**
* This method used to release used cache size and add available cache size.
* in normal case, the available size shouldn't exceed max cache size.
*
* @param size release size
*/
public void release(long size) {
try {
lock.lock();
availableCacheSize.add(size);
if (availableCacheSize.longValue() > maxCacheSize) {
availableCacheSize.reset();
availableCacheSize.add(maxCacheSize);
}
} finally {
lock.unlock();
}
}

}
Loading

0 comments on commit 1f6ce7a

Please sign in to comment.