From 981e472c8665a2f8cd90dfb721de087fd371db22 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Wed, 28 Feb 2018 18:39:22 -0800 Subject: [PATCH] Use `distributedlog-core-shaded` in pulsar worker (#257) --- bin/pulsar | 2 +- pom.xml | 2 +- pulsar-functions/dist/pom.xml | 5 +++++ pulsar-functions/worker/pom.xml | 3 ++- .../java/org/apache/pulsar/functions/worker/Utils.java | 7 +++++-- .../apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java | 6 ++---- 6 files changed, 16 insertions(+), 9 deletions(-) diff --git a/bin/pulsar b/bin/pulsar index 70c0ca53052e6..ed7e988c8cd77 100755 --- a/bin/pulsar +++ b/bin/pulsar @@ -220,7 +220,7 @@ elif [ $COMMAND == "websocket" ]; then exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.websocket.service.WebSocketServiceStarter $PULSAR_WEBSOCKET_CONF $@ elif [ $COMMAND == "standalone" ]; then PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-standalone.log"} - exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE -Dzookeeper.4lw.commands.whitelist='*' org.apache.pulsar.PulsarStandaloneStarter --config $PULSAR_STANDALONE_CONF $@ + exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarStandaloneStarter --config $PULSAR_STANDALONE_CONF $@ elif [ $COMMAND == "initialize-cluster-metadata" ]; then exec $JAVA $OPTS org.apache.pulsar.PulsarClusterMetadataSetup $@ elif [ $COMMAND == "zookeeper-shell" ]; then diff --git a/pom.xml b/pom.xml index 699c300131e47..c2c10d1d51ad7 100644 --- a/pom.xml +++ b/pom.xml @@ -114,7 +114,7 @@ flexible messaging model and an intuitive client API. UTF-8 4.7.0-SNAPSHOT - 3.5.3-beta + 3.4.10 4.1.21.Final 1.0.5 9.3.11.v20160721 diff --git a/pulsar-functions/dist/pom.xml b/pulsar-functions/dist/pom.xml index 0d199f93c9f1f..8e0a55eefafda 100644 --- a/pulsar-functions/dist/pom.xml +++ b/pulsar-functions/dist/pom.xml @@ -54,6 +54,11 @@ provided + + io.netty + netty-all + + org.apache.logging.log4j log4j-slf4j-impl diff --git a/pulsar-functions/worker/pom.xml b/pulsar-functions/worker/pom.xml index a54ce06efbf57..b653304d4dda5 100644 --- a/pulsar-functions/worker/pom.xml +++ b/pulsar-functions/worker/pom.xml @@ -120,9 +120,10 @@ 1.14.6 + org.apache.distributedlog - distributedlog-core + distributedlog-core-shaded ${bookkeeper.version} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java index 303855858b94d..25e0ab0314e74 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.functions.worker; +import dlshade.org.apache.zookeeper.KeeperException.Code; import lombok.extern.slf4j.Slf4j; import org.apache.distributedlog.AppendOnlyStreamWriter; import org.apache.distributedlog.DistributedLogConfiguration; @@ -43,7 +44,6 @@ import java.net.URI; import java.net.URL; import java.util.UUID; -import org.apache.zookeeper.KeeperException.Code; @Slf4j public final class Utils { @@ -141,7 +141,7 @@ public static void downloadFromBookkeeper(Namespace namespace, public static DistributedLogConfiguration getDlogConf(WorkerConfig workerConfig) { int numReplicas = workerConfig.getNumFunctionPackageReplicas(); - return new DistributedLogConfiguration() + DistributedLogConfiguration conf = new DistributedLogConfiguration() .setWriteLockEnabled(false) .setOutputBufferSize(256 * 1024) // 256k .setPeriodicFlushFrequencyMilliSeconds(0) // disable periodical flush @@ -154,6 +154,9 @@ public static DistributedLogConfiguration getDlogConf(WorkerConfig workerConfig) .setWriteQuorumSize(numReplicas) .setAckQuorumSize(numReplicas) .setUseDaemonThread(true); + conf.setProperty("bkc.allowShadedLedgerManagerFactoryClass", true); + conf.setProperty("bkc.shadedLedgerManagerFactoryClassPrefix", "dlshade."); + return conf; } public static URI initializeDlogNamespace(String zkServers, String ledgersRootPath) throws IOException { diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java index 3944934d0912a..ed639c98d7a48 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java @@ -212,8 +212,7 @@ public void start() throws Exception { ServerConfiguration conf = new ServerConfiguration(); conf.setLedgerManagerFactoryClassName("org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory"); // Use minimal configuration requiring less memory for unit tests - // conf.setLedgerStorageClass(DbLedgerStorage.class.getName()); - conf.setLedgerStorageClass(SortedLedgerStorage.class.getName()); + conf.setLedgerStorageClass(DbLedgerStorage.class.getName()); conf.setProperty("dbStorage_writeCacheMaxSizeMb", 2); conf.setProperty("dbStorage_readAheadCacheMaxSizeMb", 1); conf.setProperty("dbStorage_rocksDB_writeBufferSizeMB", 1); @@ -230,8 +229,7 @@ public void startStandalone() throws Exception { LOG.debug("Local ZK/BK starting ..."); ServerConfiguration conf = new ServerConfiguration(); conf.setLedgerManagerFactoryClassName("org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory"); - // conf.setLedgerStorageClass(DbLedgerStorage.class.getName()); - conf.setLedgerStorageClass(SortedLedgerStorage.class.getName()); + conf.setLedgerStorageClass(DbLedgerStorage.class.getName()); conf.setDiskUsageThreshold(0.99999f); conf.setDiskUsageWarnThreshold(0.9999f); conf.setProperty("dbStorage_writeCacheMaxSizeMb", 256);