Skip to content

Commit

Permalink
Use distributedlog-core-shaded in pulsar worker (apache#257)
Browse files Browse the repository at this point in the history
  • Loading branch information
sijie committed Mar 4, 2018
1 parent 68e1f3c commit 981e472
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 9 deletions.
2 changes: 1 addition & 1 deletion bin/pulsar
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ elif [ $COMMAND == "websocket" ]; then
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.websocket.service.WebSocketServiceStarter $PULSAR_WEBSOCKET_CONF $@
elif [ $COMMAND == "standalone" ]; then
PULSAR_LOG_FILE=${PULSAR_LOG_FILE:-"pulsar-standalone.log"}
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE -Dzookeeper.4lw.commands.whitelist='*' org.apache.pulsar.PulsarStandaloneStarter --config $PULSAR_STANDALONE_CONF $@
exec $JAVA $OPTS -Dpulsar.log.file=$PULSAR_LOG_FILE org.apache.pulsar.PulsarStandaloneStarter --config $PULSAR_STANDALONE_CONF $@
elif [ $COMMAND == "initialize-cluster-metadata" ]; then
exec $JAVA $OPTS org.apache.pulsar.PulsarClusterMetadataSetup $@
elif [ $COMMAND == "zookeeper-shell" ]; then
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ flexible messaging model and an intuitive client API.</description>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<bookkeeper.version>4.7.0-SNAPSHOT</bookkeeper.version>
<zookeeper.version>3.5.3-beta</zookeeper.version>
<zookeeper.version>3.4.10</zookeeper.version>
<netty.version>4.1.21.Final</netty.version>
<storm.version>1.0.5</storm.version>
<jetty.version>9.3.11.v20160721</jetty.version>
Expand Down
5 changes: 5 additions & 0 deletions pulsar-functions/dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
Expand Down
3 changes: 2 additions & 1 deletion pulsar-functions/worker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,10 @@
<version>1.14.6</version>
</dependency>

<!-- use shaded dependency util pulsar bump zookeeper version to 3.5 -->
<dependency>
<groupId>org.apache.distributedlog</groupId>
<artifactId>distributedlog-core</artifactId>
<artifactId>distributedlog-core-shaded</artifactId>
<version>${bookkeeper.version}</version>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.worker;

import dlshade.org.apache.zookeeper.KeeperException.Code;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.AppendOnlyStreamWriter;
import org.apache.distributedlog.DistributedLogConfiguration;
Expand All @@ -43,7 +44,6 @@
import java.net.URI;
import java.net.URL;
import java.util.UUID;
import org.apache.zookeeper.KeeperException.Code;

@Slf4j
public final class Utils {
Expand Down Expand Up @@ -141,7 +141,7 @@ public static void downloadFromBookkeeper(Namespace namespace,
public static DistributedLogConfiguration getDlogConf(WorkerConfig workerConfig) {
int numReplicas = workerConfig.getNumFunctionPackageReplicas();

return new DistributedLogConfiguration()
DistributedLogConfiguration conf = new DistributedLogConfiguration()
.setWriteLockEnabled(false)
.setOutputBufferSize(256 * 1024) // 256k
.setPeriodicFlushFrequencyMilliSeconds(0) // disable periodical flush
Expand All @@ -154,6 +154,9 @@ public static DistributedLogConfiguration getDlogConf(WorkerConfig workerConfig)
.setWriteQuorumSize(numReplicas)
.setAckQuorumSize(numReplicas)
.setUseDaemonThread(true);
conf.setProperty("bkc.allowShadedLedgerManagerFactoryClass", true);
conf.setProperty("bkc.shadedLedgerManagerFactoryClassPrefix", "dlshade.");
return conf;
}

public static URI initializeDlogNamespace(String zkServers, String ledgersRootPath) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ public void start() throws Exception {
ServerConfiguration conf = new ServerConfiguration();
conf.setLedgerManagerFactoryClassName("org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
// Use minimal configuration requiring less memory for unit tests
// conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
conf.setLedgerStorageClass(SortedLedgerStorage.class.getName());
conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
conf.setProperty("dbStorage_writeCacheMaxSizeMb", 2);
conf.setProperty("dbStorage_readAheadCacheMaxSizeMb", 1);
conf.setProperty("dbStorage_rocksDB_writeBufferSizeMB", 1);
Expand All @@ -230,8 +229,7 @@ public void startStandalone() throws Exception {
LOG.debug("Local ZK/BK starting ...");
ServerConfiguration conf = new ServerConfiguration();
conf.setLedgerManagerFactoryClassName("org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory");
// conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
conf.setLedgerStorageClass(SortedLedgerStorage.class.getName());
conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
conf.setDiskUsageThreshold(0.99999f);
conf.setDiskUsageWarnThreshold(0.9999f);
conf.setProperty("dbStorage_writeCacheMaxSizeMb", 256);
Expand Down

0 comments on commit 981e472

Please sign in to comment.