Skip to content

Commit

Permalink
PIP-117: Change Pulsar standalone defaults (apache#15478)
Browse files Browse the repository at this point in the history
* PIP-117: Change Pulsar standalone defaults

* Fixed config for cpp tests

* Allow to force ZK use in standalone

* Fixed checkstyle

* Fixed worker configuration

* fixed python state test

* Fixed import order

* Fixed extra semicolon

* Fixed misplaces semicolon

* Fixed config file test

* Avoid flaky test in ProducerTest.cc
  • Loading branch information
merlimat authored May 24, 2022
1 parent 6b40749 commit bf194b5
Show file tree
Hide file tree
Showing 18 changed files with 254 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.sasl.SaslConstants;
import org.apache.pulsar.common.util.DirectMemoryUtils;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;

/**
Expand Down Expand Up @@ -2740,6 +2741,13 @@ public String getMetadataStoreUrl() {
}
}

/**
* Tells whether the selected metadata store implementation is based on ZooKeeper.
*/
public boolean isMetadataStoreBackedByZookeeper() {
return MetadataStoreFactory.isBasedOnZookeeper(getMetadataStoreUrl());
}

public String getConfigurationMetadataStoreUrl() {
if (StringUtils.isNotBlank(configurationMetadataStoreUrl)) {
return configurationMetadataStoreUrl;
Expand Down
6 changes: 6 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,12 @@
<artifactId>jetcd-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-package-filesystem-storage</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
150 changes: 116 additions & 34 deletions pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandalone.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,34 +27,45 @@
import java.nio.file.Paths;
import java.util.Collections;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.TenantResources;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.functions.instance.state.PulsarMetadataStateStoreProviderImpl;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.service.WorkerServiceLoader;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.bookkeeper.BKCluster;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.packages.management.storage.filesystem.FileSystemPackagesStorageProvider;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Slf4j
public class PulsarStandalone implements AutoCloseable {

private static final Logger log = LoggerFactory.getLogger(PulsarStandalone.class);
private static final String PULSAR_STANDALONE_USE_ZOOKEEPER = "PULSAR_STANDALONE_USE_ZOOKEEPER";

PulsarService broker;
PulsarAdmin admin;

// This is used in compatibility mode
LocalBookkeeperEnsemble bkEnsemble;

// This is used from Pulsar 2.11 on, with new default settings
BKCluster bkCluster;
MetadataStoreExtended metadataStore;

ServiceConfiguration config;
WorkerService fnWorkerService;
WorkerConfig workerConfig;
Expand All @@ -63,10 +74,6 @@ public void setBroker(PulsarService broker) {
this.broker = broker;
}

public void setAdmin(PulsarAdmin admin) {
this.admin = admin;
}

public void setBkEnsemble(LocalBookkeeperEnsemble bkEnsemble) {
this.bkEnsemble = bkEnsemble;
}
Expand Down Expand Up @@ -212,13 +219,20 @@ public boolean isHelp() {
@Parameter(names = { "--num-bookies" }, description = "Number of local Bookies")
private int numOfBk = 1;

@Parameter(names = { "--zookeeper-port" }, description = "Local zookeeper's port")
@Parameter(names = { "--metadata-dir" },
description = "Directory for storing metadata")
private String metadataDir = "data/metadata";

@Parameter(names = {"--zookeeper-port"}, description = "Local zookeeper's port",
hidden = true)
private int zkPort = 2181;

@Parameter(names = { "--bookkeeper-port" }, description = "Local bookies base port")
private int bkPort = 3181;

@Parameter(names = { "--zookeeper-dir" }, description = "Local zooKeeper's data directory")
@Parameter(names = { "--zookeeper-dir" },
description = "Local zooKeeper's data directory",
hidden = true)
private String zkDir = "data/standalone/zookeeper";

@Parameter(names = { "--bookkeeper-dir" }, description = "Local bookies base data directory")
Expand Down Expand Up @@ -249,7 +263,23 @@ public boolean isHelp() {
@Parameter(names = { "-h", "--help" }, description = "Show this help message")
private boolean help = false;

private boolean usingNewDefaultsPIP117;

public void start() throws Exception {
String forceUseZookeeperEnv = System.getenv(PULSAR_STANDALONE_USE_ZOOKEEPER);

// Allow forcing to use ZK mode via an env variable. eg:
// PULSAR_STANDALONE_USE_ZOOKEEPER=1
if (StringUtils.equalsAnyIgnoreCase(forceUseZookeeperEnv, "1", "true")) {
usingNewDefaultsPIP117 = false;
log.info("Forcing to chose ZooKeeper metadata through environment variable");
} else if (Paths.get(zkDir).toFile().exists()) {
log.info("Found existing ZooKeeper metadata. Continuing with ZooKeeper");
usingNewDefaultsPIP117 = false;
} else {
// There's no existing ZK data directory, or we're already using RocksDB for metadata
usingNewDefaultsPIP117 = true;
}

if (config == null) {
log.error("Failed to load configuration");
Expand All @@ -259,14 +289,11 @@ public void start() throws Exception {
log.debug("--- setup PulsarStandaloneStarter ---");

if (!this.isOnlyBroker()) {
ServerConfiguration bkServerConf = new ServerConfiguration();
bkServerConf.loadConf(new File(configFile).toURI().toURL());

// Start LocalBookKeeper
bkEnsemble = new LocalBookkeeperEnsemble(
this.getNumOfBk(), this.getZkPort(), this.getBkPort(), this.getStreamStoragePort(), this.getZkDir(),
this.getBkDir(), this.isWipeData(), "127.0.0.1");
bkEnsemble.startStandalone(bkServerConf, !this.isNoStreamStorage());
if (usingNewDefaultsPIP117) {
startBookieWithRocksDB();
} else {
startBookieWithZookeeper();
}
}

if (this.isNoBroker()) {
Expand All @@ -277,33 +304,48 @@ public void start() throws Exception {
if (!this.isNoFunctionsWorker()) {
workerConfig = PulsarService.initializeWorkerConfigFromBrokerConfig(
config, this.getFnWorkerConfigFile());
// worker talks to local broker
if (this.isNoStreamStorage()) {
// only set the state storage service url when state is enabled.
workerConfig.setStateStorageServiceUrl(null);
} else if (workerConfig.getStateStorageServiceUrl() == null) {
workerConfig.setStateStorageServiceUrl("bk://127.0.0.1:" + this.getStreamStoragePort());
if (usingNewDefaultsPIP117) {
workerConfig.setStateStorageProviderImplementation(
PulsarMetadataStateStoreProviderImpl.class.getName());

config.setEnablePackagesManagement(true);
config.setFunctionsWorkerEnablePackageManagement(true);
workerConfig.setFunctionsWorkerEnablePackageManagement(true);
config.setPackagesManagementStorageProvider(FileSystemPackagesStorageProvider.class.getName());
} else {
// worker talks to local broker
if (this.isNoStreamStorage()) {
// only set the state storage service url when state is enabled.
workerConfig.setStateStorageServiceUrl(null);
} else if (workerConfig.getStateStorageServiceUrl() == null) {
workerConfig.setStateStorageServiceUrl("bk://127.0.0.1:" + this.getStreamStoragePort());
}
}
fnWorkerService = WorkerServiceLoader.load(workerConfig);
} else {
workerConfig = new WorkerConfig();
}

config.setRunningStandalone(true);

if (!usingNewDefaultsPIP117) {
final String metadataStoreUrl =
ZKMetadataStore.ZK_SCHEME_IDENTIFIER + "localhost:" + this.getZkPort();
config.setMetadataStoreUrl(metadataStoreUrl);
config.setConfigurationMetadataStoreUrl(metadataStoreUrl);
config.getProperties().setProperty("metadataStoreUrl", metadataStoreUrl);
config.getProperties().setProperty("configurationMetadataStoreUrl", metadataStoreUrl);
}

// Start Broker
broker = new PulsarService(config,
workerConfig,
Optional.ofNullable(fnWorkerService),
(exitCode) -> {
log.info("Halting standalone process with code {}", exitCode);
LogManager.shutdown();
Runtime.getRuntime().halt(exitCode);
});
workerConfig,
Optional.ofNullable(fnWorkerService),
PulsarStandalone::processTerminator);
broker.start();

final String cluster = config.getClusterName();

admin = broker.getAdminClient();

//create default namespace
createNameSpace(cluster, TopicName.PUBLIC_TENANT,
NamespaceName.get(TopicName.PUBLIC_TENANT, TopicName.DEFAULT_NAMESPACE));
Expand Down Expand Up @@ -380,11 +422,51 @@ public void close() {
broker.close();
}

if (bkCluster != null) {
bkCluster.close();
}

if (bkEnsemble != null) {
bkEnsemble.stop();
}
} catch (Exception e) {
log.error("Shutdown failed: {}", e.getMessage(), e);
}
}


private void startBookieWithRocksDB() throws Exception {
log.info("Starting BK with RocksDb metadata store");
String metadataStoreUrl = "rocksdb://" + Paths.get(metadataDir).toAbsolutePath();
bkCluster = BKCluster.builder()
.metadataServiceUri(metadataStoreUrl)
.bkPort(bkPort)
.numBookies(numOfBk)
.dataDir(bkDir)
.build();
config.setBookkeeperNumberOfChannelsPerBookie(1);
config.setMetadataStoreUrl(metadataStoreUrl);
}

private void startBookieWithZookeeper() throws Exception {
log.info("Starting BK & ZK cluster");
ServerConfiguration bkServerConf = new ServerConfiguration();
bkServerConf.loadConf(new File(configFile).toURI().toURL());

// Start LocalBookKeeper
bkEnsemble = new LocalBookkeeperEnsemble(
this.getNumOfBk(), this.getZkPort(), this.getBkPort(), this.getStreamStoragePort(), this.getZkDir(),
this.getBkDir(), this.isWipeData(), "127.0.0.1");
bkEnsemble.startStandalone(bkServerConf, !this.isNoStreamStorage());

config.setZookeeperServers("127.0.0.1:" + zkPort);
}

private static void processTerminator(int exitCode) {
log.info("Halting standalone process with code {}", exitCode);
LogManager.shutdown();
Runtime.getRuntime().halt(exitCode);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.commons.lang3.StringUtils.isBlank;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;

public final class PulsarStandaloneBuilder {

Expand Down Expand Up @@ -101,24 +100,17 @@ public PulsarStandalone build() {
ServiceConfiguration config = new ServiceConfiguration();
config.setClusterName("standalone");
pulsarStandalone.setConfig(config);
String zkServers = "127.0.0.1";

if (pulsarStandalone.getAdvertisedAddress() != null) {
// Use advertised address from command line
pulsarStandalone.getConfig().setAdvertisedAddress(pulsarStandalone.getAdvertisedAddress());
zkServers = pulsarStandalone.getAdvertisedAddress();
} else if (isBlank(pulsarStandalone.getConfig().getAdvertisedAddress())) {
// Use advertised address as local hostname
pulsarStandalone.getConfig().setAdvertisedAddress(ServiceConfigurationUtils.unsafeLocalhostResolve());
} else {
// Use advertised address from config file
}

// Set ZK server's host to localhost
final String metadataStoreUrl =
ZKMetadataStore.ZK_SCHEME_IDENTIFIER + zkServers + ":" + pulsarStandalone.getZkPort();
pulsarStandalone.getConfig().setMetadataStoreUrl(metadataStoreUrl);
pulsarStandalone.getConfig().setConfigurationMetadataStoreUrl(metadataStoreUrl);
pulsarStandalone.getConfig().setRunningStandalone(true);
return pulsarStandalone;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,18 @@
import com.beust.jcommander.Parameter;
import java.io.FileInputStream;
import java.util.Arrays;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.CmdGenerateDocs;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Slf4j
public class PulsarStandaloneStarter extends PulsarStandalone {
@Parameter(names = {"-g", "--generate-docs"}, description = "Generate docs")
private boolean generateDocs = false;

private static final Logger log = LoggerFactory.getLogger(PulsarStandaloneStarter.class);

public PulsarStandaloneStarter(String[] args) throws Exception {

JCommander jcommander = new JCommander();
Expand Down Expand Up @@ -71,12 +68,9 @@ public PulsarStandaloneStarter(String[] args) throws Exception {
inputStream, ServiceConfiguration.class);
}

String zkServers = "127.0.0.1";

if (this.getAdvertisedAddress() != null) {
// Use advertised address from command line
config.setAdvertisedAddress(this.getAdvertisedAddress());
zkServers = this.getAdvertisedAddress();
} else if (isBlank(config.getAdvertisedAddress()) && isBlank(config.getAdvertisedListeners())) {
// Use advertised address as local hostname
config.setAdvertisedAddress("localhost");
Expand All @@ -101,14 +95,6 @@ public PulsarStandaloneStarter(String[] args) throws Exception {
}
}
}
final String metadataStoreUrl =
ZKMetadataStore.ZK_SCHEME_IDENTIFIER + zkServers + ":" + this.getZkPort();
config.setMetadataStoreUrl(metadataStoreUrl);
config.setConfigurationMetadataStoreUrl(metadataStoreUrl);

config.setRunningStandalone(true);
config.getProperties().setProperty("metadataStoreUrl", metadataStoreUrl);
config.getProperties().setProperty("configurationMetadataStoreUrl", metadataStoreUrl);

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ private EmbeddedPulsarCluster(int numBrokers, int numBookies, String metadataSto
this.numBrokers = numBrokers;
this.numBookies = numBookies;
this.metadataStoreUrl = metadataStoreUrl;
this.bkCluster = new BKCluster(metadataStoreUrl, numBookies);
this.bkCluster = BKCluster.builder()
.metadataServiceUri(metadataStoreUrl)
.numBookies(numBookies)
.build();

for (int i = 0; i < numBrokers; i++) {
PulsarService s = new PulsarService(getConf());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public void testConfigFileDefaults() throws Exception {
try (FileInputStream stream = new FileInputStream("../conf/broker.conf")) {
final ServiceConfiguration javaConfig = PulsarConfigurationLoader.create(new Properties(), ServiceConfiguration.class);
final ServiceConfiguration fileConfig = PulsarConfigurationLoader.create(stream, ServiceConfiguration.class);
List<String> toSkip = Arrays.asList("properties", "class");
List<String> toSkip = Arrays.asList("properties", "class", "metadataStoreBackedByZookeeper");
for (PropertyDescriptor pd : Introspector.getBeanInfo(ServiceConfiguration.class).getPropertyDescriptors()) {
if (pd.getReadMethod() == null || toSkip.contains(pd.getName())) {
continue;
Expand Down
1 change: 0 additions & 1 deletion pulsar-client-cpp/pulsar-test-service-start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ $PULSAR_DIR/bin/pulsar tokens create \
export PULSAR_STANDALONE_CONF=$SRC_DIR/pulsar-client-cpp/test-conf/standalone-ssl.conf
$PULSAR_DIR/bin/pulsar-daemon start standalone \
--no-functions-worker --no-stream-storage \
--zookeeper-dir $DATA_DIR/zookeeper \
--bookkeeper-dir $DATA_DIR/bookkeeper

echo "-- Wait for Pulsar service to be ready"
Expand Down
Loading

0 comments on commit bf194b5

Please sign in to comment.