Skip to content

Commit

Permalink
[improve] Removed usages of SafeRun (apache#20060)
Browse files Browse the repository at this point in the history
### Motivation

With BK 4.16, we don't need to pass `SafeRunnable` instances to the `OrderedExecutor` anymore. The executor has embedded the logic of checking and logging exceptions.

Removing the SafeRun will avoid extra allocations in critical path and clutter of the code

### Verifying this change

- [ ] Make sure that the change passes the CI checks.

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads (10MB)*
  - *Extended integration test for recovery after broker failure*

### Does this pull request potentially affect one of the following parts:

<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->

*If the box was checked, please highlight the changes*

- [ ] Dependencies (add or upgrade a dependency)
- [ ] The public API
- [ ] The schema
- [ ] The default values of configurations
- [ ] The threading model
- [ ] The binary protocol
- [ ] The REST endpoints
- [ ] The admin CLI options
- [ ] The metrics
- [ ] Anything that affects deployment

### Documentation

<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->

- [ ] `doc` <!-- Your PR contains doc changes. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

### Matching PR in forked repository

PR in forked repository: (merlimat#6)

<!--
After opening this PR, the build in apache/pulsar will fail and instructions will
be provided for opening a PR in the PR author's forked repository.

apache/pulsar pull requests should be first tested in your own fork since the 
apache/pulsar CI based on GitHub Actions has constrained resources and quota.
GitHub Actions provides separate quota for pull requests that are executed in 
a forked repository.

The tests will be run in the forked repository until all PR review comments have
been handled, the tests pass and the PR is approved by a reviewer.
-->
  • Loading branch information
merlimat authored Apr 11, 2023
1 parent 11751b7 commit bafecb2
Show file tree
Hide file tree
Showing 19 changed files with 150 additions and 227 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Collections2;
Expand Down Expand Up @@ -1358,7 +1357,7 @@ public void asyncResetCursor(Position newPos, boolean forceReset, AsyncCallbacks
final PositionImpl newPosition = (PositionImpl) newPos;

// order trim and reset operations on a ledger
ledger.getExecutor().execute(safeRun(() -> {
ledger.getExecutor().execute(() -> {
PositionImpl actualPosition = newPosition;

if (!ledger.isValidPosition(actualPosition)
Expand All @@ -1375,7 +1374,7 @@ public void asyncResetCursor(Position newPos, boolean forceReset, AsyncCallbacks
}

internalResetCursor(actualPosition, callback);
}));
});
}

@Override
Expand Down Expand Up @@ -2055,7 +2054,7 @@ void internalMarkDelete(final MarkDeleteEntry mdEntry) {
+ "is later.", mdEntry.newPosition, persistentMarkDeletePosition);
}
// run with executor to prevent deadlock
ledger.getExecutor().execute(safeRun(() -> mdEntry.triggerComplete()));
ledger.getExecutor().execute(() -> mdEntry.triggerComplete());
return;
}

Expand All @@ -2074,7 +2073,7 @@ void internalMarkDelete(final MarkDeleteEntry mdEntry) {
+ "in progress {} is later.", mdEntry.newPosition, inProgressLatest);
}
// run with executor to prevent deadlock
ledger.getExecutor().execute(safeRun(() -> mdEntry.triggerComplete()));
ledger.getExecutor().execute(() -> mdEntry.triggerComplete());
return;
}

