Skip to content

Commit

Permalink
Support configuring DeleteInactiveTopic setting in namespace policy (a…
Browse files Browse the repository at this point in the history
…pache#7598)

### Motivation

Support configuring DeleteInactiveTopic setting in namespace policy

### Modifications

Only the two parameters `brokerDeleteInactiveTopicsMode` and `brokerDeleteInactiveTopicsMaxInactiveDurationSeconds` support namespace policy. The parameters are changed to Map structure, the key is the namespace, and the value is the parameter value.
Such as: namespace1=delete_when_no_subscriptions, namespace2=delete_when_no_subscriptions.

In addition, there is a key name called `default`. If it is set, other namespaces that do not specify parameters will use this parameter.
Such as: default=delete_when_no_subscriptions
  • Loading branch information
315157973 authored Jul 28, 2020
1 parent 48156ad commit 00e3089
Show file tree
Hide file tree
Showing 18 changed files with 566 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;

import java.lang.reflect.Field;
import java.net.URI;
import java.net.URL;
import java.util.Collections;
Expand Down Expand Up @@ -90,6 +92,7 @@
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.TenantOperation;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -1832,39 +1835,64 @@ protected DelayedDeliveryPolicies internalGetDelayedDelivery() {
}
}

protected void internalSetDelayedDelivery(DelayedDeliveryPolicies delayedDeliveryPolicies) {
protected InactiveTopicPolicies internalGetInactiveTopic() {
validateNamespacePolicyOperation(namespaceName, PolicyName.INACTIVE_TOPIC, PolicyOperation.READ);

Policies policies = getNamespacePolicies(namespaceName);
if (policies.inactive_topic_policies == null) {
return new InactiveTopicPolicies(config().getBrokerDeleteInactiveTopicsMode()
, config().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds()
, config().isBrokerDeleteInactiveTopicsEnabled());
} else {
return policies.inactive_topic_policies;
}
}

protected void internalSetInactiveTopic(InactiveTopicPolicies inactiveTopicPolicies){
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();
internalSetPolicies("inactive_topic_policies", inactiveTopicPolicies);
}

protected void internalSetPolicies(String fieldName, Object value){
try {
Stat nodeStat = new Stat();
final String path = path(POLICIES, namespaceName.toString());
byte[] content = globalZk().getData(path, null, nodeStat);
Policies policies = jsonMapper().readValue(content, Policies.class);

policies.delayed_delivery_policies = delayedDeliveryPolicies;
Field field = Policies.class.getDeclaredField(fieldName);
field.setAccessible(true);
field.set(policies, value);

globalZk().setData(path, jsonMapper().writeValueAsBytes(policies), nodeStat.getVersion());
policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
log.info("[{}] Successfully updated delayed delivery messages configuration: namespace={}, map={}", clientAppId(),
namespaceName, jsonMapper().writeValueAsString(policies.retention_policies));
log.info("[{}] Successfully updated {} configuration: namespace={}, value={}", clientAppId(), fieldName,
namespaceName, jsonMapper().writeValueAsString(value));

} catch (KeeperException.NoNodeException e) {
log.warn("[{}] Failed to update delayed delivery messages configuration for namespace {}: does not exist", clientAppId(),
namespaceName);
log.warn("[{}] Failed to update {} configuration for namespace {}: does not exist", clientAppId(),
fieldName, namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn("[{}] Failed to update delayed delivery messages configuration for namespace {}: concurrent modification",
clientAppId(), namespaceName);
log.warn("[{}] Failed to update {} configuration for namespace {}: concurrent modification",
clientAppId(), fieldName, namespaceName);
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (RestException pfe) {
throw pfe;
} catch (Exception e) {
log.error("[{}] Failed to update delayed delivery messages configuration for namespace {}", clientAppId(), namespaceName,
e);
log.error("[{}] Failed to update {} configuration for namespace {}", clientAppId(), fieldName
, namespaceName, e);
throw new RestException(e);
}
}

protected void internalSetDelayedDelivery(DelayedDeliveryPolicies delayedDeliveryPolicies) {
validateSuperUserAccess();
validatePoliciesReadOnlyAccess();
internalSetPolicies("delayed_delivery_policies", delayedDeliveryPolicies);
}

protected void internalSetNamespaceAntiAffinityGroup(String antiAffinityGroup) {
validateNamespacePolicyOperation(namespaceName, PolicyName.ANTI_AFFINITY, PolicyOperation.WRITE);
checkNotNull(antiAffinityGroup, "AntiAffinityGroup should not be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;

import org.apache.pulsar.common.policies.data.TenantOperation;
import org.slf4j.Logger;
Expand Down Expand Up @@ -857,6 +858,41 @@ public void setDelayedDeliveryPolicies(@PathParam("tenant") String tenant,
internalSetDelayedDelivery(deliveryPolicies);
}

@GET
@Path("/{tenant}/{namespace}/inactiveTopicPolicies")
@ApiOperation(value = "Get inactive topic policies config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
@ApiResponse(code = 409, message = "Concurrent modification"), })
public InactiveTopicPolicies getInactiveTopicPolicies(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetInactiveTopic();
}

@DELETE
@Path("/{tenant}/{namespace}/inactiveTopicPolicies")
@ApiOperation(value = "Remove inactive topic policies from a namespace.")
@ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist"),
@ApiResponse(code = 409, message = "Concurrent modification")})
public void removeInactiveTopicPolicies(@PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
internalSetInactiveTopic( null);
}

@POST
@Path("/{tenant}/{namespace}/inactiveTopicPolicies")
@ApiOperation(value = "Set inactive topic policies config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"), })
public void setInactiveTopicPolicies(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@ApiParam(value = "Inactive topic policies for the specified namespace") InactiveTopicPolicies inactiveTopicPolicies) {
validateNamespaceName(tenant, namespace);
internalSetInactiveTopic(inactiveTopicPolicies);
}

