Skip to content

Commit

Permalink
Fix failed initialized METADATA_STORE_SCHEME (apache#14708)
Browse files Browse the repository at this point in the history
Co-authored-by: gavingaozhangmin <[email protected]>
  • Loading branch information
gaozhangmin and gavingaozhangmin authored Mar 18, 2022
1 parent 601fbdd commit 0a38fce
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.bookie.rackawareness;

import static org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.METADATA_STORE_SCHEME;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -31,6 +32,8 @@
import java.util.concurrent.ExecutionException;
import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RackChangeNotifier;
import org.apache.bookkeeper.meta.exceptions.Code;
import org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.bookkeeper.net.AbstractDNSToSwitchMapping;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieNode;
Expand All @@ -42,7 +45,10 @@
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -63,24 +69,54 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
private volatile BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration();
private volatile Map<String, BookieInfo> bookieInfoMap = new HashMap<>();

@Override
public void setConf(Configuration conf) {
super.setConf(conf);
public static MetadataStore createMetadataStore(Configuration conf) throws MetadataException {
MetadataStore store;
Object storeProperty = conf.getProperty(METADATA_STORE_INSTANCE);
if (storeProperty == null) {
throw new RuntimeException(METADATA_STORE_INSTANCE + " configuration was not set in the BK client "
+ "configuration");
}

if (!(storeProperty instanceof MetadataStore)) {
throw new RuntimeException(METADATA_STORE_INSTANCE + " is not an instance of MetadataStore");
if (storeProperty != null) {
if (!(storeProperty instanceof MetadataStore)) {
throw new RuntimeException(METADATA_STORE_INSTANCE + " is not an instance of MetadataStore");
}
store = (MetadataStore) storeProperty;
} else {
String url;
String metadataServiceUri = (String) conf.getProperty("metadataServiceUri");
if (StringUtils.isNotBlank(metadataServiceUri)) {
try {
url = metadataServiceUri.replaceFirst(METADATA_STORE_SCHEME + ":", "")
.replace(";", ",");
} catch (Exception e) {
throw new MetadataException(Code.METADATA_SERVICE_ERROR, e);
}
} else {
String zkServers = (String) conf.getProperty("zkServers");
if (StringUtils.isBlank(zkServers)) {
String errorMsg = String.format("Neither %s configuration set in the BK client configuration nor "
+ "metadataServiceUri/zkServers set in bk server configuration", METADATA_STORE_INSTANCE);
throw new RuntimeException(errorMsg);
}
url = zkServers;
}
try {
int zkTimeout = Integer.parseInt((String) conf.getProperty("zkTimeout"));
store = MetadataStoreExtended.create(url,
MetadataStoreConfig.builder()
.sessionTimeoutMillis(zkTimeout)
.build());
} catch (MetadataStoreException e) {
throw new MetadataException(Code.METADATA_SERVICE_ERROR, e);
}
}
return store;
}

MetadataStore store = (MetadataStore) storeProperty;

bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).join();
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
MetadataStore store;
try {
store = createMetadataStore(conf);
bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).join();
for (Map<String, BookieInfo> bookieMapping : bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
.map(Map::values).orElse(Collections.emptyList())) {
for (String address : bookieMapping.keySet()) {
Expand All @@ -91,7 +127,7 @@ public void setConf(Configuration conf) {
bookieAddressListLastTime);
}
}
} catch (InterruptedException | ExecutionException e) {
} catch (InterruptedException | ExecutionException | MetadataException e) {
throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list");
}
store.registerListener(this::handleUpdates);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.bookie.rackawareness;

import static org.apache.pulsar.bookie.rackawareness.BookieRackAffinityMapping.METADATA_STORE_INSTANCE;
import io.netty.util.HashedWheelTimer;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -35,6 +36,7 @@
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicyImpl;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.proto.BookieAddressResolver;
Expand Down Expand Up @@ -68,20 +70,12 @@ public IsolatedBookieEnsemblePlacementPolicy() {
public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf,
Optional<DNSToSwitchMapping> optionalDnsResolver, HashedWheelTimer timer, FeatureProvider featureProvider,
StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) {

Object storeProperty = conf.getProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE);
if (storeProperty == null) {
throw new RuntimeException(BookieRackAffinityMapping.METADATA_STORE_INSTANCE
+ " configuration was not set in the BK client configuration");
}

if (!(storeProperty instanceof MetadataStore)) {
throw new RuntimeException(
BookieRackAffinityMapping.METADATA_STORE_INSTANCE + " is not an instance of MetadataStore");
MetadataStore store;
try {
store = BookieRackAffinityMapping.createMetadataStore(conf);
} catch (MetadataException e) {
throw new RuntimeException(METADATA_STORE_INSTANCE + " failed initialized");
}

MetadataStore store = (MetadataStore) storeProperty;

Set<String> primaryIsolationGroups = new HashSet<>();
Set<String> secondaryIsolationGroups = new HashSet<>();
if (conf.getProperty(ISOLATION_BOOKIE_GROUPS) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

public abstract class AbstractMetadataDriver implements Closeable {

protected static final String METADATA_STORE_SCHEME = "metadata-store";
public static final String METADATA_STORE_SCHEME = "metadata-store";

public static final String METADATA_STORE_INSTANCE = "metadata-store-instance";

Expand Down

0 comments on commit 0a38fce

Please sign in to comment.