Skip to content

Commit

Permalink
PIP-45: Converted BookieRackAffinityMapping to use MetadataStore (apa…
Browse files Browse the repository at this point in the history
…che#12841)

* PIP-45: Converted BookieRackAffinityMapping to use MetadataStore

* WIP

* Only initialize bookieMappingCache if we have an isolation group

* Added back compatibility class wrappers
  • Loading branch information
merlimat authored Dec 1, 2021
1 parent f965fb8 commit d7158dd
Show file tree
Hide file tree
Showing 31 changed files with 373 additions and 521 deletions.
6 changes: 6 additions & 0 deletions pulsar-broker-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-jackson</artifactId>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,69 +16,67 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.zookeeper;

import com.fasterxml.jackson.databind.ObjectMapper;
package org.apache.pulsar.bookie.rackawareness;

import java.net.InetAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RackChangeNotifier;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.AbstractDNSToSwitchMapping;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieNode;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.commons.configuration.Configuration;
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Notification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase.getZKServersFromServiceUri;

/**
* It provides the mapping of bookies to its rack from zookeeper.
*/
public class ZkBookieRackAffinityMapping extends AbstractDNSToSwitchMapping
implements ZooKeeperCacheListener<BookiesRackConfiguration>, RackChangeNotifier {
private static final Logger LOG = LoggerFactory.getLogger(ZkBookieRackAffinityMapping.class);
public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
implements RackChangeNotifier {
private static final Logger LOG = LoggerFactory.getLogger(BookieRackAffinityMapping.class);

public static final String BOOKIE_INFO_ROOT_PATH = "/bookies";
public static final String ZK_DATA_CACHE_BK_RACK_CONF_INSTANCE = "zk_data_cache_bk_rack_conf_instance";
private static final String ZK_LEDGERS_DEFAULT_ROOT_PATH = "/ledgers";
public static final String METADATA_STORE_INSTANCE = "METADATA_STORE_INSTANCE";
private static final String LEDGERS_DEFAULT_ROOT_PATH = "/ledgers";

private ZooKeeperDataCache<BookiesRackConfiguration> bookieMappingCache = null;
private MetadataCache<BookiesRackConfiguration> bookieMappingCache = null;
private ITopologyAwareEnsemblePlacementPolicy<BookieNode> rackawarePolicy = null;

private static final ObjectMapper jsonMapper = ObjectMapperFactory.create();

private volatile BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration();
private volatile Map<String, BookieInfo> bookieInfoMap = new HashMap<>();

@Override
public void setConf(Configuration conf) {
super.setConf(conf);
if (conf.getProperty(ZK_DATA_CACHE_BK_RACK_CONF_INSTANCE) != null) {
bookieMappingCache = (ZooKeeperDataCache<BookiesRackConfiguration>) conf.getProperty(ZK_DATA_CACHE_BK_RACK_CONF_INSTANCE);
bookieMappingCache.registerListener(this);
} else {
bookieMappingCache = getAndSetZkCache(conf);
conf.setProperty(ZK_DATA_CACHE_BK_RACK_CONF_INSTANCE, bookieMappingCache);
Object storeProperty = conf.getProperty(METADATA_STORE_INSTANCE);
if (storeProperty == null) {
throw new RuntimeException(METADATA_STORE_INSTANCE + " configuration was not set in the BK client configuration");
}

if (!(storeProperty instanceof MetadataStore)) {
throw new RuntimeException(METADATA_STORE_INSTANCE + " is not an instance of MetadataStore");
}

MetadataStore store = (MetadataStore) storeProperty;

bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).join();
store.registerListener(this::handleUpdates);

// A previous version of this code tried to eagerly load the cache. However, this is invalid
// in later versions of bookkeeper as when setConf is called, the bookieAddressResolver is not yet set
}
Expand Down Expand Up @@ -122,62 +120,6 @@ private void updateRacksWithHost(BookiesRackConfiguration racks) {
bookieInfoMap = newBookieInfoMap;
}

private ZooKeeperDataCache<BookiesRackConfiguration> getAndSetZkCache(Configuration conf) {
ZooKeeperCache zkCache = null;
if (conf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE) != null) {
zkCache = (ZooKeeperCache) conf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE);
} else {
if (conf instanceof ClientConfiguration) {
int zkTimeout = ((ClientConfiguration) conf).getZkTimeout();
try {
final String metadataServiceUriStr = ((ClientConfiguration) conf).getMetadataServiceUri();
URI metadataServiceUri = URI.create(metadataServiceUriStr);
String ledgersRootPath = metadataServiceUri.getPath();
String zkServers;
if (ZK_LEDGERS_DEFAULT_ROOT_PATH.equals(ledgersRootPath)) {
zkServers = getZKServersFromServiceUri(metadataServiceUri);
} else {
int zkLedgerRootIndex = ledgersRootPath.lastIndexOf("/");
zkServers = getZKServersFromServiceUri(metadataServiceUri) + ledgersRootPath.substring(0, zkLedgerRootIndex);
}
ZooKeeper zkClient = ZooKeeperClient.newBuilder().connectString(zkServers)
.sessionTimeoutMs(zkTimeout).build();
zkCache = new ZooKeeperCache("bookies-racks", zkClient,
(int) TimeUnit.MILLISECONDS.toSeconds(zkTimeout)) {
};
conf.addProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, zkCache);
} catch (Exception e) {
LOG.error("Error creating zookeeper client", e);
}
} else {
LOG.error("No zk configurations available");
}
}
ZooKeeperDataCache<BookiesRackConfiguration> zkDataCache = getZkBookieRackMappingCache(
zkCache);
zkDataCache.registerListener(this);
return zkDataCache;
}

