Skip to content

Commit

Permalink
[feature][broker] Strict bookie affinity group strategy (apache#12025)
Browse files Browse the repository at this point in the history
### Motivation
1、Suppose We have three bookies, `b1, b2, b3`,
b1 and b2 are in region `test-region`, b3 doesn't belongs to any region.
Namespace `public/test` has set-affinity-group `test-region`.
When produce msg to topic under ` public/test` the bookie b3 will also be selected as ensemble, I think it's a violation of isolation.

2、namespace public/default doesn't set any affinity-group, when produce msg to topic under ` public/default`, 
the bookie b1 and b2  which have been isolated will  be selected as ensemble. I think it's also a violation of isolation.


### Modifications

ignore the bookies which have not been divided into any region, when using  `ZkIsolatedBookieEnsemblePlacementPolicy.java`
  • Loading branch information
gaozhangmin authored Mar 21, 2022
1 parent d4e0797 commit 5ebaafd
Show file tree
Hide file tree
Showing 11 changed files with 460 additions and 24 deletions.
4 changes: 3 additions & 1 deletion conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1386,6 +1386,8 @@ packagesManagementLedgerRootPath=/ledgers

### --- Packages management service configuration variables (end) --- ###

#enable or disable strict bookie affinity
strictBookieAffinityEnabled=false

### --- Deprecated settings --- ###

Expand Down Expand Up @@ -1426,4 +1428,4 @@ tlsEnabled=false

# Enable Key_Shared subscription (default is enabled)
# @deprecated since 2.8.0 subscriptionTypesEnabled is preferred over subscriptionKeySharedEnable.
subscriptionKeySharedEnable=true
subscriptionKeySharedEnable=true
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -61,6 +60,8 @@ public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlac

private MetadataCache<BookiesRackConfiguration> bookieMappingCache;

private static final String PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP = "*";


public IsolatedBookieEnsemblePlacementPolicy() {
super();
Expand All @@ -84,11 +85,10 @@ public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf,
for (String isolationGroup : isolationGroupsString.split(",")) {
primaryIsolationGroups.add(isolationGroup);
}

// Only add the bookieMappingCache if we have defined an isolation group
bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).join();
}
// Only add the bookieMappingCache if we have defined an isolation group
bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
bookieMappingCache.get(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH).join();
}
if (conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS) != null) {
String secondaryIsolationGroupsString = castToString(conf.getProperty(SECONDARY_ISOLATION_BOOKIE_GROUPS));
Expand Down Expand Up @@ -121,7 +121,30 @@ private static String castToString(Object obj) {
public PlacementResult<List<BookieId>> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
Map<String, byte[]> customMetadata, Set<BookieId> excludeBookies)
throws BKNotEnoughBookiesException {
Map<String, List<String>> isolationGroup = new HashMap<>();
if (customMetadata.containsKey(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG)) {
try {
EnsemblePlacementPolicyConfig policy = EnsemblePlacementPolicyConfig
.decode(customMetadata.get(EnsemblePlacementPolicyConfig.ENSEMBLE_PLACEMENT_POLICY_CONFIG));
Map<String, Object> policyProperties = policy.getProperties();
String isolationBookieGroups =
(String) policyProperties.get(ISOLATION_BOOKIE_GROUPS);
String secondaryIsolationBookieGroups =
(String) policyProperties.get(SECONDARY_ISOLATION_BOOKIE_GROUPS);
Set<String> primaryIsolationGroups = new HashSet<>();
Set<String> secondaryIsolationGroups = new HashSet<>();
if (isolationBookieGroups != null) {
primaryIsolationGroups.addAll(Arrays.asList(isolationBookieGroups.split(",")));
}
if (secondaryIsolationBookieGroups != null) {
secondaryIsolationGroups.addAll(Arrays.asList(secondaryIsolationBookieGroups.split(",")));
}
defaultIsolationGroups = ImmutablePair.of(primaryIsolationGroups, secondaryIsolationGroups);
} catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
log.error("Failed to decode EnsemblePlacementPolicyConfig from customeMetadata when choosing ensemble, "
+ "Will use defaultIsolationGroups instead");
}
}

Set<BookieId> blacklistedBookies = getBlacklistedBookiesWithIsolationGroups(
ensembleSize, defaultIsolationGroups);
if (excludeBookies == null) {
Expand Down Expand Up @@ -194,6 +217,9 @@ private static Pair<Set<String>, Set<String>> getIsolationGroup(
private Set<BookieId> getBlacklistedBookiesWithIsolationGroups(int ensembleSize,
Pair<Set<String>, Set<String>> isolationGroups) {
Set<BookieId> blacklistedBookies = new HashSet<>();
if (isolationGroups != null && isolationGroups.getLeft().contains(PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP)) {
return blacklistedBookies;
}
try {
if (bookieMappingCache != null) {
CompletableFuture<Optional<BookiesRackConfiguration>> future =
Expand All @@ -211,6 +237,7 @@ private Set<BookieId> getBlacklistedBookiesWithIsolationGroups(int ensembleSize,
int totalAvailableBookiesInPrimaryGroup = 0;
Set<String> primaryIsolationGroup = Collections.emptySet();
Set<String> secondaryIsolationGroup = Collections.emptySet();
Set<BookieId> primaryGroupBookies = new HashSet<>();
if (isolationGroups != null) {
primaryIsolationGroup = isolationGroups.getLeft();
secondaryIsolationGroup = isolationGroups.getRight();
Expand All @@ -225,9 +252,16 @@ private Set<BookieId> getBlacklistedBookiesWithIsolationGroups(int ensembleSize,
for (String groupBookie : bookiesInGroup) {
totalAvailableBookiesInPrimaryGroup += knownBookies
.containsKey(BookieId.parse(groupBookie)) ? 1 : 0;
primaryGroupBookies.add(BookieId.parse(groupBookie));
}
}
}

Set<BookieId> otherGroupBookies = new HashSet<>(blacklistedBookies);
Set<BookieId> nonRegionBookies = new HashSet<>(knownBookies.keySet());
nonRegionBookies.removeAll(primaryGroupBookies);
blacklistedBookies.addAll(nonRegionBookies);

// sometime while doing isolation, user might not want to remove isolated bookies from other default
// groups. so, same set of bookies could be overlapped into isolated-group and other default groups. so,
// try to remove those overlapped bookies from excluded-bookie list because they are also part of
Expand All @@ -241,6 +275,7 @@ private Set<BookieId> getBlacklistedBookiesWithIsolationGroups(int ensembleSize,
}
}
// if primary-isolated-bookies are not enough then add consider secondary isolated bookie group as well.
int totalAvailableBookiesFromPrimaryAndSecondary = totalAvailableBookiesInPrimaryGroup;
if (totalAvailableBookiesInPrimaryGroup < ensembleSize) {
log.info(
"Not found enough available-bookies from primary isolation group [{}], checking secondary "
Expand All @@ -250,10 +285,21 @@ private Set<BookieId> getBlacklistedBookiesWithIsolationGroups(int ensembleSize,
if (bookieGroup != null && !bookieGroup.isEmpty()) {
for (String bookieAddress : bookieGroup.keySet()) {
blacklistedBookies.remove(BookieId.parse(bookieAddress));
totalAvailableBookiesFromPrimaryAndSecondary += 1;
}
}
}
}
if (totalAvailableBookiesFromPrimaryAndSecondary < ensembleSize) {
log.info(
"Not found enough available-bookies from primary isolation group [{}] and secondary "
+ "isolation group [{}], checking from non-region bookies",
primaryIsolationGroup, secondaryIsolationGroup);
nonRegionBookies.removeAll(otherGroupBookies);
for (BookieId bookie: nonRegionBookies) {
blacklistedBookies.remove(bookie);
}
}
}
} catch (Exception e) {
log.warn("Error getting bookie isolation info from metadata store: {}", e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,9 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private boolean enableNamespaceIsolationUpdateOnTime = false;

@FieldContext(category = CATEGORY_SERVER, doc = "Enable or disable strict bookie affinity.")
private boolean strictBookieAffinityEnabled = false;

/***** --- TLS. --- ****/
@FieldContext(
category = CATEGORY_TLS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class BaseResources<T> {

protected static final String BASE_POLICIES_PATH = "/admin/policies";
protected static final String BASE_CLUSTERS_PATH = "/admin/clusters";
protected static final String LOCAL_POLICIES_ROOT = "/admin/local-policies";

@Getter
private final MetadataStore store;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@

public class LocalPoliciesResources extends BaseResources<LocalPolicies> {

private static final String LOCAL_POLICIES_ROOT = "/admin/local-policies";

public LocalPoliciesResources(MetadataStore localStore, int operationTimeoutSec) {
super(localStore, LocalPolicies.class, operationTimeoutSec);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ public static boolean pathIsFromNamespace(String path) {
&& path.substring(BASE_POLICIES_PATH.length() + 1).contains("/");
}

public static boolean pathIsNamespaceLocalPolicies(String path) {
return path.startsWith(LOCAL_POLICIES_ROOT + "/")
&& path.substring(LOCAL_POLICIES_ROOT.length() + 1).contains("/");
}

// clear resource of `/namespace/{namespaceName}` for zk-node
public CompletableFuture<Void> deleteNamespaceAsync(NamespaceName ns) {
final String namespacePath = joinPath(NAMESPACE_BASE_PATH, ns.toString());
Expand All @@ -159,6 +164,10 @@ public static NamespaceName namespaceFromPath(String path) {
return NamespaceName.get(path.substring(BASE_POLICIES_PATH.length() + 1));
}

public static NamespaceName namespaceFromLocalPoliciesPath(String path) {
return NamespaceName.get(path.substring(LOCAL_POLICIES_ROOT.length() + 1));
}

public static class IsolationPolicyResources extends BaseResources<Map<String, NamespaceIsolationDataImpl>> {
private static final String NAMESPACE_ISOLATION_POLICIES = "namespaceIsolationPolicies";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -87,6 +88,32 @@ void teardown() throws Exception {
timer.stop();
}

@Test
public void testNonRegionBookie() throws Exception {
Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
Map<String, BookieInfo> mainBookieGroup = new HashMap<>();

mainBookieGroup.put(BOOKIE1, BookieInfo.builder().rack("rack0").build());
mainBookieGroup.put(BOOKIE2, BookieInfo.builder().rack("rack1").build());

bookieMapping.put("group1", mainBookieGroup);


store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping),
Optional.empty()).join();

IsolatedBookieEnsemblePlacementPolicy isolationPolicy = new IsolatedBookieEnsemblePlacementPolicy();
ClientConfiguration bkClientConf = new ClientConfiguration();
bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store);
bkClientConf.setProperty(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, isolationGroups);
isolationPolicy.initialize(bkClientConf, Optional.empty(), timer, SettableFeatureProvider.DISABLE_ALL, NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
isolationPolicy.onClusterChanged(writableBookies, readOnlyBookies);

List<BookieId> ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(), new HashSet<>()).getResult();
assertFalse(ensemble.contains(new BookieSocketAddress(BOOKIE3).toBookieId()));
assertFalse(ensemble.contains(new BookieSocketAddress(BOOKIE4).toBookieId()));
}

@Test
public void testBasic() throws Exception {
Map<String, Map<String, BookieInfo>> bookieMapping = new HashMap<>();
Expand Down Expand Up @@ -128,9 +155,12 @@ public void testBasic() throws Exception {

Set<BookieId> bookieToExclude = new HashSet<>();
bookieToExclude.add(new BookieSocketAddress(BOOKIE1).toBookieId());
ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(), bookieToExclude).getResult();
assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE4).toBookieId()));
assertTrue(ensemble.contains(new BookieSocketAddress(BOOKIE2).toBookieId()));
try {
isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(), bookieToExclude).getResult();
} catch (BKNotEnoughBookiesException e) {
Assert.assertEquals(e.getMessage(), "Not enough non-faulty bookies available");
}


secondaryBookieGroup.put(BOOKIE4, BookieInfo.builder().rack("rack0").build());
bookieMapping.put("group2", secondaryBookieGroup);
Expand Down
Loading

0 comments on commit 5ebaafd

Please sign in to comment.