Skip to content

Commit

Permalink
Delete inactive subscriptions automatically (apache#1352)
Browse files Browse the repository at this point in the history
* Delete inactive subscriptions automatically

* Addressed PR comments

* Add subscriptionExpiryCheckIntervalInMinutes

* Add lastActive for ManagedCursorInfo
  • Loading branch information
yush1ga authored and merlimat committed May 1, 2018
1 parent c62b5c0 commit 762036c
Show file tree
Hide file tree
Showing 16 changed files with 252 additions and 122 deletions.
7 changes: 7 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ messageExpiryCheckIntervalInMinutes=5
# How long to delay rewinding cursor and dispatching messages when active consumer is changed
activeConsumerFailoverDelayTimeMillis=1000

# How long to delete inactive subscriptions from last consuming
# When it is 0, inactive subscriptions are not deleted automatically
subscriptionExpirationTimeMinutes=0

# How frequently to proactively check and purge expired subscription
subscriptionExpiryCheckIntervalInMinutes=5

# Set the default behavior for message deduplication in the broker
# This can be overridden per-namespace. If enabled, broker will reject
# messages that were already stored in the topic
Expand Down
7 changes: 7 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ messageExpiryCheckIntervalInMinutes=5
# How long to delay rewinding cursor and dispatching messages when active consumer is changed
activeConsumerFailoverDelayTimeMillis=1000

# How long to delete inactive subscriptions from last consuming
# When it is 0, inactive subscriptions are not deleted automatically
subscriptionExpirationTimeMinutes=0

# How frequently to proactively check and purge expired subscription
subscriptionExpiryCheckIntervalInMinutes=5

# Set the default behavior for message deduplication in the broker
# This can be overridden per-namespace. If enabled, broker will reject
# messages that were already stored in the topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,19 @@ enum IndividualDeletedEntries {
*/
String getName();

/**
* Get the last active time of the cursor.
*
* @return the last active time of the cursor
*/
long getLastActive();

/**
* Update the last active time of the cursor
*
*/
void updateLastActive();

/**
* Return any properties that were associated with the last stored position.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ public MarkDeleteEntry(PositionImpl newPosition, Map<String, Long> properties,
private long lastLedgerSwitchTimestamp;
private final Clock clock;

// The last active time (Unix time, milliseconds) of the cursor
private long lastActive;

enum State {
Uninitialized, // Cursor is being initialized
NoLedger, // There is no metadata ledger open for writing
Expand Down Expand Up @@ -189,6 +192,7 @@ public interface VoidCallback {
RESET_CURSOR_IN_PROGRESS_UPDATER.set(this, FALSE);
WAITING_READ_OP_UPDATER.set(this, null);
this.clock = config.getClock();
this.lastActive = this.clock.millis();
this.lastLedgerSwitchTimestamp = this.clock.millis();

if (config.getThrottleMarkDelete() > 0.0) {
Expand Down Expand Up @@ -216,6 +220,7 @@ void recover(final VoidCallback callback) {
public void operationComplete(ManagedCursorInfo info, Stat stat) {

cursorLedgerStat = stat;
lastActive = info.getLastActive() != 0 ? info.getLastActive() : lastActive;

if (info.getCursorsLedgerId() == -1L) {
// There is no cursor ledger to read the last position from. It means the cursor has been properly
Expand Down Expand Up @@ -1280,7 +1285,7 @@ PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) {
// markDelete-position and clear out deletedMsgSet
markDeletePosition = PositionImpl.get(newMarkDeletePosition);
individualDeletedMessages.remove(Range.atMost(markDeletePosition));

if (readPosition.compareTo(newMarkDeletePosition) <= 0) {
// If the position that is mark-deleted is past the read position, it
// means that the client has skipped some entries. We need to move
Expand All @@ -1307,7 +1312,7 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
final MarkDeleteCallback callback, final Object ctx) {
checkNotNull(position);
checkArgument(position instanceof PositionImpl);

if (STATE_UPDATER.get(this) == State.Closed) {
callback.markDeleteFailed(new ManagedLedgerException("Cursor was already closed"), ctx);
return;
Expand All @@ -1328,7 +1333,7 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
log.debug("[{}] Mark delete cursor {} up to position: {}", ledger.getName(), name, position);
}
PositionImpl newPosition = (PositionImpl) position;

if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(newPosition) < 0) {
if (log.isDebugEnabled()) {
log.debug(
Expand Down Expand Up @@ -1541,7 +1546,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb

for (Position pos : positions) {
PositionImpl position = (PositionImpl) checkNotNull(pos);

if (((PositionImpl) ledger.getLastConfirmedEntry()).compareTo(position) < 0) {
if (log.isDebugEnabled()) {
log.debug(
Expand Down Expand Up @@ -1692,6 +1697,16 @@ public String getName() {
return name;
}

@Override
public long getLastActive() {
return lastActive;
}

@Override
public void updateLastActive() {
lastActive = System.currentTimeMillis();
}

@Override
public boolean isDurable() {
return true;
Expand Down Expand Up @@ -1837,7 +1852,8 @@ private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl positio
ManagedCursorInfo.Builder info = ManagedCursorInfo.newBuilder() //
.setCursorsLedgerId(cursorsLedgerId) //
.setMarkDeleteLedgerId(position.getLedgerId()) //
.setMarkDeleteEntryId(position.getEntryId()); //
.setMarkDeleteEntryId(position.getEntryId()) //
.setLastActive(lastActive); //

info.addAllProperties(buildPropertiesMap(properties));
if (persistIndividualDeletedMessageRanges) {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions managed-ledger/src/main/proto/MLDataFormats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,6 @@ message ManagedCursorInfo {
// Additional custom properties associated with
// the current cursor position
repeated LongProperty properties = 5;

optional int64 lastActive = 6;
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,16 @@ public String getName() {
return name;
}

@Override
public long getLastActive() {
return System.currentTimeMillis();
}

@Override
public void updateLastActive() {
// no-op
}

public String toString() {
return String.format("%s=%s", name, position);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ public class ServiceConfiguration implements PulsarConfiguration {
private int messageExpiryCheckIntervalInMinutes = 5;
// How long to delay rewinding cursor and dispatching messages when active consumer is changed
private int activeConsumerFailoverDelayTimeMillis = 1000;
// How long to delete inactive subscriptions from last consuming
// When it is 0, inactive subscriptions are not deleted automatically
private long subscriptionExpirationTimeMinutes = 0;
// How frequently to proactively check and purge expired subscription
private long subscriptionExpiryCheckIntervalInMinutes = 5;

// Set the default behavior for message deduplication in the broker
// This can be overridden per-namespace. If enabled, broker will reject
Expand Down Expand Up @@ -681,6 +686,22 @@ public void setActiveConsumerFailoverDelayTimeMillis(int activeConsumerFailoverD
this.activeConsumerFailoverDelayTimeMillis = activeConsumerFailoverDelayTimeMillis;
}

public long getSubscriptionExpirationTimeMinutes() {
return subscriptionExpirationTimeMinutes;
}

public void setSubscriptionExpirationTimeMinutes(long subscriptionExpirationTimeMinutes) {
this.subscriptionExpirationTimeMinutes = subscriptionExpirationTimeMinutes;
}

public long getSubscriptionExpiryCheckIntervalInMinutes() {
return subscriptionExpiryCheckIntervalInMinutes;
}

public void setSubscriptionExpiryCheckIntervalInMinutes(long subscriptionExpiryCheckIntervalInMinutes) {
this.subscriptionExpiryCheckIntervalInMinutes = subscriptionExpiryCheckIntervalInMinutes;
}

public boolean isClientLibraryVersionCheckEnabled() {
return clientLibraryVersionCheckEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,10 @@ public List<String> getListFromBundle(@PathParam("property") String property, @P
NamespaceBundle nsBundle = validateNamespaceBundleOwnership(fqnn, policies.bundles, bundleRange, true, true);
try {
final List<String> topicList = Lists.newArrayList();
pulsar().getBrokerService().getTopics().forEach((name, topicFuture) -> {
if (BrokerService.extractTopic(topicFuture).isPresent()) {
TopicName topicName = TopicName.get(name);
if (nsBundle.includes(topicName)) {
topicList.add(name);
}
pulsar().getBrokerService().forEachTopic(topic -> {
TopicName topicName = TopicName.get(topic.getName());
if (nsBundle.includes(topicName)) {
topicList.add(topic.getName());
}
});
return topicList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,10 @@ public List<String> getListFromBundle(@PathParam("tenant") String tenant, @PathP
NamespaceBundle nsBundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, true, true);
try {
final List<String> topicList = Lists.newArrayList();
pulsar().getBrokerService().getTopics().forEach((name, topicFuture) -> {
if (BrokerService.extractTopic(topicFuture).isPresent()) {
TopicName topicName = TopicName.get(name);
if (nsBundle.includes(topicName)) {
topicList.add(name);
}
pulsar().getBrokerService().forEachTopic(topic -> {
TopicName topicName = TopicName.get(topic.getName());
if (nsBundle.includes(topicName)) {
topicList.add(topic.getName());
}
});
return topicList;
Expand Down
Loading

0 comments on commit 762036c

Please sign in to comment.