Skip to content

Commit

Permalink
Add control of single topic gc in inactive state. (apache#7093)
Browse files Browse the repository at this point in the history
Motivation
The brokerDeleteInactiveTopicsEnabled parameter is effect to all Topics, but in some cases like topic created by protocol handler, we need keep some topics not be deleted even if they're in inactive state.

Modifications
Add deleteWhileInactive parameter in AbstractTopic, and when create a new topic, we can call topic.setDeleteWhileInactive(false) to keep the topic not be deleted by GC.
  • Loading branch information
zhanghaou authored Jun 4, 2020
1 parent d009c11 commit 88babcc
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public abstract class AbstractTopic implements Topic {

protected volatile boolean isFenced;

// When set to false, this inactive topic can not be deleted
protected boolean deleteWhileInactive;

// Timestamp of when this topic was last seen active
protected volatile long lastActive;

Expand Down Expand Up @@ -95,6 +98,8 @@ public AbstractTopic(String topic, BrokerService brokerService) {
this.producers = new ConcurrentHashMap<>();
this.isFenced = false;
this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
this.deleteWhileInactive =
brokerService.pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled();
this.lastActive = System.nanoTime();
Policies policies = null;
try {
Expand Down Expand Up @@ -460,5 +465,13 @@ public long getBytesOutCounter() {
return getStats(false).bytesOutCounter;
}

public boolean isDeleteWhileInactive() {
return deleteWhileInactive;
}

public void setDeleteWhileInactive(boolean deleteWhileInactive) {
this.deleteWhileInactive = deleteWhileInactive;
}

private static final Logger log = LoggerFactory.getLogger(AbstractTopic.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,10 @@ public boolean isActive() {

@Override
public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode) {
if (!deleteWhileInactive) {
// This topic is not included in GC
return;
}
if (isActive()) {
lastActive = System.nanoTime();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
*/
package org.apache.pulsar.broker.service.persistent;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import com.carrotsearch.hppc.ObjectObjectHashMap;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;

import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -38,8 +39,9 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import java.util.function.BiFunction;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
Expand Down Expand Up @@ -125,15 +127,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCallback {

// Managed ledger associated with the topic
Expand Down Expand Up @@ -1647,6 +1640,10 @@ private boolean hasBacklogs() {

@Override
public void checkGC(int maxInactiveDurationInSec, InactiveTopicDeleteMode deleteMode) {
if (!deleteWhileInactive) {
// This topic is not included in GC
return;
}
if (isActive(deleteMode)) {
lastActive = System.nanoTime();
} else if (System.nanoTime() - lastActive < TimeUnit.SECONDS.toNanos(maxInactiveDurationInSec)) {
Expand Down

0 comments on commit 88babcc

Please sign in to comment.