Skip to content

Commit

Permalink
Make stadnalone use setMetadataStoreUrl (apache#14384)
Browse files Browse the repository at this point in the history
  • Loading branch information
gaozhangmin authored Mar 1, 2022
1 parent 2125e4f commit e20f34b
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 27 deletions.
18 changes: 14 additions & 4 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@

### --- General broker settings --- ###

# Zookeeper quorum connection string
zookeeperServers=
# The metadata store URL
# Examples:
# * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181
# * my-zk-1:2181,my-zk-2:2181,my-zk-3:2181 (will default to ZooKeeper when the schema is not specified)
# * zk:my-zk-1:2181,my-zk-2:2181,my-zk-3:2181/my-chroot-path (to add a ZK chroot path)
metadataStoreUrl=

# Configuration Store connection string
configurationStoreServers=
# The metadata store URL for the configuration data. If empty, we fall back to use metadataStoreUrl
configurationMetadataStoreUrl=

brokerServicePort=6650

Expand Down Expand Up @@ -1076,3 +1080,9 @@ zooKeeperOperationTimeoutSeconds=-1
# ZooKeeper cache expiry time in seconds
# Deprecated: use metadataStoreCacheExpirySeconds
zooKeeperCacheExpirySeconds=-1

# Zookeeper quorum connection string
zookeeperServers=

# Configuration Store connection string
configurationStoreServers=
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.commons.lang3.StringUtils.isBlank;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;

public final class PulsarStandaloneBuilder {

Expand Down Expand Up @@ -114,8 +115,10 @@ public PulsarStandalone build() {
}

// Set ZK server's host to localhost
pulsarStandalone.getConfig().setZookeeperServers(zkServers + ":" + pulsarStandalone.getZkPort());
pulsarStandalone.getConfig().setConfigurationStoreServers(zkServers + ":" + pulsarStandalone.getZkPort());
final String metadataStoreUrl =
ZKMetadataStore.ZK_SCHEME_IDENTIFIER + zkServers + ":" + pulsarStandalone.getZkPort();
pulsarStandalone.getConfig().setMetadataStoreUrl(metadataStoreUrl);
pulsarStandalone.getConfig().setConfigurationMetadataStoreUrl(metadataStoreUrl);
pulsarStandalone.getConfig().setRunningStandalone(true);
return pulsarStandalone;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.CmdGenerateDocs;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -100,8 +101,10 @@ public PulsarStandaloneStarter(String[] args) throws Exception {
}
}
}
config.setZookeeperServers(zkServers + ":" + this.getZkPort());
config.setConfigurationStoreServers(zkServers + ":" + this.getZkPort());
final String metadataStoreUrl =
ZKMetadataStore.ZK_SCHEME_IDENTIFIER + zkServers + ":" + this.getZkPort();
config.setMetadataStoreUrl(metadataStoreUrl);
config.setConfigurationMetadataStoreUrl(metadataStoreUrl);

