Skip to content

Commit

Permalink
[pulsar-broker] add support of secondary bookie-isolation-group (apac…
Browse files Browse the repository at this point in the history
…he#4261)

* [pulsar-broker] add support of secondary bookie-isolation-group

* fix docs

* add negative test

* Fix tests
  • Loading branch information
rdhabalia authored and merlimat committed May 18, 2019
1 parent 78e794a commit 91d4495
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 7 deletions.
8 changes: 8 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,14 @@ bookkeeperClientReorderReadSequenceEnabled=false
# outside the specified groups will not be used by the broker
bookkeeperClientIsolationGroups=

# Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't
# have enough bookie available.
bookkeeperClientSecondaryIsolationGroups=

# Minimum bookies that should be available as part of bookkeeperClientIsolationGroups
# else broker will include bookkeeperClientSecondaryIsolationGroups bookies in isolated list.
bookkeeperClientMinAvailableBookiesInIsolationGroups=

# Enable/disable having read operations for a ledger to be sticky to a single bookie.
# If this flag is enabled, the client will use one single bookie (by preference) to read
# all entries for a ledger.
Expand Down
8 changes: 8 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,14 @@ bookkeeperClientReorderReadSequenceEnabled=false
# outside the specified groups will not be used by the broker
bookkeeperClientIsolationGroups=

# Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't
# have enough bookie available.
bookkeeperClientSecondaryIsolationGroups=

# Minimum bookies that should be available as part of bookkeeperClientIsolationGroups
# else broker will include bookkeeperClientSecondaryIsolationGroups bookies in isolated list.
bookkeeperClientMinAvailableBookiesInIsolationGroups=

### --- Managed Ledger --- ###

# Number of bookies to use when creating a ledger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,18 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Enable bookie isolation by specifying a list of bookie groups to choose from. \n\n"
+ "Any bookie outside the specified groups will not be used by the broker")
private String bookkeeperClientIsolationGroups;
@FieldContext(
category = CATEGORY_STORAGE_BK,
required = false,
doc = "Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't have enough bookie available."
)
private String bookkeeperClientSecondaryIsolationGroups;
@FieldContext(
category = CATEGORY_STORAGE_BK,
required = false,
doc = "Minimum bookies that should be available as part of bookkeeperClientIsolationGroups \n\n"
+ "else broker will include bookkeeperClientSecondaryIsolationGroups bookies in isolated list.")
private int bookkeeperClientMinAvailableBookiesInIsolationGroups = 0;
@FieldContext(category = CATEGORY_STORAGE_BK, doc = "Enable/disable having read operations for a ledger to be sticky to "
+ "a single bookie.\n" +
"If this flag is enabled, the client will use one single bookie (by " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws I
bkConf.setEnsemblePlacementPolicy(ZkIsolatedBookieEnsemblePlacementPolicy.class);
bkConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
conf.getBookkeeperClientIsolationGroups());
bkConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
conf.getBookkeeperClientSecondaryIsolationGroups());
bkConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.MIN_AVAILABLE_PRIMARY_ISOLATED_BOOKIE,
conf.getBookkeeperClientMinAvailableBookiesInIsolationGroups());
if (bkConf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE) == null) {
ZooKeeperCache zkc = new ZooKeeperCache(zkClient, conf.getZooKeeperOperationTimeoutSeconds()) {
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,16 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl
private static final Logger LOG = LoggerFactory.getLogger(ZkIsolatedBookieEnsemblePlacementPolicy.class);

public static final String ISOLATION_BOOKIE_GROUPS = "isolationBookieGroups";
// if policy doesn't find min-available bookies in primary-isolationBookieGroups then it uses bookies from
// secondaryIsolationBookieGroups
public static final String MIN_AVAILABLE_PRIMARY_ISOLATED_BOOKIE = "minAvailablePrimaryIsolatedBookies";
public static final String SECONDARY_ISOLATION_BOOKIE_GROUPS = "secondaryIsolationBookieGroups";

private ZooKeeperCache bookieMappingCache = null;

private final List<String> isolationGroups = new ArrayList<String>();
private final List<String> primaryIsolationGroups = new ArrayList<String>();
private final List<String> secondaryIsolationGroups = new ArrayList<String>();
private int minAvailablePrimaryIsolatedBookies = 0;
private final ObjectMapper jsonMapper = ObjectMapperFactory.create();

public ZkIsolatedBookieEnsemblePlacementPolicy() {
Expand All @@ -72,12 +78,22 @@ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf,
String isolationGroupsString = (String) conf.getProperty(ISOLATION_BOOKIE_GROUPS);
if (!isolationGroupsString.isEmpty()) {
for (String isolationGroup : isolationGroupsString.split(",")) {
isolationGroups.add(isolationGroup);
primaryIsolationGroups.add(isolationGroup);
}
bookieMappingCache = getAndSetZkCache(conf);
}
}

if (conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS) != null) {
String secondaryIsolationGroupsString = (String) conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS);
if (!secondaryIsolationGroupsString.isEmpty()) {
for (String isolationGroup : secondaryIsolationGroupsString.split(",")) {
secondaryIsolationGroups.add(isolationGroup);
}
}
}
minAvailablePrimaryIsolatedBookies = conf.getProperty(MIN_AVAILABLE_PRIMARY_ISOLATED_BOOKIE) != null
? (int) conf.getProperty(MIN_AVAILABLE_PRIMARY_ISOLATED_BOOKIE)
: 0;
return super.initialize(conf, optionalDnsResolver, timer, featureProvider, statsLogger);
}