Expand Down Expand Up @@ -2611,8 +2610,8 @@ private boolean shouldPersistUnackRangesToLedger() {
private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl position, Map<String, Long> properties,
MetaStoreCallback<Void> callback, boolean persistIndividualDeletedMessageRanges) {
if (state == State.Closed) {
ledger.getExecutor().execute(safeRun(() -> callback.operationFailed(new MetaStoreException(
new CursorAlreadyClosedException(name + " cursor already closed")))));
ledger.getExecutor().execute(() -> callback.operationFailed(new MetaStoreException(
new CursorAlreadyClosedException(name + " cursor already closed"))));
return;
}

Expand Down Expand Up @@ -2845,7 +2844,7 @@ private CompletableFuture<LedgerHandle> doCreateNewMetadataLedger() {
return;
}

ledger.getExecutor().execute(safeRun(() -> {
ledger.getExecutor().execute(() -> {
ledger.mbean.endCursorLedgerCreateOp();
if (rc != BKException.Code.OK) {
log.warn("[{}] Error creating ledger for cursor {}: {}", ledger.getName(), name,
Expand All @@ -2858,7 +2857,7 @@ private CompletableFuture<LedgerHandle> doCreateNewMetadataLedger() {
log.debug("[{}] Created ledger {} for cursor {}", ledger.getName(), lh.getId(), name);
}
future.complete(lh);
}));
});
}, LedgerMetadataUtils.buildAdditionalMetadataForCursor(name));

return future;
Expand Down Expand Up @@ -3192,7 +3191,7 @@ private void asyncDeleteLedger(final LedgerHandle lh, int retry) {
log.warn("[{}] Failed to delete ledger {}: {}", ledger.getName(), lh.getId(),
BKException.getMessage(rc));
if (!isNoSuchLedgerExistsException(rc)) {
ledger.getScheduledExecutor().schedule(safeRun(() -> asyncDeleteLedger(lh, retry - 1)),
ledger.getScheduledExecutor().schedule(() -> asyncDeleteLedger(lh, retry - 1),
DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS);
}
return;
Expand Down Expand Up @@ -3227,7 +3226,7 @@ private void asyncDeleteCursorLedger(int retry) {
log.warn("[{}][{}] Failed to delete ledger {}: {}", ledger.getName(), name, cursorLedger.getId(),
BKException.getMessage(rc));
if (!isNoSuchLedgerExistsException(rc)) {
ledger.getScheduledExecutor().schedule(safeRun(() -> asyncDeleteCursorLedger(retry - 1)),
ledger.getScheduledExecutor().schedule(() -> asyncDeleteCursorLedger(retry - 1),
DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static com.google.common.base.Preconditions.checkState;
import static java.lang.Math.min;
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BoundType;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -409,7 +408,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
if (!ledgers.isEmpty()) {
final long id = ledgers.lastKey();
OpenCallback opencb = (rc, lh, ctx1) -> {
executor.execute(safeRun(() -> {
executor.execute(() -> {
mbean.endDataLedgerOpenOp();
if (log.isDebugEnabled()) {
log.debug("[{}] Opened ledger {}: {}", name, id, BKException.getMessage(rc));
Expand Down Expand Up @@ -439,7 +438,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
callback.initializeFailed(createManagedLedgerException(rc));
return;
}
}));
});
};

if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -522,7 +521,7 @@ public void operationFailed(MetaStoreException e) {
return;
}

executor.execute(safeRun(() -> {
executor.execute(() -> {
mbean.endDataLedgerCreateOp();
if (rc != BKException.Code.OK) {
callback.initializeFailed(createManagedLedgerException(rc));
Expand Down Expand Up @@ -551,7 +550,7 @@ public void operationFailed(MetaStoreException e) {

// Save it back to ensure all nodes exist
store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, storeLedgersCb);
}));
});
}, ledgerMetadata);
}

Expand Down Expand Up @@ -774,10 +773,10 @@ public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx)
buffer.retain();

// Jump to specific thread to avoid contention from writers writing from different threads
executor.execute(safeRun(() -> {
executor.execute(() -> {
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, callback, ctx);
internalAsyncAddEntry(addOperation);
}));
});
}

@Override
Expand All @@ -790,10 +789,10 @@ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback
buffer.retain();

// Jump to specific thread to avoid contention from writers writing from different threads
executor.execute(safeRun(() -> {
executor.execute(() -> {
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx);
internalAsyncAddEntry(addOperation);
}));
});
}

protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
Expand Down Expand Up @@ -2374,7 +2373,7 @@ void notifyCursors() {
break;
}

executor.execute(safeRun(waitingCursor::notifyEntriesAvailable));
executor.execute(waitingCursor::notifyEntriesAvailable);
}
}

Expand All @@ -2385,7 +2384,7 @@ void notifyWaitingEntryCallBacks() {
break;
}

executor.execute(safeRun(cb::entriesAvailable));
executor.execute(cb::entriesAvailable);
}
}

