diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 57c5b2b5f451a..5755f528c6ace 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1300,7 +1300,7 @@ public void asyncResetCursor(Position newPos, boolean forceReset, AsyncCallbacks final PositionImpl newPosition = (PositionImpl) newPos; // order trim and reset operations on a ledger - ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> { + ledger.getExecutor().execute(safeRun(() -> { PositionImpl actualPosition = newPosition; if (!ledger.isValidPosition(actualPosition) @@ -1997,7 +1997,7 @@ void internalMarkDelete(final MarkDeleteEntry mdEntry) { + "is later.", mdEntry.newPosition, persistentMarkDeletePosition); } // run with executor to prevent deadlock - ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> mdEntry.triggerComplete())); + ledger.getExecutor().execute(safeRun(() -> mdEntry.triggerComplete())); return; } @@ -2016,7 +2016,7 @@ void internalMarkDelete(final MarkDeleteEntry mdEntry) { + "in progress {} is later.", mdEntry.newPosition, inProgressLatest); } // run with executor to prevent deadlock - ledger.getExecutor().executeOrdered(ledger.getName(), safeRun(() -> mdEntry.triggerComplete())); + ledger.getExecutor().execute(safeRun(() -> mdEntry.triggerComplete())); return; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index a487443f72bb3..58fcff877ca6c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -56,6 +56,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -78,7 +79,6 @@ import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.common.util.Backoff; -import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.common.util.Retries; import org.apache.bookkeeper.mledger.AsyncCallbacks; @@ -294,7 +294,7 @@ public enum PositionBound { private final OrderedScheduler scheduledExecutor; @Getter - protected final OrderedExecutor executor; + protected final Executor executor; @Getter private final ManagedLedgerFactoryImpl factory; @@ -345,7 +345,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper this.ledgerMetadata = LedgerMetadataUtils.buildBaseManagedLedgerMetadata(name); this.digestType = BookKeeper.DigestType.fromApiDigestType(config.getDigestType()); this.scheduledExecutor = scheduledExecutor; - this.executor = bookKeeper.getMainWorkerPool(); + this.executor = bookKeeper.getMainWorkerPool().chooseThread(name); TOTAL_SIZE_UPDATER.set(this, 0); NUMBER_OF_ENTRIES_UPDATER.set(this, 0); ENTRIES_ADDED_COUNTER_UPDATER.set(this, 0); @@ -408,7 +408,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { if (!ledgers.isEmpty()) { final long id = ledgers.lastKey(); OpenCallback opencb = (rc, lh, ctx1) -> { - executor.executeOrdered(name, safeRun(() -> { + executor.execute(safeRun(() -> { mbean.endDataLedgerOpenOp(); if (log.isDebugEnabled()) { log.debug("[{}] Opened ledger {}: {}", name, id, BKException.getMessage(rc)); @@ -521,7 +521,7 @@ public void operationFailed(MetaStoreException e) { return; } - executor.executeOrdered(name, safeRun(() -> { + executor.execute(safeRun(() -> { mbean.endDataLedgerCreateOp(); if (rc != BKException.Code.OK) { callback.initializeFailed(createManagedLedgerException(rc)); @@ -773,7 +773,7 @@ public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx) buffer.retain(); // Jump to specific thread to avoid contention from writers writing from different threads - executor.executeOrdered(name, safeRun(() -> { + executor.execute(safeRun(() -> { OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, callback, ctx); internalAsyncAddEntry(addOperation); })); @@ -789,7 +789,7 @@ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback buffer.retain(); // Jump to specific thread to avoid contention from writers writing from different threads - executor.executeOrdered(name, safeRun(() -> { + executor.execute(safeRun(() -> { OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx); internalAsyncAddEntry(addOperation); })); @@ -1944,7 +1944,7 @@ CompletableFuture getLedgerHandle(long ledgerId) { } promise.complete(res); } - }, executor.chooseThread(name)); + }, executor); return promise; }); } @@ -2397,11 +2397,11 @@ private void trimConsumedLedgersInBackground() { @Override public void trimConsumedLedgersInBackground(CompletableFuture promise) { - executor.executeOrdered(name, safeRun(() -> internalTrimConsumedLedgers(promise))); + executor.execute(safeRun(() -> internalTrimConsumedLedgers(promise))); } public void trimConsumedLedgersInBackground(boolean isTruncate, CompletableFuture promise) { - executor.executeOrdered(name, safeRun(() -> internalTrimLedgers(isTruncate, promise))); + executor.execute(safeRun(() -> internalTrimLedgers(isTruncate, promise))); } private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture promise) { @@ -2421,8 +2421,7 @@ private void maybeOffloadInBackground(CompletableFuture promise) { final long offloadThresholdInSeconds = Optional.ofNullable(policies.getManagedLedgerOffloadThresholdInSeconds()).orElse(-1L); if (offloadThresholdInBytes >= 0 || offloadThresholdInSeconds >= 0) { - executor.executeOrdered(name, - safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise))); + executor.execute(safeRun(() -> maybeOffload(offloadThresholdInBytes, offloadThresholdInSeconds, promise))); } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index 34d94efb4942b..14135b037920a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -200,7 +200,7 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx) handleAddFailure(lh); } else { // Trigger addComplete callback in a thread hashed on the managed ledger name - ml.getExecutor().executeOrdered(ml.getName(), this); + ml.getExecutor().execute(this); } } @@ -322,7 +322,7 @@ void handleAddFailure(final LedgerHandle lh) { ManagedLedgerImpl finalMl = this.ml; finalMl.mbean.recordAddEntryError(); - finalMl.getExecutor().executeOrdered(finalMl.getName(), SafeRun.safeRun(() -> { + finalMl.getExecutor().execute(SafeRun.safeRun(() -> { // Force the creation of a new ledger. Doing it in a background thread to avoid acquiring ML lock // from a BK callback. finalMl.ledgerClosed(lh); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index 7fc2eec66263d..86290d8bc8f5f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -156,7 +156,7 @@ void checkReadCompletion() { cursor.readOperationCompleted(); } finally { - cursor.ledger.getExecutor().executeOrdered(cursor.ledger.getName(), safeRun(() -> { + cursor.ledger.getExecutor().execute(safeRun(() -> { callback.readEntriesComplete(entries, ctx); recycle(); })); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java index 6831d8680be93..39e7b6b42ec0b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java @@ -65,13 +65,13 @@ public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper book @Override synchronized void initialize(ManagedLedgerInitializeLedgerCallback callback, Object ctx) { log.info("Opening shadow managed ledger {} with source={}", name, sourceMLName); - executor.executeOrdered(name, safeRun(() -> doInitialize(callback, ctx))); + executor.execute(safeRun(() -> doInitialize(callback, ctx))); } private void doInitialize(ManagedLedgerInitializeLedgerCallback callback, Object ctx) { // Fetch the list of existing ledgers in the source managed ledger store.watchManagedLedgerInfo(sourceMLName, (managedLedgerInfo, stat) -> - executor.executeOrdered(name, safeRun(() -> processSourceManagedLedgerInfo(managedLedgerInfo, stat))) + executor.execute(safeRun(() -> processSourceManagedLedgerInfo(managedLedgerInfo, stat))) ); store.getManagedLedgerInfo(sourceMLName, false, null, new MetaStore.MetaStoreCallback<>() { @Override @@ -105,7 +105,7 @@ public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) final long lastLedgerId = ledgers.lastKey(); mbean.startDataLedgerOpenOp(); - AsyncCallback.OpenCallback opencb = (rc, lh, ctx1) -> executor.executeOrdered(name, safeRun(() -> { + AsyncCallback.OpenCallback opencb = (rc, lh, ctx1) -> executor.execute(safeRun(() -> { mbean.endDataLedgerOpenOp(); if (log.isDebugEnabled()) { log.debug("[{}] Opened source ledger {}", name, lastLedgerId); @@ -321,7 +321,7 @@ private synchronized void processSourceManagedLedgerInfo(MLDataFormats.ManagedLe mbean.startDataLedgerOpenOp(); //open ledger in readonly mode. bookKeeper.asyncOpenLedgerNoRecovery(lastLedgerId, digestType, config.getPassword(), - (rc, lh, ctx1) -> executor.executeOrdered(name, safeRun(() -> { + (rc, lh, ctx1) -> executor.execute(safeRun(() -> { mbean.endDataLedgerOpenOp(); if (log.isDebugEnabled()) { log.debug("[{}] Opened new source ledger {}", name, lastLedgerId); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java index 81629efef628f..1c5563b38b120 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java @@ -97,7 +97,7 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole ml.getMbean().addReadEntriesSample(entries.size(), totalSize); callback.readEntriesComplete(entries, ctx); - }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> { + }, ml.getExecutor()).exceptionally(exception -> { callback.readEntriesFailed(createManagedLedgerException(exception), ctx); return null; }); @@ -130,7 +130,7 @@ public void asyncReadEntry(ReadHandle lh, PositionImpl position, AsyncCallbacks. } finally { ledgerEntries.close(); } - }, ml.getExecutor().chooseThread(ml.getName())); + }, ml.getExecutor()); } @Override diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java index 8eefefa0f51f5..8b2f3e25f1cbb 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java @@ -292,8 +292,7 @@ public void attach(CompletableFuture> handle) { } } } - }, rangeEntryCache.getManagedLedger().getExecutor() - .chooseThread(rangeEntryCache.getManagedLedger().getName())).exceptionally(exception -> { + }, rangeEntryCache.getManagedLedger().getExecutor()).exceptionally(exception -> { synchronized (PendingRead.this) { for (ReadEntriesCallbackWithContext callback : callbacks) { ManagedLedgerException mlException = createManagedLedgerException(exception); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java index 95b0fbcfd4e99..28a2f00cf683c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java @@ -267,7 +267,7 @@ private void asyncReadEntry0(ReadHandle lh, PositionImpl position, final ReadEnt } finally { ledgerEntries.close(); } - }, ml.getExecutor().chooseThread(ml.getName())).exceptionally(exception -> { + }, ml.getExecutor()).exceptionally(exception -> { ml.invalidateLedgerHandle(lh); pendingReadsManager.invalidateLedger(lh.getId()); callback.readEntryFailed(createManagedLedgerException(exception), ctx); @@ -378,10 +378,9 @@ private AsyncCallbacks.ReadEntriesCallback handlePendingReadsLimits(ReadHandle l new ManagedLedgerException.TooManyRequestsException(message), ctx); return null; } - ml.getExecutor().submitOrdered(lh.getId(), () -> { + ml.getExecutor().execute(() -> { asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, originalCallback, ctx, newHandle); - return null; }); return null; } else { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java index ec865018b34b0..1b02cd674c567 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java @@ -34,7 +34,6 @@ import lombok.Cleanup; import org.apache.bookkeeper.client.api.ReadHandle; -import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -55,13 +54,11 @@ public class EntryCacheManagerTest extends MockedBookKeeperTestCase { @Override protected void setUpTestCase() throws Exception { - OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().numThreads(1).build(); - ml1 = mock(ManagedLedgerImpl.class); when(ml1.getScheduledExecutor()).thenReturn(executor); when(ml1.getName()).thenReturn("cache1"); when(ml1.getMbean()).thenReturn(new ManagedLedgerMBeanImpl(ml1)); - when(ml1.getExecutor()).thenReturn(super.executor); + when(ml1.getExecutor()).thenReturn(executor); when(ml1.getFactory()).thenReturn(factory); when(ml1.getConfig()).thenReturn(new ManagedLedgerConfig()); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java index 04eb95b2b3ff0..6f573ff8d75c8 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManagerTest.java @@ -18,10 +18,11 @@ */ package org.apache.bookkeeper.mledger.impl.cache; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.api.ReadHandle; -import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; @@ -60,14 +61,14 @@ public class PendingReadsManagerTest { static final Object CTX = "foo"; static final Object CTX2 = "far"; static final long ledgerId = 123414L; - OrderedExecutor orderedExecutor; + ExecutorService orderedExecutor; PendingReadsManagerTest() { } @BeforeClass(alwaysRun = true) void before() { - orderedExecutor = OrderedExecutor.newBuilder().build(); + orderedExecutor = Executors.newSingleThreadExecutor(); } @AfterClass(alwaysRun = true) diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java index a9b257cec065b..a87c040031f9a 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java @@ -26,16 +26,14 @@ import io.netty.util.Timer; import io.netty.util.TimerTask; import java.util.ArrayList; -import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import lombok.Getter; import lombok.ToString; import lombok.extern.slf4j.Slf4j; -import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; @@ -83,7 +81,7 @@ public class TxnLogBufferedWriter { private final Timer timer; /** All write operation will be executed on single thread. **/ - private final ExecutorService singleThreadExecutorForWrite; + private final Executor singleThreadExecutorForWrite; /** The serializer for the object which called by {@link #asyncAddData}. **/ private final DataSerializer dataSerializer; @@ -140,7 +138,7 @@ public class TxnLogBufferedWriter { * when disabled. * @param timer Used for periodic flush. */ - public TxnLogBufferedWriter(ManagedLedger managedLedger, OrderedExecutor orderedExecutor, Timer timer, + public TxnLogBufferedWriter(ManagedLedger managedLedger, Executor executor, Timer timer, DataSerializer dataSerializer, int batchedWriteMaxRecords, int batchedWriteMaxSize, int batchedWriteMaxDelayInMillis, boolean batchEnabled, TxnLogBufferedWriterMetricsStats metrics){ @@ -157,8 +155,7 @@ public TxnLogBufferedWriter(ManagedLedger managedLedger, OrderedExecutor ordered } this.batchEnabled = batchEnabled && batchedWriteMaxRecords > 1; this.managedLedger = managedLedger; - this.singleThreadExecutorForWrite = orderedExecutor.chooseThread( - managedLedger.getName() == null ? UUID.randomUUID().toString() : managedLedger.getName()); + this.singleThreadExecutorForWrite = executor; this.dataSerializer = dataSerializer; this.batchedWriteMaxRecords = batchedWriteMaxRecords; this.batchedWriteMaxSize = batchedWriteMaxSize; diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java index 56fc9c9f2e32c..3c6b2e382e311 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterTest.java @@ -36,7 +36,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -53,6 +53,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.collections4.CollectionUtils; import org.apache.pulsar.metadata.api.MetadataStoreException; @@ -177,8 +178,6 @@ public void testMainProcess(int batchedWriteMaxRecords, int batchedWriteMaxSize, bkc.failAfter(1, BKException.Code.NotEnoughBookiesException); metadataStore.setAlwaysFail(new MetadataStoreException.BadVersionException("")); } - OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder() - .numThreads(5).name("txn-threads").build(); HashedWheelTimer transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("transaction-timer"), 1, TimeUnit.MILLISECONDS); JsonDataSerializer dataSerializer = new JsonDataSerializer(eachDataBytesLen); @@ -189,7 +188,7 @@ public void testMainProcess(int batchedWriteMaxRecords, int batchedWriteMaxSize, */ // Create TxLogBufferedWriter. TxnLogBufferedWriter txnLogBufferedWriter = new TxnLogBufferedWriter( - managedLedger, orderedExecutor, transactionTimer, + managedLedger, ((ManagedLedgerImpl) managedLedger).getExecutor(), transactionTimer, dataSerializer, batchedWriteMaxRecords, batchedWriteMaxSize, batchedWriteMaxDelayInMillis, batchEnabled, DISABLED_BUFFERED_WRITER_METRICS); // Store the param-context, param-position, param-exception of callback function and complete-count for verify. @@ -344,7 +343,6 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { managedLedger.close(); } transactionTimer.stop(); - orderedExecutor.shutdown(); /** * Assert all Byte Buf generated by DataSerializer has been released. * 1. Because ManagedLedger holds write cache, some data is not actually released until ManagedLedger is @@ -382,6 +380,7 @@ public void testFlushThresholds() throws Exception{ ManagedLedger managedLedger = Mockito.mock(ManagedLedger.class); Mockito.when(managedLedger.getName()).thenReturn(mlName); OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder().numThreads(5).name("txn-topic-threads").build(); + Executor topicExecutor = orderedExecutor.chooseThread(managedLedger.getName()); HashedWheelTimer transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("transaction-timer"), 1, TimeUnit.MILLISECONDS); SumStrDataSerializer dataSerializer = new SumStrDataSerializer(); @@ -401,7 +400,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable { } }).when(managedLedger).asyncAddEntry(Mockito.any(ByteBuf.class), Mockito.any(), Mockito.any()); // Test threshold: writeMaxDelayInMillis (use timer). - TxnLogBufferedWriter txnLogBufferedWriter1 = new TxnLogBufferedWriter<>(managedLedger, orderedExecutor, + TxnLogBufferedWriter txnLogBufferedWriter1 = new TxnLogBufferedWriter<>(managedLedger, topicExecutor, transactionTimer, dataSerializer, 32, 1024 * 4, 100, true, DISABLED_BUFFERED_WRITER_METRICS); TxnLogBufferedWriter.AddDataCallback callback = Mockito.mock(TxnLogBufferedWriter.AddDataCallback.class); @@ -414,7 +413,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable { txnLogBufferedWriter1.close().get(); // Test threshold: batchedWriteMaxRecords. - TxnLogBufferedWriter txnLogBufferedWriter2 = new TxnLogBufferedWriter<>(managedLedger, orderedExecutor, + TxnLogBufferedWriter txnLogBufferedWriter2 = new TxnLogBufferedWriter<>(managedLedger, topicExecutor, transactionTimer, dataSerializer, 32, 1024 * 4, 10000, true, DISABLED_BUFFERED_WRITER_METRICS); for (int i = 0; i < 32; i++){ @@ -425,7 +424,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable { txnLogBufferedWriter2.close(); // Test threshold: batchedWriteMaxSize. - TxnLogBufferedWriter txnLogBufferedWriter3 = new TxnLogBufferedWriter<>(managedLedger, orderedExecutor, + TxnLogBufferedWriter txnLogBufferedWriter3 = new TxnLogBufferedWriter<>(managedLedger, topicExecutor, transactionTimer, dataSerializer, 1024, 64 * 4, 10000, true, DISABLED_BUFFERED_WRITER_METRICS); for (int i = 0; i < 64; i++){ @@ -453,11 +452,9 @@ public Object answer(InvocationOnMock invocation) throws Throwable { @Test public void testPendingScheduleTriggerTaskCount() throws Exception { // Create components. - OrderedExecutor orderedExecutor = Mockito.mock(OrderedExecutor.class); ArrayBlockingQueue workQueue = new ArrayBlockingQueue<>(65536 * 2); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, workQueue); - Mockito.when(orderedExecutor.chooseThread(Mockito.anyString())).thenReturn(threadPoolExecutor); HashedWheelTimer transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("transaction-timer"), 1, TimeUnit.MILLISECONDS); SumStrDataSerializer dataSerializer = new SumStrDataSerializer(); @@ -465,15 +462,14 @@ public void testPendingScheduleTriggerTaskCount() throws Exception { MockedManagedLedger mockedManagedLedger = mockManagedLedgerWithWriteCounter(mlName); // Start tests. TxnLogBufferedWriter txnLogBufferedWriter = new TxnLogBufferedWriter<>(mockedManagedLedger.managedLedger, - orderedExecutor, transactionTimer, dataSerializer, 2, 1024 * 4, + threadPoolExecutor, transactionTimer, dataSerializer, 2, 1024 * 4, 1, true, DISABLED_BUFFERED_WRITER_METRICS); TxnLogBufferedWriter.AddDataCallback callback = Mockito.mock(TxnLogBufferedWriter.AddDataCallback.class); // Append heavier tasks to the Ledger thread. - final ExecutorService executorService = orderedExecutor.chooseThread(mlName); AtomicInteger heavierTaskCounter = new AtomicInteger(); Thread heavierTask = new Thread(() -> { while (true) { - executorService.execute(() -> { + threadPoolExecutor.execute(() -> { try { heavierTaskCounter.incrementAndGet(); Thread.sleep(19); @@ -531,7 +527,7 @@ public void testPendingScheduleTriggerTaskCount() throws Exception { dataSerializer.cleanup(); threadPoolExecutor.shutdown(); transactionTimer.stop(); - orderedExecutor.shutdown(); + threadPoolExecutor.shutdown(); } private static class JsonDataSerializer implements TxnLogBufferedWriter.DataSerializer{ @@ -931,7 +927,6 @@ private void releaseTxnLogBufferedWriterContext(TxnLogBufferedWriterContext cont context.txnLogBufferedWriter.close().get(); context.metrics.close(); context.timer.stop(); - context.orderedExecutor.shutdown(); CollectorRegistry.defaultRegistry.clear(); } @@ -940,7 +935,6 @@ private static class TxnLogBufferedWriterContext{ TxnLogBufferedWriter txnLogBufferedWriter; MockedManagedLedger mockedManagedLedger; Timer timer; - OrderedExecutor orderedExecutor; TxnLogBufferedWriterMetricsStats metrics; } @@ -971,11 +965,12 @@ private TxnLogBufferedWriterContext createTxnBufferedWriterContextWithMetrics( MockedManagedLedger mockedManagedLedger = mockManagedLedgerWithWriteCounter(mlName); // Create Txn Buffered Writer. TxnLogBufferedWriter txnLogBufferedWriter = new TxnLogBufferedWriter( - mockedManagedLedger.managedLedger, orderedExecutor, transactionTimer, + mockedManagedLedger.managedLedger, + orderedExecutor.chooseThread(mockedManagedLedger.managedLedger.getName()), transactionTimer, dataSerializer, batchedWriteMaxRecords, batchedWriteMaxSize, batchedWriteMaxDelayInMillis, true, metricsStats); return new TxnLogBufferedWriterContext(txnLogBufferedWriter, mockedManagedLedger, transactionTimer, - orderedExecutor, metricsStats); + metricsStats); } private void verifyTheCounterMetrics(int triggeredByRecordCount, int triggeredByMaxSize, int triggeredByMaxDelay,