diff --git a/conf/broker.conf b/conf/broker.conf index 4ae8522044a79..37b1281c0cdee 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -406,6 +406,14 @@ bookkeeperClientReorderReadSequenceEnabled=false # outside the specified groups will not be used by the broker bookkeeperClientIsolationGroups= +# Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't +# have enough bookie available. +bookkeeperClientSecondaryIsolationGroups= + +# Minimum bookies that should be available as part of bookkeeperClientIsolationGroups +# else broker will include bookkeeperClientSecondaryIsolationGroups bookies in isolated list. +bookkeeperClientMinAvailableBookiesInIsolationGroups= + # Enable/disable having read operations for a ledger to be sticky to a single bookie. # If this flag is enabled, the client will use one single bookie (by preference) to read # all entries for a ledger. diff --git a/conf/standalone.conf b/conf/standalone.conf index a6a26219b0470..7e8f6927a22d8 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -272,6 +272,14 @@ bookkeeperClientReorderReadSequenceEnabled=false # outside the specified groups will not be used by the broker bookkeeperClientIsolationGroups= +# Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't +# have enough bookie available. +bookkeeperClientSecondaryIsolationGroups= + +# Minimum bookies that should be available as part of bookkeeperClientIsolationGroups +# else broker will include bookkeeperClientSecondaryIsolationGroups bookies in isolated list. +bookkeeperClientMinAvailableBookiesInIsolationGroups= + ### --- Managed Ledger --- ### # Number of bookies to use when creating a ledger diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 5209e311e24a2..e59420841a1ca 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -711,6 +711,18 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "Enable bookie isolation by specifying a list of bookie groups to choose from. \n\n" + "Any bookie outside the specified groups will not be used by the broker") private String bookkeeperClientIsolationGroups; + @FieldContext( + category = CATEGORY_STORAGE_BK, + required = false, + doc = "Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't have enough bookie available." + ) + private String bookkeeperClientSecondaryIsolationGroups; + @FieldContext( + category = CATEGORY_STORAGE_BK, + required = false, + doc = "Minimum bookies that should be available as part of bookkeeperClientIsolationGroups \n\n" + + "else broker will include bookkeeperClientSecondaryIsolationGroups bookies in isolated list.") + private int bookkeeperClientMinAvailableBookiesInIsolationGroups = 0; @FieldContext(category = CATEGORY_STORAGE_BK, doc = "Enable/disable having read operations for a ledger to be sticky to " + "a single bookie.\n" + "If this flag is enabled, the client will use one single bookie (by " + diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java index 65207e6c67829..2cf5fda03145b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java @@ -93,6 +93,10 @@ public BookKeeper create(ServiceConfiguration conf, ZooKeeper zkClient) throws I bkConf.setEnsemblePlacementPolicy(ZkIsolatedBookieEnsemblePlacementPolicy.class); bkConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, conf.getBookkeeperClientIsolationGroups()); + bkConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, + conf.getBookkeeperClientSecondaryIsolationGroups()); + bkConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.MIN_AVAILABLE_PRIMARY_ISOLATED_BOOKIE, + conf.getBookkeeperClientMinAvailableBookiesInIsolationGroups()); if (bkConf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE) == null) { ZooKeeperCache zkc = new ZooKeeperCache(zkClient, conf.getZooKeeperOperationTimeoutSeconds()) { }; diff --git a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java index 5db20bbd6ec14..a46dba0d82c3d 100644 --- a/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java +++ b/pulsar-zookeeper-utils/src/main/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicy.java @@ -54,10 +54,16 @@ public class ZkIsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePl private static final Logger LOG = LoggerFactory.getLogger(ZkIsolatedBookieEnsemblePlacementPolicy.class); public static final String ISOLATION_BOOKIE_GROUPS = "isolationBookieGroups"; + // if policy doesn't find min-available bookies in primary-isolationBookieGroups then it uses bookies from + // secondaryIsolationBookieGroups + public static final String MIN_AVAILABLE_PRIMARY_ISOLATED_BOOKIE = "minAvailablePrimaryIsolatedBookies"; + public static final String SECONDARY_ISOLATION_BOOKIE_GROUPS = "secondaryIsolationBookieGroups"; private ZooKeeperCache bookieMappingCache = null; - private final List isolationGroups = new ArrayList(); + private final List primaryIsolationGroups = new ArrayList(); + private final List secondaryIsolationGroups = new ArrayList(); + private int minAvailablePrimaryIsolatedBookies = 0; private final ObjectMapper jsonMapper = ObjectMapperFactory.create(); public ZkIsolatedBookieEnsemblePlacementPolicy() { @@ -72,12 +78,22 @@ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf, String isolationGroupsString = (String) conf.getProperty(ISOLATION_BOOKIE_GROUPS); if (!isolationGroupsString.isEmpty()) { for (String isolationGroup : isolationGroupsString.split(",")) { - isolationGroups.add(isolationGroup); + primaryIsolationGroups.add(isolationGroup); } bookieMappingCache = getAndSetZkCache(conf); } } - + if (conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS) != null) { + String secondaryIsolationGroupsString = (String) conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS); + if (!secondaryIsolationGroupsString.isEmpty()) { + for (String isolationGroup : secondaryIsolationGroupsString.split(",")) { + secondaryIsolationGroups.add(isolationGroup); + } + } + } + minAvailablePrimaryIsolatedBookies = conf.getProperty(MIN_AVAILABLE_PRIMARY_ISOLATED_BOOKIE) != null + ? (int) conf.getProperty(MIN_AVAILABLE_PRIMARY_ISOLATED_BOOKIE) + : 0; return super.initialize(conf, optionalDnsResolver, timer, featureProvider, statsLogger); } @@ -141,8 +157,9 @@ private Set getBlacklistedBookies() { .getData(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, this) .orElseThrow(() -> new KeeperException.NoNodeException( ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH)); - for (String group : allGroupsBookieMapping.keySet()) { - if (!isolationGroups.contains(group)) { + Set allBookies = allGroupsBookieMapping.keySet(); + for (String group : allBookies) { + if (!primaryIsolationGroups.contains(group)) { for (String bookieAddress : allGroupsBookieMapping.get(group).keySet()) { blacklistedBookies.add(new BookieSocketAddress(bookieAddress)); } @@ -150,9 +167,9 @@ private Set getBlacklistedBookies() { } // sometime while doing isolation, user might not want to remove isolated bookies from other default // groups. so, same set of bookies could be overlapped into isolated-group and other default groups. so, - // try to remove those overlapped bookies from excluded-bookie list because ther are also part of + // try to remove those overlapped bookies from excluded-bookie list because they are also part of // isolated-group bookies. - for (String group : isolationGroups) { + for (String group : primaryIsolationGroups) { Map bookieGroup = allGroupsBookieMapping.get(group); if (bookieGroup != null && !bookieGroup.isEmpty()) { for (String bookieAddress : bookieGroup.keySet()) { @@ -160,6 +177,17 @@ private Set getBlacklistedBookies() { } } } + // if primary-isolated-bookies are not enough then add consider secondary isolated bookie group as well. + if ((allBookies.size() - blacklistedBookies.size()) < minAvailablePrimaryIsolatedBookies) { + for (String group : secondaryIsolationGroups) { + Map bookieGroup = allGroupsBookieMapping.get(group); + if (bookieGroup != null && !bookieGroup.isEmpty()) { + for (String bookieAddress : bookieGroup.keySet()) { + blacklistedBookies.remove(new BookieSocketAddress(bookieAddress)); + } + } + } + } } } catch (Exception e) { LOG.warn("Error getting bookie isolation info from zk: {}", e.getMessage()); diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java index da0725df5fca5..8c2576b1a0462 100644 --- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java +++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZkIsolatedBookieEnsemblePlacementPolicyTest.java @@ -44,6 +44,7 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; +import org.junit.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -353,4 +354,101 @@ public void testOverlappedBookies() throws Exception { localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1); } + + @Test + public void testSecondaryIsolationGroupsBookies() throws Exception { + Map> bookieMapping = new HashMap<>(); + Map defaultBookieGroup = new HashMap<>(); + final String isolatedGroup = "primaryGroup"; + final String secondaryIsolatedGroup = "secondaryGroup"; + + defaultBookieGroup.put(BOOKIE1, new BookieInfo("rack0", null)); + defaultBookieGroup.put(BOOKIE2, new BookieInfo("rack1", null)); + defaultBookieGroup.put(BOOKIE3, new BookieInfo("rack1", null)); + defaultBookieGroup.put(BOOKIE4, new BookieInfo("rack1", null)); + defaultBookieGroup.put(BOOKIE5, new BookieInfo("rack1", null)); + + Map primaryIsolatedBookieGroup = new HashMap<>(); + primaryIsolatedBookieGroup.put(BOOKIE1, new BookieInfo("rack1", null)); + + Map secondaryIsolatedBookieGroup = new HashMap<>(); + secondaryIsolatedBookieGroup.put(BOOKIE2, new BookieInfo("rack0", null)); + secondaryIsolatedBookieGroup.put(BOOKIE4, new BookieInfo("rack0", null)); + + bookieMapping.put("default", defaultBookieGroup); + bookieMapping.put(isolatedGroup, primaryIsolatedBookieGroup); + bookieMapping.put(secondaryIsolatedGroup, secondaryIsolatedBookieGroup); + + ZkUtils.createFullPathOptimistic(localZkc, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, + jsonMapper.writeValueAsBytes(bookieMapping), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + Thread.sleep(100); + + ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy(); + ClientConfiguration bkClientConf = new ClientConfiguration(); + bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, 30) { + }); + bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolatedGroup); + bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, secondaryIsolatedGroup); + bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.MIN_AVAILABLE_PRIMARY_ISOLATED_BOOKIE, 2); + isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL, + NullStatsLogger.INSTANCE); + isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies); + + List ensemble = isolationPolicy + .newEnsemble(3, 3, 2, Collections.emptyMap(), new HashSet<>()).getResult(); + assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE1))); + assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2))); + assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE4))); + + localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1); + } + + @Test + public void testSecondaryIsolationGroupsBookiesNegative() throws Exception { + + Map> bookieMapping = new HashMap<>(); + Map defaultBookieGroup = new HashMap<>(); + final String isolatedGroup = "primaryGroup"; + final String secondaryIsolatedGroup = "secondaryGroup"; + + defaultBookieGroup.put(BOOKIE1, new BookieInfo("rack0", null)); + defaultBookieGroup.put(BOOKIE2, new BookieInfo("rack1", null)); + defaultBookieGroup.put(BOOKIE3, new BookieInfo("rack1", null)); + defaultBookieGroup.put(BOOKIE4, new BookieInfo("rack1", null)); + defaultBookieGroup.put(BOOKIE5, new BookieInfo("rack1", null)); + + Map primaryIsolatedBookieGroup = new HashMap<>(); + primaryIsolatedBookieGroup.put(BOOKIE1, new BookieInfo("rack1", null)); + + bookieMapping.put("default", defaultBookieGroup); + bookieMapping.put(isolatedGroup, primaryIsolatedBookieGroup); + + ZkUtils.createFullPathOptimistic(localZkc, ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, + jsonMapper.writeValueAsBytes(bookieMapping), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + Thread.sleep(100); + + ZkIsolatedBookieEnsemblePlacementPolicy isolationPolicy = new ZkIsolatedBookieEnsemblePlacementPolicy(); + ClientConfiguration bkClientConf = new ClientConfiguration(); + bkClientConf.setProperty(ZooKeeperCache.ZK_CACHE_INSTANCE, new ZooKeeperCache(localZkc, 30) { + }); + bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolatedGroup); + bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, + secondaryIsolatedGroup); + bkClientConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.MIN_AVAILABLE_PRIMARY_ISOLATED_BOOKIE, 2); + isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL, + NullStatsLogger.INSTANCE); + isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies); + + try { + List ensemble = isolationPolicy + .newEnsemble(3, 3, 2, Collections.emptyMap(), new HashSet<>()).getResult(); + Assert.fail("Should have thrown BKNotEnoughBookiesException"); + } catch (BKNotEnoughBookiesException ne) { + // Ok.. + } + + localZkc.delete(ZkBookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, -1); + } } diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md index d763e432c52c7..4f040f2af768e 100644 --- a/site2/docs/reference-configuration.md +++ b/site2/docs/reference-configuration.md @@ -170,6 +170,8 @@ Pulsar brokers are responsible for handling incoming messages from producers, di |bookkeeperClientRegionawarePolicyEnabled| Enable region-aware bookie selection policy. BK will chose bookies from different regions and racks when forming a new bookie ensemble. If enabled, the value of bookkeeperClientRackawarePolicyEnabled is ignored |false| |bookkeeperClientReorderReadSequenceEnabled| Enable/disable reordering read sequence on reading entries. |false| |bookkeeperClientIsolationGroups| Enable bookie isolation by specifying a list of bookie groups to choose from. Any bookie outside the specified groups will not be used by the broker || +|bookkeeperClientSecondaryIsolationGroups| Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't have enough bookie available. || +|bookkeeperClientMinAvailableBookiesInIsolationGroups| Minimum bookies that should be available as part of bookkeeperClientIsolationGroups else broker will include bookkeeperClientSecondaryIsolationGroups bookies in isolated list. || |bookkeeperEnableStickyReads | Enable/disable having read operations for a ledger to be sticky to a single bookie. If this flag is enabled, the client will use one single bookie (by preference) to read all entries for a ledger. | true | |managedLedgerDefaultEnsembleSize| Number of bookies to use when creating a ledger |2| |managedLedgerDefaultWriteQuorum| Number of copies to store for each message |2|