@GET
@Path("/{tenant}/{namespace}/maxProducersPerTopic")
@ApiOperation(value = "Get maxProducersPerTopic config on a namespace.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
Expand Down Expand Up @@ -63,8 +65,8 @@ public abstract class AbstractTopic implements Topic {

protected volatile boolean isFenced;

// When set to false, this inactive topic can not be deleted
protected boolean deleteWhileInactive;
// Inactive topic policies
protected InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies();

// Timestamp of when this topic was last seen active
protected volatile long lastActive;
Expand Down Expand Up @@ -98,8 +100,9 @@ public AbstractTopic(String topic, BrokerService brokerService) {
this.producers = new ConcurrentHashMap<>();
this.isFenced = false;
this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
this.deleteWhileInactive =
brokerService.pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled();
this.inactiveTopicPolicies.setDeleteWhileInactive(brokerService.pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled());
this.inactiveTopicPolicies.setMaxInactiveDurationSeconds(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds());
this.inactiveTopicPolicies.setInactiveTopicDeleteMode(brokerService.pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMode());
this.lastActive = System.nanoTime();
Policies policies = null;
try {
Expand Down Expand Up @@ -132,12 +135,14 @@ protected boolean isProducersExceeded() {
return false;
}

@Override
public void disableCnxAutoRead() {
if (producers != null) {
producers.values().forEach(producer -> producer.getCnx().disableCnxAutoRead());
}
}

@Override
public void enableCnxAutoRead() {
if (producers != null) {
producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead());
Expand Down Expand Up @@ -466,12 +471,23 @@ public long getBytesOutCounter() {
}

public boolean isDeleteWhileInactive() {
return deleteWhileInactive;
return this.inactiveTopicPolicies.isDeleteWhileInactive();
}

public void setDeleteWhileInactive(boolean deleteWhileInactive) {
this.deleteWhileInactive = deleteWhileInactive;
this.inactiveTopicPolicies.setDeleteWhileInactive(deleteWhileInactive);
}

private static final Logger log = LoggerFactory.getLogger(AbstractTopic.class);

public InactiveTopicPolicies getInactiveTopicPolicies() {
return inactiveTopicPolicies;
}

public void resetInactiveTopicPolicies(InactiveTopicDeleteMode inactiveTopicDeleteMode
, int maxInactiveDurationSeconds, boolean deleteWhileInactive) {
inactiveTopicPolicies.setInactiveTopicDeleteMode(inactiveTopicDeleteMode);
inactiveTopicPolicies.setMaxInactiveDurationSeconds(maxInactiveDurationSeconds);
inactiveTopicPolicies.setDeleteWhileInactive(deleteWhileInactive);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -442,8 +442,7 @@ protected void startStatsUpdater(int statsUpdateInitailDelayInSecs, int statsUpd
protected void startInactivityMonitor() {
if (pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled()) {
int interval = pulsar().getConfiguration().getBrokerDeleteInactiveTopicsFrequencySeconds();
int maxInactiveDurationInSec = pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds();
inactivityMonitor.scheduleAtFixedRate(safeRun(() -> checkGC(maxInactiveDurationInSec)), interval, interval,
inactivityMonitor.scheduleAtFixedRate(safeRun(() -> checkGC()), interval, interval,
TimeUnit.SECONDS);
}

Expand Down Expand Up @@ -1244,9 +1243,8 @@ public Semaphore getLookupRequestSemaphore() {
return lookupRequestSemaphore.get();
}

public void checkGC(int maxInactiveDurationInSec) {
forEachTopic(topic -> topic.checkGC(maxInactiveDurationInSec,
pulsar.getConfiguration().getBrokerDeleteInactiveTopicsMode()));
public void checkGC() {
forEachTopic(Topic::checkGC);
}

public void checkMessageExpiry() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init

CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect);

void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode);
void checkGC();

void checkInactiveSubscriptions();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
Expand Down Expand Up @@ -68,7 +69,6 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
import org.apache.pulsar.common.policies.data.NonPersistentReplicatorStats;
import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats;
Expand Down Expand Up @@ -145,6 +145,9 @@ public NonPersistentTopic(String topic, BrokerService brokerService) {
.orElseThrow(() -> new KeeperException.NoNodeException());
isEncryptionRequired = policies.encryption_required;
isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
if (policies.inactive_topic_policies != null) {
inactiveTopicPolicies = policies.inactive_topic_policies;
}
setSchemaCompatibilityStrategy(policies);

schemaValidationEnforced = policies.schema_validation_enforced;
Expand Down Expand Up @@ -420,7 +423,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean c

/**
* Close this topic - close all producers and subscriptions associated with this topic
*
*
* @param closeWithoutWaitingClientDisconnect
* don't wait for client disconnect and forcefully close managed-ledger
* @return Completable future indicating completion of close operation
Expand Down Expand Up @@ -626,6 +629,7 @@ public Replicator getPersistentReplicator(String remoteCluster) {
return replicators.get(remoteCluster);
}

@Override
public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats,
StatsOutputStream topicStatsStream, ClusterReplicationMetrics replStats, String namespace,
boolean hydratePublishers) {
Expand Down Expand Up @@ -755,6 +759,7 @@ public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats
topicStatsStream.endObject();
}

@Override
public NonPersistentTopicStats getStats(boolean getPreciseBacklog) {

NonPersistentTopicStats stats = new NonPersistentTopicStats();
Expand Down Expand Up @@ -808,6 +813,7 @@ public NonPersistentTopicStats getStats(boolean getPreciseBacklog) {
return stats;
}

@Override
public PersistentTopicInternalStats getInternalStats() {

PersistentTopicInternalStats stats = new PersistentTopicInternalStats();
Expand All @@ -829,11 +835,12 @@ public boolean isActive() {
}

@Override
public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode) {
if (!deleteWhileInactive) {
public void checkGC() {
if (!isDeleteWhileInactive()) {
// This topic is not included in GC
return;
}
int maxInactiveDurationInSec = inactiveTopicPolicies.getMaxInactiveDurationSeconds();
if (isActive()) {
lastActive = System.nanoTime();
} else {
Expand Down Expand Up @@ -895,6 +902,14 @@ public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
producer.checkEncryption();
});
subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(Consumer::checkPermissions));

if (data.inactive_topic_policies != null) {
this.inactiveTopicPolicies = data.inactive_topic_policies;
} else {
ServiceConfiguration cfg = brokerService.getPulsar().getConfiguration();
resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
, cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), cfg.isBrokerDeleteInactiveTopicsEnabled());
}
return checkReplicationAndRetryOnFailure();
}

Expand Down
Loading

0 comments on commit 00e3089

Please sign in to comment.