Skip to content

Commit

Permalink
[Pulsar SQL] Pulsar SQL support query big entry data (apache#12448)
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoran10 authored Oct 26, 2021
1 parent 6a2e3a1 commit a8f0788
Show file tree
Hide file tree
Showing 10 changed files with 122 additions and 44 deletions.
3 changes: 2 additions & 1 deletion conf/presto/catalog/pulsar.properties
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ pulsar.max-split-queue-cache-size=-1
# to prevent erroneous rewriting
pulsar.namespace-delimiter-rewrite-enable=false
pulsar.rewrite-namespace-delimiter=/

# max size of one batch message (default value is 5MB)
# pulsar.max-message-size=5242880

####### TIERED STORAGE OFFLOADER CONFIGS #######

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;

Expand Down Expand Up @@ -109,7 +110,8 @@ private ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig puls
.setReadEntryTimeout(60)
.setThrottleValue(pulsarConnectorConfig.getBookkeeperThrottleValue())
.setNumIOThreads(pulsarConnectorConfig.getBookkeeperNumIOThreads())
.setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads());
.setNumWorkerThreads(pulsarConnectorConfig.getBookkeeperNumWorkerThreads())
.setNettyMaxFrameSizeBytes(pulsarConnectorConfig.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING);

ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
managedLedgerFactoryConfig.setMaxCacheSize(pulsarConnectorConfig.getManagedLedgerCacheSizeMB());
Expand Down
3 changes: 3 additions & 0 deletions site2/docs/sql-deployment-configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ pulsar.entry-read-batch-size=100

# default number of splits to use per query
pulsar.target-num-splits=4

