Skip to content

Commit

Permalink
[ISSUE 7757] Support Persistence Policies on topic level (apache#7817)
Browse files Browse the repository at this point in the history
Link [https://github.com/apache/pulsar/issues/7757](https://github.com/apache/pulsar/issues/7757) and master issue [https://github.com/apache/pulsar/issues/2688](https://github.com/apache/pulsar/issues/2688)

### Motivation

Support set/get/remove persistence  policies on topic level.

### Verifying this change
new unit test added.
  • Loading branch information
zhanghaou authored Aug 17, 2020
1 parent e06e872 commit ff1780c
Show file tree
Hide file tree
Showing 10 changed files with 496 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;

import java.net.MalformedURLException;
import java.net.URI;
import java.util.ArrayList;
Expand All @@ -35,14 +34,12 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import javax.servlet.ServletContext;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;

import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand All @@ -65,6 +62,7 @@
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
Expand All @@ -76,8 +74,8 @@
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.apache.pulsar.zookeeper.ZooKeeperManagedLedgerCache;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -894,4 +892,22 @@ protected void checkArgument(boolean b, String errorMessage) {
throw new RestException(Status.BAD_REQUEST, errorMessage);
}
}

protected void validatePersistencePolicies(PersistencePolicies persistence) {
checkNotNull(persistence, "persistence policies should not be null");
final ServiceConfiguration config = pulsar().getConfiguration();
checkArgument(persistence.getBookkeeperEnsemble() <= config.getManagedLedgerMaxEnsembleSize(),
"Bookkeeper-Ensemble must be <= " + config.getManagedLedgerMaxEnsembleSize());
checkArgument(persistence.getBookkeeperWriteQuorum() <= config.getManagedLedgerMaxWriteQuorum(),
"Bookkeeper-WriteQuorum must be <= " + config.getManagedLedgerMaxWriteQuorum());
checkArgument(persistence.getBookkeeperAckQuorum() <= config.getManagedLedgerMaxAckQuorum(),
"Bookkeeper-AckQuorum must be <= " + config.getManagedLedgerMaxAckQuorum());
checkArgument(
(persistence.getBookkeeperEnsemble() >= persistence.getBookkeeperWriteQuorum())
&& (persistence.getBookkeeperWriteQuorum() >= persistence.getBookkeeperAckQuorum()),
String.format("Bookkeeper Ensemble (%s) >= WriteQuorum (%s) >= AckQuoru (%s)",
persistence.getBookkeeperEnsemble(), persistence.getBookkeeperWriteQuorum(),
persistence.getBookkeeperAckQuorum()));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2005,24 +2005,6 @@ protected List<String> internalGetAntiAffinityNamespaces(String cluster, String
}
}

private void validatePersistencePolicies(PersistencePolicies persistence) {
checkNotNull(persistence, "persistence policies should not be null");
final ServiceConfiguration config = pulsar().getConfiguration();
checkArgument(persistence.getBookkeeperEnsemble() <= config.getManagedLedgerMaxEnsembleSize(),
"Bookkeeper-Ensemble must be <= " + config.getManagedLedgerMaxEnsembleSize());
checkArgument(persistence.getBookkeeperWriteQuorum() <= config.getManagedLedgerMaxWriteQuorum(),
"Bookkeeper-WriteQuorum must be <= " + config.getManagedLedgerMaxWriteQuorum());
checkArgument(persistence.getBookkeeperAckQuorum() <= config.getManagedLedgerMaxAckQuorum(),
"Bookkeeper-AckQuorum must be <= " + config.getManagedLedgerMaxAckQuorum());
checkArgument(
(persistence.getBookkeeperEnsemble() >= persistence.getBookkeeperWriteQuorum())
&& (persistence.getBookkeeperWriteQuorum() >= persistence.getBookkeeperAckQuorum()),
String.format("Bookkeeper Ensemble (%s) >= WriteQuorum (%s) >= AckQuoru (%s)",
persistence.getBookkeeperEnsemble(), persistence.getBookkeeperWriteQuorum(),
persistence.getBookkeeperAckQuorum()));

}

protected RetentionPolicies internalGetRetention() {
validateNamespacePolicyOperation(namespaceName, PolicyName.RETENTION, PolicyOperation.READ);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.naming.PartitionedManagedLedgerInfo;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
Expand Down Expand Up @@ -2309,6 +2310,48 @@ protected CompletableFuture<Void> internalRemoveRetention() {
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
}

protected void internalGetPersistence(AsyncResponse asyncResponse){
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
Optional<PersistencePolicies> persistencePolicies = getTopicPolicies(topicName)
.map(TopicPolicies::getPersistence);
if (!persistencePolicies.isPresent()) {
asyncResponse.resume(Response.noContent().build());
}else {
asyncResponse.resume(persistencePolicies.get());
}
}

protected CompletableFuture<Void> internalSetPersistence(PersistencePolicies persistencePolicies) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
validatePersistencePolicies(persistencePolicies);

TopicPolicies topicPolicies = getTopicPolicies(topicName).orElseGet(TopicPolicies::new);
topicPolicies.setPersistence(persistencePolicies);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

protected CompletableFuture<Void> internalRemovePersistence() {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
Optional<TopicPolicies> topicPolicies = getTopicPolicies(topicName);
if (!topicPolicies.isPresent()) {
return CompletableFuture.completedFuture(null);
}
topicPolicies.get().setPersistence(null);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
}

protected MessageId internalTerminate(boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,19 @@
*/
package org.apache.pulsar.broker.admin.v2;

import static org.apache.pulsar.common.util.Codec.decode;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Maps;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
Expand All @@ -38,36 +46,25 @@
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.Maps;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.ApiParam;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.pulsar.common.util.Codec.decode;

/**
*/
@Path("/persistent")
Expand Down Expand Up @@ -1353,6 +1350,89 @@ public void removeRetention(@Suspended final AsyncResponse asyncResponse,
});
}

@GET
@Path("/{tenant}/{namespace}/{topic}/persistence")
@ApiOperation(value = "Get configuration of persistence policies for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void getPersistence(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
try {
internalGetPersistence(asyncResponse);
} catch (RestException e) {
asyncResponse.resume(e);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}

@POST
@Path("/{tenant}/{namespace}/{topic}/persistence")
@ApiOperation(value = "Set configuration of persistence policies for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification"),
@ApiResponse(code = 400, message = "Invalid persistence policies")})
public void setPersistence(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Bookkeeper persistence policies for specified topic") PersistencePolicies persistencePolicies) {
validateTopicName(tenant, namespace, encodedTopic);
internalSetPersistence(persistencePolicies).whenComplete((r, ex) -> {
if (ex instanceof RestException) {
log.error("Failed updated persistence policies", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed updated persistence policies", ex);
asyncResponse.resume(new RestException(ex));
} else {
try {
log.info("[{}] Successfully updated persistence policies: namespace={}, topic={}, persistencePolicies={}",
clientAppId(),
namespaceName,
topicName.getLocalName(),
jsonMapper().writeValueAsString(persistencePolicies));
} catch (JsonProcessingException ignore) {
}
asyncResponse.resume(Response.noContent().build());
}
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/persistence")
@ApiOperation(value = "Remove configuration of persistence policies for specified topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic does not exist"),
@ApiResponse(code = 405, message = "Topic level policy is disabled, to enable the topic level policy and retry"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removePersistence(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
internalRemovePersistence().whenComplete((r, ex) -> {
if (ex != null) {
log.error("Failed updated retention", ex);
asyncResponse.resume(new RestException(ex));
} else {
log.info("[{}] Successfully remove persistence policies: namespace={}, topic={}",
clientAppId(),
namespaceName,
topicName.getLocalName());
asyncResponse.resume(Response.noContent().build());
}
});
}


@POST
@Path("/{tenant}/{namespace}/{topic}/terminate")
@ApiOperation(value = "Terminate a topic. A topic that is terminated will not accept any more "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
Expand All @@ -42,7 +41,6 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.DefaultThreadFactory;

import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Field;
Expand Down Expand Up @@ -70,11 +68,9 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Predicate;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;

import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
Expand Down Expand Up @@ -118,7 +114,6 @@
import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventListner;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;

import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
Expand All @@ -144,6 +139,7 @@
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.stats.Metrics;
Expand Down Expand Up @@ -1070,6 +1066,26 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t
// Get persistence policy for this topic
Optional<Policies> policies = Optional.empty();
Optional<LocalPolicies> localPolicies = Optional.empty();

PersistencePolicies persistencePolicies = null;
RetentionPolicies retentionPolicies = null;

if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) {
TopicName cloneTopicName = topicName;
if (topicName.isPartitioned()) {
cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
}
try {
TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPolicies(cloneTopicName);
if (topicPolicies != null) {
persistencePolicies = topicPolicies.getPersistence();
retentionPolicies = topicPolicies.getRetentionPolicies();
}
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.warn("Topic {} policies cache have not init.", topicName);
}
}

try {
policies = pulsar
.getConfigurationCache().policiesCache().get(AdminResource.path(POLICIES,
Expand All @@ -1083,16 +1099,20 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t
return;
}

PersistencePolicies persistencePolicies = policies.map(p -> p.persistence).orElseGet(
() -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
serviceConfig.getManagedLedgerDefaultWriteQuorum(),
serviceConfig.getManagedLedgerDefaultAckQuorum(),
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
if (persistencePolicies == null) {
persistencePolicies = policies.map(p -> p.persistence).orElseGet(
() -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(),
serviceConfig.getManagedLedgerDefaultWriteQuorum(),
serviceConfig.getManagedLedgerDefaultAckQuorum(),
serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit()));
}

RetentionPolicies retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
() -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
serviceConfig.getDefaultRetentionSizeInMB())
);
if (retentionPolicies == null) {
retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
() -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
serviceConfig.getDefaultRetentionSizeInMB())
);
}

ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble());
Expand Down
Loading

0 comments on commit ff1780c

Please sign in to comment.