Expand Down Expand Up @@ -141,25 +157,37 @@ private Set<BookieSocketAddress> getBlacklistedBookies() {
.getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, this)
.orElseThrow(() -> new KeeperException.NoNodeException(
ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH));
for (String group : allGroupsBookieMapping.keySet()) {
if (!isolationGroups.contains(group)) {
Set<String> allBookies = allGroupsBookieMapping.keySet();
for (String group : allBookies) {
if (!primaryIsolationGroups.contains(group)) {
for (String bookieAddress : allGroupsBookieMapping.get(group).keySet()) {
blacklistedBookies.add(new BookieSocketAddress(bookieAddress));
}
}
}
// sometime while doing isolation, user might not want to remove isolated bookies from other default
// groups. so, same set of bookies could be overlapped into isolated-group and other default groups. so,
// try to remove those overlapped bookies from excluded-bookie list because ther are also part of
// try to remove those overlapped bookies from excluded-bookie list because they are also part of
// isolated-group bookies.
for (String group : isolationGroups) {
for (String group : primaryIsolationGroups) {
Map<String, BookieInfo> bookieGroup = allGroupsBookieMapping.get(group);
if (bookieGroup != null && !bookieGroup.isEmpty()) {
for (String bookieAddress : bookieGroup.keySet()) {
blacklistedBookies.remove(new BookieSocketAddress(bookieAddress));
}
}
}
// if primary-isolated-bookies are not enough then add consider secondary isolated bookie group as well.
if ((allBookies.size() - blacklistedBookies.size()) < minAvailablePrimaryIsolatedBookies) {
for (String group : secondaryIsolationGroups) {
Map<String, BookieInfo> bookieGroup = allGroupsBookieMapping.get(group);
if (bookieGroup != null && !bookieGroup.isEmpty()) {
for (String bookieAddress : bookieGroup.keySet()) {
blacklistedBookies.remove(new BookieSocketAddress(bookieAddress));
}
}
}
}
}
} catch (Exception e) {
LOG.warn("Error getting bookie isolation info from zk: {}", e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -353,4 +354,101 @@ public void testOverlappedBookies() throws Exception {

localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1);
}

@Test
public void testSecondaryIsolationGroupsBookies() throws Exception {
Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
Map<String, BookieInfo> defaultBookieGroup = new HashMap<>();
final String isolatedGroup = "primaryGroup";
final String secondaryIsolatedGroup = "secondaryGroup";

defaultBookieGroup.put(BOOKIE1, new BookieInfo("rack0", null));
defaultBookieGroup.put(BOOKIE2, new BookieInfo("rack1", null));
defaultBookieGroup.put(BOOKIE3, new BookieInfo("rack1", null));
defaultBookieGroup.put(BOOKIE4, new BookieInfo("rack1", null));
defaultBookieGroup.put(BOOKIE5, new BookieInfo("rack1", null));

Map<String, BookieInfo> primaryIsolatedBookieGroup = new HashMap<>();
primaryIsolatedBookieGroup.put(BOOKIE1, new BookieInfo("rack1", null));

Map<String, BookieInfo> secondaryIsolatedBookieGroup = new HashMap<>();
secondaryIsolatedBookieGroup.put(BOOKIE2, new BookieInfo("rack0", null));
secondaryIsolatedBookieGroup.put(BOOKIE4, new BookieInfo("rack0", null));

bookieMapping.put("default", defaultBookieGroup);
bookieMapping.put(isolatedGroup, primaryIsolatedBookieGroup);
bookieMapping.put(secondaryIsolatedGroup, secondaryIsolatedBookieGroup);

ZkUtils.createFullPathOptimistic(localZkc, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

Thread.sleep(100);

ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, 30) {
});
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolatedGroup);
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, secondaryIsolatedGroup);
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.MIN_AVAILABLE_PRIMARY_ISOLATED_BOOKIE, 2);
isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL,
NullStatsLogger.INSTANCE);
isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);