private ZooKeeperDataCache<BookiesRackConfiguration> getZkBookieRackMappingCache(
ZooKeeperCache zkCache) {
return new ZooKeeperDataCache<BookiesRackConfiguration>(
zkCache) {

@Override
public BookiesRackConfiguration deserialize(String key, byte[] content)
throws Exception {
LOG.info("Reloading the bookie rack affinity mapping cache.");
if (LOG.isDebugEnabled()) {
LOG.debug("Loading the bookie mappings with bookie info data: {}", new String(content));
}
BookiesRackConfiguration racks = jsonMapper.readValue(content, BookiesRackConfiguration.class);
return racks;
}

};
}

@Override
public List<String> resolve(List<String> bookieAddressList) {
List<String> racks = new ArrayList<>(bookieAddressList.size());
Expand All @@ -190,7 +132,11 @@ public List<String> resolve(List<String> bookieAddressList) {
private String getRack(String bookieAddress) {
try {
// Trigger load of z-node in case it didn't exist
Optional<BookiesRackConfiguration> racks = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH);
CompletableFuture<Optional<BookiesRackConfiguration>> future =
bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH);

Optional<BookiesRackConfiguration> racks = (future.isDone() && !future.isCompletedExceptionally())
? future.join() : Optional.empty();
updateRacksWithHost(racks.orElseGet(BookiesRackConfiguration::new));
if (!racks.isPresent()) {
// since different placement policy will have different default rack,
Expand Down Expand Up @@ -235,17 +181,24 @@ public void reloadCachedMappings() {
// no-op
}

@Override
public void onUpdate(String path, BookiesRackConfiguration data, Stat stat) {
private void handleUpdates(Notification n) {
if (!n.getPath().equals(BOOKIE_INFO_ROOT_PATH)) {
return;
}

if (rackawarePolicy != null) {
LOG.info("Bookie rack info updated to {}. Notifying rackaware policy.", data.toString());
List<BookieId> bookieAddressList = new ArrayList<>();
for (Map<String, BookieInfo> bookieMapping : data.values()) {
for (String addr : bookieMapping.keySet()) {
bookieAddressList.add(BookieId.parse(addr));
}
}
rackawarePolicy.onBookieRackChange(bookieAddressList);
bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
.thenAccept(optVal -> {
LOG.info("Bookie rack info updated to {}. Notifying rackaware policy.", optVal);
List<BookieId> bookieAddressList = new ArrayList<>();
for (Map<String, BookieInfo> bookieMapping : optVal.map(Map::values).orElse(
Collections.emptyList())) {
for (String addr : bookieMapping.keySet()) {
bookieAddressList.add(BookieId.parse(addr));
}
}
rackawarePolicy.onBookieRackChange(bookieAddressList);
});
}
}

Expand Down
Loading

0 comments on commit d7158dd

Please sign in to comment.