From 1061906b6a2313cfc23ed31eae984ad3b430c33e Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Tue, 20 Feb 2018 15:01:42 -0800 Subject: [PATCH] Use advertisedAddress for bookies in pulsar standalone (#1254) * Use advertisedAddress for bookies in pulsar standalone (#233) * Use advertised address for zookeeper server in standalone as well --- .../apache/pulsar/PulsarStandaloneStarter.java | 11 +++++++---- .../zookeeper/LocalBookkeeperEnsemble.java | 11 +++++++++-- .../zookeeper/LocalBookkeeperEnsembleTest.java | 16 ++++++++++++++++ 3 files changed, 32 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java index 2cdf264aad5e2..314eacf6573d8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java @@ -107,13 +107,12 @@ public PulsarStandaloneStarter(String[] args) throws Exception { this.config = PulsarConfigurationLoader.create((new FileInputStream(configFile)), ServiceConfiguration.class); - // Set ZK server's host to localhost - config.setZookeeperServers("127.0.0.1:" + zkPort); - config.setGlobalZookeeperServers("127.0.0.1:" + zkPort); + String zkServers = "127.0.0.1"; if (advertisedAddress != null) { // Use advertised address from command line config.setAdvertisedAddress(advertisedAddress); + zkServers = advertisedAddress; } else if (isBlank(config.getAdvertisedAddress())) { // Use advertised address as local hostname config.setAdvertisedAddress(ServiceConfigurationUtils.unsafeLocalhostResolve()); @@ -121,6 +120,10 @@ public PulsarStandaloneStarter(String[] args) throws Exception { // Use advertised address from config file } + // Set ZK server's host to localhost + config.setZookeeperServers(zkServers + ":" + zkPort); + config.setGlobalZookeeperServers(zkServers + ":" + zkPort); + Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { try { @@ -148,7 +151,7 @@ void start() throws Exception { if (!onlyBroker) { // Start LocalBookKeeper - bkEnsemble = new LocalBookkeeperEnsemble(numOfBk, zkPort, bkPort, zkDir, bkDir, wipeData); + bkEnsemble = new LocalBookkeeperEnsemble(numOfBk, zkPort, bkPort, zkDir, bkDir, wipeData, config.getAdvertisedAddress()); bkEnsemble.startStandalone(); } diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java index 42a94dca0da27..016e2addccdd8 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsemble.java @@ -69,6 +69,11 @@ public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort) public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort, String zkDataDirName, String bkDataDirName, boolean clearOldData) { + this(numberOfBookies, zkPort, bkBasePort, zkDataDirName, bkDataDirName, clearOldData, null); + } + + public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort, String zkDataDirName, + String bkDataDirName, boolean clearOldData, String advertisedAddress) { this.numberOfBookies = numberOfBookies; this.HOSTPORT = "127.0.0.1:" + zkPort; this.ZooKeeperDefaultPort = zkPort; @@ -76,10 +81,12 @@ public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort, this.zkDataDirName = zkDataDirName; this.bkDataDirName = bkDataDirName; this.clearOldData = clearOldData; - LOG.info("Running " + this.numberOfBookies + " bookie(s)."); + this.advertisedAddress = null == advertisedAddress ? "127.0.0.1" : advertisedAddress; + LOG.info("Running {} bookie(s) and advertised them at {}.", this.numberOfBookies, advertisedAddress); } private final String HOSTPORT; + private final String advertisedAddress; NIOServerCnxnFactory serverFactory; ZooKeeperServer zks; ZooKeeper zkc; @@ -224,7 +231,7 @@ public void startStandalone() throws Exception { conf.setProperty("dbStorage_readAheadCacheMaxSizeMb", 64); conf.setFlushInterval(60000); conf.setProperty("journalMaxGroupWaitMSec", 1L); - conf.setAdvertisedAddress("127.0.0.1"); + conf.setAdvertisedAddress(advertisedAddress); runZookeeper(1000); initializeZookeper(); diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java index bcb9f040f6bee..95735af6f2a29 100644 --- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java +++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/LocalBookkeeperEnsembleTest.java @@ -21,6 +21,7 @@ import java.io.File; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertFalse; import org.testng.annotations.AfterMethod; @@ -41,6 +42,21 @@ void setup() throws Exception { void teardown() throws Exception { } + @Test + void testAdvertisedAddress() throws Exception { + final int numBk = 1; + final int zkPort = PortManager.nextFreePort(); + final int bkPort = PortManager.nextFreePort(); + + LocalBookkeeperEnsemble ensemble = new LocalBookkeeperEnsemble( + numBk, zkPort, bkPort, null, null, true, "127.0.0.2"); + ensemble.startStandalone(); + + assertNotNull(ensemble.getZkClient().exists("/ledgers/available/127.0.0.2:" + bkPort, false)); + + ensemble.stop(); + } + @Test void testStartStop() throws Exception {