Skip to content

Commit

Permalink
Support delete inactive topic when subscriptions caught up (apache#6077)
Browse files Browse the repository at this point in the history
### Motivation

Currently, pulsar support delete inactive topic which has no active producers and no subscriptions. This pull request is support to delete inactive topics that all subscriptions of the topic are caught up and no active producers/consumer. 

### Modifications

Expose inactive topic delete mode in broker.conf, future more we can support namespace level configuration for the inactive topic delete mode.
  • Loading branch information
codelipenghui authored Jan 19, 2020
1 parent 5577066 commit dc7abd8
Show file tree
Hide file tree
Showing 11 changed files with 261 additions and 23 deletions.
11 changes: 11 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,17 @@ brokerDeleteInactiveTopicsEnabled=true
# How often to check for inactive topics
brokerDeleteInactiveTopicsFrequencySeconds=60

# Set the inactive topic delete mode. Default is delete_when_no_subscriptions
# 'delete_when_no_subscriptions' mode only delete the topic which has no subscriptions and no active producers
# 'delete_when_subscriptions_caught_up' mode only delete the topic that all subscriptions has no backlogs(caught up)
# and no active producers/consumers
brokerDeleteInactiveTopicsMode=delete_when_no_subscriptions

# Max duration of topic inactivity in seconds, default is not present
# If not present, 'brokerDeleteInactiveTopicsFrequencySeconds' will be used
# Topics that are inactive for longer than this value will be deleted
brokerDeleteInactiveTopicsMaxInactiveDurationSeconds=

# How frequently to proactively check and purge expired messages
messageExpiryCheckIntervalInMinutes=5

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import lombok.Setter;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.configuration.Category;
import org.apache.pulsar.common.configuration.FieldContext;
Expand Down Expand Up @@ -254,6 +255,24 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "How often to check for inactive topics"
)
private int brokerDeleteInactiveTopicsFrequencySeconds = 60;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Set the inactive topic delete mode. Default is delete_when_no_subscriptions\n"
+ "'delete_when_no_subscriptions' mode only delete the topic which has no subscriptions and no active producers\n"
+ "'delete_when_subscriptions_caught_up' mode only delete the topic that all subscriptions has no backlogs(caught up)"
+ "and no active producers/consumers"
)
private InactiveTopicDeleteMode brokerDeleteInactiveTopicsMode = InactiveTopicDeleteMode.delete_when_no_subscriptions;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "Max duration of topic inactivity in seconds, default is not present\n"
+ "If not present, 'brokerDeleteInactiveTopicsFrequencySeconds' will be used\n"
+ "Topics that are inactive for longer than this value will be deleted"
)
private Integer brokerDeleteInactiveTopicsMaxInactiveDurationSeconds = null;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "How frequently to proactively check and purge expired messages"
Expand Down Expand Up @@ -1449,6 +1468,14 @@ public boolean isDefaultTopicTypePartitioned() {
return TopicType.PARTITIONED.toString().equals(allowAutoTopicCreationType);
}

public int getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds() {
if (brokerDeleteInactiveTopicsMaxInactiveDurationSeconds == null) {
return brokerDeleteInactiveTopicsFrequencySeconds;
} else {
return brokerDeleteInactiveTopicsMaxInactiveDurationSeconds;
}
}

