Skip to content

Commit

Permalink
[ML] Avoid passing OpAddEntry across a thread boundary in asyncAddEnt…
Browse files Browse the repository at this point in the history
…ry (apache#12606)

* [ML] Avoid passing OpAddEntry across a thread boundary

* Retain buffer in current thread

(cherry picked from commit 6af747f)
  • Loading branch information
lhotari committed Dec 9, 2021
1 parent 7bd69f9 commit 8873a2b
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -699,10 +699,14 @@ public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx)
log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state);
}

OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, ctx);
// retain buffer in this thread
buffer.retain();

// Jump to specific thread to avoid contention from writers writing from different threads
executor.executeOrdered(name, safeRun(() -> internalAsyncAddEntry(addOperation)));
executor.executeOrdered(name, safeRun(() -> {
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, callback, ctx);
internalAsyncAddEntry(addOperation);
}));
}

@Override
Expand All @@ -711,10 +715,14 @@ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback
log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state);
}

OpAddEntry addOperation = OpAddEntry.create(this, buffer, numberOfMessages, callback, ctx);
// retain buffer in this thread
buffer.retain();

// Jump to specific thread to avoid contention from writers writing from different threads
executor.executeOrdered(name, safeRun(() -> internalAsyncAddEntry(addOperation)));
executor.executeOrdered(name, safeRun(() -> {
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx);
internalAsyncAddEntry(addOperation);
}));
}

private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
Expand Down Expand Up @@ -1508,9 +1516,7 @@ public synchronized void updateLedgersIdsComplete(Stat stat) {
// If op is used by another ledger handle, we need to close it and create a new one
if (existsOp.ledger != null) {
existsOp.close();
existsOp = OpAddEntry.create(existsOp.ml, existsOp.data, existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
// release the extra retain
ReferenceCountUtil.release(existsOp.data);
existsOp = OpAddEntry.createNoRetainBuffer(existsOp.ml, existsOp.data, existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
}
existsOp.setLedger(currentLedger);
pendingAddEntries.add(existsOp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,28 +75,28 @@ enum State {
CLOSED
}

public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
OpAddEntry op = createOpAddEntry(ml, data, callback, ctx);
public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx);
if (log.isDebugEnabled()) {
log.debug("Created new OpAddEntry {}", op);
}
return op;
}

public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, int numberOfMessages, AddEntryCallback callback, Object ctx) {
OpAddEntry op = createOpAddEntry(ml, data, callback, ctx);
public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, int numberOfMessages, AddEntryCallback callback, Object ctx) {
OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx);
op.numberOfMessages = numberOfMessages;
if (log.isDebugEnabled()) {
log.debug("Created new OpAddEntry {}", op);
}
return op;
}

private static OpAddEntry createOpAddEntry(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
private static OpAddEntry createOpAddEntryNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
OpAddEntry op = RECYCLER.get();
op.ml = ml;
op.ledger = null;
op.data = data.retain();
op.data = data;
op.dataLength = data.readableBytes();
op.callback = callback;
op.ctx = ctx;
Expand Down Expand Up @@ -154,7 +154,7 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx)
}
checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s", ledger.getId(),
lh.getId());

if (!checkAndCompleteOp(ctx)) {
// means callback might have been completed by different thread (timeout task thread).. so do nothing
return;
Expand Down Expand Up @@ -254,7 +254,7 @@ private void updateLatency() {

/**
* Checks if add-operation is completed
*
*
* @return true if task is not already completed else returns false.
*/
private boolean checkAndCompleteOp(Object ctx) {
Expand All @@ -275,7 +275,7 @@ void handleAddTimeoutFailure(final LedgerHandle ledger, Object ctx) {

/**
* It handles add failure on the given ledger. it can be triggered when add-entry fails or times out.
*
*
* @param lh
*/
void handleAddFailure(final LedgerHandle lh) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2774,7 +2774,7 @@ public void avoidUseSameOpAddEntryBetweenDifferentLedger() throws Exception {

List<OpAddEntry> oldOps = new ArrayList<>();
for (int i = 0; i < 10; i++) {
OpAddEntry op = OpAddEntry.create(ledger, ByteBufAllocator.DEFAULT.buffer(128), null, null);
OpAddEntry op = OpAddEntry.createNoRetainBuffer(ledger, ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null);
if (i > 4) {
op.setLedger(mock(LedgerHandle.class));
}
Expand Down

0 comments on commit 8873a2b

Please sign in to comment.