Skip to content

Commit

Permalink
make ledger rollover check task internal (apache#8946)
Browse files Browse the repository at this point in the history
Fix apache#7195 

### Changes
1. add a schedulerTask to rollover the ledger in `ManagedLedgerImpl` instead of `BrokerService`
2. add the test.
  • Loading branch information
hangc0276 authored Jan 7, 2021
1 parent 7313b34 commit b4ef76e
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,7 @@ void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.Upd
/**
* Roll current ledger if it is full
*/
@Deprecated
void rollCurrentLedgerIfFull();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
final EntryCache entryCache;

private ScheduledFuture<?> timeoutTask;
private ScheduledFuture<?> checkLedgerRollTask;

/**
* This lock is held while the ledgers list or propertiesMap is updated asynchronously on the metadata store. Since we use the store
Expand Down Expand Up @@ -382,6 +383,8 @@ public void operationFailed(MetaStoreException e) {
});

scheduleTimeoutTask();

scheduleRollOverLedgerTask();
}

private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedgerCallback callback) {
Expand Down Expand Up @@ -1318,6 +1321,10 @@ public synchronized void asyncClose(final CloseCallback callback, final Object c
this.timeoutTask.cancel(false);
}

if (this.checkLedgerRollTask != null) {
this.checkLedgerRollTask.cancel(false);
}

}

private void closeAllCursors(CloseCallback callback, final Object ctx) {
Expand Down Expand Up @@ -1548,6 +1555,7 @@ synchronized void createLedgerAfterClosed() {
asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap());
}

@VisibleForTesting
@Override
public void rollCurrentLedgerIfFull() {
log.info("[{}] Start checking if current ledger is full", name);
Expand All @@ -1561,7 +1569,7 @@ public void closeComplete(int rc, LedgerHandle lh, Object o) {
lh.getId());

if (rc == BKException.Code.OK) {
log.debug("Successfuly closed ledger {}", lh.getId());
log.debug("Successfully closed ledger {}", lh.getId());
} else {
log.warn("Error when closing ledger {}. Status={}", lh.getId(), BKException.getMessage(rc));
}
Expand Down Expand Up @@ -3467,6 +3475,15 @@ private void scheduleTimeoutTask() {
}
}

private void scheduleRollOverLedgerTask() {
if (config.getMaximumRolloverTimeMs() > 0) {
long interval = config.getMaximumRolloverTimeMs();
this.checkLedgerRollTask = this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> {
rollCurrentLedgerIfFull();
}), interval, interval, TimeUnit.MILLISECONDS);
}
}

private void checkAddTimeout() {
long timeoutSec = config.getAddEntryTimeoutSeconds();
if (timeoutSec < 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2825,4 +2825,37 @@ public static void retryStrategically(Predicate<Void> predicate, int retryCount,
Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
}
}

@Test
public void testManagedLedgerRollOverIfFull() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionTime(1, TimeUnit.SECONDS);
config.setMaxEntriesPerLedger(2);
config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
config.setMaximumRolloverTime(500, TimeUnit.MILLISECONDS);

ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("test_managedLedger_rollOver", config);
ManagedCursor cursor = ledger.openCursor("c1");

int msgNum = 10;

for (int i = 0; i < msgNum; i++) {
ledger.addEntry(new byte[1024 * 1024]);
}

Assert.assertEquals(ledger.getLedgersInfoAsList().size(), msgNum / 2);
List<Entry> entries = cursor.readEntries(msgNum);
Assert.assertEquals(msgNum, entries.size());

for (Entry entry : entries) {
cursor.markDelete(entry.getPosition());
}
entries.forEach(e -> e.release());

// all the messages have benn acknowledged
// and all the ledgers have been removed except the last ledger
Thread.sleep(1000);
Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 2);
Assert.assertEquals(ledger.getCurrentLedgerSize(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private final ScheduledExecutorService compactionMonitor;
private final ScheduledExecutorService messagePublishBufferMonitor;
private final ScheduledExecutorService consumedLedgersMonitor;
private final ScheduledExecutorService ledgerFullMonitor;
private ScheduledExecutorService topicPublishRateLimiterMonitor;
private ScheduledExecutorService brokerPublishRateLimiterMonitor;
private ScheduledExecutorService deduplicationSnapshotMonitor;
Expand Down Expand Up @@ -310,8 +309,6 @@ public BrokerService(PulsarService pulsar) throws Exception {
new DefaultThreadFactory("pulsar-publish-buffer-monitor"));
this.consumedLedgersMonitor = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("consumed-Ledgers-monitor"));
this.ledgerFullMonitor =
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("ledger-full-monitor"));

this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
this.backlogQuotaChecker = Executors
Expand Down Expand Up @@ -461,7 +458,6 @@ public void start() throws Exception {
this.startCompactionMonitor();
this.startMessagePublishBufferMonitor();
this.startConsumedLedgersMonitor();
this.startLedgerFullMonitor();
this.startBacklogQuotaChecker();
this.updateBrokerPublisherThrottlingMaxRate();
this.startCheckReplicationPolicies();
Expand Down Expand Up @@ -554,12 +550,6 @@ protected void startConsumedLedgersMonitor() {
}
}

protected void startLedgerFullMonitor() {
int interval = pulsar().getConfiguration().getManagedLedgerMaxLedgerRolloverTimeMinutes();
ledgerFullMonitor.scheduleAtFixedRate(safeRun(this::checkLedgerFull),
interval, interval, TimeUnit.MINUTES);
}

protected void startBacklogQuotaChecker() {
if (pulsar().getConfiguration().isBacklogQuotaCheckEnabled()) {
final int interval = pulsar().getConfiguration().getBacklogQuotaCheckIntervalInSeconds();
Expand Down Expand Up @@ -698,7 +688,6 @@ public void close() throws IOException {
inactivityMonitor.shutdown();
messageExpiryMonitor.shutdown();
compactionMonitor.shutdown();
ledgerFullMonitor.shutdown();
messagePublishBufferMonitor.shutdown();
consumedLedgersMonitor.shutdown();
backlogQuotaChecker.shutdown();
Expand Down Expand Up @@ -1446,18 +1435,6 @@ private void checkConsumedLedgers() {
});
}

private void checkLedgerFull() {
forEachTopic((t) -> {
if (t instanceof PersistentTopic) {
Optional.ofNullable(((PersistentTopic) t).getManagedLedger()).ifPresent(
managedLedger -> {
managedLedger.rollCurrentLedgerIfFull();
}
);
}
});
}

public void checkMessageDeduplicationInfo() {
forEachTopic(Topic::checkMessageDeduplicationInfo);
}
Expand Down

0 comments on commit b4ef76e

Please sign in to comment.