config.setRunningStandalone(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
@ToString
public class InternalConfigurationData {

private String zookeeperServers;
private String configurationStoreServers;
private String metadataStoreUrl;
private String configurationMetadataStoreUrl;
@Deprecated
private String ledgersRootPath;
private String bookkeeperMetadataServiceUri;
Expand All @@ -38,23 +38,23 @@ public InternalConfigurationData() {
}

public InternalConfigurationData(String zookeeperServers,
String configurationStoreServers,
String configurationMetadataStoreUrl,
String ledgersRootPath,
String bookkeeperMetadataServiceUri,
String stateStorageServiceUrl) {
this.zookeeperServers = zookeeperServers;
this.configurationStoreServers = configurationStoreServers;
this.metadataStoreUrl = zookeeperServers;
this.configurationMetadataStoreUrl = configurationMetadataStoreUrl;
this.ledgersRootPath = ledgersRootPath;
this.bookkeeperMetadataServiceUri = bookkeeperMetadataServiceUri;
this.stateStorageServiceUrl = stateStorageServiceUrl;
}

public String getZookeeperServers() {
return zookeeperServers;
public String getMetadataStoreUrl() {
return metadataStoreUrl;
}

public String getConfigurationStoreServers() {
return configurationStoreServers;
public String getConfigurationMetadataStoreUrl() {
return configurationMetadataStoreUrl;
}

/** @deprecated */
Expand All @@ -77,17 +77,17 @@ public boolean equals(Object obj) {
return false;
}
InternalConfigurationData other = (InternalConfigurationData) obj;
return Objects.equals(zookeeperServers, other.zookeeperServers)
&& Objects.equals(configurationStoreServers, other.configurationStoreServers)
return Objects.equals(metadataStoreUrl, other.metadataStoreUrl)
&& Objects.equals(configurationMetadataStoreUrl, other.configurationMetadataStoreUrl)
&& Objects.equals(ledgersRootPath, other.ledgersRootPath)
&& Objects.equals(bookkeeperMetadataServiceUri, other.bookkeeperMetadataServiceUri)
&& Objects.equals(stateStorageServiceUrl, other.stateStorageServiceUrl);
}

@Override
public int hashCode() {
return Objects.hash(zookeeperServers,
configurationStoreServers,
return Objects.hash(metadataStoreUrl,
configurationMetadataStoreUrl,
ledgersRootPath,
bookkeeperMetadataServiceUri,
stateStorageServiceUrl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.functions.worker;

import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
import static org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl.removeIdentifierFromMetadataURL;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -272,13 +273,14 @@ private static URI initializeStandaloneWorkerService(PulsarClientCreator clientC
URI dlogURI;
try {
if (workerConfig.isInitializedDlogMetadata()) {
dlogURI = WorkerUtils.newDlogNamespaceURI(internalConf.getZookeeperServers());
String metadataStoreUrl = removeIdentifierFromMetadataURL(internalConf.getMetadataStoreUrl());
dlogURI = WorkerUtils.newDlogNamespaceURI(metadataStoreUrl);
} else {
dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
}
} catch (IOException ioe) {
log.error("Failed to initialize dlog namespace with zookeeper {} at metadata service uri {} for storing "
+ "function packages", internalConf.getZookeeperServers(),
+ "function packages", internalConf.getMetadataStoreUrl(),
internalConf.getBookkeeperMetadataServiceUri(), ioe);
throw ioe;
}
Expand Down Expand Up @@ -355,14 +357,15 @@ public void initInBroker(ServiceConfiguration brokerConfig,
try {
// initializing dlog namespace for function worker
if (workerConfig.isInitializedDlogMetadata()) {
dlogURI = WorkerUtils.newDlogNamespaceURI(internalConf.getZookeeperServers());
String metadataStoreUrl = removeIdentifierFromMetadataURL(internalConf.getMetadataStoreUrl());
dlogURI = WorkerUtils.newDlogNamespaceURI(metadataStoreUrl);
} else {
dlogURI = WorkerUtils.initializeDlogNamespace(internalConf);
}
} catch (IOException ioe) {
LOG.error("Failed to initialize dlog namespace with zookeeper {} at at metadata service uri {} for "
+ "storing function packages",
internalConf.getZookeeperServers(), internalConf.getBookkeeperMetadataServiceUri(), ioe);
internalConf.getMetadataStoreUrl(), internalConf.getBookkeeperMetadataServiceUri(), ioe);
throw ioe;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl.removeIdentifierFromMetadataURL;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
Expand Down Expand Up @@ -173,7 +174,7 @@ public static URI initializeDlogNamespace(InternalConfigurationData internalConf
// for BC purposes
if (internalConf.getBookkeeperMetadataServiceUri() == null) {
ledgersRootPath = internalConf.getLedgersRootPath();
ledgersStoreServers = internalConf.getZookeeperServers();
ledgersStoreServers = removeIdentifierFromMetadataURL(internalConf.getMetadataStoreUrl());
chrootPath = "";
} else {
URI metadataServiceUri = URI.create(internalConf.getBookkeeperMetadataServiceUri());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
public class BookKeeperPackagesStorage implements PackagesStorage {

private static final String NS_CLIENT_ID = "packages-management";
public static final String ZK_SCHEME_IDENTIFIER = "zk:";
final BookKeeperPackagesStorageConfiguration configuration;
private Namespace namespace;

Expand Down Expand Up @@ -92,7 +93,14 @@ private URI initializeDlogNamespace() throws IOException {
ledgersRootPath = metadataServiceUri.getPath();
} else {
ledgersRootPath = configuration.getPackagesManagementLedgerRootPath();
ledgersStoreServers = configuration.getZookeeperServers();
if (StringUtils.isNotBlank(configuration.getMetadataStoreUrl())) {
ledgersStoreServers = configuration.getMetadataStoreUrl();
if (ledgersStoreServers.startsWith(ZK_SCHEME_IDENTIFIER)) {
ledgersStoreServers = ledgersStoreServers.substring(ZK_SCHEME_IDENTIFIER.length());
}
} else {
ledgersStoreServers = configuration.getZookeeperServers();
}
}
BKDLConfig bkdlConfig = new BKDLConfig(ledgersStoreServers, ledgersRootPath);
DLMetadata dlMetadata = DLMetadata.create(bkdlConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ String getZookeeperServers() {
return getProperty("zookeeperServers");
}

String getMetadataStoreUrl() {
return getProperty("metadataStoreUrl");
}

String getPackagesManagementLedgerRootPath() {
return getProperty("packagesManagementLedgerRootPath");
}
Expand Down

0 comments on commit e20f34b

Please sign in to comment.