# max size of one batch message (default value is 5MB)
pulsar.max-message-size=5242880
```

You can connect Presto to a Pulsar cluster with multiple hosts. To configure multiple hosts for brokers, add multiple URLs to `pulsar.web-service-url`. To configure multiple hosts for ZooKeeper, add multiple URIs to `pulsar.zookeeper-uri`. The following is an example.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,6 @@ public class BKContainer extends PulsarContainer<BKContainer> {
public BKContainer(String clusterName, String hostName) {
super(
clusterName, hostName, hostName, "bin/run-bookie.sh", BOOKIE_PORT, INVALID_PORT);
tailContainerLog();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
Expand All @@ -56,6 +56,7 @@ public class TestBasicPresto extends TestPulsarSQLBase {
private void setupPresto() throws Exception {
log.info("[TestBasicPresto] setupPresto...");
pulsarCluster.startPrestoWorker();
initJdbcConnection();
}

private void teardownPresto() {
Expand Down Expand Up @@ -161,31 +162,26 @@ protected int prepareData(TopicName topicName,
boolean useNsOffloadPolices,
Schema schema,
CompressionType compressionType) throws Exception {
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();

if (schema.getSchemaInfo().getName().equals(Schema.BYTES.getSchemaInfo().getName())) {
prepareDataForBytesSchema(pulsarClient, topicName, isBatch, compressionType);
prepareDataForBytesSchema(topicName, isBatch, compressionType);
} else if (schema.getSchemaInfo().getName().equals(Schema.BYTEBUFFER.getSchemaInfo().getName())) {
prepareDataForByteBufferSchema(pulsarClient, topicName, isBatch, compressionType);
prepareDataForByteBufferSchema(topicName, isBatch, compressionType);
} else if (schema.getSchemaInfo().getType().equals(SchemaType.STRING)) {
prepareDataForStringSchema(pulsarClient, topicName, isBatch, compressionType);
prepareDataForStringSchema(topicName, isBatch, compressionType);
} else if (schema.getSchemaInfo().getType().equals(SchemaType.JSON)
|| schema.getSchemaInfo().getType().equals(SchemaType.AVRO)) {
prepareDataForStructSchema(pulsarClient, topicName, isBatch, schema, compressionType);
prepareDataForStructSchema(topicName, isBatch, schema, compressionType);
} else if (schema.getSchemaInfo().getType().equals(SchemaType.PROTOBUF_NATIVE)) {
prepareDataForProtobufNativeSchema(pulsarClient, topicName, isBatch, schema, compressionType);
prepareDataForProtobufNativeSchema(topicName, isBatch, schema, compressionType);
} else if (schema.getSchemaInfo().getType().equals(SchemaType.KEY_VALUE)) {
prepareDataForKeyValueSchema(pulsarClient, topicName, schema, compressionType);
prepareDataForKeyValueSchema(topicName, schema, compressionType);
}

return NUM_OF_STOCKS;
}

private void prepareDataForBytesSchema(PulsarClient pulsarClient,
TopicName topicName,
private void prepareDataForBytesSchema(TopicName topicName,
boolean isBatch,
CompressionType compressionType) throws PulsarClientException {
@Cleanup
Expand All @@ -201,8 +197,7 @@ private void prepareDataForBytesSchema(PulsarClient pulsarClient,
producer.flush();
}

private void prepareDataForByteBufferSchema(PulsarClient pulsarClient,
TopicName topicName,
private void prepareDataForByteBufferSchema(TopicName topicName,
boolean isBatch,
CompressionType compressionType) throws PulsarClientException {
@Cleanup
Expand All @@ -218,8 +213,7 @@ private void prepareDataForByteBufferSchema(PulsarClient pulsarClient,
producer.flush();
}

private void prepareDataForStringSchema(PulsarClient pulsarClient,
TopicName topicName,
private void prepareDataForStringSchema(TopicName topicName,
boolean isBatch,
CompressionType compressionType) throws PulsarClientException {
@Cleanup
Expand All @@ -235,8 +229,7 @@ private void prepareDataForStringSchema(PulsarClient pulsarClient,
producer.flush();
}

private void prepareDataForStructSchema(PulsarClient pulsarClient,
TopicName topicName,
private void prepareDataForStructSchema(TopicName topicName,
boolean isBatch,
Schema<Stock> schema,
CompressionType compressionType) throws Exception {
Expand All @@ -254,8 +247,7 @@ private void prepareDataForStructSchema(PulsarClient pulsarClient,
producer.flush();
}

private void prepareDataForProtobufNativeSchema(PulsarClient pulsarClient,
TopicName topicName,
private void prepareDataForProtobufNativeSchema(TopicName topicName,
boolean isBatch,
Schema<StockProtoMessage.Stock> schema,
CompressionType compressionType) throws Exception {
Expand All @@ -274,8 +266,7 @@ private void prepareDataForProtobufNativeSchema(PulsarClient pulsarClient,
producer.flush();
}

private void prepareDataForKeyValueSchema(PulsarClient pulsarClient,
TopicName topicName,
private void prepareDataForKeyValueSchema(TopicName topicName,
Schema<KeyValue<Stock, Stock>> schema,
CompressionType compressionType) throws Exception {
@Cleanup
Expand Down Expand Up @@ -342,4 +333,33 @@ private void validateContentForKeyValueSchema(int messageNum, String[] contentAr
}
}

@Test(timeOut = 1000 * 30)
public void testQueueBigEntry() throws Exception {
String tableName = "big_data_" + randomName(5);
String topic = "persistent://public/default/" + tableName;

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topic)
.enableBatching(false)
.create();

// Make sure that the data length bigger than the default maxMessageSize
int dataLength = Commands.DEFAULT_MAX_MESSAGE_SIZE + 2 * 1024 * 1024;
Assert.assertTrue(dataLength < pulsarCluster.getSpec().maxMessageSize());
byte[] data = new byte[dataLength];
for (int i = 0; i < dataLength; i++) {
data[i] = 'a';
}

int messageCnt = 5;
log.info("start produce big entry data, data length: {}", dataLength);
for (int i = 0 ; i < messageCnt; ++i) {
producer.newMessage().value(data).send();
}

int count = selectCount("public/default", tableName);
Assert.assertEquals(count, messageCnt);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.MessageIdImpl;
Expand All @@ -40,8 +39,6 @@
import org.apache.pulsar.tests.integration.containers.S3Container;
import org.testcontainers.shaded.org.apache.commons.lang.StringUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


Expand Down Expand Up @@ -89,6 +86,7 @@ private void setupExtraContainers() throws Exception {
String offloadProperties = getOffloadProperties(BUCKET, null, ENDPOINT);
pulsarCluster.startPrestoWorker(OFFLOAD_DRIVER, offloadProperties);
pulsarCluster.startPrestoFollowWorkers(1, OFFLOAD_DRIVER, offloadProperties);
initJdbcConnection();
}

private String getOffloadProperties(String bucket, String region, String endpoint) {
Expand Down Expand Up @@ -136,11 +134,6 @@ protected int prepareData(TopicName topicName,
Schema schema,
CompressionType compressionType) throws Exception {
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();

@Cleanup
Consumer<Stock> consumer = pulsarClient.newConsumer(JSONSchema.of(Stock.class))
.topic(topicName.toString())
.subscriptionName("test")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import static org.assertj.core.api.Assertions.assertThat;

import com.google.common.base.Stopwatch;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
Expand Down Expand Up @@ -196,9 +194,6 @@ private void validateData(TopicName topicName, int messageNum, Schema schema) th
);

// test predicate pushdown
String url = String.format("jdbc:presto://%s", pulsarCluster.getPrestoWorkerContainer().getUrl());
Connection connection = DriverManager.getConnection(url, "test", null);

String query = String.format("select * from pulsar" +
".\"%s\".\"%s\" order by __publish_time__", namespace, topic);
log.info("Executing query: {}", query);
Expand Down Expand Up @@ -267,11 +262,7 @@ private void validateData(TopicName topicName, int messageNum, Schema schema) th
log.info("Executing query: result for topic {} returnedTimestamps size: {}", topic, returnedTimestamps.size());
assertThat(returnedTimestamps.size()).isEqualTo(0);

query = String.format("select count(*) from pulsar.\"%s\".\"%s\"", namespace, topic);
log.info("Executing query: {}", query);
res = connection.createStatement().executeQuery(query);
res.next();
int count = res.getInt("_col0");
int count = selectCount(namespace, topic);
assertThat(count).isGreaterThan(messageNum - 2);
}

Expand Down Expand Up @@ -304,5 +295,12 @@ private static void printCurrent(ResultSet rs) throws SQLException {

}

protected int selectCount(String namespace, String tableName) throws SQLException {
String query = String.format("select count(*) from pulsar.\"%s\".\"%s\"", namespace, tableName);
log.info("Executing count query: {}", query);
ResultSet res = connection.createStatement().executeQuery(query);
res.next();
return res.getInt("_col0");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,22 @@
*/
package org.apache.pulsar.tests.integration.suites;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.tests.integration.containers.BrokerContainer;
import org.apache.pulsar.tests.integration.containers.S3Container;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;

/**
* Pulsar SQL test suite.
*/
@Slf4j
public abstract class PulsarSQLTestSuite extends PulsarTestSuite {

Expand All @@ -33,11 +42,15 @@ public abstract class PulsarSQLTestSuite extends PulsarTestSuite {
public static final String BUCKET = "pulsar-integtest";
public static final String ENDPOINT = "http://" + S3Container.NAME + ":9090";

protected Connection connection = null;
protected PulsarClient pulsarClient = null;

@Override
protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(String clusterName, PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
specBuilder.queryLastMessage(true);
specBuilder.clusterName("pulsar-sql-test");
specBuilder.numBrokers(1);
specBuilder.maxMessageSize(2 * Commands.DEFAULT_MAX_MESSAGE_SIZE);
return super.beforeSetupCluster(clusterName, specBuilder);
}

Expand All @@ -55,4 +68,43 @@ protected void beforeStartCluster() throws Exception {
}
}

@Override
public void setupCluster() throws Exception {
super.setupCluster();
pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarCluster.getPlainTextServiceUrl())
.build();
}

protected void initJdbcConnection() throws SQLException {
if (pulsarCluster.getPrestoWorkerContainer() == null) {
log.error("The presto work container isn't exist.");
return;
}
String url = String.format("jdbc:presto://%s", pulsarCluster.getPrestoWorkerContainer().getUrl());
connection = DriverManager.getConnection(url, "test", null);
}

@Override
public void tearDownCluster() throws Exception {
close();
super.tearDownCluster();
}

protected void close() {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
log.error("Failed to close sql connection.", e);
}
}
if (pulsarClient != null) {
try {
pulsarClient.close();
} catch (PulsarClientException e) {
log.error("Failed to close pulsar client.", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public static PulsarCluster forSpec(PulsarClusterSpec spec) {
return new PulsarCluster(spec);
}

@Getter
private final PulsarClusterSpec spec;

@Getter
Expand Down Expand Up @@ -150,6 +151,7 @@ private PulsarCluster(PulsarClusterSpec spec) {
.withEnv("journalMaxGroupWaitMSec", "0")
.withEnv("clusterName", clusterName)
.withEnv("diskUsageThreshold", "0.99")
.withEnv("nettyMaxFrameSizeBytes", "" + spec.maxMessageSize)
)
);

Expand All @@ -166,7 +168,8 @@ private PulsarCluster(PulsarClusterSpec spec) {
.withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1")
// used in s3 tests
.withEnv("AWS_ACCESS_KEY_ID", "accesskey")
.withEnv("AWS_SECRET_KEY", "secretkey");
.withEnv("AWS_SECRET_KEY", "secretkey")
.withEnv("maxMessageSize", "" + spec.maxMessageSize);
if (spec.queryLastMessage) {
brokerContainer.withEnv("bookkeeperExplicitLacIntervalInMills", "10");
brokerContainer.withEnv("bookkeeperUseV2WireProtocol", "false");
Expand Down Expand Up @@ -420,6 +423,7 @@ private PrestoWorkerContainer buildPrestoWorkerContainer(String hostName, boolea
.withEnv("zookeeperServers", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
.withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT)
.withEnv("pulsar.web-service-url", "http://pulsar-broker-0:8080")
.withEnv("SQL_PREFIX_pulsar.max-message-size", "" + spec.maxMessageSize)
.withClasspathResourceMapping(
resourcePath, "/pulsar/conf/presto/config.properties", BindMode.READ_WRITE);
if (spec.queryLastMessage) {
Expand Down
Loading

0 comments on commit a8f0788

Please sign in to comment.