Skip to content

Commit

Permalink
Monitor thread to check if topics need compaction (apache#1915)
Browse files Browse the repository at this point in the history
Similar to other monitors, runs every 60 seconds by default, and
checks whether any persistent topics need to be compacted.
  • Loading branch information
ivankelly authored and merlimat committed Jun 6, 2018
1 parent 881e274 commit cd28ebf
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 0 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ maxConsumersPerTopic=0
# Using a value of 0, is disabling maxConsumersPerSubscription-limit check.
maxConsumersPerSubscription=0

# Interval between checks to see if topics with compaction policies need to be compacted
brokerServiceCompactionMonitorIntervalInSeconds=60

### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request with
#role as proxyRoles - it will demand to see a valid original principal.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,9 @@ public class ServiceConfiguration implements PulsarConfiguration {
@FieldContext(dynamic = true)
private boolean preferLaterVersions = false;

// Interval between checks to see if topics with compaction policies need to be compacted
private int brokerServiceCompactionMonitorIntervalInSeconds = 60;

private String schemaRegistryStorageClassName = "org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory";
private Set<String> schemaRegistryCompatibilityCheckers = Sets.newHashSet(
"org.apache.pulsar.broker.service.schema.JsonSchemaCompatibilityCheck"
Expand Down Expand Up @@ -1719,4 +1722,11 @@ public int getS3ManagedLedgerOffloadReadBufferSizeInBytes() {
return this.s3ManagedLedgerOffloadReadBufferSizeInBytes;
}

public void setBrokerServiceCompactionMonitorIntervalInSeconds(int interval) {
this.brokerServiceCompactionMonitorIntervalInSeconds = interval;
}

public int getBrokerServiceCompactionMonitorIntervalInSeconds() {
return this.brokerServiceCompactionMonitorIntervalInSeconds;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies

private final ScheduledExecutorService inactivityMonitor;
private final ScheduledExecutorService messageExpiryMonitor;
private final ScheduledExecutorService compactionMonitor;

private DistributedIdGenerator producerNameGenerator;

Expand Down Expand Up @@ -227,6 +228,9 @@ public BrokerService(PulsarService pulsar) throws Exception {
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-inactivity-monitor"));
this.messageExpiryMonitor = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-msg-expiry-monitor"));
this.compactionMonitor =
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-compaction-monitor"));

this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
this.backlogQuotaChecker = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));
Expand Down Expand Up @@ -309,6 +313,7 @@ public void start() throws Exception {
this.startStatsUpdater();
this.startInactivityMonitor();
this.startMessageExpiryMonitor();
this.startCompactionMonitor();
this.startBacklogQuotaChecker();
// register listener to capture zk-latency
ClientCnxnAspect.addListener(zkStatsListener);
Expand Down Expand Up @@ -350,6 +355,14 @@ void startMessageExpiryMonitor() {
TimeUnit.MINUTES);
}

void startCompactionMonitor() {
int interval = pulsar().getConfiguration().getBrokerServiceCompactionMonitorIntervalInSeconds();
if (interval > 0) {
compactionMonitor.scheduleAtFixedRate(safeRun(() -> checkCompaction()),
interval, interval, TimeUnit.SECONDS);
}
}

void startBacklogQuotaChecker() {
if (pulsar().getConfiguration().isBacklogQuotaCheckEnabled()) {
final int interval = pulsar().getConfiguration().getBacklogQuotaCheckIntervalInSeconds();
Expand Down Expand Up @@ -387,6 +400,7 @@ public void close() throws IOException {
statsUpdater.shutdown();
inactivityMonitor.shutdown();
messageExpiryMonitor.shutdown();
compactionMonitor.shutdown();
backlogQuotaChecker.shutdown();
authenticationService.close();
pulsarStats.close();
Expand Down Expand Up @@ -838,6 +852,14 @@ public void checkMessageExpiry() {
forEachTopic(Topic::checkMessageExpiry);
}

public void checkCompaction() {
forEachTopic((t) -> {
if (t instanceof PersistentTopic) {
((PersistentTopic) t).checkCompaction();
}
});
}

public void checkMessageDeduplicationInfo() {
forEachTopic(Topic::checkMessageDeduplicationInfo);
}
Expand Down
3 changes: 3 additions & 0 deletions site/_data/config/broker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ configs:
- name: messageExpiryCheckIntervalInMinutes
default: '5'
description: How frequently to proactively check and purge expired messages
- name: brokerServiceCompactionMonitorIntervalInSeconds
default: '60'
description: Interval between checks to see if topics with compaction policies need to be compacted
- name: activeConsumerFailoverDelayTimeMillis
default: '1000'
description: How long to delay rewinding cursor and dispatching messages when active consumer is changed.
Expand Down

0 comments on commit cd28ebf

Please sign in to comment.