From 5ebaafd4288c69d135b295ab563c8ad56affa15b Mon Sep 17 00:00:00 2001 From: gaozhangmin Date: Mon, 21 Mar 2022 17:25:07 +0800 Subject: [PATCH] [feature][broker] Strict bookie affinity group strategy (#12025) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Motivation 1、Suppose We have three bookies, `b1, b2, b3`, b1 and b2 are in region `test-region`, b3 doesn't belongs to any region. Namespace `public/test` has set-affinity-group `test-region`. When produce msg to topic under ` public/test` the bookie b3 will also be selected as ensemble, I think it's a violation of isolation. 2、namespace public/default doesn't set any affinity-group, when produce msg to topic under ` public/default`, the bookie b1 and b2 which have been isolated will be selected as ensemble. I think it's also a violation of isolation. ### Modifications ignore the bookies which have not been divided into any region, when using `ZkIsolatedBookieEnsemblePlacementPolicy.java` --- conf/broker.conf | 4 +- ...IsolatedBookieEnsemblePlacementPolicy.java | 58 +++- .../pulsar/broker/ServiceConfiguration.java | 3 + .../broker/resources/BaseResources.java | 1 + .../resources/LocalPoliciesResources.java | 2 - .../broker/resources/NamespaceResources.java | 9 + ...atedBookieEnsemblePlacementPolicyTest.java | 36 ++- .../pulsar/broker/service/BrokerService.java | 80 ++++- .../service/persistent/PersistentTopic.java | 4 + .../service/BrokerBookieIsolationTest.java | 286 ++++++++++++++++++ site2/docs/reference-configuration.md | 1 + 11 files changed, 460 insertions(+), 24 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index c7d20cfae2b23..a373dc4650e69 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1386,6 +1386,8 @@ packagesManagementLedgerRootPath=/ledgers ### --- Packages management service configuration variables (end) --- ### +#enable or disable strict bookie affinity +strictBookieAffinityEnabled=false ### --- Deprecated settings --- ### @@ -1426,4 +1428,4 @@ tlsEnabled=false # Enable Key_Shared subscription (default is enabled) # @deprecated since 2.8.0 subscriptionTypesEnabled is preferred over subscriptionKeySharedEnable. -subscriptionKeySharedEnable=true \ No newline at end of file +subscriptionKeySharedEnable=true diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java index c086e2c4e5e46..f56481576943e 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicy.java @@ -23,7 +23,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -61,6 +60,8 @@ public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlac private MetadataCache bookieMappingCache; + private static final String PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP = "*"; + public IsolatedBookieEnsemblePlacementPolicy() { super(); @@ -84,11 +85,10 @@ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf, for (String isolationGroup : isolationGroupsString.split(",")) { primaryIsolationGroups.add(isolationGroup); } - - // Only add the bookieMappingCache if we have defined an isolation group - bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class); - bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).join(); } + // Only add the bookieMappingCache if we have defined an isolation group + bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class); + bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).join(); } if (conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS) != null) { String secondaryIsolationGroupsString = castToString(conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS)); @@ -121,7 +121,30 @@ private static String castToString(Object obj) { public PlacementResult> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize, Map customMetadata, Set excludeBookies) throws BKNotEnoughBookiesException { - Map> isolationGroup = new HashMap<>(); + if (customMetadata.containsKey(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG)) { + try { + EnsemblePlacementPolicyConfig policy = EnsemblePlacementPolicyConfig + .decode(customMetadata.get(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG)); + Map policyProperties = policy.getProperties(); + String isolationBookieGroups = + (String) policyProperties.get(ISOLATION_BOOKIE_GROUPS); + String secondaryIsolationBookieGroups = + (String) policyProperties.get(SECONDARY_ISOLATION_BOOKIE_GROUPS); + Set primaryIsolationGroups = new HashSet<>(); + Set secondaryIsolationGroups = new HashSet<>(); + if (isolationBookieGroups != null) { + primaryIsolationGroups.addAll(Arrays.asList(isolationBookieGroups.split(","))); + } + if (secondaryIsolationBookieGroups != null) { + secondaryIsolationGroups.addAll(Arrays.asList(secondaryIsolationBookieGroups.split(","))); + } + defaultIsolationGroups = ImmutablePair.of(primaryIsolationGroups, secondaryIsolationGroups); + } catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) { + log.error("Failed to decode EnsemblePlacementPolicyConfig from customeMetadata when choosing ensemble, " + + "Will use defaultIsolationGroups instead"); + } + } + Set blacklistedBookies = getBlacklistedBookiesWithIsolationGroups( ensembleSize, defaultIsolationGroups); if (excludeBookies == null) { @@ -194,6 +217,9 @@ private static Pair, Set> getIsolationGroup( private Set getBlacklistedBookiesWithIsolationGroups(int ensembleSize, Pair, Set> isolationGroups) { Set blacklistedBookies = new HashSet<>(); + if (isolationGroups != null && isolationGroups.getLeft().contains(PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP)) { + return blacklistedBookies; + } try { if (bookieMappingCache != null) { CompletableFuture> future = @@ -211,6 +237,7 @@ private Set getBlacklistedBookiesWithIsolationGroups(int ensembleSize, int totalAvailableBookiesInPrimaryGroup = 0; Set primaryIsolationGroup = Collections.emptySet(); Set secondaryIsolationGroup = Collections.emptySet(); + Set primaryGroupBookies = new HashSet<>(); if (isolationGroups != null) { primaryIsolationGroup = isolationGroups.getLeft(); secondaryIsolationGroup = isolationGroups.getRight(); @@ -225,9 +252,16 @@ private Set getBlacklistedBookiesWithIsolationGroups(int ensembleSize, for (String groupBookie : bookiesInGroup) { totalAvailableBookiesInPrimaryGroup += knownBookies .containsKey(BookieId.parse(groupBookie)) ? 1 : 0; + primaryGroupBookies.add(BookieId.parse(groupBookie)); } } } + + Set otherGroupBookies = new HashSet<>(blacklistedBookies); + Set nonRegionBookies = new HashSet<>(knownBookies.keySet()); + nonRegionBookies.removeAll(primaryGroupBookies); + blacklistedBookies.addAll(nonRegionBookies); + // sometime while doing isolation, user might not want to remove isolated bookies from other default // groups. so, same set of bookies could be overlapped into isolated-group and other default groups. so, // try to remove those overlapped bookies from excluded-bookie list because they are also part of @@ -241,6 +275,7 @@ private Set getBlacklistedBookiesWithIsolationGroups(int ensembleSize, } } // if primary-isolated-bookies are not enough then add consider secondary isolated bookie group as well. + int totalAvailableBookiesFromPrimaryAndSecondary = totalAvailableBookiesInPrimaryGroup; if (totalAvailableBookiesInPrimaryGroup < ensembleSize) { log.info( "Not found enough available-bookies from primary isolation group [{}], checking secondary " @@ -250,10 +285,21 @@ private Set getBlacklistedBookiesWithIsolationGroups(int ensembleSize, if (bookieGroup != null && !bookieGroup.isEmpty()) { for (String bookieAddress : bookieGroup.keySet()) { blacklistedBookies.remove(BookieId.parse(bookieAddress)); + totalAvailableBookiesFromPrimaryAndSecondary += 1; } } } } + if (totalAvailableBookiesFromPrimaryAndSecondary < ensembleSize) { + log.info( + "Not found enough available-bookies from primary isolation group [{}] and secondary " + + "isolation group [{}], checking from non-region bookies", + primaryIsolationGroup, secondaryIsolationGroup); + nonRegionBookies.removeAll(otherGroupBookies); + for (BookieId bookie: nonRegionBookies) { + blacklistedBookies.remove(bookie); + } + } } } catch (Exception e) { log.warn("Error getting bookie isolation info from metadata store: {}", e.getMessage()); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index df64e052a153b..3148f085b7fdb 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1250,6 +1250,9 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private boolean enableNamespaceIsolationUpdateOnTime = false; + @FieldContext(category = CATEGORY_SERVER, doc = "Enable or disable strict bookie affinity.") + private boolean strictBookieAffinityEnabled = false; + /***** --- TLS. --- ****/ @FieldContext( category = CATEGORY_TLS, diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java index bc670c53a8c8f..acfcd08d74160 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/BaseResources.java @@ -43,6 +43,7 @@ public class BaseResources { protected static final String BASE_POLICIES_PATH = "/admin/policies"; protected static final String BASE_CLUSTERS_PATH = "/admin/clusters"; + protected static final String LOCAL_POLICIES_ROOT = "/admin/local-policies"; @Getter private final MetadataStore store; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java index f83d2bed19964..6432bfbac7835 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/LocalPoliciesResources.java @@ -33,8 +33,6 @@ public class LocalPoliciesResources extends BaseResources { - private static final String LOCAL_POLICIES_ROOT = "/admin/local-policies"; - public LocalPoliciesResources(MetadataStore localStore, int operationTimeoutSec) { super(localStore, LocalPolicies.class, operationTimeoutSec); } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java index 74d4bcf6c472e..3c35f32f36c1d 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/NamespaceResources.java @@ -143,6 +143,11 @@ public static boolean pathIsFromNamespace(String path) { && path.substring(BASE_POLICIES_PATH.length() + 1).contains("/"); } + public static boolean pathIsNamespaceLocalPolicies(String path) { + return path.startsWith(LOCAL_POLICIES_ROOT + "/") + && path.substring(LOCAL_POLICIES_ROOT.length() + 1).contains("/"); + } + // clear resource of `/namespace/{namespaceName}` for zk-node public CompletableFuture deleteNamespaceAsync(NamespaceName ns) { final String namespacePath = joinPath(NAMESPACE_BASE_PATH, ns.toString()); @@ -159,6 +164,10 @@ public static NamespaceName namespaceFromPath(String path) { return NamespaceName.get(path.substring(BASE_POLICIES_PATH.length() + 1)); } + public static NamespaceName namespaceFromLocalPoliciesPath(String path) { + return NamespaceName.get(path.substring(LOCAL_POLICIES_ROOT.length() + 1)); + } + public static class IsolationPolicyResources extends BaseResources> { private static final String NAMESPACE_ISOLATION_POLICIES = "namespaceIsolationPolicies"; diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java index 052e3e84527c2..87fac391bb61e 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/IsolatedBookieEnsemblePlacementPolicyTest.java @@ -47,6 +47,7 @@ import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.MetadataStoreFactory; import org.awaitility.Awaitility; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -87,6 +88,32 @@ void teardown() throws Exception { timer.stop(); } + @Test + public void testNonRegionBookie() throws Exception { + Map> bookieMapping = new HashMap<>(); + Map mainBookieGroup = new HashMap<>(); + + mainBookieGroup.put(BOOKIE1, BookieInfo.builder().rack("rack0").build()); + mainBookieGroup.put(BOOKIE2, BookieInfo.builder().rack("rack1").build()); + + bookieMapping.put("group1", mainBookieGroup); + + + store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping), + Optional.empty()).join(); + + IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new IsolatedBookieEnsemblePlacementPolicy(); + ClientConfiguration bkClientConf = new ClientConfiguration(); + bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store); + bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolationGroups); + isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER); + isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies); + + List ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(), new HashSet<>()).getResult(); + assertFalse(ensemble.contains(new BookieSocketAddress(BOOKIE3).toBookieId())); + assertFalse(ensemble.contains(new BookieSocketAddress(BOOKIE4).toBookieId())); + } + @Test public void testBasic() throws Exception { Map> bookieMapping = new HashMap<>(); @@ -128,9 +155,12 @@ public void testBasic() throws Exception { Set bookieToExclude = new HashSet<>(); bookieToExclude.add(new BookieSocketAddress(BOOKIE1).toBookieId()); - ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(), bookieToExclude).getResult(); - assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE4).toBookieId())); - assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2).toBookieId())); + try { + isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(), bookieToExclude).getResult(); + } catch (BKNotEnoughBookiesException e) { + Assert.assertEquals(e.getMessage(), "Not enough non-faulty bookies available"); + } + secondaryBookieGroup.put(BOOKIE4, BookieInfo.builder().rack("rack0").build()); bookieMapping.put("group2", secondaryBookieGroup); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 6527fe4eb7b22..a3502c8d92802 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -145,6 +145,7 @@ import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; import org.apache.pulsar.common.policies.data.BacklogQuota; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.LocalPolicies; import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; import org.apache.pulsar.common.policies.data.PersistencePolicies; import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; @@ -1509,22 +1510,46 @@ public CompletableFuture getManagedLedgerConfig(TopicName t ); } - ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble()); managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum()); managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum()); - if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) { - managedLedgerConfig - .setBookKeeperEnsemblePlacementPolicyClassName( - IsolatedBookieEnsemblePlacementPolicy.class); - Map properties = Maps.newHashMap(); - properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, - localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary()); - properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, - localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary()); - managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties); + + if (serviceConfig.isStrictBookieAffinityEnabled()) { + managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName( + IsolatedBookieEnsemblePlacementPolicy.class); + if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) { + Map properties = Maps.newHashMap(); + properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, + localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary()); + properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, + localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary()); + managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties); + } else if (SystemTopicClient.isSystemTopic(topicName)) { + Map properties = Maps.newHashMap(); + properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, "*"); + properties.put(IsolatedBookieEnsemblePlacementPolicy + .SECONDARY_ISOLATION_BOOKIE_GROUPS, "*"); + managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties); + } else { + Map properties = Maps.newHashMap(); + properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, ""); + properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, ""); + managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties); + } + } else { + if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) { + managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName( + IsolatedBookieEnsemblePlacementPolicy.class); + Map properties = Maps.newHashMap(); + properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, + localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary()); + properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, + localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary()); + managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties); + } } + managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate()); managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType()); managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword()); @@ -1950,13 +1975,44 @@ private void handleMetadataChanges(Notification n) { if (n.getType() == NotificationType.Modified && NamespaceResources.pathIsFromNamespace(n.getPath())) { NamespaceName ns = NamespaceResources.namespaceFromPath(n.getPath()); handlePoliciesUpdates(ns); + } else if (n.getType() == NotificationType.Modified + && NamespaceResources.pathIsNamespaceLocalPolicies(n.getPath())) { + NamespaceName ns = NamespaceResources.namespaceFromLocalPoliciesPath(n.getPath()); + handleLocalPoliciesUpdates(ns); } else if (pulsar().getPulsarResources().getDynamicConfigResources().isDynamicConfigurationPath(n.getPath())) { handleDynamicConfigurationUpdates(); } - // Ignore unrelated notifications } + private void handleLocalPoliciesUpdates(NamespaceName namespace) { + pulsar.getPulsarResources().getLocalPolicies().getLocalPoliciesAsync(namespace) + .thenAccept(optLocalPolicies -> { + if (!optLocalPolicies.isPresent()) { + return; + } + LocalPolicies localPolicies = optLocalPolicies.get(); + log.info("[{}] updating with {}", namespace, localPolicies); + topics.forEach((name, topicFuture) -> { + if (namespace.includes(TopicName.get(name))) { + // If the topic is already created, immediately apply the updated policies, otherwise + // once the topic is created it'll apply the policies update + topicFuture.thenAccept(topic -> { + if (log.isDebugEnabled()) { + log.debug("Notifying topic that local policies have changed: {}", name); + } + topic.ifPresent(t -> { + if (t instanceof PersistentTopic) { + PersistentTopic topic1 = (PersistentTopic) t; + topic1.onLocalPoliciesUpdate(); + } + }); + }); + } + }); + }); + } + private void handlePoliciesUpdates(NamespaceName namespace) { pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace) .thenAccept(optPolicies -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 135e23c9f2281..f6eaa157a5699 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2363,6 +2363,10 @@ private boolean shouldTopicBeRetained() { return retentionTime < 0 || (System.nanoTime() - lastActive) < retentionTime; } + public CompletableFuture onLocalPoliciesUpdate() { + return checkPersistencePolicies(); + } + @Override public CompletableFuture onPoliciesUpdate(Policies data) { if (log.isDebugEnabled()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java index c8a3c577f89ee..65d5654450ba2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java @@ -73,8 +73,10 @@ import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; +import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -203,6 +205,7 @@ public void testBookieIsolation() throws Exception { try { admin.namespaces().getBookieAffinityGroup(ns1); + fail("ns1 should have no bookie affinity group set"); } catch (PulsarAdminException.NotFoundException e) { // Ok } @@ -263,6 +266,288 @@ public void testBookieIsolation() throws Exception { assertEquals(clientConf.getProperty(REPP_DNS_RESOLVER_CLASS), BookieRackAffinityMapping.class.getName()); } + @Test + public void testSetRackInfoAndAffinityGroupDuringProduce() throws Exception { + final String tenant1 = "tenant1"; + final String cluster = "use"; + final String ns2 = String.format("%s/%s/%s", tenant1, cluster, "ns2"); + final int totalPublish = 100; + + final String brokerBookkeeperClientIsolationGroups = "default-group"; + final String tenantNamespaceIsolationGroups = "tenant1-isolation"; + + BookieServer[] bookies = bkEnsemble.getBookies(); + ZooKeeper zkClient = bkEnsemble.getZkClient(); + + Set isolatedBookies = Sets.newHashSet(bookies[2].getBookieId(), + bookies[3].getBookieId()); + + ServiceConfiguration config = new ServiceConfiguration(); + config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); + config.setClusterName(cluster); + config.setWebServicePort(Optional.of(0)); + config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); + config.setBrokerShutdownTimeoutMs(0L); + config.setBrokerServicePort(Optional.of(0)); + config.setAdvertisedAddress("localhost"); + config.setStrictBookieAffinityEnabled(true); + config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups); + + config.setManagedLedgerDefaultEnsembleSize(2); + config.setManagedLedgerDefaultWriteQuorum(2); + config.setManagedLedgerDefaultAckQuorum(2); + + config.setAllowAutoTopicCreationType("non-partitioned"); + + int totalEntriesPerLedger = 20; + int totalLedgers = totalPublish / totalEntriesPerLedger; + config.setManagedLedgerMaxEntriesPerLedger(totalEntriesPerLedger); + config.setManagedLedgerMinLedgerRolloverTimeMinutes(0); + pulsarService = new PulsarService(config); + pulsarService.start(); + + PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarService.getWebServiceAddress()).build(); + + ClusterData clusterData = ClusterData.builder().serviceUrl(pulsarService.getWebServiceAddress()).build(); + admin.clusters().createCluster(cluster, clusterData); + TenantInfoImpl tenantInfo = new TenantInfoImpl(null, Sets.newHashSet(cluster)); + admin.tenants().createTenant(tenant1, tenantInfo); + admin.namespaces().createNamespace(ns2); + + try { + admin.namespaces().getBookieAffinityGroup(ns2); + fail("ns2 should have no bookie affinity group set"); + } catch (PulsarAdminException.NotFoundException e) { + // Ok + } + + @Cleanup + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarService.getBrokerServiceUrl()) + .statsInterval(-1, TimeUnit.SECONDS).build(); + + final String topicName = String.format("persistent://%s/%s", ns2, "topic1"); + + Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-subscriber-name") + .subscribe(); + consumer.close(); + + ProducerBuilder producerBuilder = pulsarClient.newProducer().topic(topicName).sendTimeout(5, TimeUnit.SECONDS); + + Producer producer = producerBuilder.create(); + for (int i = 0; i < 20; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + setDefaultIsolationGroup(tenantNamespaceIsolationGroups, zkClient, isolatedBookies); + admin.namespaces().setBookieAffinityGroup(ns2, + BookieAffinityGroupData.builder() + .bookkeeperAffinityGroupPrimary(tenantNamespaceIsolationGroups) + .build()); + assertEquals(admin.namespaces().getBookieAffinityGroup(ns2), + BookieAffinityGroupData.builder() + .bookkeeperAffinityGroupPrimary(tenantNamespaceIsolationGroups) + .build()); + + PersistentTopic topic2 = (PersistentTopic) pulsarService.getBrokerService().getTopicReference(topicName).get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) topic2.getManagedLedger(); + Awaitility.await().untilAsserted(() -> + Assert.assertTrue(ml.getConfig().getBookKeeperEnsemblePlacementPolicyProperties().size() > 0)); + + for (int i=0; i<80; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + producer.close(); + + Bookie bookie1 = bookies[0].getBookie(); + Field ledgerManagerField = Bookie.class.getDeclaredField("ledgerManager"); + ledgerManagerField.setAccessible(true); + LedgerManager ledgerManager = (LedgerManager) ledgerManagerField.get(bookie1); + + ManagedLedgerImpl ml2 = (ManagedLedgerImpl) topic2.getManagedLedger(); + // namespace: ns2 + assertEquals(ml2.getLedgersInfoAsList().size(), totalLedgers); + + List ledgers = ml2.getLedgersInfoAsList(); + // validate ledgers' ensemble with affinity bookies + for (int i=1; i> ledgerMetaFuture = ledgerManager.readLedgerMetadata(ledgerId); + LedgerMetadata ledgerMetadata = ledgerMetaFuture.get().getValue(); + Set ledgerBookies = Sets.newHashSet(); + ledgerBookies.addAll(ledgerMetadata.getAllEnsembles().values().iterator().next()); + assertEquals(ledgerBookies.size(), isolatedBookies.size()); + ledgerBookies.removeAll(isolatedBookies); + assertEquals(ledgerBookies.size(), 0); + } + } + + /** + * Validate that broker can support tenant based strict bookie isolation. + * + *
+     * 1. create one bookie-info group : isolated-group
+     * 2. namespace ns1 : has none group
+     *    validate: bookie-ensemble for ns1-topics's ledger will be from bookies don't belongs to any group
+     *    if bookies don't belongs to any group are not enough, then, throw #BKNotEnoughBookiesException
+     * 3. namespace ns2,ns3,ns4: uses isolated-group
+     *    validate: bookie-ensemble for above namespace-topics's ledger will try to select from isolated-group firstly
+     *    if bookies belongs to isolated-group are not enough
+     *    then, bookies from secondary isolation group will be selected (if secondary isolation group set)
+     *    if bookies still are not enough, then, bookies don't belongs to any group will be selected.
+     * 
+ * + * @throws Exception + */ + @Test + public void testStrictBookieIsolation() throws Exception { + final String tenant1 = "tenant1"; + final String cluster = "use"; + final String ns1 = String.format("%s/%s/%s", tenant1, cluster, "ns1"); + final String ns2 = String.format("%s/%s/%s", tenant1, cluster, "ns2"); + final String ns3 = String.format("%s/%s/%s", tenant1, cluster, "ns3"); + final String ns4 = String.format("%s/%s/%s", tenant1, cluster, "ns4"); + final int totalPublish = 100; + + final String brokerBookkeeperClientIsolationGroups = "default-group"; + final String tenantNamespaceIsolationGroups = "tenant1-isolation"; + + BookieServer[] bookies = bkEnsemble.getBookies(); + ZooKeeper zkClient = bkEnsemble.getZkClient(); + + Set defaultBookies = Sets.newHashSet(bookies[0].getBookieId(), + bookies[1].getBookieId()); + Set isolatedBookies = Sets.newHashSet(bookies[2].getBookieId(), + bookies[3].getBookieId()); + + setDefaultIsolationGroup(tenantNamespaceIsolationGroups, zkClient, isolatedBookies); + + ServiceConfiguration config = new ServiceConfiguration(); + config.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); + config.setClusterName(cluster); + config.setWebServicePort(Optional.of(0)); + config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort()); + config.setBrokerShutdownTimeoutMs(0L); + config.setBrokerServicePort(Optional.of(0)); + config.setAdvertisedAddress("localhost"); + config.setStrictBookieAffinityEnabled(true); + config.setBookkeeperClientIsolationGroups(brokerBookkeeperClientIsolationGroups); + + config.setManagedLedgerDefaultEnsembleSize(2); + config.setManagedLedgerDefaultWriteQuorum(2); + config.setManagedLedgerDefaultAckQuorum(2); + + config.setAllowAutoTopicCreationType("non-partitioned"); + + int totalEntriesPerLedger = 20; + int totalLedgers = totalPublish / totalEntriesPerLedger; + config.setManagedLedgerMaxEntriesPerLedger(totalEntriesPerLedger); + config.setManagedLedgerMinLedgerRolloverTimeMinutes(0); + pulsarService = new PulsarService(config); + pulsarService.start(); + + + PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarService.getWebServiceAddress()).build(); + + ClusterData clusterData = ClusterData.builder().serviceUrl(pulsarService.getWebServiceAddress()).build(); + admin.clusters().createCluster(cluster, clusterData); + TenantInfoImpl tenantInfo = new TenantInfoImpl(null, Sets.newHashSet(cluster)); + admin.tenants().createTenant(tenant1, tenantInfo); + admin.namespaces().createNamespace(ns1); + admin.namespaces().createNamespace(ns2); + admin.namespaces().createNamespace(ns3); + admin.namespaces().createNamespace(ns4); + admin.namespaces().setBookieAffinityGroup(ns2, + BookieAffinityGroupData.builder() + .bookkeeperAffinityGroupPrimary(tenantNamespaceIsolationGroups) + .build()); + admin.namespaces().setBookieAffinityGroup(ns3, + BookieAffinityGroupData.builder() + .bookkeeperAffinityGroupPrimary(tenantNamespaceIsolationGroups) + .build()); + admin.namespaces().setBookieAffinityGroup(ns4, + BookieAffinityGroupData.builder() + .bookkeeperAffinityGroupPrimary(tenantNamespaceIsolationGroups) + .build()); + + assertEquals(admin.namespaces().getBookieAffinityGroup(ns2), + BookieAffinityGroupData.builder() + .bookkeeperAffinityGroupPrimary(tenantNamespaceIsolationGroups) + .build()); + assertEquals(admin.namespaces().getBookieAffinityGroup(ns3), + BookieAffinityGroupData.builder() + .bookkeeperAffinityGroupPrimary(tenantNamespaceIsolationGroups) + .build()); + assertEquals(admin.namespaces().getBookieAffinityGroup(ns4), + BookieAffinityGroupData.builder() + .bookkeeperAffinityGroupPrimary(tenantNamespaceIsolationGroups) + .build()); + + try { + admin.namespaces().getBookieAffinityGroup(ns1); + fail("ns1 should have no bookie affinity group set"); + } catch (PulsarAdminException.NotFoundException e) { + // Ok + } + + @Cleanup + PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarService.getBrokerServiceUrl()) + .statsInterval(-1, TimeUnit.SECONDS).build(); + + PersistentTopic topic1 = (PersistentTopic) createTopicAndPublish(pulsarClient, ns1, "topic1", totalPublish); + PersistentTopic topic2 = (PersistentTopic) createTopicAndPublish(pulsarClient, ns2, "topic1", totalPublish); + PersistentTopic topic3 = (PersistentTopic) createTopicAndPublish(pulsarClient, ns3, "topic1", totalPublish); + PersistentTopic topic4 = (PersistentTopic) createTopicAndPublish(pulsarClient, ns4, "topic1", totalPublish); + + Bookie bookie1 = bookies[0].getBookie(); + Field ledgerManagerField = Bookie.class.getDeclaredField("ledgerManager"); + ledgerManagerField.setAccessible(true); + LedgerManager ledgerManager = (LedgerManager) ledgerManagerField.get(bookie1); + + // namespace: ns1 + ManagedLedgerImpl ml = (ManagedLedgerImpl) topic1.getManagedLedger(); + assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers); + // validate ledgers' ensemble with affinity bookies + assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), defaultBookies); + + // namespace: ns2 + ml = (ManagedLedgerImpl) topic2.getManagedLedger(); + assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers); + // validate ledgers' ensemble with affinity bookies + assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies); + + // namespace: ns3 + ml = (ManagedLedgerImpl) topic3.getManagedLedger(); + assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers); + // validate ledgers' ensemble with affinity bookies + assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies); + + // namespace: ns4 + ml = (ManagedLedgerImpl) topic4.getManagedLedger(); + assertEquals(ml.getLedgersInfoAsList().size(), totalLedgers); + // validate ledgers' ensemble with affinity bookies + assertAffinityBookies(ledgerManager, ml.getLedgersInfoAsList(), isolatedBookies); + + ManagedLedgerClientFactory mlFactory = + (ManagedLedgerClientFactory) pulsarService.getManagedLedgerClientFactory(); + Map bkPlacementPolicyToBkClientMap = mlFactory + .getBkEnsemblePolicyToBookKeeperMap(); + + // broker should create only 1 bk-client and factory per isolation-group + assertEquals(bkPlacementPolicyToBkClientMap.size(), 2); + + // make sure bk-isolation group also configure REPP_DNS_RESOLVER_CLASS as ZkBookieRackAffinityMapping to + // configure rack-aware policy with in isolated group + Map bkMap = mlFactory.getBkEnsemblePolicyToBookKeeperMap(); + BookKeeper bk = bkMap.values().iterator().next(); + Method getConf = BookKeeper.class.getDeclaredMethod("getConf"); + getConf.setAccessible(true); + ClientConfiguration clientConf = (ClientConfiguration) getConf.invoke(bk); + assertEquals(clientConf.getProperty(REPP_DNS_RESOLVER_CLASS), BookieRackAffinityMapping.class.getName()); + } + /** * It verifies that "ZkIsolatedBookieEnsemblePlacementPolicy" considers secondary affinity-group if primary group * doesn't have enough non-faulty bookies. @@ -362,6 +647,7 @@ public void testBookieIsolationWithSecondaryGroup() throws Exception { try { admin.namespaces().getBookieAffinityGroup(ns1); + fail("ns1 should have no bookie affinity group set"); } catch (PulsarAdminException.NotFoundException e) { // Ok } diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md index f8a6aaa105384..1cdcb92447e95 100644 --- a/site2/docs/reference-configuration.md +++ b/site2/docs/reference-configuration.md @@ -378,6 +378,7 @@ brokerServiceCompactionThresholdInBytes|If the estimated backlog size is greater | additionalServletDirectory | Location of broker additional servlet NAR directory | ./brokerAdditionalServlet | | brokerEntryMetadataInterceptors | Set broker entry metadata interceptors.

Multiple interceptors should be separated by commas.

Available values:
  • org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor
  • org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor


  • Example
    brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor, org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor|N/A | | enableExposingBrokerEntryMetadataToClient|Whether to expose broker entry metadata to client or not.

    Available values:
  • true
  • false

  • Example
    enableExposingBrokerEntryMetadataToClient=true | false | +| strictBookieAffinityEnabled | Enable or disable the strict bookie isolation strategy. If enabled,
    - `bookie-ensemble` first tries to choose bookies that belong to a namespace's affinity group. If the number of bookies is not enough, then the rest bookies are chosen.
    - If namespace has no affinity group, `bookie-ensemble` only chooses bookies that belong to no region. If the number of bookies is not enough, `BKNotEnoughBookiesException` is thrown.| false | #### Deprecated parameters of Broker