diff --git a/docker/pulsar/Dockerfile b/docker/pulsar/Dockerfile index 561a8c50f2a3a..69b8b6768bc60 100644 --- a/docker/pulsar/Dockerfile +++ b/docker/pulsar/Dockerfile @@ -26,6 +26,7 @@ ADD ${PULSAR_TARBALL} / RUN mv /apache-pulsar-* /pulsar COPY scripts/apply-config-from-env.py /pulsar/bin +COPY scripts/apply-config-from-env-with-prefix.py /pulsar/bin COPY scripts/gen-yml-from-env.py /pulsar/bin COPY scripts/generate-zookeeper-config.sh /pulsar/bin COPY scripts/pulsar-zookeeper-ruok.sh /pulsar/bin diff --git a/docker/pulsar/scripts/apply-config-from-env-with-prefix.py b/docker/pulsar/scripts/apply-config-from-env-with-prefix.py new file mode 100755 index 0000000000000..670ba9a97cf23 --- /dev/null +++ b/docker/pulsar/scripts/apply-config-from-env-with-prefix.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +## +## Edit a properties config file and replace values based on +## the ENV variables +## export prefix_my-key=new-value +## ./apply-config-from-env-with-prefix prefix_ file.conf +## + +import os, sys + +if len(sys.argv) < 3: + print('Usage: %s' % (sys.argv[0])) + sys.exit(1) + +# Always apply env config to env scripts as well +prefix = sys.argv[1] +conf_files = sys.argv[2:] + + +for conf_filename in conf_files: + lines = [] # List of config file lines + keys = {} # Map a key to its line number in the file + + # Load conf file + for line in open(conf_filename): + lines.append(line) + line = line.strip() + if not line or line.startswith('#'): + continue + + try: + k,v = line.split('=', 1) + keys[k] = len(lines) - 1 + except: + print("[%s] skip Processing %s" % (conf_filename, line)) + + # Update values from Env + for k in sorted(os.environ.keys()): + v = os.environ[k].strip() + + # Hide the value in logs if is password. + if "password" in k: + displayValue = "********" + else: + displayValue = v + + if k.startswith(prefix): + k = k[len(prefix):] + if k in keys: + print('[%s] Applying config %s = %s' % (conf_filename, k, displayValue)) + idx = keys[k] + lines[idx] = '%s=%s\n' % (k, v) + + + # Add new keys from Env + for k in sorted(os.environ.keys()): + v = os.environ[k] + if not k.startswith(prefix): + continue + + # Hide the value in logs if is password. + if "password" in k: + displayValue = "********" + else: + displayValue = v + + k = k[len(prefix):] + if k not in keys: + print('[%s] Adding config %s = %s' % (conf_filename, k, displayValue)) + lines.append('%s=%s\n' % (k, v)) + else: + print('[%s] Updating config %s = %s' % (conf_filename, k, displayValue)) + lines[keys[k]] = '%s=%s\n' % (k, v) + + + # Store back the updated config in the same file + f = open(conf_filename, 'w') + for line in lines: + f.write(line) + f.close() + diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 81a1dc991d64b..09d752003a38d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3435,6 +3435,7 @@ public static ManagedLedgerException createManagedLedgerException(Throwable t) { && !(t.getCause() instanceof CompletionException) /* check to avoid stackoverlflow */) { return createManagedLedgerException(t.getCause()); } else { + log.error("Unknown exception for ManagedLedgerException.", t); return new ManagedLedgerException("Unknown exception"); } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java index 6620081038534..4417f21296eb7 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java @@ -19,6 +19,8 @@ package org.apache.pulsar.common.policies.data; import static org.apache.pulsar.common.util.FieldParser.value; + +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; import java.io.Serializable; @@ -126,67 +128,93 @@ public String getValue() { // common config @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private String offloadersDirectory = DEFAULT_OFFLOADER_DIRECTORY; @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private String managedLedgerOffloadDriver = null; @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private Integer managedLedgerOffloadMaxThreads = DEFAULT_OFFLOAD_MAX_THREADS; @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private Integer managedLedgerOffloadPrefetchRounds = DEFAULT_OFFLOAD_MAX_PREFETCH_ROUNDS; @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private Long managedLedgerOffloadThresholdInBytes = DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES; @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private Long managedLedgerOffloadDeletionLagInMillis = DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS; @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private OffloadedReadPriority managedLedgerOffloadedReadPriority = DEFAULT_OFFLOADED_READ_PRIORITY; // s3 config, set by service configuration or cli @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private String s3ManagedLedgerOffloadRegion = null; @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private String s3ManagedLedgerOffloadBucket = null; @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private String s3ManagedLedgerOffloadServiceEndpoint = null; @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private Integer s3ManagedLedgerOffloadMaxBlockSizeInBytes = DEFAULT_MAX_BLOCK_SIZE_IN_BYTES; @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private Integer s3ManagedLedgerOffloadReadBufferSizeInBytes = DEFAULT_READ_BUFFER_SIZE_IN_BYTES; // s3 config, set by service configuration @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private String s3ManagedLedgerOffloadRole = null; @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private String s3ManagedLedgerOffloadRoleSessionName = "pulsar-s3-offload"; // gcs config, set by service configuration or cli @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private String gcsManagedLedgerOffloadRegion = null; @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private String gcsManagedLedgerOffloadBucket = null; @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private Integer gcsManagedLedgerOffloadMaxBlockSizeInBytes = DEFAULT_MAX_BLOCK_SIZE_IN_BYTES; @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private Integer gcsManagedLedgerOffloadReadBufferSizeInBytes = DEFAULT_READ_BUFFER_SIZE_IN_BYTES; // gcs config, set by service configuration @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private String gcsManagedLedgerOffloadServiceAccountKeyFile = null; // file system config, set by service configuration @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private String fileSystemProfilePath = null; @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private String fileSystemURI = null; // --------- new offload configurations --------- // they are universal configurations and could be used to `aws-s3`, `google-cloud-storage` or `azureblob`. @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private String managedLedgerOffloadBucket; @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private String managedLedgerOffloadRegion; @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private String managedLedgerOffloadServiceEndpoint; @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private Integer managedLedgerOffloadMaxBlockSizeInBytes; @Configuration + @JsonProperty(access = JsonProperty.Access.READ_WRITE) private Integer managedLedgerOffloadReadBufferSizeInBytes; public static OffloadPolicies create(String driver, String region, String bucket, String endpoint, @@ -572,7 +600,7 @@ private static Object getCompatibleValue(Properties properties, Field field) { if (field.getName().equals("managedLedgerOffloadThresholdInBytes")) { object = properties.getProperty("managedLedgerOffloadThresholdInBytes", properties.getProperty(OFFLOAD_THRESHOLD_NAME_IN_CONF_FILE)); - } else if (field.getName().equals("")) { + } else if (field.getName().equals("managedLedgerOffloadDeletionLagInMillis")) { object = properties.getProperty("managedLedgerOffloadDeletionLagInMillis", properties.getProperty(DELETION_LAG_NAME_IN_CONF_FILE)); } else { diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java index 16fe8a5a8c039..3485f988ef1f6 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java @@ -128,6 +128,11 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHand try { OffloadPolicies offloadPolicies = this.pulsarAdmin.namespaces() .getOffloadPolicies(topicName.getNamespace()); + if (offloadPolicies != null) { + offloadPolicies.setOffloadersDirectory(pulsarConnectorConfig.getOffloadersDirectory()); + offloadPolicies.setManagedLedgerOffloadMaxThreads( + pulsarConnectorConfig.getManagedLedgerOffloadMaxThreads()); + } if (!PulsarConnectorUtils.isPartitionedTopic(topicName, this.pulsarAdmin)) { splits = getSplitsNonPartitionedTopic( numSplits, topicName, tableHandle, schemaInfo, tupleDomain, offloadPolicies); @@ -160,8 +165,10 @@ Collection getSplitsPartitionedTopic(int numSplits, TopicName topic int splitRemainder = actualNumSplits % predicatedPartitions.size(); - ManagedLedgerFactory managedLedgerFactory = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig) - .getManagedLedgerFactory(); + PulsarConnectorCache pulsarConnectorCache = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig); + ManagedLedgerFactory managedLedgerFactory = pulsarConnectorCache.getManagedLedgerFactory(); + ManagedLedgerConfig managedLedgerConfig = pulsarConnectorCache.getManagedLedgerConfig( + topicName.getNamespaceObject(), offloadPolicies, pulsarConnectorConfig); List splits = new LinkedList<>(); for (int i = 0; i < predicatedPartitions.size(); i++) { @@ -170,6 +177,7 @@ Collection getSplitsPartitionedTopic(int numSplits, TopicName topic getSplitsForTopic( topicName.getPartition(predicatedPartitions.get(i)).getPersistenceNamingEncoding(), managedLedgerFactory, + managedLedgerConfig, splitsForThisPartition, tableHandle, schemaInfo, @@ -231,12 +239,15 @@ private List getPredicatedPartitions(TopicName topicName, TupleDomain getSplitsNonPartitionedTopic(int numSplits, TopicName topicName, PulsarTableHandle tableHandle, SchemaInfo schemaInfo, TupleDomain tupleDomain, OffloadPolicies offloadPolicies) throws Exception { - ManagedLedgerFactory managedLedgerFactory = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig) - .getManagedLedgerFactory(); + PulsarConnectorCache pulsarConnectorCache = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig); + ManagedLedgerFactory managedLedgerFactory = pulsarConnectorCache.getManagedLedgerFactory(); + ManagedLedgerConfig managedLedgerConfig = pulsarConnectorCache.getManagedLedgerConfig( + topicName.getNamespaceObject(), offloadPolicies, pulsarConnectorConfig); return getSplitsForTopic( topicName.getPersistenceNamingEncoding(), managedLedgerFactory, + managedLedgerConfig, numSplits, tableHandle, schemaInfo, @@ -248,6 +259,7 @@ Collection getSplitsNonPartitionedTopic(int numSplits, TopicName to @VisibleForTesting Collection getSplitsForTopic(String topicNamePersistenceEncoding, ManagedLedgerFactory managedLedgerFactory, + ManagedLedgerConfig managedLedgerConfig, int numSplits, PulsarTableHandle tableHandle, SchemaInfo schemaInfo, String tableName, @@ -259,7 +271,7 @@ Collection getSplitsForTopic(String topicNamePersistenceEncoding, try { readOnlyCursor = managedLedgerFactory.openReadOnlyCursor( topicNamePersistenceEncoding, - PositionImpl.earliest, new ManagedLedgerConfig()); + PositionImpl.earliest, managedLedgerConfig); long numEntries = readOnlyCursor.getNumberOfEntries(); if (numEntries <= 0) { @@ -270,6 +282,7 @@ Collection getSplitsForTopic(String topicNamePersistenceEncoding, this.connectorId, tupleDomain, managedLedgerFactory, + managedLedgerConfig, topicNamePersistenceEncoding, numEntries); @@ -341,6 +354,7 @@ private PredicatePushdownInfo(PositionImpl startPosition, PositionImpl endPositi public static PredicatePushdownInfo getPredicatePushdownInfo(String connectorId, TupleDomain tupleDomain, ManagedLedgerFactory managedLedgerFactory, + ManagedLedgerConfig managedLedgerConfig, String topicNamePersistenceEncoding, long totalNumEntries) throws ManagedLedgerException, InterruptedException { @@ -349,7 +363,7 @@ public static PredicatePushdownInfo getPredicatePushdownInfo(String connectorId, try { readOnlyCursor = managedLedgerFactory.openReadOnlyCursor( topicNamePersistenceEncoding, - PositionImpl.earliest, new ManagedLedgerConfig()); + PositionImpl.earliest, managedLedgerConfig); if (tupleDomain.getDomains().isPresent()) { Domain domain = tupleDomain.getDomains().get().get(PulsarInternalColumn.PUBLISH_TIME diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java index 3d185c92de968..82bff48d93d80 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarSplitManager.java @@ -18,6 +18,9 @@ */ package org.apache.pulsar.sql.presto; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.airlift.json.JsonCodec; import io.airlift.log.Logger; import io.prestosql.spi.connector.ColumnHandle; import io.prestosql.spi.connector.ConnectorSession; @@ -28,12 +31,17 @@ import io.prestosql.spi.predicate.TupleDomain; import io.prestosql.spi.predicate.ValueSet; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.OffloadPolicies; +import org.apache.pulsar.common.schema.SchemaInfo; +import org.apache.pulsar.common.schema.SchemaType; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.testng.Assert; import org.testng.annotations.Test; +import java.io.UnsupportedEncodingException; import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; @@ -53,6 +61,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.fail; public class TestPulsarSplitManager extends TestPulsarConnector { @@ -401,4 +410,77 @@ public void testGetSplitNonSchema(String delimiter) throws Exception { pulsarTableLayoutHandle, null); assertNotNull(connectorSplitSource); } + + @Test + public void pulsarSplitJsonCodecTest() throws JsonProcessingException, UnsupportedEncodingException { + OffloadPolicies offloadPolicies = OffloadPolicies.create( + "aws-s3", + "test-region", + "test-bucket", + "test-endpoint", + "test-credential-id", + "test-credential-secret", + 5000, + 2000, + 1000L, + 5000L, + OffloadPolicies.OffloadedReadPriority.BOOKKEEPER_FIRST + ); + + SchemaInfo schemaInfo = JSONSchema.of(Foo.class).getSchemaInfo(); + final String schema = new String(schemaInfo.getSchema(), "ISO8859-1"); + final String originSchemaName = schemaInfo.getName(); + final String schemaName = schemaInfo.getName(); + final String schemaInfoProperties = new ObjectMapper().writeValueAsString(schemaInfo.getProperties()); + final SchemaType schemaType = schemaInfo.getType(); + + final long splitId = 1; + final String connectorId = "connectorId"; + final String tableName = "tableName"; + final long splitSize = 5; + final long startPositionEntryId = 22; + final long endPositionEntryId = 33; + final long startPositionLedgerId = 10; + final long endPositionLedgerId = 21; + final TupleDomain tupleDomain = TupleDomain.all(); + + byte[] pulsarSplitData; + JsonCodec jsonCodec = JsonCodec.jsonCodec(PulsarSplit.class); + try { + PulsarSplit pulsarSplit = new PulsarSplit( + splitId, connectorId, schemaName, originSchemaName, tableName, splitSize, schema, + schemaType, startPositionEntryId, endPositionEntryId, startPositionLedgerId, + endPositionLedgerId, tupleDomain, schemaInfoProperties, offloadPolicies); + pulsarSplitData = jsonCodec.toJsonBytes(pulsarSplit); + } catch (Exception e) { + e.printStackTrace(); + log.error("Failed to serialize the PulsarSplit.", e); + fail("Failed to serialize the PulsarSplit."); + return; + } + + try { + PulsarSplit pulsarSplit = jsonCodec.fromJson(pulsarSplitData); + Assert.assertEquals(pulsarSplit.getSchema(), schema); + Assert.assertEquals(pulsarSplit.getOriginSchemaName(), originSchemaName); + Assert.assertEquals(pulsarSplit.getSchemaName(), schemaName); + Assert.assertEquals(pulsarSplit.getSchemaInfoProperties(), schemaInfoProperties); + Assert.assertEquals(pulsarSplit.getSchemaType(), schemaType); + Assert.assertEquals(pulsarSplit.getSplitId(), splitId); + Assert.assertEquals(pulsarSplit.getConnectorId(), connectorId); + Assert.assertEquals(pulsarSplit.getTableName(), tableName); + Assert.assertEquals(pulsarSplit.getSplitSize(), splitSize); + Assert.assertEquals(pulsarSplit.getStartPositionEntryId(), startPositionEntryId); + Assert.assertEquals(pulsarSplit.getEndPositionEntryId(), endPositionEntryId); + Assert.assertEquals(pulsarSplit.getStartPositionLedgerId(), startPositionLedgerId); + Assert.assertEquals(pulsarSplit.getEndPositionLedgerId(), endPositionLedgerId); + Assert.assertEquals(pulsarSplit.getTupleDomain(), tupleDomain); + Assert.assertEquals(pulsarSplit.getOffloadPolicies(), offloadPolicies); + } catch (Exception e) { + log.error("Failed to deserialize the PulsarSplit.", e); + fail("Failed to deserialize the PulsarSplit."); + } + + } + } diff --git a/tests/docker-images/latest-version-image/scripts/run-presto-worker.sh b/tests/docker-images/latest-version-image/scripts/run-presto-worker.sh index a78f955f7af82..87b393a37dc73 100755 --- a/tests/docker-images/latest-version-image/scripts/run-presto-worker.sh +++ b/tests/docker-images/latest-version-image/scripts/run-presto-worker.sh @@ -18,7 +18,7 @@ # under the License. # -bin/apply-config-from-env.py conf/presto/catalog/pulsar.properties && \ +bin/apply-config-from-env-with-prefix.py SQL_PREFIX_ conf/presto/catalog/pulsar.properties && \ bin/apply-config-from-env.py conf/pulsar_env.sh if [ -z "$NO_AUTOSTART" ]; then diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java index 2e885e0b7f3a4..7c6c9d6d78531 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PrestoWorkerContainer.java @@ -36,10 +36,19 @@ public PrestoWorkerContainer(String clusterName, String hostname) { "bin/run-presto-worker.sh", -1, PRESTO_HTTP_PORT, - "/v1/node"); + "/v1/info/state"); } + @Override + protected void afterStart() { + this.tailContainerLog(); + DockerUtils.runCommandAsync(this.dockerClient, this.getContainerId(), + "tail", "-f", "/var/log/pulsar/presto_worker.log"); + DockerUtils.runCommandAsync(this.dockerClient, this.getContainerId(), + "tail", "-f", "/pulsar/lib/presto/var/log/server.log"); + } + @Override protected void beforeStop() { super.beforeStop(); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java index e8f83c807b3f8..8c6a5d8267861 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/PulsarContainer.java @@ -149,7 +149,7 @@ public void start() { beforeStart(); super.start(); afterStart(); - log.info("Start pulsar service {} at container {}", serviceName, getContainerId()); + log.info("[{}] Start pulsar service {} at container {}", getContainerName(), serviceName, getContainerId()); } @Override diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java index 7fde5bc04e6f1..9c6fe1d118c9b 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java @@ -18,101 +18,56 @@ */ package org.apache.pulsar.tests.integration.presto; -import com.google.common.base.Stopwatch; -import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.impl.schema.JSONSchema; -import org.apache.pulsar.tests.integration.docker.ContainerExecException; -import org.apache.pulsar.tests.integration.docker.ContainerExecResult; -import org.apache.pulsar.tests.integration.suites.PulsarTestSuite; -import org.apache.pulsar.tests.integration.topologies.PulsarCluster; -import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; -import org.awaitility.Awaitility; +import org.apache.pulsar.common.naming.TopicName; import org.testng.annotations.AfterClass; -import org.testng.annotations.AfterSuite; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.sql.Timestamp; -import java.util.LinkedList; -import java.util.List; - -import static org.assertj.core.api.Assertions.assertThat; - @Slf4j -public class TestBasicPresto extends PulsarTestSuite { +public class TestBasicPresto extends TestPulsarSQLBase { private static final int NUM_OF_STOCKS = 10; - @Override - protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(String clusterName, PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) { - return super.beforeSetupCluster(clusterName, specBuilder.queryLastMessage(true)); - } - @BeforeClass public void setupPresto() throws Exception { - log.info("[setupPresto]"); + log.info("[TestBasicPresto] setupPresto..."); pulsarCluster.startPrestoWorker(); } @AfterClass public void teardownPresto() { - log.info("tearing down..."); + log.info("[TestBasicPresto] tearing down..."); pulsarCluster.stopPrestoWorker(); } @Test public void testSimpleSQLQueryBatched() throws Exception { - testSimpleSQLQuery(true); + TopicName topicName = TopicName.get("public/default/stocks_batched_" + randomName(5)); + pulsarSQLBasicTest(topicName, true, false); } @Test public void testSimpleSQLQueryNonBatched() throws Exception { - testSimpleSQLQuery(false); + TopicName topicName = TopicName.get("public/default/stocks_nonbatched_" + randomName(5)); + pulsarSQLBasicTest(topicName, false, false); } - - public void testSimpleSQLQuery(boolean isBatched) throws Exception { - // wait until presto worker started - ContainerExecResult result; - do { - try { - result = execQuery("show catalogs;"); - assertThat(result.getExitCode()).isEqualTo(0); - assertThat(result.getStdout()).contains("pulsar", "system"); - break; - } catch (ContainerExecException cee) { - if (cee.getResult().getStderr().contains("Presto server is still initializing")) { - Thread.sleep(10000); - } else { - throw cee; - } - } - } while (true); + @Override + protected int prepareData(TopicName topicName, boolean isBatch, boolean useNsOffloadPolices) throws Exception { @Cleanup PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) - .build(); - - String stocksTopic; - if (isBatched) { - stocksTopic = "stocks_batched"; - } else { - stocksTopic = "stocks_nonbatched"; - } + .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) + .build(); @Cleanup Producer producer = pulsarClient.newProducer(JSONSchema.of(Stock.class)) - .topic(stocksTopic) - .enableBatching(isBatched) + .topic(topicName.toString()) + .enableBatching(isBatch) .create(); for (int i = 0 ; i < NUM_OF_STOCKS; ++i) { @@ -120,137 +75,7 @@ public void testSimpleSQLQuery(boolean isBatched) throws Exception { producer.send(stock); } producer.flush(); - - result = execQuery("show schemas in pulsar;"); - assertThat(result.getExitCode()).isEqualTo(0); - assertThat(result.getStdout()).contains("public/default"); - - pulsarCluster.getBroker(0) - .execCmd( - "/bin/bash", - "-c", "bin/pulsar-admin namespaces unload public/default"); - - Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted( - () -> { - ContainerExecResult r = execQuery("show tables in pulsar.\"public/default\";"); - assertThat(r.getExitCode()).isEqualTo(0); - assertThat(r.getStdout()).contains("stocks"); - } - ); - - Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted( - () -> { - ContainerExecResult containerExecResult = execQuery(String.format("select * from pulsar.\"public/default\".%s order by entryid;", stocksTopic)); - assertThat(containerExecResult.getExitCode()).isEqualTo(0); - log.info("select sql query output \n{}", containerExecResult.getStdout()); - String[] split = containerExecResult.getStdout().split("\n"); - assertThat(split.length).isEqualTo(NUM_OF_STOCKS); - String[] split2 = containerExecResult.getStdout().split("\n|,"); - for (int i = 0; i < NUM_OF_STOCKS; ++i) { - assertThat(split2).contains("\"" + i + "\""); - assertThat(split2).contains("\"" + "STOCK_" + i + "\""); - assertThat(split2).contains("\"" + (100.0 + i * 10) + "\""); - } - } - ); - - - // test predicate pushdown - - String url = String.format("jdbc:presto://%s", pulsarCluster.getPrestoWorkerContainer().getUrl()); - Connection connection = DriverManager.getConnection(url, "test", null); - - String query = String.format("select * from pulsar" + - ".\"public/default\".%s order by __publish_time__", stocksTopic); - log.info("Executing query: {}", query); - ResultSet res = connection.createStatement().executeQuery(query); - - List timestamps = new LinkedList<>(); - while (res.next()) { - printCurrent(res); - timestamps.add(res.getTimestamp("__publish_time__")); - } - - assertThat(timestamps.size()).isGreaterThan(NUM_OF_STOCKS - 2); - - query = String.format("select * from pulsar" + - ".\"public/default\".%s where __publish_time__ > timestamp '%s' order by __publish_time__", stocksTopic, timestamps.get(timestamps.size() / 2)); - log.info("Executing query: {}", query); - res = connection.createStatement().executeQuery(query); - - List returnedTimestamps = new LinkedList<>(); - while (res.next()) { - printCurrent(res); - returnedTimestamps.add(res.getTimestamp("__publish_time__")); - } - - assertThat(returnedTimestamps.size() + 1).isEqualTo(timestamps.size() / 2); - - // Try with a predicate that has a earlier time than any entry - // Should return all rows - query = String.format("select * from pulsar" + - ".\"public/default\".%s where __publish_time__ > from_unixtime(%s) order by __publish_time__", stocksTopic, 0); - log.info("Executing query: {}", query); - res = connection.createStatement().executeQuery(query); - - returnedTimestamps = new LinkedList<>(); - while (res.next()) { - printCurrent(res); - returnedTimestamps.add(res.getTimestamp("__publish_time__")); - } - - assertThat(returnedTimestamps.size()).isEqualTo(timestamps.size()); - - // Try with a predicate that has a latter time than any entry - // Should return no rows - - query = String.format("select * from pulsar" + - ".\"public/default\".%s where __publish_time__ > from_unixtime(%s) order by __publish_time__", stocksTopic, 99999999999L); - log.info("Executing query: {}", query); - res = connection.createStatement().executeQuery(query); - - returnedTimestamps = new LinkedList<>(); - while (res.next()) { - printCurrent(res); - returnedTimestamps.add(res.getTimestamp("__publish_time__")); - } - - assertThat(returnedTimestamps.size()).isEqualTo(0); - } - - @AfterSuite - @Override - public void tearDownCluster() { - super.tearDownCluster(); - } - - public static ContainerExecResult execQuery(final String query) throws Exception { - ContainerExecResult containerExecResult; - - containerExecResult = pulsarCluster.getPrestoWorkerContainer() - .execCmd("/bin/bash", "-c", PulsarCluster.PULSAR_COMMAND_SCRIPT + " sql --execute " + "'" + query + "'"); - - Stopwatch sw = Stopwatch.createStarted(); - while (containerExecResult.getExitCode() != 0 && sw.elapsed(TimeUnit.SECONDS) < 120) { - TimeUnit.MILLISECONDS.sleep(500); - containerExecResult = pulsarCluster.getPrestoWorkerContainer() - .execCmd("/bin/bash", "-c", PulsarCluster.PULSAR_COMMAND_SCRIPT + " sql --execute " + "'" + query + "'"); - } - - return containerExecResult; - - } - - private static void printCurrent(ResultSet rs) throws SQLException { - ResultSetMetaData rsmd = rs.getMetaData(); - int columnsNumber = rsmd.getColumnCount(); - for (int i = 1; i <= columnsNumber; i++) { - if (i > 1) System.out.print(", "); - String columnValue = rs.getString(i); - System.out.print(columnValue + " " + rsmd.getColumnName(i)); - } - System.out.println(""); - + return NUM_OF_STOCKS; } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java index 42f1ea9910bd0..cd0d3484e9dea 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java @@ -18,61 +18,53 @@ */ package org.apache.pulsar.tests.integration.presto; +import static com.google.common.base.Preconditions.checkNotNull; + import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.client.impl.schema.JSONSchema; -import org.apache.pulsar.tests.integration.containers.BrokerContainer; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.tests.integration.containers.S3Container; -import org.apache.pulsar.tests.integration.docker.ContainerExecException; -import org.apache.pulsar.tests.integration.docker.ContainerExecResult; -import org.apache.pulsar.tests.integration.suites.PulsarTestSuite; -import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.testcontainers.shaded.org.apache.commons.lang.StringUtils; import org.testng.Assert; import org.testng.annotations.AfterClass; -import org.testng.annotations.AfterSuite; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.sql.Timestamp; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import static com.google.common.base.Preconditions.checkNotNull; -import static org.assertj.core.api.Assertions.assertThat; +/** + * Test presto query from tiered storage. + */ @Slf4j -public class TestPrestoQueryTieredStorage extends PulsarTestSuite { +public class TestPrestoQueryTieredStorage extends TestPulsarSQLBase { - private final static int ENTRIES_PER_LEDGER = 1024; - private final static String OFFLOAD_DRIVER = "s3"; - private final static String BUCKET = "pulsar-integtest"; - private final static String ENDPOINT = "http://" + S3Container.NAME + ":9090"; + private final String TENANT = "presto"; + private final String NAMESPACE = "ts"; private S3Container s3Container; - @Override - protected void beforeStartCluster() throws Exception { - for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) { - getEnv().forEach(brokerContainer::withEnv); - } - } - @BeforeClass - public void setupPresto() throws Exception { + public void setupExtraContainers() throws Exception { + log.info("[TestPrestoQueryTieredStorage] setupExtraContainers..."); + pulsarCluster.runAdminCommandOnAnyBroker( "tenants", + "create", "--allowed-clusters", pulsarCluster.getClusterName(), + "--admin-roles", "offload-admin", TENANT); + + pulsarCluster.runAdminCommandOnAnyBroker( + "namespaces", + "create", "--clusters", pulsarCluster.getClusterName(), + NamespaceName.get(TENANT, NAMESPACE).toString()); + s3Container = new S3Container( pulsarCluster.getClusterName(), S3Container.NAME) @@ -80,11 +72,12 @@ public void setupPresto() throws Exception { .withNetworkAliases(S3Container.NAME); s3Container.start(); - log.info("[setupPresto] prestoWorker: " + pulsarCluster.getPrestoWorkerContainer()); - pulsarCluster.startPrestoWorker(OFFLOAD_DRIVER, getOffloadProperties(BUCKET, null, ENDPOINT)); + String offloadProperties = getOffloadProperties(BUCKET, null, ENDPOINT); + pulsarCluster.startPrestoWorker(OFFLOAD_DRIVER, offloadProperties); + pulsarCluster.startPrestoFollowWorkers(1, OFFLOAD_DRIVER, offloadProperties); } - public String getOffloadProperties(String bucket, String region, String endpoint) { + private String getOffloadProperties(String bucket, String region, String endpoint) { checkNotNull(bucket); StringBuilder sb = new StringBuilder(); sb.append("{"); @@ -99,10 +92,9 @@ public String getOffloadProperties(String bucket, String region, String endpoint return sb.toString(); } - @AfterClass public void teardownPresto() { - log.info("tearing down..."); + log.info("[TestPrestoQueryTieredStorage] tearing down..."); if (null != s3Container) { s3Container.stop(); } @@ -110,213 +102,91 @@ public void teardownPresto() { pulsarCluster.stopPrestoWorker(); } - // Flaky Test: https://github.com/apache/pulsar/issues/7750 - // @Test + @Test public void testQueryTieredStorage1() throws Exception { - testSimpleSQLQuery(false); + TopicName topicName = TopicName.get( + TopicDomain.persistent.value(), TENANT, NAMESPACE, "stocks_ts_nons_" + randomName(5)); + pulsarSQLBasicTest(topicName, false, false); } - // Flaky Test: https://github.com/apache/pulsar/issues/7750 - // @Test + @Test public void testQueryTieredStorage2() throws Exception { - testSimpleSQLQuery(true); + TopicName topicName = TopicName.get( + TopicDomain.persistent.value(), TENANT, NAMESPACE, "stocks_ts_ns_" + randomName(5)); + pulsarSQLBasicTest(topicName, false, true); } - public void testSimpleSQLQuery(boolean isNamespaceOffload) throws Exception { - - // wait until presto worker started - ContainerExecResult result; - do { - try { - result = execQuery("show catalogs;"); - assertThat(result.getExitCode()).isEqualTo(0); - assertThat(result.getStdout()).contains("pulsar", "system"); - break; - } catch (ContainerExecException cee) { - if (cee.getResult().getStderr().contains("Presto server is still initializing")) { - Thread.sleep(10000); - } else { - throw cee; - } - } - } while (true); - + @Override + protected int prepareData(TopicName topicName, boolean isBatch, boolean useNsOffloadPolices) throws Exception { @Cleanup PulsarClient pulsarClient = PulsarClient.builder() - .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) - .build(); + .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) + .build(); - String stocksTopic = "stocks-" + randomName(5); + @Cleanup + Consumer consumer = pulsarClient.newConsumer(JSONSchema.of(Stock.class)) + .topic(topicName.toString()) + .subscriptionName("test") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); @Cleanup Producer producer = pulsarClient.newProducer(JSONSchema.of(Stock.class)) - .topic(stocksTopic) + .topic(topicName.toString()) .create(); long firstLedgerId = -1; - long currentLedgerId = -1; int sendMessageCnt = 0; - while (currentLedgerId <= firstLedgerId) { - sendMessageCnt ++; - final Stock stock = new Stock(sendMessageCnt,"STOCK_" + sendMessageCnt , 100.0 + sendMessageCnt * 10); + while (true) { + Stock stock = new Stock( + sendMessageCnt,"STOCK_" + sendMessageCnt , 100.0 + sendMessageCnt * 10); MessageIdImpl messageId = (MessageIdImpl) producer.send(stock); + sendMessageCnt ++; if (firstLedgerId == -1) { firstLedgerId = messageId.getLedgerId(); } - currentLedgerId = messageId.getLedgerId(); - log.info("firstLedgerId: {}, currentLedgerId: {}", firstLedgerId, currentLedgerId); + if (messageId.getLedgerId() > firstLedgerId) { + log.info("ledger rollover firstLedgerId: {}, currentLedgerId: {}", + firstLedgerId, messageId.getLedgerId()); + break; + } Thread.sleep(100); } - producer.flush(); - - offloadAndDeleteFromBK(isNamespaceOffload, stocksTopic); - - // check schema - result = execQuery("show schemas in pulsar;"); - assertThat(result.getExitCode()).isEqualTo(0); - assertThat(result.getStdout()).contains("public/default"); - - // check table - result = execQuery("show tables in pulsar.\"public/default\";"); - assertThat(result.getExitCode()).isEqualTo(0); - assertThat(result.getStdout()).contains(stocksTopic); - - // check query - ContainerExecResult containerExecResult = execQuery(String.format("select * from pulsar.\"public/default\".%s order by entryid;", stocksTopic)); - assertThat(containerExecResult.getExitCode()).isEqualTo(0); - log.info("select sql query output \n{}", containerExecResult.getStdout()); - String[] split = containerExecResult.getStdout().split("\n"); - assertThat(split.length).isGreaterThan(sendMessageCnt - 2); - - String[] split2 = containerExecResult.getStdout().split("\n|,"); - - for (int i = 0; i < sendMessageCnt - 2; ++i) { - assertThat(split2).contains("\"" + i + "\""); - assertThat(split2).contains("\"" + "STOCK_" + i + "\""); - assertThat(split2).contains("\"" + (100.0 + i * 10) + "\""); - } - - // test predicate pushdown - - String url = String.format("jdbc:presto://%s", pulsarCluster.getPrestoWorkerContainer().getUrl()); - Connection connection = DriverManager.getConnection(url, "test", null); - String query = String.format("select * from pulsar" + - ".\"public/default\".%s order by __publish_time__", stocksTopic); - log.info("Executing query: {}", query); - ResultSet res = connection.createStatement().executeQuery(query); - - List timestamps = new LinkedList<>(); - while (res.next()) { - printCurrent(res); - timestamps.add(res.getTimestamp("__publish_time__")); - } - - assertThat(timestamps.size()).isGreaterThan(sendMessageCnt - 2); - - query = String.format("select * from pulsar" + - ".\"public/default\".%s where __publish_time__ > timestamp '%s' order by __publish_time__", stocksTopic, timestamps.get(timestamps.size() / 2)); - log.info("Executing query: {}", query); - res = connection.createStatement().executeQuery(query); - - List returnedTimestamps = new LinkedList<>(); - while (res.next()) { - printCurrent(res); - returnedTimestamps.add(res.getTimestamp("__publish_time__")); - } - - assertThat(returnedTimestamps.size()).isEqualTo(timestamps.size() / 2); - - // Try with a predicate that has a earlier time than any entry - // Should return all rows - query = String.format("select * from pulsar" + - ".\"public/default\".%s where __publish_time__ > from_unixtime(%s) order by __publish_time__", stocksTopic, 0); - log.info("Executing query: {}", query); - res = connection.createStatement().executeQuery(query); - - returnedTimestamps = new LinkedList<>(); - while (res.next()) { - printCurrent(res); - returnedTimestamps.add(res.getTimestamp("__publish_time__")); - } - - assertThat(returnedTimestamps.size()).isEqualTo(timestamps.size()); - - // Try with a predicate that has a latter time than any entry - // Should return no rows - - query = String.format("select * from pulsar" + - ".\"public/default\".%s where __publish_time__ > from_unixtime(%s) order by __publish_time__", stocksTopic, 99999999999L); - log.info("Executing query: {}", query); - res = connection.createStatement().executeQuery(query); - - returnedTimestamps = new LinkedList<>(); - while (res.next()) { - printCurrent(res); - returnedTimestamps.add(res.getTimestamp("__publish_time__")); - } - - assertThat(returnedTimestamps.size()).isEqualTo(0); + offloadAndDeleteFromBK(useNsOffloadPolices, topicName); + return sendMessageCnt; } - @AfterSuite - @Override - public void tearDownCluster() { - super.tearDownCluster(); - } - - public static ContainerExecResult execQuery(final String query) throws Exception { - ContainerExecResult containerExecResult; - - containerExecResult = pulsarCluster.getPrestoWorkerContainer() - .execCmd("/bin/bash", "-c", PulsarCluster.PULSAR_COMMAND_SCRIPT + " sql --execute " + "'" + query + "'"); - - return containerExecResult; - - } - - private static void printCurrent(ResultSet rs) throws SQLException { - ResultSetMetaData rsmd = rs.getMetaData(); - int columnsNumber = rsmd.getColumnCount(); - for (int i = 1; i <= columnsNumber; i++) { - if (i > 1) System.out.print(", "); - String columnValue = rs.getString(i); - System.out.print(columnValue + " " + rsmd.getColumnName(i)); - } - System.out.println(""); - - } - - private void offloadAndDeleteFromBK(boolean isNamespaceOffload, String stocksTopic) { + private void offloadAndDeleteFromBK(boolean useNsOffloadPolices, TopicName topicName) { String adminUrl = pulsarCluster.getHttpServiceUrl(); try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) { // read managed ledger info, check ledgers exist - long firstLedger = admin.topics().getInternalStats(stocksTopic).ledgers.get(0).ledgerId; + long firstLedger = admin.topics().getInternalStats(topicName.toString()).ledgers.get(0).ledgerId; String output = ""; - if (isNamespaceOffload) { + if (useNsOffloadPolices) { pulsarCluster.runAdminCommandOnAnyBroker( "namespaces", "set-offload-policies", "--bucket", "pulsar-integtest", - "--driver", "s3", + "--driver", OFFLOAD_DRIVER, "--endpoint", "http://" + S3Container.NAME + ":9090", "--offloadAfterElapsed", "1000", - "public/default"); + topicName.getNamespace()); output = pulsarCluster.runAdminCommandOnAnyBroker( - "namespaces", "get-offload-policies").getStdout(); + "namespaces", "get-offload-policies", topicName.getNamespace()).getStdout(); Assert.assertTrue(output.contains("pulsar-integtest")); - Assert.assertTrue(output.contains("s3")); + Assert.assertTrue(output.contains(OFFLOAD_DRIVER)); } // offload with a low threshold output = pulsarCluster.runAdminCommandOnAnyBroker("topics", - "offload", "--size-threshold", "1M", stocksTopic).getStdout(); + "offload", "--size-threshold", "0", topicName.toString()).getStdout(); Assert.assertTrue(output.contains("Offload triggered")); output = pulsarCluster.runAdminCommandOnAnyBroker("topics", - "offload-status", "-w", stocksTopic).getStdout(); + "offload-status", "-w", topicName.toString()).getStdout(); Assert.assertTrue(output.contains("Offload was a success")); // delete the first ledger, so that we cannot possibly read from it @@ -330,21 +200,10 @@ private void offloadAndDeleteFromBK(boolean isNamespaceOffload, String stocksTop } // Unload topic to clear all caches, open handles, etc - admin.topics().unload(stocksTopic); + admin.topics().unload(topicName.toString()); } catch (Exception e) { Assert.fail("Failed to deleteOffloadedDataFromBK."); } } - protected Map getEnv() { - Map result = new HashMap<>(); - result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(ENTRIES_PER_LEDGER)); - result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0"); - result.put("managedLedgerOffloadDriver", OFFLOAD_DRIVER); - result.put("s3ManagedLedgerOffloadBucket", BUCKET); - result.put("s3ManagedLedgerOffloadServiceEndpoint", ENDPOINT); - - return result; - } - } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java new file mode 100644 index 0000000000000..b4caa9f2779f9 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPulsarSQLBase.java @@ -0,0 +1,249 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.presto; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.google.common.base.Stopwatch; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.tests.integration.docker.ContainerExecException; +import org.apache.pulsar.tests.integration.docker.ContainerExecResult; +import org.apache.pulsar.tests.integration.suites.PulsarSQLTestSuite; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.awaitility.Awaitility; +import org.testcontainers.shaded.okhttp3.OkHttpClient; +import org.testcontainers.shaded.okhttp3.Request; +import org.testcontainers.shaded.okhttp3.Response; +import org.testng.Assert; + +@Slf4j +public class TestPulsarSQLBase extends PulsarSQLTestSuite { + + protected void pulsarSQLBasicTest(TopicName topic, boolean isBatch, boolean useNsOffloadPolices) throws Exception { + waitPulsarSQLReady(); + + log.info("start prepare data for query. topic: {}", topic); + int messageCnt = prepareData(topic, isBatch, useNsOffloadPolices); + log.info("finish prepare data for query. topic: {}, messageCnt: {}", topic, messageCnt); + + validateMetadata(topic); + + validateData(topic, messageCnt); + } + + private void waitPulsarSQLReady() throws Exception { + // wait until presto worker started + ContainerExecResult result; + do { + try { + result = execQuery("show catalogs;"); + assertThat(result.getExitCode()).isEqualTo(0); + assertThat(result.getStdout()).contains("pulsar", "system"); + break; + } catch (ContainerExecException cee) { + if (cee.getResult().getStderr().contains("Presto server is still initializing")) { + Thread.sleep(10000); + } else { + throw cee; + } + } + } while (true); + + // check presto follow workers start finish. + if (pulsarCluster.getSqlFollowWorkerContainers() != null + && pulsarCluster.getSqlFollowWorkerContainers().size() > 0) { + OkHttpClient okHttpClient = new OkHttpClient(); + Request request = new Request.Builder() + .url("http://" + pulsarCluster.getPrestoWorkerContainer().getUrl() + "/v1/node") + .build(); + do { + try (Response response = okHttpClient.newCall(request).execute()) { + Assert.assertNotNull(response.body()); + String nodeJsonStr = response.body().string(); + Assert.assertTrue(nodeJsonStr.length() > 0); + log.info("presto node info: {}", nodeJsonStr); + if (nodeJsonStr.contains("uri")) { + log.info("presto node exist."); + break; + } + Thread.sleep(1000); + } + } while (true); + } + } + + protected int prepareData(TopicName topicName, boolean isBatch, boolean useNsOffloadPolices) throws Exception { + throw new Exception("Unsupported operation prepareData."); + } + + private void validateMetadata(TopicName topicName) throws Exception { + ContainerExecResult result = execQuery("show schemas in pulsar;"); + assertThat(result.getExitCode()).isEqualTo(0); + assertThat(result.getStdout()).contains(topicName.getNamespace()); + + pulsarCluster.getBroker(0) + .execCmd( + "/bin/bash", + "-c", "bin/pulsar-admin namespaces unload " + topicName.getNamespace()); + + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted( + () -> { + ContainerExecResult r = execQuery( + String.format("show tables in pulsar.\"%s\";", topicName.getNamespace())); + assertThat(r.getExitCode()).isEqualTo(0); + assertThat(r.getStdout()).contains(topicName.getLocalName()); + } + ); + } + + private void validateData(TopicName topicName, int messageNum) throws Exception { + String namespace = topicName.getNamespace(); + String topic = topicName.getLocalName(); + + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted( + () -> { + ContainerExecResult containerExecResult = execQuery( + String.format("select * from pulsar.\"%s\".\"%s\" order by entryid;", namespace, topic)); + assertThat(containerExecResult.getExitCode()).isEqualTo(0); + log.info("select sql query output \n{}", containerExecResult.getStdout()); + String[] split = containerExecResult.getStdout().split("\n"); + assertThat(split.length).isEqualTo(messageNum); + String[] split2 = containerExecResult.getStdout().split("\n|,"); + for (int i = 0; i < messageNum; ++i) { + assertThat(split2).contains("\"" + i + "\""); + assertThat(split2).contains("\"" + "STOCK_" + i + "\""); + assertThat(split2).contains("\"" + (100.0 + i * 10) + "\""); + } + } + ); + + // test predicate pushdown + String url = String.format("jdbc:presto://%s", pulsarCluster.getPrestoWorkerContainer().getUrl()); + Connection connection = DriverManager.getConnection(url, "test", null); + + String query = String.format("select * from pulsar" + + ".\"%s\".\"%s\" order by __publish_time__", namespace, topic); + log.info("Executing query: {}", query); + ResultSet res = connection.createStatement().executeQuery(query); + + List timestamps = new LinkedList<>(); + while (res.next()) { + printCurrent(res); + timestamps.add(res.getTimestamp("__publish_time__")); + } + log.info("Executing query: result for topic {} timestamps size {}", topic, timestamps.size()); + + assertThat(timestamps.size()).isGreaterThan(messageNum - 2); + + query = String.format("select * from pulsar" + + ".\"%s\".\"%s\" where __publish_time__ > timestamp '%s' order by __publish_time__", + namespace, topic, timestamps.get(timestamps.size() / 2)); + log.info("Executing query: {}", query); + res = connection.createStatement().executeQuery(query); + + List returnedTimestamps = new LinkedList<>(); + while (res.next()) { + printCurrent(res); + returnedTimestamps.add(res.getTimestamp("__publish_time__")); + } + + log.info("Executing query: result for topic {} returnedTimestamps size: {}", topic, returnedTimestamps.size()); + if (timestamps.size() % 2 == 0) { + // for example: total size 10, the right receive number is 4, so 4 + 1 == 10 / 2 + assertThat(returnedTimestamps.size() + 1).isEqualTo(timestamps.size() / 2); + } else { + // for example: total size 101, the right receive number is 50, so 50 == (101 - 1) / 2 + assertThat(returnedTimestamps.size()).isEqualTo((timestamps.size() - 1) / 2); + } + + // Try with a predicate that has a earlier time than any entry + // Should return all rows + query = String.format("select * from pulsar.\"%s\".\"%s\" where " + + "__publish_time__ > from_unixtime(%s) order by __publish_time__", namespace, topic, 0); + log.info("Executing query: {}", query); + res = connection.createStatement().executeQuery(query); + + returnedTimestamps = new LinkedList<>(); + while (res.next()) { + printCurrent(res); + returnedTimestamps.add(res.getTimestamp("__publish_time__")); + } + + log.info("Executing query: result for topic {} returnedTimestamps size: {}", topic, returnedTimestamps.size()); + assertThat(returnedTimestamps.size()).isEqualTo(timestamps.size()); + + // Try with a predicate that has a latter time than any entry + // Should return no rows + + query = String.format("select * from pulsar.\"%s\".\"%s\" where " + + "__publish_time__ > from_unixtime(%s) order by __publish_time__", namespace, topic, 99999999999L); + log.info("Executing query: {}", query); + res = connection.createStatement().executeQuery(query); + + returnedTimestamps = new LinkedList<>(); + while (res.next()) { + printCurrent(res); + returnedTimestamps.add(res.getTimestamp("__publish_time__")); + } + + log.info("Executing query: result for topic {} returnedTimestamps size: {}", topic, returnedTimestamps.size()); + assertThat(returnedTimestamps.size()).isEqualTo(0); + } + + public static ContainerExecResult execQuery(final String query) throws Exception { + ContainerExecResult containerExecResult; + + containerExecResult = pulsarCluster.getPrestoWorkerContainer() + .execCmd("/bin/bash", "-c", PulsarCluster.PULSAR_COMMAND_SCRIPT + " sql --execute " + "'" + query + "'"); + + Stopwatch sw = Stopwatch.createStarted(); + while (containerExecResult.getExitCode() != 0 && sw.elapsed(TimeUnit.SECONDS) < 120) { + TimeUnit.MILLISECONDS.sleep(500); + containerExecResult = pulsarCluster.getPrestoWorkerContainer() + .execCmd("/bin/bash", "-c", PulsarCluster.PULSAR_COMMAND_SCRIPT + " sql --execute " + "'" + query + "'"); + } + + return containerExecResult; + + } + + private static void printCurrent(ResultSet rs) throws SQLException { + ResultSetMetaData rsmd = rs.getMetaData(); + int columnsNumber = rsmd.getColumnCount(); + for (int i = 1; i <= columnsNumber; i++) { + if (i > 1) System.out.print(", "); + String columnValue = rs.getString(i); + System.out.print(columnValue + " " + rsmd.getColumnName(i)); + } + System.out.println(""); + + } + + +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java new file mode 100644 index 0000000000000..5863ae6546e89 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarSQLTestSuite.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.suites; + +import java.util.HashMap; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.tests.integration.containers.BrokerContainer; +import org.apache.pulsar.tests.integration.containers.S3Container; +import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec; + + +@Slf4j +public abstract class PulsarSQLTestSuite extends PulsarTestSuite { + + public final static int ENTRIES_PER_LEDGER = 100; + public final static String OFFLOAD_DRIVER = "aws-s3"; + public final static String BUCKET = "pulsar-integtest"; + public final static String ENDPOINT = "http://" + S3Container.NAME + ":9090"; + + @Override + public String getTestName() { + return "pulsar-sql-test-suite"; + } + + @Override + protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(String clusterName, PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) { + specBuilder.queryLastMessage(true); + specBuilder.clusterName("pulsar-sql-test"); + specBuilder.numBrokers(1); + return super.beforeSetupCluster(clusterName, specBuilder); + } + + @Override + protected void beforeStartCluster() throws Exception { + Map envMap = new HashMap<>(); + envMap.put("managedLedgerMaxEntriesPerLedger", String.valueOf(ENTRIES_PER_LEDGER)); + envMap.put("managedLedgerMinLedgerRolloverTimeMinutes", "0"); + envMap.put("managedLedgerOffloadDriver", OFFLOAD_DRIVER); + envMap.put("s3ManagedLedgerOffloadBucket", BUCKET); + envMap.put("s3ManagedLedgerOffloadServiceEndpoint", ENDPOINT); + + for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) { + brokerContainer.withEnv(envMap); + } + } + +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index 8d5404c0d2923..dfb24f3c70a67 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -83,6 +83,8 @@ public static PulsarCluster forSpec(PulsarClusterSpec spec) { private final Map workerContainers; private final ProxyContainer proxyContainer; private PrestoWorkerContainer prestoWorkerContainer; + @Getter + private Map sqlFollowWorkerContainers; private Map> externalServices = Collections.emptyMap(); private final boolean enablePrestoWorker; @@ -93,17 +95,10 @@ private PulsarCluster(PulsarClusterSpec spec) { this.network = Network.newNetwork(); this.enablePrestoWorker = spec.enablePrestoWorker(); + this.sqlFollowWorkerContainers = Maps.newTreeMap(); if (enablePrestoWorker) { - prestoWorkerContainer = new PrestoWorkerContainer(clusterName, PrestoWorkerContainer.NAME) - .withNetwork(network) - .withNetworkAliases(PrestoWorkerContainer.NAME) - .withEnv("clusterName", clusterName) - .withEnv("zkServers", ZKContainer.NAME) - .withEnv("zookeeperServers", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT) - .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT) - .withEnv("pulsar.bookkeeper-use-v2-protocol", "false") - .withEnv("pulsar.bookkeeper-explicit-interval", "10") - .withEnv("pulsar.broker-service-url", "http://pulsar-broker-0:8080"); + prestoWorkerContainer = buildPrestoWorkerContainer( + PrestoWorkerContainer.NAME, true, null, null); } else { prestoWorkerContainer = null; } @@ -358,41 +353,78 @@ public void startPrestoWorker() { public void startPrestoWorker(String offloadDriver, String offloadProperties) { log.info("[startPrestoWorker] offloadDriver: {}, offloadProperties: {}", offloadDriver, offloadProperties); if (null == prestoWorkerContainer) { - prestoWorkerContainer = new PrestoWorkerContainer(clusterName, PrestoWorkerContainer.NAME) - .withNetwork(network) - .withNetworkAliases(PrestoWorkerContainer.NAME) - .withEnv("clusterName", clusterName) - .withEnv("zkServers", ZKContainer.NAME) - .withEnv("zookeeperServers", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT) - .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT) - .withEnv("pulsar.broker-service-url", "http://pulsar-broker-0:8080"); - if (spec.queryLastMessage) { - prestoWorkerContainer.withEnv("pulsar.bookkeeper-use-v2-protocol", "false") - .withEnv("pulsar.bookkeeper-explicit-interval", "10"); - } - if (offloadDriver != null && offloadProperties != null) { - log.info("[startPrestoWorker] set offload env offloadDriver: {}, offloadProperties: {}", - offloadDriver, offloadProperties); - prestoWorkerContainer.withEnv("PULSAR_PREFIX_pulsar.managed-ledger-offload-driver", offloadDriver); - prestoWorkerContainer.withEnv("PULSAR_PREFIX_pulsar.offloader-properties", offloadProperties); - prestoWorkerContainer.withEnv("PULSAR_PREFIX_pulsar.offloaders-directory", "/pulsar/offloaders"); - // used in s3 tests - prestoWorkerContainer.withEnv("AWS_ACCESS_KEY_ID", "accesskey"); - prestoWorkerContainer.withEnv("AWS_SECRET_KEY", "secretkey"); - } + prestoWorkerContainer = buildPrestoWorkerContainer( + PrestoWorkerContainer.NAME, true, offloadDriver, offloadProperties); } - log.info("[startPrestoWorker] Starting Presto Worker"); prestoWorkerContainer.start(); + log.info("[{}] Presto coordinator start finished.", prestoWorkerContainer.getContainerName()); } public void stopPrestoWorker() { + if (sqlFollowWorkerContainers != null && sqlFollowWorkerContainers.size() > 0) { + for (PrestoWorkerContainer followWorker : sqlFollowWorkerContainers.values()) { + followWorker.stop(); + log.info("Stopped presto follow worker {}.", followWorker.getContainerName()); + } + sqlFollowWorkerContainers.clear(); + log.info("Stopped all presto follow workers."); + } if (null != prestoWorkerContainer) { prestoWorkerContainer.stop(); - log.info("Stopped Presto Worker"); + log.info("Stopped presto coordinator."); prestoWorkerContainer = null; } } + public void startPrestoFollowWorkers(int numSqlFollowWorkers, String offloadDriver, String offloadProperties) { + log.info("start presto follow worker containers."); + sqlFollowWorkerContainers.putAll(runNumContainers( + "sql-follow-worker", + numSqlFollowWorkers, + (name) -> { + log.info("build presto follow worker with name {}", name); + return buildPrestoWorkerContainer(name, false, offloadDriver, offloadProperties); + } + )); + // Start workers that have been initialized + sqlFollowWorkerContainers.values().parallelStream().forEach(PrestoWorkerContainer::start); + log.info("Successfully started {} presto follow worker containers.", sqlFollowWorkerContainers.size()); + } + + private PrestoWorkerContainer buildPrestoWorkerContainer(String hostName, boolean isCoordinator, + String offloadDriver, String offloadProperties) { + String resourcePath = isCoordinator ? "presto-coordinator-config.properties" + : "presto-follow-worker-config.properties"; + PrestoWorkerContainer container = new PrestoWorkerContainer( + clusterName, hostName) + .withNetwork(network) + .withNetworkAliases(hostName) + .withEnv("clusterName", clusterName) + .withEnv("zkServers", ZKContainer.NAME) + .withEnv("zookeeperServers", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT) + .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT) + .withEnv("pulsar.broker-service-url", "http://pulsar-broker-0:8080") + .withClasspathResourceMapping( + resourcePath, "/pulsar/conf/presto/config.properties", BindMode.READ_WRITE); + if (spec.queryLastMessage) { + container.withEnv("pulsar.bookkeeper-use-v2-protocol", "false") + .withEnv("pulsar.bookkeeper-explicit-interval", "10"); + } + if (offloadDriver != null && offloadProperties != null) { + log.info("[startPrestoWorker] set offload env offloadDriver: {}, offloadProperties: {}", + offloadDriver, offloadProperties); + // used to query from tiered storage + container.withEnv("SQL_PREFIX_pulsar.managed-ledger-offload-driver", offloadDriver); + container.withEnv("SQL_PREFIX_pulsar.offloader-properties", offloadProperties); + container.withEnv("SQL_PREFIX_pulsar.offloaders-directory", "/pulsar/offloaders"); + container.withEnv("AWS_ACCESS_KEY_ID", "accesskey"); + container.withEnv("AWS_SECRET_KEY", "secretkey"); + } + log.info("[{}] build presto worker container. isCoordinator: {}, resourcePath: {}", + container.getContainerName(), isCoordinator, resourcePath); + return container; + } + public synchronized void setupFunctionWorkers(String suffix, FunctionRuntimeType runtimeType, int numFunctionWorkers) { switch (runtimeType) { case THREAD: diff --git a/tests/integration/src/test/resources/presto-coordinator-config.properties b/tests/integration/src/test/resources/presto-coordinator-config.properties new file mode 100644 index 0000000000000..03a9ad1110d4a --- /dev/null +++ b/tests/integration/src/test/resources/presto-coordinator-config.properties @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +coordinator=true + +node.id=ffffffff-ffff-ffff-ffff-ffffffffffff +node.environment=test +http-server.http.port=8081 + +discovery-server.enabled=true +discovery.uri=http://presto-worker:8081 + +exchange.http-client.max-connections=1000 +exchange.http-client.max-connections-per-server=1000 +exchange.http-client.connect-timeout=1m +exchange.http-client.idle-timeout=1m + +scheduler.http-client.max-connections=1000 +scheduler.http-client.max-connections-per-server=1000 +scheduler.http-client.connect-timeout=1m +scheduler.http-client.idle-timeout=1m + +query.client.timeout=5m +query.min-expire-age=30m + +presto.version=testversion + +node-scheduler.include-coordinator=true diff --git a/tests/integration/src/test/resources/presto-follow-worker-config.properties b/tests/integration/src/test/resources/presto-follow-worker-config.properties new file mode 100644 index 0000000000000..be39b356b992b --- /dev/null +++ b/tests/integration/src/test/resources/presto-follow-worker-config.properties @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +coordinator=false + +node.environment=test +http-server.http.port=8081 +discovery.uri=http://presto-worker:8081 + +query.client.timeout=5m +query.min-expire-age=30m + +presto.version=testversion diff --git a/tests/integration/src/test/resources/pulsar-sql.xml b/tests/integration/src/test/resources/pulsar-sql.xml index 46762f6ca0b03..bedc443f6c9b6 100644 --- a/tests/integration/src/test/resources/pulsar-sql.xml +++ b/tests/integration/src/test/resources/pulsar-sql.xml @@ -23,6 +23,7 @@ +