List<BookieSocketAddress> ensemble = isolationPolicy
.newEnsemble(3, 3, 2, Collections.emptyMap(), new HashSet<>()).getResult();
assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE1)));
assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2)));
assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE4)));

localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1);
}

@Test
public void testSecondaryIsolationGroupsBookiesNegative() throws Exception {

Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
Map<String, BookieInfo> defaultBookieGroup = new HashMap<>();
final String isolatedGroup = "primaryGroup";
final String secondaryIsolatedGroup = "secondaryGroup";

defaultBookieGroup.put(BOOKIE1, new BookieInfo("rack0", null));
defaultBookieGroup.put(BOOKIE2, new BookieInfo("rack1", null));
defaultBookieGroup.put(BOOKIE3, new BookieInfo("rack1", null));
defaultBookieGroup.put(BOOKIE4, new BookieInfo("rack1", null));
defaultBookieGroup.put(BOOKIE5, new BookieInfo("rack1", null));

Map<String, BookieInfo> primaryIsolatedBookieGroup = new HashMap<>();
primaryIsolatedBookieGroup.put(BOOKIE1, new BookieInfo("rack1", null));

bookieMapping.put("default", defaultBookieGroup);
bookieMapping.put(isolatedGroup, primaryIsolatedBookieGroup);

ZkUtils.createFullPathOptimistic(localZkc, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH,
jsonMapper.writeValueAsBytes(bookieMapping), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

Thread.sleep(100);

ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, 30) {
});
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolatedGroup);
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
secondaryIsolatedGroup);
bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.MIN_AVAILABLE_PRIMARY_ISOLATED_BOOKIE, 2);
isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL,
NullStatsLogger.INSTANCE);
isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);

try {
List<BookieSocketAddress> ensemble = isolationPolicy
.newEnsemble(3, 3, 2, Collections.emptyMap(), new HashSet<>()).getResult();
Assert.fail("Should have thrown BKNotEnoughBookiesException");
} catch (BKNotEnoughBookiesException ne) {
// Ok..
}

localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1);
}
}
2 changes: 2 additions & 0 deletions site2/docs/reference-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
|bookkeeperClientRegionawarePolicyEnabled| Enable region-aware bookie selection policy. BK will chose bookies from different regions and racks when forming a new bookie ensemble. If enabled, the value of bookkeeperClientRackawarePolicyEnabled is ignored |false|
|bookkeeperClientReorderReadSequenceEnabled| Enable/disable reordering read sequence on reading entries. |false|
|bookkeeperClientIsolationGroups| Enable bookie isolation by specifying a list of bookie groups to choose from. Any bookie outside the specified groups will not be used by the broker ||
|bookkeeperClientSecondaryIsolationGroups| Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't have enough bookie available. ||
|bookkeeperClientMinAvailableBookiesInIsolationGroups| Minimum bookies that should be available as part of bookkeeperClientIsolationGroups else broker will include bookkeeperClientSecondaryIsolationGroups bookies in isolated list. ||
|bookkeeperEnableStickyReads | Enable/disable having read operations for a ledger to be sticky to a single bookie. If this flag is enabled, the client will use one single bookie (by preference) to read all entries for a ledger. | true |
|managedLedgerDefaultEnsembleSize| Number of bookies to use when creating a ledger |2|
|managedLedgerDefaultWriteQuorum| Number of copies to store for each message |2|
Expand Down

0 comments on commit 91d4495

Please sign in to comment.