Skip to content

Commit

Permalink
[improve][broker] Pin executor to managed ledger instance to cache th…
Browse files Browse the repository at this point in the history
…e string hashing (apache#18078)

Co-authored-by: Jiwe Guo <[email protected]>
Co-authored-by: houxiaoyu <[email protected]>
  • Loading branch information
3 people authored Nov 30, 2022
1 parent 58ad3d0 commit fe45a9c
Show file tree
Hide file tree
Showing 12 changed files with 48 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1300,7 +1300,7 @@ public void asyncResetCursor(Position newPos, boolean forceReset, AsyncCallbacks
final PositionImpl newPosition = (PositionImpl) newPos;

// order trim and reset operations on a ledger
ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> {
ledger.getExecutor().execute(safeRun(() -> {
PositionImpl actualPosition = newPosition;

if (!ledger.isValidPosition(actualPosition)
Expand Down Expand Up @@ -1997,7 +1997,7 @@ void internalMarkDelete(final MarkDeleteEntry mdEntry) {
+ "is later.", mdEntry.newPosition, persistentMarkDeletePosition);
}
// run with executor to prevent deadlock
ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> mdEntry.triggerComplete()));
ledger.getExecutor().execute(safeRun(() -> mdEntry.triggerComplete()));
return;
}

Expand All @@ -2016,7 +2016,7 @@ void internalMarkDelete(final MarkDeleteEntry mdEntry) {
+ "in progress {} is later.", mdEntry.newPosition, inProgressLatest);
}
// run with executor to prevent deadlock
ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> mdEntry.triggerComplete()));
ledger.getExecutor().execute(safeRun(() -> mdEntry.triggerComplete()));
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -78,7 +79,6 @@
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.Backoff;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.common.util.Retries;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
Expand Down Expand Up @@ -294,7 +294,7 @@ public enum PositionBound {
private final OrderedScheduler scheduledExecutor;

@Getter
protected final OrderedExecutor executor;
protected final Executor executor;

@Getter
private final ManagedLedgerFactoryImpl factory;
Expand Down Expand Up @@ -345,7 +345,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
this.ledgerMetadata = LedgerMetadataUtils.buildBaseManagedLedgerMetadata(name);
this.digestType = BookKeeper.DigestType.fromApiDigestType(config.getDigestType());
this.scheduledExecutor = scheduledExecutor;
this.executor = bookKeeper.getMainWorkerPool();
this.executor = bookKeeper.getMainWorkerPool().chooseThread(name);
TOTAL_SIZE_UPDATER.set(this, 0);
NUMBER_OF_ENTRIES_UPDATER.set(this, 0);
ENTRIES_ADDED_COUNTER_UPDATER.set(this, 0);
Expand Down Expand Up @@ -408,7 +408,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
if (!ledgers.isEmpty()) {
final long id = ledgers.lastKey();
OpenCallback opencb = (rc, lh, ctx1) -> {
executor.executeOrdered(name, safeRun(() -> {
executor.execute(safeRun(() -> {
mbean.endDataLedgerOpenOp();
if (log.isDebugEnabled()) {
log.debug("[{}] Opened ledger {}: {}", name, id, BKException.getMessage(rc));
Expand Down Expand Up @@ -521,7 +521,7 @@ public void operationFailed(MetaStoreException e) {
return;
}

executor.executeOrdered(name, safeRun(() -> {
executor.execute(safeRun(() -> {
mbean.endDataLedgerCreateOp();
if (rc != BKException.Code.OK) {
callback.initializeFailed(createManagedLedgerException(rc));
Expand Down Expand Up @@ -773,7 +773,7 @@ public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx)
buffer.retain();

// Jump to specific thread to avoid contention from writers writing from different threads
executor.executeOrdered(name, safeRun(() -> {
executor.execute(safeRun(() -> {
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, callback, ctx);
internalAsyncAddEntry(addOperation);
}));
Expand All @@ -789,7 +789,7 @@ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback
buffer.retain();

// Jump to specific thread to avoid contention from writers writing from different threads
executor.executeOrdered(name, safeRun(() -> {
executor.execute(safeRun(() -> {
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx);
internalAsyncAddEntry(addOperation);
}));
Expand Down Expand Up @@ -1944,7 +1944,7 @@ CompletableFuture<ReadHandle> getLedgerHandle(long ledgerId) {
}
promise.complete(res);
}
}, executor.chooseThread(name));
}, executor);
return promise;
});
}
Expand Down Expand Up @@ -2397,11 +2397,11 @@ private void trimConsumedLedgersInBackground() {

@Override
public void trimConsumedLedgersInBackground(CompletableFuture<?> promise) {
executor.executeOrdered(name, safeRun(() -> internalTrimConsumedLedgers(promise)));
executor.execute(safeRun(() -> internalTrimConsumedLedgers(promise)));
}

public void trimConsumedLedgersInBackground(boolean isTruncate, CompletableFuture<?> promise) {
executor.executeOrdered(name, safeRun(() -> internalTrimLedgers(isTruncate, promise)));
executor.execute(safeRun(() -> internalTrimLedgers(isTruncate, promise)));
}

private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> promise) {
Expand All @@ -2421,8 +2421,7 @@ private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
final long offloadThresholdInSeconds =
Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L);
if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) {
executor.executeOrdered(name,
safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
executor.execute(safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx)
handleAddFailure(lh);
} else {
// Trigger addComplete callback in a thread hashed on the managed ledger name
ml.getExecutor().executeOrdered(ml.getName(), this);
ml.getExecutor().execute(this);
}
}

Expand Down Expand Up @@ -322,7 +322,7 @@ void handleAddFailure(final LedgerHandle lh) {
ManagedLedgerImpl finalMl = this.ml;
finalMl.mbean.recordAddEntryError();

finalMl.getExecutor().executeOrdered(finalMl.getName(), SafeRun.safeRun(() -> {
finalMl.getExecutor().execute(SafeRun.safeRun(() -> {
// Force the creation of a new ledger. Doing it in a background thread to avoid acquiring ML lock
// from a BK callback.
finalMl.ledgerClosed(lh);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ void checkReadCompletion() {
cursor.readOperationCompleted();

} finally {
cursor.ledger.getExecutor().executeOrdered(cursor.ledger.getName(), safeRun(() -> {
cursor.ledger.getExecutor().execute(safeRun(() -> {
callback.readEntriesComplete(entries, ctx);
recycle();
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper book
@Override
synchronized void initialize(ManagedLedgerInitializeLedgerCallback callback, Object ctx) {
log.info("Opening shadow managed ledger {} with source={}", name, sourceMLName);
executor.executeOrdered(name, safeRun(() -> doInitialize(callback, ctx)));
executor.execute(safeRun(() -> doInitialize(callback, ctx)));
}

private void doInitialize(ManagedLedgerInitializeLedgerCallback callback, Object ctx) {
// Fetch the list of existing ledgers in the source managed ledger
store.watchManagedLedgerInfo(sourceMLName, (managedLedgerInfo, stat) ->
executor.executeOrdered(name, safeRun(() -> processSourceManagedLedgerInfo(managedLedgerInfo, stat)))
executor.execute(safeRun(() -> processSourceManagedLedgerInfo(managedLedgerInfo, stat)))
);
store.getManagedLedgerInfo(sourceMLName, false, null, new MetaStore.MetaStoreCallback<>() {
@Override
Expand Down Expand Up @@ -105,7 +105,7 @@ public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat)

final long lastLedgerId = ledgers.lastKey();
mbean.startDataLedgerOpenOp();
AsyncCallback.OpenCallback opencb = (rc, lh, ctx1) -> executor.executeOrdered(name, safeRun(() -> {
AsyncCallback.OpenCallback opencb = (rc, lh, ctx1) -> executor.execute(safeRun(() -> {
mbean.endDataLedgerOpenOp();
if (log.isDebugEnabled()) {
log.debug("[{}] Opened source ledger {}", name, lastLedgerId);
Expand Down Expand Up @@ -321,7 +321,7 @@ private synchronized void processSourceManagedLedgerInfo(MLDataFormats.ManagedLe
mbean.startDataLedgerOpenOp();
//open ledger in readonly mode.
bookKeeper.asyncOpenLedgerNoRecovery(lastLedgerId, digestType, config.getPassword(),
(rc, lh, ctx1) -> executor.executeOrdered(name, safeRun(() -> {
(rc, lh, ctx1) -> executor.execute(safeRun(() -> {
mbean.endDataLedgerOpenOp();
if (log.isDebugEnabled()) {
log.debug("[{}] Opened new source ledger {}", name, lastLedgerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole
ml.getMbean().addReadEntriesSample(entries.size(), totalSize);

callback.readEntriesComplete(entries, ctx);
}, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
}, ml.getExecutor()).exceptionally(exception -> {
callback.readEntriesFailed(createManagedLedgerException(exception), ctx);
return null;
});
Expand Down Expand Up @@ -130,7 +130,7 @@ public void asyncReadEntry(ReadHandle lh, PositionImpl position, AsyncCallbacks.
} finally {
ledgerEntries.close();
}
}, ml.getExecutor().chooseThread(ml.getName()));
}, ml.getExecutor());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,8 +292,7 @@ public void attach(CompletableFuture<List<EntryImpl>> handle) {
}
}
}
}, rangeEntryCache.getManagedLedger().getExecutor()
.chooseThread(rangeEntryCache.getManagedLedger().getName())).exceptionally(exception -> {
}, rangeEntryCache.getManagedLedger().getExecutor()).exceptionally(exception -> {
synchronized (PendingRead.this) {
for (ReadEntriesCallbackWithContext callback : callbacks) {
ManagedLedgerException mlException = createManagedLedgerException(exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ private void asyncReadEntry0(ReadHandle lh, PositionImpl position, final ReadEnt
} finally {
ledgerEntries.close();
}
}, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> {
}, ml.getExecutor()).exceptionally(exception -> {
ml.invalidateLedgerHandle(lh);
pendingReadsManager.invalidateLedger(lh.getId());
callback.readEntryFailed(createManagedLedgerException(exception), ctx);
Expand Down Expand Up @@ -378,10 +378,9 @@ private AsyncCallbacks.ReadEntriesCallback handlePendingReadsLimits(ReadHandle l
new ManagedLedgerException.TooManyRequestsException(message), ctx);
return null;
}
ml.getExecutor().submitOrdered(lh.getId(), () -> {
ml.getExecutor().execute(() -> {
asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry,
originalCallback, ctx, newHandle);
return null;
});
return null;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import lombok.Cleanup;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand All @@ -55,13 +54,11 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase {

@Override
protected void setUpTestCase() throws Exception {
OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().numThreads(1).build();

ml1 = mock(ManagedLedgerImpl.class);
when(ml1.getScheduledExecutor()).thenReturn(executor);
when(ml1.getName()).thenReturn("cache1");
when(ml1.getMbean()).thenReturn(new ManagedLedgerMBeanImpl(ml1));
when(ml1.getExecutor()).thenReturn(super.executor);
when(ml1.getExecutor()).thenReturn(executor);
when(ml1.getFactory()).thenReturn(factory);
when(ml1.getConfig()).thenReturn(new ManagedLedgerConfig());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
*/
package org.apache.bookkeeper.mledger.impl.cache;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
Expand Down Expand Up @@ -60,14 +61,14 @@ public class PendingReadsManagerTest {
static final Object CTX = "foo";
static final Object CTX2 = "far";
static final long ledgerId = 123414L;
OrderedExecutor orderedExecutor;
ExecutorService orderedExecutor;

PendingReadsManagerTest() {
}

@BeforeClass(alwaysRun = true)
void before() {
orderedExecutor = OrderedExecutor.newBuilder().build();
orderedExecutor = Executors.newSingleThreadExecutor();
}

@AfterClass(alwaysRun = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,14 @@
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
Expand Down Expand Up @@ -83,7 +81,7 @@ public class TxnLogBufferedWriter<T> {
private final Timer timer;

/** All write operation will be executed on single thread. **/
private final ExecutorService singleThreadExecutorForWrite;
private final Executor singleThreadExecutorForWrite;

/** The serializer for the object which called by {@link #asyncAddData}. **/
private final DataSerializer<T> dataSerializer;
Expand Down Expand Up @@ -140,7 +138,7 @@ public class TxnLogBufferedWriter<T> {
* when disabled.
* @param timer Used for periodic flush.
*/
public TxnLogBufferedWriter(ManagedLedger managedLedger, OrderedExecutor orderedExecutor, Timer timer,
public TxnLogBufferedWriter(ManagedLedger managedLedger, Executor executor, Timer timer,
DataSerializer<T> dataSerializer,
int batchedWriteMaxRecords, int batchedWriteMaxSize, int batchedWriteMaxDelayInMillis,
boolean batchEnabled, TxnLogBufferedWriterMetricsStats metrics){
Expand All @@ -157,8 +155,7 @@ public TxnLogBufferedWriter(ManagedLedger managedLedger, OrderedExecutor ordered
}
this.batchEnabled = batchEnabled && batchedWriteMaxRecords > 1;
this.managedLedger = managedLedger;
this.singleThreadExecutorForWrite = orderedExecutor.chooseThread(
managedLedger.getName() == null ? UUID.randomUUID().toString() : managedLedger.getName());
this.singleThreadExecutorForWrite = executor;
this.dataSerializer = dataSerializer;
this.batchedWriteMaxRecords = batchedWriteMaxRecords;
this.batchedWriteMaxSize = batchedWriteMaxSize;
Expand Down
Loading

0 comments on commit fe45a9c

Please sign in to comment.