Expand Down Expand Up @@ -2432,16 +2431,16 @@ private void trimConsumedLedgersInBackground() {

@Override
public void trimConsumedLedgersInBackground(CompletableFuture<?> promise) {
executor.execute(safeRun(() -> internalTrimConsumedLedgers(promise)));
executor.execute(() -> internalTrimConsumedLedgers(promise));
}

public void trimConsumedLedgersInBackground(boolean isTruncate, CompletableFuture<?> promise) {
executor.execute(safeRun(() -> internalTrimLedgers(isTruncate, promise)));
executor.execute(() -> internalTrimLedgers(isTruncate, promise));
}

private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> promise) {
scheduledExecutor.schedule(safeRun(() -> trimConsumedLedgersInBackground(isTruncate, promise)), 100,
TimeUnit.MILLISECONDS);
scheduledExecutor.schedule(() -> trimConsumedLedgersInBackground(isTruncate, promise),
100, TimeUnit.MILLISECONDS);
}

private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
Expand All @@ -2456,7 +2455,7 @@ private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
final long offloadThresholdInSeconds =
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
executor.execute(safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
executor.execute(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise));
}
}

Expand All @@ -2477,7 +2476,7 @@ private void maybeOffload(long offloadThresholdInBytes, long offloadThresholdInS
}

