Skip to content

Commit

Permalink
Support topic-level inactiveTopicPolicies (apache#7986)
Browse files Browse the repository at this point in the history
### Motivation
Support topic-level inactiveTopicPolicies

### Modifications
Support set/get/remove inactiveTopicPolicies policy on topic level.
  • Loading branch information
315157973 authored Sep 5, 2020
1 parent 9e2aca8 commit 4a441db
Show file tree
Hide file tree
Showing 9 changed files with 528 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,8 @@
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.naming.PartitionedManagedLedgerInfo;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PolicyName;
import org.apache.pulsar.common.policies.data.PolicyOperation;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.PublishRate;
Expand Down Expand Up @@ -843,6 +842,21 @@ protected CompletableFuture<Void> internalSetOffloadPolicies(OffloadPolicies off
return completableFuture;
}

protected CompletableFuture<Void> internalSetInactiveTopicPolicies(InactiveTopicPolicies inactiveTopicPolicies) {
TopicPolicies topicPolicies = null;
try {
topicPolicies = pulsar().getTopicPoliciesService().getTopicPolicies(topicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.error("Topic {} policies cache have not init.", topicName);
return FutureUtil.failedFuture(new RestException(Status.PRECONDITION_FAILED, "Policies cache have not init"));
}
if (topicPolicies == null) {
topicPolicies = new TopicPolicies();
}
topicPolicies.setInactiveTopicPolicies(inactiveTopicPolicies);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
}

private CompletableFuture<Void> internalUpdateOffloadPolicies(OffloadPolicies offloadPolicies, TopicName topicName) {
return pulsar().getBrokerService().getTopicIfExists(topicName.toString())
.thenAccept(optionalTopic -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
Expand Down Expand Up @@ -377,6 +378,69 @@ public void deleteMaxUnackedMessagesOnConsumer(@Suspended final AsyncResponse as
setMaxUnackedMessagesOnConsumer(asyncResponse, tenant, namespace, encodedTopic, null);
}

@GET
@Path("/{tenant}/{namespace}/{topic}/inactiveTopicPolicies")
@ApiOperation(value = "Get inactive topic policies on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error"),})
public void getInactiveTopicPolicies(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
TopicPolicies topicPolicies = getTopicPolicies(topicName).orElse(new TopicPolicies());
if (topicPolicies.isInactiveTopicPoliciesSet()) {
asyncResponse.resume(topicPolicies.getInactiveTopicPolicies());
} else {
asyncResponse.resume(Response.noContent().build());
}
}

@POST
@Path("/{tenant}/{namespace}/{topic}/inactiveTopicPolicies")
@ApiOperation(value = "Set inactive topic policies on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
public void setInactiveTopicPolicies(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "inactive topic policies for the specified topic")
InactiveTopicPolicies inactiveTopicPolicies) {
validateTopicName(tenant, namespace, encodedTopic);
validateAdminAccessForTenant(tenant);
validatePoliciesReadOnlyAccess();
checkTopicLevelPolicyEnable();
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
internalSetInactiveTopicPolicies(inactiveTopicPolicies).whenComplete((res, ex) -> {
if (ex instanceof RestException) {
log.error("Failed set InactiveTopicPolicies", ex);
asyncResponse.resume(ex);
} else if (ex != null) {
log.error("Failed set InactiveTopicPolicies", ex);
asyncResponse.resume(new RestException(ex));
} else {
asyncResponse.resume(Response.noContent().build());
}
});
}

@DELETE
@Path("/{tenant}/{namespace}/{topic}/inactiveTopicPolicies")
@ApiOperation(value = "Delete inactive topic policies on a topic.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),})
public void deleteInactiveTopicPolicies(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic) {
validateTopicName(tenant, namespace, encodedTopic);
setInactiveTopicPolicies(asyncResponse, tenant, namespace, encodedTopic, null);
}

@GET
@Path("/{tenant}/{namespace}/{topic}/maxUnackedMessagesOnSubscription")
@ApiOperation(value = "Get max unacked messages per subscription config on a topic.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1825,14 +1825,19 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
delayedDeliveryTickTimeMillis = data.delayed_delivery_policies.getTickTime();
delayedDeliveryEnabled = data.delayed_delivery_policies.isActive();
}
//If the topic-level policy already exists, the namespace-level policy cannot override the topic-level policy.
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
if (data.inactive_topic_policies != null) {
this.inactiveTopicPolicies = data.inactive_topic_policies;
if (topicPolicies == null || !topicPolicies.isInactiveTopicPoliciesSet()) {
this.inactiveTopicPolicies = data.inactive_topic_policies;
}
} else {
ServiceConfiguration cfg = brokerService.getPulsar().getConfiguration();
resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
, cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), cfg.isBrokerDeleteInactiveTopicsEnabled());
}


initializeDispatchRateLimiterIfNeeded(Optional.ofNullable(data));

this.updateMaxPublishRate(data);
Expand Down Expand Up @@ -2344,24 +2349,50 @@ public void onUpdate(TopicPolicies policies) {
if (log.isDebugEnabled()) {
log.debug("[{}] update topic policy: {}", topic, policies);
}

initializeTopicDispatchRateLimiterIfNeeded(Optional.ofNullable(policies));
if (this.dispatchRateLimiter.isPresent() && policies != null
&& policies.getDispatchRate() != null) {
if (policies == null) {
return;
}
Optional<Policies> namespacePolicies = getNamespacePolicies();
initializeTopicDispatchRateLimiterIfNeeded(policies);
if (this.dispatchRateLimiter.isPresent() && policies.getDispatchRate() != null) {
dispatchRateLimiter.ifPresent(dispatchRateLimiter ->
dispatchRateLimiter.updateDispatchRate(policies.getDispatchRate()));
}

if (policies != null && policies.getPublishRate() != null) {
if (policies.getPublishRate() != null) {
topicPolicyPublishRate = policies.getPublishRate();
updateTopicPublishDispatcher();
}

if (policies.isInactiveTopicPoliciesSet()) {
inactiveTopicPolicies = policies.getInactiveTopicPolicies();
} else {
//topic-level policies is null , so use namespace-level or broker-level
namespacePolicies.ifPresent(nsPolicies -> {
if (nsPolicies.inactive_topic_policies != null) {
inactiveTopicPolicies = nsPolicies.inactive_topic_policies;
} else {
ServiceConfiguration cfg = brokerService.getPulsar().getConfiguration();
resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
, cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), cfg.isBrokerDeleteInactiveTopicsEnabled());
}
});
}
}

private Optional<Policies> getNamespacePolicies(){
try {
return Optional.ofNullable(brokerService.pulsar().getAdminClient().namespaces()
.getPolicies(TopicName.get(topic).getNamespace()));
} catch (Exception e) {
log.error("get namespace policies fail", e);
}
return Optional.empty();
}

private void initializeTopicDispatchRateLimiterIfNeeded(Optional<TopicPolicies> policies) {
private void initializeTopicDispatchRateLimiterIfNeeded(TopicPolicies policies) {
synchronized (dispatchRateLimiter) {
if (!dispatchRateLimiter.isPresent() && policies.isPresent() &&
policies.get().getDispatchRate() != null) {
if (!dispatchRateLimiter.isPresent() && policies.getDispatchRate() != null) {
this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC));
}
}
Expand Down
Loading

0 comments on commit 4a441db

Please sign in to comment.