diff --git a/.github/workflows/ci-integration-sql.yaml b/.github/workflows/ci-integration-sql.yaml index 500fd42884978..29f9710be53d7 100644 --- a/.github/workflows/ci-integration-sql.yaml +++ b/.github/workflows/ci-integration-sql.yaml @@ -73,6 +73,15 @@ jobs: if: steps.docs.outputs.changed_only == 'no' run: mvn clean install -DskipTests +# Flaky Test: https://github.com/apache/pulsar/issues/7750 +# - name: build pulsar image +# if: steps.docs.outputs.changed_only == 'no' +# run: mvn -B -f docker/pulsar/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true +# +# - name: build pulsar-all image +# if: steps.docs.outputs.changed_only == 'no' +# run: mvn -B -f docker/pulsar-all/pom.xml install -am -Pdocker -DskipTests -Ddocker.nocache=true +# - name: build artifacts and docker pulsar latest test image if: steps.docs.outputs.changed_only == 'no' run: mvn -B -f tests/docker-images/pom.xml install -am -Pdocker -DskipTests diff --git a/conf/presto/config.properties b/conf/presto/config.properties index 9f17135523dfa..2ca62ae5157e4 100644 --- a/conf/presto/config.properties +++ b/conf/presto/config.properties @@ -38,5 +38,7 @@ query.client.timeout=5m query.min-expire-age=30m presto.version=testversion + distributed-joins-enabled=true + node-scheduler.include-coordinator=true diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java index 4936923dfda6b..f3778020f2d61 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/OffloadPolicies.java @@ -21,6 +21,7 @@ import static org.apache.pulsar.common.util.FieldParser.value; import com.google.common.base.MoreObjects; +import java.io.Serializable; import java.lang.reflect.Field; import java.util.Arrays; import java.util.Objects; @@ -32,7 +33,9 @@ * Definition of the offload policies. */ @Data -public class OffloadPolicies { +public class OffloadPolicies implements Serializable { + + private final static long serialVersionUID = 0L; public final static int DEFAULT_MAX_BLOCK_SIZE_IN_BYTES = 64 * 1024 * 1024; // 64MB public final static int DEFAULT_READ_BUFFER_SIZE_IN_BYTES = 1024 * 1024; // 1MB diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java index f4a7d7432f444..091efe199739d 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java @@ -40,7 +40,6 @@ import org.apache.bookkeeper.mledger.offload.OffloaderUtils; import org.apache.bookkeeper.mledger.offload.Offloaders; import org.apache.bookkeeper.stats.StatsProvider; -import org.apache.commons.beanutils.BeanUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.common.naming.NamespaceName; @@ -81,9 +80,8 @@ private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws this.statsProvider.start(clientConfiguration); - OffloadPolicies offloadPolicies = new OffloadPolicies(); - BeanUtils.copyProperties(offloadPolicies, pulsarConnectorConfig); - this.defaultOffloader = initManagedLedgerOffloader(offloadPolicies, pulsarConnectorConfig); + this.defaultOffloader = initManagedLedgerOffloader( + pulsarConnectorConfig.getOffloadPolices(), pulsarConnectorConfig); } public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception { diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java index 49d2ae349153d..0a172d9887ee5 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import java.util.regex.Matcher; import javax.validation.constraints.NotNull; import org.apache.bookkeeper.stats.NullStatsProvider; @@ -31,6 +32,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.nar.NarClassLoader; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.apache.pulsar.common.protocol.Commands; /** @@ -399,6 +401,16 @@ public PulsarAdmin getPulsarAdmin() throws PulsarClientException { return this.pulsarAdmin; } + public OffloadPolicies getOffloadPolices() { + Properties offloadProperties = new Properties(); + offloadProperties.putAll(getOffloaderProperties()); + OffloadPolicies offloadPolicies = OffloadPolicies.create(offloadProperties); + offloadPolicies.setManagedLedgerOffloadDriver(getManagedLedgerOffloadDriver()); + offloadPolicies.setManagedLedgerOffloadMaxThreads(getManagedLedgerOffloadMaxThreads()); + offloadPolicies.setOffloadersDirectory(getOffloadersDirectory()); + return offloadPolicies; + } + @Override public void close() throws Exception { this.pulsarAdmin.close(); diff --git a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java index faf2bbc2639a3..f3d2f7a1a653e 100644 --- a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java +++ b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/TestPulsarConnectorConfig.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.sql.presto; +import org.apache.pulsar.common.policies.data.OffloadPolicies; import org.testng.Assert; import org.testng.annotations.Test; @@ -68,4 +69,35 @@ public void testDefaultManagedLedgerConfig() { Assert.assertEquals(availableProcessors, connectorConfig.getManagedLedgerNumSchedulerThreads()); } + @Test + public void testGetOffloadPolices() throws Exception { + PulsarConnectorConfig connectorConfig = new PulsarConnectorConfig(); + + final String managedLedgerOffloadDriver = "s3"; + final String offloaderDirectory = "/pulsar/offloaders"; + final int managedLedgerOffloadMaxThreads = 5; + final String bucket = "offload-bucket"; + final String region = "us-west-2"; + final String endpoint = "http://s3.amazonaws.com"; + final String offloadProperties = "{" + + "\"s3ManagedLedgerOffloadBucket\":\"" + bucket + "\"," + + "\"s3ManagedLedgerOffloadRegion\":\"" + region + "\"," + + "\"s3ManagedLedgerOffloadServiceEndpoint\":\"" + endpoint + "\"" + + "}"; + + connectorConfig.setManagedLedgerOffloadDriver(managedLedgerOffloadDriver); + connectorConfig.setOffloadersDirectory(offloaderDirectory); + connectorConfig.setManagedLedgerOffloadMaxThreads(managedLedgerOffloadMaxThreads); + connectorConfig.setOffloaderProperties(offloadProperties); + + OffloadPolicies offloadPolicies = connectorConfig.getOffloadPolices(); + Assert.assertNotNull(offloadPolicies); + Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadDriver(), managedLedgerOffloadDriver); + Assert.assertEquals(offloadPolicies.getOffloadersDirectory(), offloaderDirectory); + Assert.assertEquals(offloadPolicies.getManagedLedgerOffloadMaxThreads(), managedLedgerOffloadMaxThreads); + Assert.assertEquals(offloadPolicies.getS3ManagedLedgerOffloadBucket(), bucket); + Assert.assertEquals(offloadPolicies.getS3ManagedLedgerOffloadRegion(), region); + Assert.assertEquals(offloadPolicies.getS3ManagedLedgerOffloadServiceEndpoint(), endpoint); + } + } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java index 093f3eb5a675d..e29d945b6bf9b 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestBasicPresto.java @@ -50,6 +50,7 @@ public class TestBasicPresto extends PulsarTestSuite { @BeforeClass public void setupPresto() throws Exception { + log.info("[setupPresto]"); pulsarCluster.startPrestoWorker(); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java new file mode 100644 index 0000000000000..8ba48aa79841e --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/presto/TestPrestoQueryTieredStorage.java @@ -0,0 +1,349 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.presto; + +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.apache.pulsar.tests.integration.containers.BrokerContainer; +import org.apache.pulsar.tests.integration.containers.S3Container; +import org.apache.pulsar.tests.integration.docker.ContainerExecException; +import org.apache.pulsar.tests.integration.docker.ContainerExecResult; +import org.apache.pulsar.tests.integration.suites.PulsarTestSuite; +import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.testcontainers.shaded.org.apache.commons.lang.StringUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeClass; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.assertj.core.api.Assertions.assertThat; + +@Slf4j +public class TestPrestoQueryTieredStorage extends PulsarTestSuite { + + private final static int ENTRIES_PER_LEDGER = 1024; + private final static String OFFLOAD_DRIVER = "s3"; + private final static String BUCKET = "pulsar-integtest"; + private final static String ENDPOINT = "http://" + S3Container.NAME + ":9090"; + + private S3Container s3Container; + + @Override + protected void beforeStartCluster() throws Exception { + for (BrokerContainer brokerContainer : pulsarCluster.getBrokers()) { + getEnv().forEach(brokerContainer::withEnv); + } + } + + @BeforeClass + public void setupPresto() throws Exception { + s3Container = new S3Container( + pulsarCluster.getClusterName(), + S3Container.NAME) + .withNetwork(pulsarCluster.getNetwork()) + .withNetworkAliases(S3Container.NAME); + s3Container.start(); + + log.info("[setupPresto] prestoWorker: " + pulsarCluster.getPrestoWorkerContainer()); + pulsarCluster.startPrestoWorker(OFFLOAD_DRIVER, getOffloadProperties(BUCKET, null, ENDPOINT)); + } + + public String getOffloadProperties(String bucket, String region, String endpoint) { + checkNotNull(bucket); + StringBuilder sb = new StringBuilder(); + sb.append("{"); + sb.append("\"s3ManagedLedgerOffloadBucket\":").append("\"").append(bucket).append("\","); + if (StringUtils.isNotEmpty(region)) { + sb.append("\"s3ManagedLedgerOffloadRegion\":").append("\"").append(region).append("\","); + } + if (StringUtils.isNotEmpty(endpoint)) { + sb.append("\"s3ManagedLedgerOffloadServiceEndpoint\":").append("\"").append(endpoint).append("\""); + } + sb.append("}"); + return sb.toString(); + } + + + @AfterClass + public void teardownPresto() { + log.info("tearing down..."); + if (null != s3Container) { + s3Container.stop(); + } + + pulsarCluster.stopPrestoWorker(); + } + + // Flaky Test: https://github.com/apache/pulsar/issues/7750 + // @Test + public void testQueryTieredStorage1() throws Exception { + testSimpleSQLQuery(false); + } + + // Flaky Test: https://github.com/apache/pulsar/issues/7750 + // @Test + public void testQueryTieredStorage2() throws Exception { + testSimpleSQLQuery(true); + } + + public void testSimpleSQLQuery(boolean isNamespaceOffload) throws Exception { + + // wait until presto worker started + ContainerExecResult result; + do { + try { + result = execQuery("show catalogs;"); + assertThat(result.getExitCode()).isEqualTo(0); + assertThat(result.getStdout()).contains("pulsar", "system"); + break; + } catch (ContainerExecException cee) { + if (cee.getResult().getStderr().contains("Presto server is still initializing")) { + Thread.sleep(10000); + } else { + throw cee; + } + } + } while (true); + + @Cleanup + PulsarClient pulsarClient = PulsarClient.builder() + .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) + .build(); + + String stocksTopic = "stocks-" + randomName(5); + + @Cleanup + Producer producer = pulsarClient.newProducer(JSONSchema.of(Stock.class)) + .topic(stocksTopic) + .create(); + + long firstLedgerId = -1; + long currentLedgerId = -1; + int sendMessageCnt = 0; + while (currentLedgerId <= firstLedgerId) { + sendMessageCnt ++; + final Stock stock = new Stock(sendMessageCnt,"STOCK_" + sendMessageCnt , 100.0 + sendMessageCnt * 10); + MessageIdImpl messageId = (MessageIdImpl) producer.send(stock); + if (firstLedgerId == -1) { + firstLedgerId = messageId.getLedgerId(); + } + currentLedgerId = messageId.getLedgerId(); + log.info("firstLedgerId: {}, currentLedgerId: {}", firstLedgerId, currentLedgerId); + Thread.sleep(100); + } + producer.flush(); + + offloadAndDeleteFromBK(isNamespaceOffload, stocksTopic); + + // check schema + result = execQuery("show schemas in pulsar;"); + assertThat(result.getExitCode()).isEqualTo(0); + assertThat(result.getStdout()).contains("public/default"); + + // check table + result = execQuery("show tables in pulsar.\"public/default\";"); + assertThat(result.getExitCode()).isEqualTo(0); + assertThat(result.getStdout()).contains(stocksTopic); + + // check query + ContainerExecResult containerExecResult = execQuery(String.format("select * from pulsar.\"public/default\".%s order by entryid;", stocksTopic)); + assertThat(containerExecResult.getExitCode()).isEqualTo(0); + log.info("select sql query output \n{}", containerExecResult.getStdout()); + String[] split = containerExecResult.getStdout().split("\n"); + assertThat(split.length).isGreaterThan(sendMessageCnt - 2); + + String[] split2 = containerExecResult.getStdout().split("\n|,"); + + for (int i = 0; i < sendMessageCnt - 2; ++i) { + assertThat(split2).contains("\"" + i + "\""); + assertThat(split2).contains("\"" + "STOCK_" + i + "\""); + assertThat(split2).contains("\"" + (100.0 + i * 10) + "\""); + } + + // test predicate pushdown + + String url = String.format("jdbc:presto://%s", pulsarCluster.getPrestoWorkerContainer().getUrl()); + Connection connection = DriverManager.getConnection(url, "test", null); + + String query = String.format("select * from pulsar" + + ".\"public/default\".%s order by __publish_time__", stocksTopic); + log.info("Executing query: {}", query); + ResultSet res = connection.createStatement().executeQuery(query); + + List timestamps = new LinkedList<>(); + while (res.next()) { + printCurrent(res); + timestamps.add(res.getTimestamp("__publish_time__")); + } + + assertThat(timestamps.size()).isGreaterThan(sendMessageCnt - 2); + + query = String.format("select * from pulsar" + + ".\"public/default\".%s where __publish_time__ > timestamp '%s' order by __publish_time__", stocksTopic, timestamps.get(timestamps.size() / 2)); + log.info("Executing query: {}", query); + res = connection.createStatement().executeQuery(query); + + List returnedTimestamps = new LinkedList<>(); + while (res.next()) { + printCurrent(res); + returnedTimestamps.add(res.getTimestamp("__publish_time__")); + } + + assertThat(returnedTimestamps.size()).isEqualTo(timestamps.size() / 2); + + // Try with a predicate that has a earlier time than any entry + // Should return all rows + query = String.format("select * from pulsar" + + ".\"public/default\".%s where __publish_time__ > from_unixtime(%s) order by __publish_time__", stocksTopic, 0); + log.info("Executing query: {}", query); + res = connection.createStatement().executeQuery(query); + + returnedTimestamps = new LinkedList<>(); + while (res.next()) { + printCurrent(res); + returnedTimestamps.add(res.getTimestamp("__publish_time__")); + } + + assertThat(returnedTimestamps.size()).isEqualTo(timestamps.size()); + + // Try with a predicate that has a latter time than any entry + // Should return no rows + + query = String.format("select * from pulsar" + + ".\"public/default\".%s where __publish_time__ > from_unixtime(%s) order by __publish_time__", stocksTopic, 99999999999L); + log.info("Executing query: {}", query); + res = connection.createStatement().executeQuery(query); + + returnedTimestamps = new LinkedList<>(); + while (res.next()) { + printCurrent(res); + returnedTimestamps.add(res.getTimestamp("__publish_time__")); + } + + assertThat(returnedTimestamps.size()).isEqualTo(0); + } + + @AfterSuite + @Override + public void tearDownCluster() { + super.tearDownCluster(); + } + + public static ContainerExecResult execQuery(final String query) throws Exception { + ContainerExecResult containerExecResult; + + containerExecResult = pulsarCluster.getPrestoWorkerContainer() + .execCmd("/bin/bash", "-c", PulsarCluster.PULSAR_COMMAND_SCRIPT + " sql --execute " + "'" + query + "'"); + + return containerExecResult; + + } + + private static void printCurrent(ResultSet rs) throws SQLException { + ResultSetMetaData rsmd = rs.getMetaData(); + int columnsNumber = rsmd.getColumnCount(); + for (int i = 1; i <= columnsNumber; i++) { + if (i > 1) System.out.print(", "); + String columnValue = rs.getString(i); + System.out.print(columnValue + " " + rsmd.getColumnName(i)); + } + System.out.println(""); + + } + + private void offloadAndDeleteFromBK(boolean isNamespaceOffload, String stocksTopic) { + String adminUrl = pulsarCluster.getHttpServiceUrl(); + try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build()) { + // read managed ledger info, check ledgers exist + long firstLedger = admin.topics().getInternalStats(stocksTopic).ledgers.get(0).ledgerId; + + String output = ""; + + if (isNamespaceOffload) { + pulsarCluster.runAdminCommandOnAnyBroker( + "namespaces", "set-offload-policies", + "--bucket", "pulsar-integtest", + "--driver", "s3", + "--endpoint", "http://" + S3Container.NAME + ":9090", + "--offloadAfterElapsed", "1000", + "public/default"); + + output = pulsarCluster.runAdminCommandOnAnyBroker( + "namespaces", "get-offload-policies").getStdout(); + Assert.assertTrue(output.contains("pulsar-integtest")); + Assert.assertTrue(output.contains("s3")); + } + + // offload with a low threshold + output = pulsarCluster.runAdminCommandOnAnyBroker("topics", + "offload", "--size-threshold", "1M", stocksTopic).getStdout(); + Assert.assertTrue(output.contains("Offload triggered")); + + output = pulsarCluster.runAdminCommandOnAnyBroker("topics", + "offload-status", "-w", stocksTopic).getStdout(); + Assert.assertTrue(output.contains("Offload was a success")); + + // delete the first ledger, so that we cannot possibly read from it + ClientConfiguration bkConf = new ClientConfiguration(); + bkConf.setZkServers(pulsarCluster.getZKConnString()); + try (BookKeeper bk = new BookKeeper(bkConf)) { + bk.deleteLedger(firstLedger); + } catch (Exception e) { + log.error("Failed to delete from BookKeeper.", e); + Assert.fail("Failed to delete from BookKeeper."); + } + + // Unload topic to clear all caches, open handles, etc + admin.topics().unload(stocksTopic); + } catch (Exception e) { + Assert.fail("Failed to deleteOffloadedDataFromBK."); + } + } + + protected Map getEnv() { + Map result = new HashMap<>(); + result.put("managedLedgerMaxEntriesPerLedger", String.valueOf(ENTRIES_PER_LEDGER)); + result.put("managedLedgerMinLedgerRolloverTimeMinutes", "0"); + result.put("managedLedgerOffloadDriver", OFFLOAD_DRIVER); + result.put("s3ManagedLedgerOffloadBucket", BUCKET); + result.put("s3ManagedLedgerOffloadServiceEndpoint", ENDPOINT); + + return result; + } + +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java index 6ffd38af08be9..36175a69b3c3f 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topologies/PulsarCluster.java @@ -328,6 +328,11 @@ public synchronized void stop() { } public void startPrestoWorker() { + startPrestoWorker(null, null); + } + + public void startPrestoWorker(String offloadDriver, String offloadProperties) { + log.info("[startPrestoWorker] offloadDriver: {}, offloadProperties: {}", offloadDriver, offloadProperties); if (null == prestoWorkerContainer) { prestoWorkerContainer = new PrestoWorkerContainer(clusterName, PrestoWorkerContainer.NAME) .withNetwork(network) @@ -337,8 +342,18 @@ public void startPrestoWorker() { .withEnv("zookeeperServers", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT) .withEnv("pulsar.zookeeper-uri", ZKContainer.NAME + ":" + ZKContainer.ZK_PORT) .withEnv("pulsar.broker-service-url", "http://pulsar-broker-0:8080"); + if (offloadDriver != null && offloadProperties != null) { + log.info("[startPrestoWorker] set offload env offloadDriver: {}, offloadProperties: {}", + offloadDriver, offloadProperties); + prestoWorkerContainer.withEnv("PULSAR_PREFIX_pulsar.managed-ledger-offload-driver", offloadDriver); + prestoWorkerContainer.withEnv("PULSAR_PREFIX_pulsar.offloader-properties", offloadProperties); + prestoWorkerContainer.withEnv("PULSAR_PREFIX_pulsar.offloaders-directory", "/pulsar/offloaders"); + // used in s3 tests + prestoWorkerContainer.withEnv("AWS_ACCESS_KEY_ID", "accesskey"); + prestoWorkerContainer.withEnv("AWS_SECRET_KEY", "secretkey"); + } } - log.info("Starting Presto Worker"); + log.info("[startPrestoWorker] Starting Presto Worker"); prestoWorkerContainer.start(); }