Skip to content

Commit

Permalink
[pulsar-broker] schedule one add/read timeout task per ml/topic (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
rdhabalia authored and merlimat committed Apr 24, 2019
1 parent ccea624 commit 5e96a9c
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,15 @@ enum PositionBound {
final ManagedLedgerFactoryImpl factory;
protected final ManagedLedgerMBeanImpl mbean;
protected final Clock clock;

private static final AtomicLongFieldUpdater<ManagedLedgerImpl> READ_OP_COUNT_UPDATER = AtomicLongFieldUpdater
.newUpdater(ManagedLedgerImpl.class, "readOpCount");
private volatile long readOpCount = 0;

// last read-operation's callback to check read-timeout on it.
private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, ReadEntryCallbackWrapper> LAST_READ_CALLBACK = AtomicReferenceFieldUpdater
.newUpdater(ManagedLedgerImpl.class, ReadEntryCallbackWrapper.class, "lastReadCallback");
private volatile ReadEntryCallbackWrapper lastReadCallback = null;

/**
* Queue of pending entries to be added to the managed ledger. Typically entries are queued when a new ledger is
* created asynchronously and hence there is no ready ledger to write into.
Expand Down Expand Up @@ -338,26 +343,6 @@ public void operationFailed(MetaStoreException e) {
scheduleTimeoutTask();
}

private void scheduleTimeoutTask() {
long timeoutSec = config.getAddEntryTimeoutSeconds();
// disable timeout task checker if timeout <= 0
if (timeoutSec > 0) {
this.timeoutTask = this.scheduledExecutor.scheduleAtFixedRate(() -> {
OpAddEntry opAddEntry = pendingAddEntries.peek();
if (opAddEntry != null) {
boolean isTimedOut = opAddEntry.lastInitTime != -1
&& TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - opAddEntry.lastInitTime) >= timeoutSec
&& opAddEntry.completed == FALSE;
if (isTimedOut) {
log.error("Failed to add entry for ledger {} in time-out {} sec",
(opAddEntry.ledger != null ? opAddEntry.ledger.getId() : -1), timeoutSec);
opAddEntry.handleAddFailure(opAddEntry.ledger);
}
}
}, config.getAddEntryTimeoutSeconds(), config.getAddEntryTimeoutSeconds(), TimeUnit.SECONDS);
}
}

private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedgerCallback callback) {
if (log.isDebugEnabled()) {
log.debug("[{}] initializing bookkeeper; ledgers {}", name, ledgers);
Expand Down Expand Up @@ -1216,6 +1201,7 @@ public synchronized void asyncClose(final CloseCallback callback, final Object c
if (this.timeoutTask != null) {
this.timeoutTask.cancel(false);
}

}

private void closeAllCursors(CloseCallback callback, final Object ctx) {
Expand Down Expand Up @@ -1597,24 +1583,13 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)
}

protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntryCallback callback, Object ctx) {
long timeout = config.getReadEntryTimeoutSeconds();
boolean checkTimeout = timeout > 0;
if (checkTimeout) {
if (config.getReadEntryTimeoutSeconds() > 0) {
// set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled
long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
long createdTime = System.nanoTime();
ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, position.getLedgerId(),
position.getEntryId(), callback, readOpCount, ctx);
final ScheduledFuture<?> task = scheduledExecutor.schedule(() -> {
// validate ReadEntryCallbackWrapper object is not recycled by bk-client callback (by validating
// readOpCount) and fail the callback if read is not completed yet
if (readCallback.readOpCount == readOpCount
&& ReadEntryCallbackWrapper.READ_COMPLETED_UPDATER.get(readCallback) == FALSE) {
log.warn("[{}]-{} read entry timeout for {} after {} sec", this.name, ledger.getId(), position,
timeout);
readCallback.readEntryFailed(createManagedLedgerException(BKException.Code.TimeoutException), readOpCount);
}
}, timeout, TimeUnit.SECONDS);
readCallback.task = task;
position.getEntryId(), callback, readOpCount, createdTime, ctx);
LAST_READ_CALLBACK.set(this, readCallback);
entryCache.asyncReadEntry(ledger, position, readCallback, readOpCount);
} else {
entryCache.asyncReadEntry(ledger, position, callback, ctx);
Expand All @@ -1623,24 +1598,13 @@ protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntr

protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, boolean isSlowestReader,
OpReadEntry opReadEntry, Object ctx) {
long timeout = config.getReadEntryTimeoutSeconds();
boolean checkTimeout = timeout > 0;
if (checkTimeout) {
if (config.getReadEntryTimeoutSeconds() > 0) {
// set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled
long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
long createdTime = System.nanoTime();
ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,
opReadEntry, readOpCount, ctx);
final ScheduledFuture<?> task = scheduledExecutor.schedule(() -> {
// validate ReadEntryCallbackWrapper object is not recycled by bk-client callback (by validating
// readOpCount) and fail the callback if read is not completed yet
if (readCallback.readOpCount == readOpCount
&& ReadEntryCallbackWrapper.READ_COMPLETED_UPDATER.get(readCallback) == FALSE) {
log.warn("[{}]-{} read entry timeout for {}-{} after {} sec", this.name, ledger.getId(), firstEntry,
lastEntry, timeout);
readCallback.readEntriesFailed(createManagedLedgerException(BKException.Code.TimeoutException), readOpCount);
}
}, timeout, TimeUnit.SECONDS);
readCallback.task = task;
opReadEntry, readOpCount, createdTime, ctx);
LAST_READ_CALLBACK.set(this, readCallback);
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, readCallback, readOpCount);
} else {
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, opReadEntry, ctx);
Expand All @@ -1658,8 +1622,8 @@ static final class ReadEntryCallbackWrapper implements ReadEntryCallback, ReadEn
String name;
long ledgerId;
long entryId;
ScheduledFuture<?> task;
volatile long readOpCount = -1;
volatile long createdTime = -1;
volatile Object cntx;

final Handle<ReadEntryCallbackWrapper> recyclerHandle;
Expand All @@ -1668,32 +1632,44 @@ private ReadEntryCallbackWrapper(Handle<ReadEntryCallbackWrapper> recyclerHandle
this.recyclerHandle = recyclerHandle;
}

static ReadEntryCallbackWrapper create(String name, long ledgerId, long entryId, ReadEntryCallback callback, long readOpCount, Object ctx) {
static ReadEntryCallbackWrapper create(String name, long ledgerId, long entryId, ReadEntryCallback callback,
long readOpCount, long createdTime, Object ctx) {
ReadEntryCallbackWrapper readCallback = RECYCLER.get();
readCallback.name = name;
readCallback.ledgerId = ledgerId;
readCallback.entryId = entryId;
readCallback.readEntryCallback = callback;
readCallback.cntx = ctx;
readCallback.readOpCount = readOpCount;
readCallback.createdTime = createdTime;
return readCallback;
}

static ReadEntryCallbackWrapper create(String name, long ledgerId, long entryId, ReadEntriesCallback callback, long readOpCount, Object ctx) {
static ReadEntryCallbackWrapper create(String name, long ledgerId, long entryId, ReadEntriesCallback callback,
long readOpCount, long createdTime, Object ctx) {
ReadEntryCallbackWrapper readCallback = RECYCLER.get();
readCallback.name = name;
readCallback.ledgerId = ledgerId;
readCallback.entryId = entryId;
readCallback.readEntriesCallback = callback;
readCallback.cntx = ctx;
readCallback.readOpCount = readOpCount;
readCallback.createdTime = createdTime;
return readCallback;
}

public boolean isTimedOut(long timeoutSec) {
return this.createdTime != -1
&& TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - this.createdTime) >= timeoutSec
&& this.readCompleted == FALSE;
}

@Override
public void readEntryComplete(Entry entry, Object ctx) {
if (checkCallbackCompleted(ctx)) {
log.warn("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
if (log.isDebugEnabled()) {
log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
}
entry.release();
return;
}
Expand All @@ -1704,7 +1680,9 @@ public void readEntryComplete(Entry entry, Object ctx) {
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
if (checkCallbackCompleted(ctx)) {
log.warn("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
if (log.isDebugEnabled()) {
log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
}
return;
}
readEntryCallback.readEntryFailed(exception, cntx);
Expand All @@ -1714,7 +1692,9 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
@Override
public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
if (checkCallbackCompleted(ctx)) {
log.warn("[{}] read entries already completed for {}-{}", name, ledgerId, entryId);
if (log.isDebugEnabled()) {
log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
}
returnedEntries.forEach(Entry::release);
return;
}
Expand All @@ -1725,13 +1705,26 @@ public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
if (checkCallbackCompleted(ctx)) {
log.warn("[{}] read entries already completed for {}-{}", name, ledgerId, entryId);
if (log.isDebugEnabled()) {
log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
}
return;
}
readEntriesCallback.readEntriesFailed(exception, cntx);
recycle();
}

public void readFailed(ManagedLedgerException exception, Object ctx) {
if (readEntryCallback != null) {
readEntryFailed(exception, ctx);
} else if (readEntriesCallback != null) {
readEntriesFailed(exception, ctx);
} else {
// it should not happen .. recycle if none of the callback exists..
recycle();
}
}

private boolean checkCallbackCompleted(Object ctx) {
// if the ctx-readOpCount is different than object's readOpCount means Object is already recycled and
// assigned to different request
Expand All @@ -1742,13 +1735,7 @@ private boolean checkCallbackCompleted(Object ctx) {

private void recycle() {
readOpCount = -1;
if (task != null && !task.isDone() && !task.isCancelled()) {
try {
task.cancel(false);
} catch (Throwable th) {
log.debug("[{}]Failed to cancle task for read-callback for {}-{}", name, ledgerId, entryId);
}
}
createdTime = -1;
readEntryCallback = null;
readEntriesCallback = null;
ledgerId = -1;
Expand Down Expand Up @@ -3104,6 +3091,49 @@ protected boolean checkAndCompleteLedgerOpTask(int rc, LedgerHandle lh, Object c
return false;
}

private void scheduleTimeoutTask() {
// disable timeout task checker if timeout <= 0
if (config.getAddEntryTimeoutSeconds() > 0 || config.getReadEntryTimeoutSeconds() > 0) {
long timeoutSec = Math.min(config.getAddEntryTimeoutSeconds(), config.getReadEntryTimeoutSeconds());
this.timeoutTask = this.scheduledExecutor.scheduleAtFixedRate(safeRun(() -> {
checkAddTimeout();
checkReadTimeout();
}), timeoutSec, timeoutSec, TimeUnit.SECONDS);
}
}

private void checkAddTimeout() {
long timeoutSec = config.getAddEntryTimeoutSeconds();
if (timeoutSec < 1) {
return;
}
OpAddEntry opAddEntry = pendingAddEntries.peek();
if (opAddEntry != null) {
boolean isTimedOut = opAddEntry.lastInitTime != -1
&& TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - opAddEntry.lastInitTime) >= timeoutSec
&& opAddEntry.completed == FALSE;
if (isTimedOut) {
log.error("Failed to add entry for ledger {} in time-out {} sec",
(opAddEntry.ledger != null ? opAddEntry.ledger.getId() : -1), timeoutSec);
opAddEntry.handleAddFailure(opAddEntry.ledger);
}
}
}

private void checkReadTimeout() {
long timeoutSec = config.getReadEntryTimeoutSeconds();
if (timeoutSec < 1) {
return;
}
ReadEntryCallbackWrapper callback = LAST_READ_CALLBACK.get(this);
if (callback != null && callback.isTimedOut(timeoutSec)) {
log.warn("[{}]-{} read entry timeout for {} after {} sec", this.name, callback.ledgerId, callback.entryId,
timeoutSec);
callback.readFailed(createManagedLedgerException(BKException.Code.TimeoutException), callback.readOpCount);
LAST_READ_CALLBACK.set(this, null);
}
}

private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
Expand All @@ -34,8 +33,6 @@
import org.apache.bookkeeper.mledger.Position;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.TRUE;
import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;

class OpReadEntry implements ReadEntriesCallback {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2277,8 +2277,7 @@ public void testManagedLedgerWithReadEntryTimeOut() throws Exception {
BookKeeper bk = mock(BookKeeper.class);
doNothing().when(bk).asyncCreateLedger(anyInt(), anyInt(), anyInt(), any(), any(), any(), any(), any());
AtomicReference<ManagedLedgerException> responseException1 = new AtomicReference<>();
CountDownLatch latch1 = new CountDownLatch(1);

String ctxStr = "timeoutCtx";
CompletableFuture<LedgerEntries> entriesFuture = new CompletableFuture<>();
ReadHandle ledgerHandle = mock(ReadHandle.class);
doReturn(entriesFuture).when(ledgerHandle).readAsync(PositionImpl.earliest.getLedgerId(),
Expand All @@ -2289,47 +2288,48 @@ public void testManagedLedgerWithReadEntryTimeOut() throws Exception {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
responseException1.set(null);
latch1.countDown();
}

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
assertEquals(ctxStr, (String) ctx);
responseException1.set(exception);
latch1.countDown();
}
}, null);
}, ctxStr);
ledger.asyncCreateLedger(bk, config, null, new CreateCallback() {
@Override
public void createComplete(int rc, LedgerHandle lh, Object ctx) {

}
}, Collections.emptyMap());
latch1.await(config.getReadEntryTimeoutSeconds() + 2, TimeUnit.SECONDS);
retryStrategically((test) -> {
return responseException1.get() != null;
}, 5, 1000);
assertNotNull(responseException1.get());
assertEquals(responseException1.get().getMessage(), BKException.getMessage(BKException.Code.TimeoutException));

// (2) test read-timeout for: ManagedLedger.asyncReadEntry(..)
CountDownLatch latch2 = new CountDownLatch(1);
AtomicReference<ManagedLedgerException> responseException2 = new AtomicReference<>();
PositionImpl readPositionRef = PositionImpl.earliest;
ManagedCursorImpl cursor = new ManagedCursorImpl(bk, config, ledger, "cursor1");
OpReadEntry opReadEntry = OpReadEntry.create(cursor, readPositionRef, 1, new ReadEntriesCallback() {

@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
latch2.countDown();
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
assertEquals(ctxStr, (String) ctx);
responseException2.set(exception);
latch2.countDown();
}

}, null);
ledger.asyncReadEntry(ledgerHandle, PositionImpl.earliest.getEntryId(), PositionImpl.earliest.getEntryId(),
false, opReadEntry, null);
latch2.await(config.getReadEntryTimeoutSeconds() + 2, TimeUnit.SECONDS);
false, opReadEntry, ctxStr);
retryStrategically((test) -> {
return responseException2.get() != null;
}, 5, 1000);
assertNotNull(responseException2.get());
assertEquals(responseException2.get().getMessage(), BKException.getMessage(BKException.Code.TimeoutException));

Expand Down Expand Up @@ -2405,4 +2405,14 @@ private void setFieldValue(Class clazz, Object classObj, String fieldName, Objec
field.setAccessible(true);
field.set(classObj, fieldValue);
}

public static void retryStrategically(Predicate<Void> predicate, int retryCount, long intSleepTimeInMillis)
throws Exception {
for (int i = 0; i < retryCount; i++) {
if (predicate.test(null) || i == (retryCount - 1)) {
break;
}
Thread.sleep(intSleepTimeInMillis + (intSleepTimeInMillis * i));
}
}
}

0 comments on commit 5e96a9c

Please sign in to comment.