enum TopicType {
PARTITIONED("partitioned"),
NON_PARTITIONED("non-partitioned");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ protected void startStatsUpdater(int statsUpdateInitailDelayInSecs, int statsUpd
protected void startInactivityMonitor() {
if (pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled()) {
int interval = pulsar().getConfiguration().getBrokerDeleteInactiveTopicsFrequencySeconds();
inactivityMonitor.scheduleAtFixedRate(safeRun(() -> checkGC(interval)), interval, interval,
int maxInactiveDurationInSec = pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds();
inactivityMonitor.scheduleAtFixedRate(safeRun(() -> checkGC(maxInactiveDurationInSec)), interval, interval,
TimeUnit.SECONDS);
}

Expand Down Expand Up @@ -1095,8 +1096,9 @@ public Semaphore getLookupRequestSemaphore() {
return lookupRequestSemaphore.get();
}

public void checkGC(int gcIntervalInSeconds) {
forEachTopic(topic -> topic.checkGC(gcIntervalInSeconds));
public void checkGC(int maxInactiveDurationInSec) {
forEachTopic(topic -> topic.checkGC(maxInactiveDurationInSec,
pulsar.getConfiguration().getBrokerDeleteInactiveTopicsMode()));
}

public void checkMessageExpiry() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicStats;
Expand Down Expand Up @@ -123,7 +124,7 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init

CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect);

void checkGC(int gcInterval);
void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode);

void checkInactiveSubscriptions();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
import org.apache.pulsar.common.policies.data.NonPersistentReplicatorStats;
import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats;
Expand Down Expand Up @@ -822,19 +823,19 @@ public boolean isActive() {
}

@Override
public void checkGC(int gcIntervalInSeconds) {
public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode) {
if (isActive()) {
lastActive = System.nanoTime();
} else {
if (System.nanoTime() - lastActive > TimeUnit.SECONDS.toNanos(gcIntervalInSeconds)) {
if (System.nanoTime() - lastActive > TimeUnit.SECONDS.toNanos(maxInactiveDurationInSec)) {

if (TopicName.get(topic).isGlobal()) {
// For global namespace, close repl producers first.
// Once all repl producers are closed, we can delete the topic,
// provided no remote producers connected to the broker.
if (log.isDebugEnabled()) {
log.debug("[{}] Global topic inactive for {} seconds, closing repl producers.", topic,
gcIntervalInSeconds);
maxInactiveDurationInSec);
}

stopReplProducers().thenCompose(v -> delete(true, false, true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo;
Expand Down Expand Up @@ -764,11 +765,11 @@ void removeSubscription(String subscriptionName) {
*/
@Override
public CompletableFuture<Void> delete() {
return delete(false, false);
return delete(false, false, false);
}

private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean deleteSchema) {
return delete(failIfHasSubscriptions, false, deleteSchema);
private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean failIfHasBacklogs, boolean deleteSchema) {
return delete(failIfHasSubscriptions, failIfHasBacklogs, false, deleteSchema);
}

/**
Expand All @@ -780,7 +781,7 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean d
*/
@Override
public CompletableFuture<Void> deleteForcefully() {
return delete(false, true, false);
return delete(false, false, true, false);
}

/**
Expand All @@ -800,6 +801,7 @@ public CompletableFuture<Void> deleteForcefully() {
* IllegalStateException if topic is still active ManagedLedgerException if ledger delete operation fails
*/
private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
boolean failIfHasBacklogs,
boolean closeIfClientsConnected,
boolean deleteSchema) {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -841,6 +843,12 @@ private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions"));
return;
}
} else if (failIfHasBacklogs) {
if (hasBacklogs()) {
isFenced = false;
deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions did not catch up"));
return;
}
} else {
subscriptions.forEach((s, sub) -> futures.add(sub.delete()));
}
Expand Down Expand Up @@ -1586,19 +1594,36 @@ public long getBacklogSize() {
return ledger.getEstimatedBacklogSize();
}

public boolean isActive() {
public boolean isActive(InactiveTopicDeleteMode deleteMode) {
switch (deleteMode) {
case delete_when_no_subscriptions:
if (!subscriptions.isEmpty()) {
return true;
}
break;
case delete_when_subscriptions_caught_up:
if (hasBacklogs()) {
return true;
}
break;
}
if (TopicName.get(topic).isGlobal()) {
// No local consumers and no local producers
return !subscriptions.isEmpty() || hasLocalProducers();
// no local producers
return hasLocalProducers();
} else {
return USAGE_COUNT_UPDATER.get(this) != 0;
}
return USAGE_COUNT_UPDATER.get(this) != 0 || !subscriptions.isEmpty();
}

private boolean hasBacklogs() {
return subscriptions.values().stream().anyMatch(sub -> sub.getNumberOfEntriesInBacklog() > 0);
}

@Override
public void checkGC(int gcIntervalInSeconds) {
if (isActive()) {
public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode) {
if (isActive(deleteMode)) {
lastActive = System.nanoTime();
} else if (System.nanoTime() - lastActive < TimeUnit.SECONDS.toNanos(gcIntervalInSeconds)) {
} else if (System.nanoTime() - lastActive < TimeUnit.SECONDS.toNanos(maxInactiveDurationInSec)) {
// Gc interval did not expire yet
return;
} else if (shouldTopicBeRetained()) {
Expand All @@ -1613,7 +1638,7 @@ public void checkGC(int gcIntervalInSeconds) {
// provided no remote producers connected to the broker.
if (log.isDebugEnabled()) {
log.debug("[{}] Global topic inactive for {} seconds, closing repl producers.", topic,
gcIntervalInSeconds);
maxInactiveDurationInSec);
}
closeReplProducersIfNoBacklog().thenRun(() -> {
if (hasRemoteProducers()) {
Expand All @@ -1625,7 +1650,7 @@ public void checkGC(int gcIntervalInSeconds) {
.completeExceptionally(new TopicBusyException("Topic has connected remote producers"));
} else {
log.info("[{}] Global topic inactive for {} seconds, closed repl producers", topic,
gcIntervalInSeconds);
maxInactiveDurationInSec);
replCloseFuture.complete(null);
}
}).exceptionally(e -> {
Expand All @@ -1639,7 +1664,8 @@ public void checkGC(int gcIntervalInSeconds) {
replCloseFuture.complete(null);
}

replCloseFuture.thenCompose(v -> delete(true, true))
replCloseFuture.thenCompose(v -> delete(deleteMode == InactiveTopicDeleteMode.delete_when_no_subscriptions,
deleteMode == InactiveTopicDeleteMode.delete_when_subscriptions_caught_up, true))
.thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", topic))
.exceptionally(e -> {
if (e.getCause() instanceof TopicBusyException) {
Expand Down Expand Up @@ -1969,7 +1995,7 @@ public synchronized OffloadProcessStatus offloadStatus() {
public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
return hasSchema()
.thenCompose((hasSchema) -> {
if (hasSchema || isActive() || ledger.getTotalSize() != 0) {
if (hasSchema || isActive(InactiveTopicDeleteMode.delete_when_no_subscriptions) || ledger.getTotalSize() != 0) {
return checkSchemaCompatibleForConsumer(schema);
} else {
return addSchema(schema).thenCompose(schemaVersion ->
Expand Down
Loading

0 comments on commit dc7abd8

Please sign in to comment.