if (!offloadMutex.tryLock()) {
scheduledExecutor.schedule(safeRun(() -> maybeOffloadInBackground(finalPromise)),
scheduledExecutor.schedule(() -> maybeOffloadInBackground(finalPromise),
100, TimeUnit.MILLISECONDS);
return;
}
Expand Down Expand Up @@ -2956,7 +2955,7 @@ private void asyncDeleteLedger(long ledgerId, long retry) {
log.warn("[{}] Ledger was already deleted {}", name, ledgerId);
} else if (rc != BKException.Code.OK) {
log.error("[{}] Error deleting ledger {} : {}", name, ledgerId, BKException.getMessage(rc));
scheduledExecutor.schedule(safeRun(() -> asyncDeleteLedger(ledgerId, retry - 1)),
scheduledExecutor.schedule(() -> asyncDeleteLedger(ledgerId, retry - 1),
DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS);
} else {
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -3260,7 +3259,7 @@ private void tryTransformLedgerInfo(long ledgerId, LedgerInfoTransformation tran
if (!metadataMutex.tryLock()) {
// retry in 100 milliseconds
scheduledExecutor.schedule(
safeRun(() -> tryTransformLedgerInfo(ledgerId, transformation, finalPromise)), 100,
() -> tryTransformLedgerInfo(ledgerId, transformation, finalPromise), 100,
TimeUnit.MILLISECONDS);
} else { // lock acquired
CompletableFuture<Void> unlockingPromise = new CompletableFuture<>();
Expand Down Expand Up @@ -4011,9 +4010,8 @@ private void scheduleTimeoutTask() {
timeoutSec = timeoutSec <= 0
? Math.max(config.getAddEntryTimeoutSeconds(), config.getReadEntryTimeoutSeconds())
: timeoutSec;
this.timeoutTask = this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> {
checkTimeouts();
}), timeoutSec, timeoutSec, TimeUnit.SECONDS);
this.timeoutTask = this.scheduledExecutor.scheduleAtFixedRate(
this::checkTimeouts, timeoutSec, timeoutSec, TimeUnit.SECONDS);
}
}

Expand Down Expand Up @@ -4336,7 +4334,7 @@ protected void updateLastLedgerCreatedTimeAndScheduleRolloverTask() {
checkLedgerRollTask.cancel(true);
}
this.checkLedgerRollTask = this.scheduledExecutor.schedule(
safeRun(this::rollCurrentLedgerIfFull), this.maximumRolloverTimeMs, TimeUnit.MILLISECONDS);
this::rollCurrentLedgerIfFull, this.maximumRolloverTimeMs, TimeUnit.MILLISECONDS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.CompressionType;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.compression.CompressionCodec;
Expand Down Expand Up @@ -156,7 +155,7 @@ public void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, Map
.exceptionally(ex -> {
try {
executor.executeOrdered(ledgerName,
SafeRunnable.safeRun(() -> callback.operationFailed(getException(ex))));
() -> callback.operationFailed(getException(ex)));
} catch (RejectedExecutionException e) {
//executor maybe shutdown, use common pool to run callback.
CompletableFuture.runAsync(() -> callback.operationFailed(getException(ex)));
Expand Down Expand Up @@ -204,8 +203,8 @@ public void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, St
.thenAcceptAsync(newVersion -> callback.operationComplete(null, newVersion),
executor.chooseThread(ledgerName))
.exceptionally(ex -> {
executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback
.operationFailed(getException(ex))));
executor.executeOrdered(ledgerName,
() -> callback.operationFailed(getException(ex)));
return null;
});
}
Expand All @@ -221,8 +220,8 @@ public void getCursors(String ledgerName, MetaStoreCallback<List<String>> callba
.thenAcceptAsync(cursors -> callback.operationComplete(cursors, null), executor
.chooseThread(ledgerName))
.exceptionally(ex -> {
executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback
.operationFailed(getException(ex))));
executor.executeOrdered(ledgerName,
() -> callback.operationFailed(getException(ex)));
return null;
});
}
Expand All @@ -249,8 +248,8 @@ public void asyncGetCursorInfo(String ledgerName, String cursorName,
}
}, executor.chooseThread(ledgerName))
.exceptionally(ex -> {
executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback
.operationFailed(getException(ex))));
executor.executeOrdered(ledgerName,
() -> callback.operationFailed(getException(ex)));
return null;
});
}
Expand Down Expand Up @@ -284,8 +283,8 @@ public void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedC
.thenAcceptAsync(optStat -> callback.operationComplete(null, optStat), executor
.chooseThread(ledgerName))
.exceptionally(ex -> {
executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback
.operationFailed(getException(ex))));
executor.executeOrdered(ledgerName,
() -> callback.operationFailed(getException(ex)));
return null;
});
}
Expand All @@ -303,15 +302,15 @@ public void asyncRemoveCursor(String ledgerName, String cursorName, MetaStoreCal
callback.operationComplete(null, null);
}, executor.chooseThread(ledgerName))
.exceptionally(ex -> {
executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> {
executor.executeOrdered(ledgerName, () -> {
Throwable actEx = FutureUtil.unwrapCompletionException(ex);
if (actEx instanceof MetadataStoreException.NotFoundException){
log.info("[{}] [{}] cursor delete done because it did not exist.", ledgerName, cursorName);
callback.operationComplete(null, null);
return;
}
callback.operationFailed(getException(ex));
}));
});
return null;
});
}
Expand All @@ -329,8 +328,8 @@ public void removeManagedLedger(String ledgerName, MetaStoreCallback<Void> callb
callback.operationComplete(null, null);
}, executor.chooseThread(ledgerName))
.exceptionally(ex -> {
executor.executeOrdered(ledgerName, SafeRunnable.safeRun(() -> callback
.operationFailed(getException(ex))));
executor.executeOrdered(ledgerName,
() -> callback.operationFailed(getException(ex)));
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,14 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.bookkeeper.util.SafeRunnable;


/**
* Handles the life-cycle of an addEntry() operation.
*
*/
@Slf4j
public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
public class OpAddEntry implements AddCallback, CloseCallback, Runnable {
protected ManagedLedgerImpl ml;
LedgerHandle ledger;
private long entryId;
Expand Down Expand Up @@ -212,7 +210,7 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx)

// Called in executor hashed on managed ledger name, once the add operation is complete
@Override
public void safeRun() {
public void run() {
if (payloadProcessorHandle != null) {
payloadProcessorHandle.release();
}
Expand Down Expand Up @@ -328,11 +326,11 @@ void handleAddFailure(final LedgerHandle lh) {
ManagedLedgerImpl finalMl = this.ml;
finalMl.mbean.recordAddEntryError();

finalMl.getExecutor().execute(SafeRun.safeRun(() -> {
finalMl.getExecutor().execute(() -> {
// Force the creation of a new ledger. Doing it in a background thread to avoid acquiring ML lock
// from a BK callback.
finalMl.ledgerClosed(lh);
}));
});
}

void close() {
Expand Down
Loading

0 comments on commit bafecb2

Please sign in to comment.