Skip to content

Commit

Permalink
set bookieMappingCache into conf (apache#8844)
Browse files Browse the repository at this point in the history
### Motivation
When the value of "/bookies" in ZooKeeper is changed, Brokers are notified of the change.
After the notification to Brokers, only one of the two BookKeeper clients that Brokers have seems to reflect the change.

**Two BookKeeper clients that Brokers have**
- https://github.com/apache/pulsar/blob/ac0c6e41f0ebe3c900bb31e41c8d40b3f60b19df/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java#L81
- https://github.com/apache/pulsar/blob/102fa9de03509b86e47f58ab8e1c0dde2095da3b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/schema/BookkeeperSchemaStorage.java#L105

The client of `BookkeeperSchemaStorage` don't seems to reflect the change.
The cause is that `ZkBookieRackAffinityMapping#onUpdate` don't run.

I confirmed that I change ZkBookieRackAffinityMapping instances to use same `ZooKeeperDataCache` and `ZkBookieRackAffinityMapping#onUpdate` works.

### Modification
- Set the argument of `ZkBookieRackAffinityMapping#setConf` to `bookieMappingCache`
- Move `updateRacksWithHost()`
  • Loading branch information
k2la authored Dec 8, 2020
1 parent d2510ad commit ceeabfc
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class ZkBookieRackAffinityMapping extends AbstractDNSToSwitchMapping
private static final Logger LOG = LoggerFactory.getLogger(ZkBookieRackAffinityMapping.class);

public static final String BOOKIE_INFO_ROOT_PATH = "/bookies";
public static final String ZK_DATA_CACHE_BK_RACK_CONF_INSTANCE = "zk_data_cache_bk_rack_conf_instance";

private ZooKeeperDataCache<BookiesRackConfiguration> bookieMappingCache = null;
private ITopologyAwareEnsemblePlacementPolicy<BookieNode> rackawarePolicy = null;
Expand All @@ -66,7 +67,13 @@ public class ZkBookieRackAffinityMapping extends AbstractDNSToSwitchMapping
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
bookieMappingCache = getAndSetZkCache(conf);
if (conf.getProperty(ZK_DATA_CACHE_BK_RACK_CONF_INSTANCE) != null) {
bookieMappingCache = (ZooKeeperDataCache<BookiesRackConfiguration>) conf.getProperty(ZK_DATA_CACHE_BK_RACK_CONF_INSTANCE);
bookieMappingCache.registerListener(this);
} else {
bookieMappingCache = getAndSetZkCache(conf);
conf.setProperty(ZK_DATA_CACHE_BK_RACK_CONF_INSTANCE, bookieMappingCache);
}

try {
BookiesRackConfiguration racks = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).orElse(new BookiesRackConfiguration());
Expand Down Expand Up @@ -152,7 +159,6 @@ public BookiesRackConfiguration deserialize(String key, byte[] content)
LOG.debug("Loading the bookie mappings with bookie info data: {}", new String(content));
}
BookiesRackConfiguration racks = jsonMapper.readValue(content, BookiesRackConfiguration.class);
updateRacksWithHost(racks);
return racks;
}

Expand All @@ -172,6 +178,7 @@ private String getRack(String bookieAddress) {
try {
// Trigger load of z-node in case it didn't exist
Optional<BookiesRackConfiguration> racks = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH);
updateRacksWithHost(racks.orElse(new BookiesRackConfiguration()));
if (!racks.isPresent()) {
// since different placement policy will have different default rack,
// don't be smart here and just return null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.zookeeper;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -78,12 +80,14 @@ public void testBasic() throws Exception {
ClientConfiguration bkClientConf1 = new ClientConfiguration();
bkClientConf1.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache("test", localZkc, 30) {
});
assertNull(bkClientConf1.getProperty(ZkBookieRackAffinityMapping.ZK_DATA_CACHE_BK_RACK_CONF_INSTANCE));
mapping1.setConf(bkClientConf1);
List<String> racks1 = mapping1
.resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()));
assertEquals(racks1.get(0), "/rack0");
assertEquals(racks1.get(1), "/rack1");
assertEquals(racks1.get(2), null);
assertNotNull(bkClientConf1.getProperty(ZkBookieRackAffinityMapping.ZK_DATA_CACHE_BK_RACK_CONF_INSTANCE));

// Case 2: ZkServers and ZkTimeout are given (ZKCache will be constructed in
// ZkBookieRackAffinityMapping#setConf)
Expand Down

0 comments on commit ceeabfc

Please sign in to comment.