Skip to content

Commit

Permalink
[pulsar-broker] support secondary bookie isolation group at namespace (
Browse files Browse the repository at this point in the history
…apache#4458)

* [pulsar-broker] support secondary bookie isolation group at namespace

* Fix: get correct available-primary bookie count | cli optional

* fix test
  • Loading branch information
rdhabalia authored Jun 6, 2019
1 parent dfdfdb4 commit ed836e2
Show file tree
Hide file tree
Showing 15 changed files with 281 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -756,12 +756,6 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Enable bookie secondary-isolation group if bookkeeperClientIsolationGroups doesn't have enough bookie available."
)
private String bookkeeperClientSecondaryIsolationGroups;
@FieldContext(
category = CATEGORY_STORAGE_BK,
required = false,
doc = "Minimum bookies that should be available as part of bookkeeperClientIsolationGroups \n\n"
+ "else broker will include bookkeeperClientSecondaryIsolationGroups bookies in isolated list.")
private int bookkeeperClientMinAvailableBookiesInIsolationGroups = 0;
@FieldContext(category = CATEGORY_STORAGE_BK, doc = "Enable/disable having read operations for a ledger to be sticky to "
+ "a single bookie.\n" +
"If this flag is enabled, the client will use one single bookie (by " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ private void setDefaultEnsemblePlacementPolicy(ClientConfiguration bkConf, Servi
conf.getBookkeeperClientIsolationGroups());
bkConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
conf.getBookkeeperClientSecondaryIsolationGroups());
bkConf.setProperty(ZkIsolatedBookieEnsemblePlacementPolicy.MIN_AVAILABLE_PRIMARY_ISOLATED_BOOKIE,
conf.getBookkeeperClientMinAvailableBookiesInIsolationGroups());
if (bkConf.getProperty(ZooKeeperCache.ZK_CACHE_INSTANCE) == null) {
ZooKeeperCache zkc = new ZooKeeperCache(zkClient, conf.getZooKeeperOperationTimeoutSeconds()) {
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.DispatchRate;
Expand Down Expand Up @@ -583,7 +584,7 @@ protected void internalUnloadNamespace() {
}


protected void internalSetBookieAffinityGroup(String bookieAffinityGroup) {
protected void internalSetBookieAffinityGroup(BookieAffinityGroupData bookieAffinityGroup) {
log.info("[{}] Setting bookie-affinity-group {} for namespace {}", clientAppId(), bookieAffinityGroup,
this.namespaceName);

Expand Down Expand Up @@ -614,7 +615,7 @@ protected void internalSetBookieAffinityGroup(String bookieAffinityGroup) {
.get(pulsar().getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
localPolicies = new LocalPolicies();
}
localPolicies.bookkeeperAffinityGroup = bookieAffinityGroup;
localPolicies.bookieAffinityGroup = bookieAffinityGroup;
byte[] data = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(localPolicies);
pulsar().getLocalZkCache().getZooKeeper().setData(path, data, Math.toIntExact(version));
// invalidate namespace's local-policies
Expand All @@ -636,7 +637,7 @@ protected void internalSetBookieAffinityGroup(String bookieAffinityGroup) {
}
}

protected String internalGetBookieAffinityGroup() {
protected BookieAffinityGroupData internalGetBookieAffinityGroup() {
validateSuperUserAccess();

if (namespaceName.isGlobal()) {
Expand All @@ -650,9 +651,9 @@ protected String internalGetBookieAffinityGroup() {
String path = joinPath(LOCAL_POLICIES_ROOT, this.namespaceName.toString());
try {
Optional<LocalPolicies> policies = pulsar().getLocalZkCacheService().policiesCache().get(path);
final String bookkeeperAffinityGroup = policies.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Namespace local-policies does not exist")).bookkeeperAffinityGroup;
if (StringUtils.isBlank(bookkeeperAffinityGroup)) {
final BookieAffinityGroupData bookkeeperAffinityGroup = policies.orElseThrow(() -> new RestException(Status.NOT_FOUND,
"Namespace local-policies does not exist")).bookieAffinityGroup;
if (bookkeeperAffinityGroup == null) {
throw new RestException(Status.NOT_FOUND, "bookie-affinity group does not exist");
}
return bookkeeperAffinityGroup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.DispatchRate;
Expand Down Expand Up @@ -552,13 +553,13 @@ public void setPersistence(@PathParam("property") String property, @PathParam("c
}

@POST
@Path("/{property}/{cluster}/{namespace}/persistence/bookieAffinity/{bookieAffinityGroup}")
@Path("/{property}/{cluster}/{namespace}/persistence/bookieAffinity")
@ApiOperation(hidden = true, value = "Set the bookie-affinity-group to namespace-local policy.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public void setBookieAffinityGroup(@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("bookieAffinityGroup") String bookieAffinityGroup) {
@PathParam("namespace") String namespace, BookieAffinityGroupData bookieAffinityGroup) {
validateNamespaceName(property, cluster, namespace);
internalSetBookieAffinityGroup(bookieAffinityGroup);
}
Expand All @@ -569,7 +570,7 @@ public void setBookieAffinityGroup(@PathParam("property") String property, @Path
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public String getBookieAffinityGroup(@PathParam("property") String property,
public BookieAffinityGroupData getBookieAffinityGroup(@PathParam("property") String property,
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace) {
validateNamespaceName(property, cluster, namespace);
return internalGetBookieAffinityGroup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
Expand Down Expand Up @@ -492,13 +493,13 @@ public void setPersistence(@PathParam("tenant") String tenant, @PathParam("names
}

@POST
@Path("/{tenant}/{namespace}/persistence/bookieAffinity/{bookieAffinityGroup}")
@Path("/{tenant}/{namespace}/persistence/bookieAffinity")
@ApiOperation(value = "Set the bookie-affinity-group to namespace-persistent policy.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public void setBookieAffinityGroup(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@PathParam("bookieAffinityGroup") String bookieAffinityGroup) {
BookieAffinityGroupData bookieAffinityGroup) {
validateNamespaceName(tenant, namespace);
internalSetBookieAffinityGroup(bookieAffinityGroup);
}
Expand All @@ -509,7 +510,7 @@ public void setBookieAffinityGroup(@PathParam("tenant") String tenant, @PathPara
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification") })
public String getBookieAffinityGroup(@PathParam("property") String property,
public BookieAffinityGroupData getBookieAffinityGroup(@PathParam("property") String property,
@PathParam("namespace") String namespace) {
validateNamespaceName(property, namespace);
return internalGetBookieAffinityGroup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -746,12 +746,14 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum());
managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum());
if (localPolicies.isPresent() && StringUtils.isNotBlank(localPolicies.get().bookkeeperAffinityGroup)) {
if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) {
managedLedgerConfig
.setBookKeeperEnsemblePlacementPolicyClassName(ZkIsolatedBookieEnsemblePlacementPolicy.class);
Map<String, Object> properties = Maps.newHashMap();
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS,
localPolicies.get().bookkeeperAffinityGroup);
localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupPrimary);
properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS,
localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupSecondary);
managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties);
}
managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate());
Expand Down
Loading

0 comments on commit ed836e2

Please sign in to comment.