Skip to content

Commit

Permalink
[pulsar-broker] minor fix for removing atomic-updater at managed-ledg…
Browse files Browse the repository at this point in the history
…er (apache#4146)
  • Loading branch information
rdhabalia authored and merlimat committed Apr 26, 2019
1 parent fbdf038 commit 07de52d
Showing 1 changed file with 8 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,6 @@ enum PositionBound {
.newUpdater(ManagedLedgerImpl.class, "readOpCount");
private volatile long readOpCount = 0;
// last read-operation's callback to check read-timeout on it.
private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, ReadEntryCallbackWrapper> LAST_READ_CALLBACK = AtomicReferenceFieldUpdater
.newUpdater(ManagedLedgerImpl.class, ReadEntryCallbackWrapper.class, "lastReadCallback");
private volatile ReadEntryCallbackWrapper lastReadCallback = null;

/**
Expand Down Expand Up @@ -1592,7 +1590,7 @@ protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntr
long createdTime = System.nanoTime();
ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, position.getLedgerId(),
position.getEntryId(), callback, readOpCount, createdTime, ctx);
LAST_READ_CALLBACK.set(this, readCallback);
lastReadCallback = readCallback;
entryCache.asyncReadEntry(ledger, position, readCallback, readOpCount);
} else {
entryCache.asyncReadEntry(ledger, position, callback, ctx);
Expand All @@ -1607,7 +1605,7 @@ protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry
long createdTime = System.nanoTime();
ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,
opReadEntry, readOpCount, createdTime, ctx);
LAST_READ_CALLBACK.set(this, readCallback);
lastReadCallback = readCallback;
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, readCallback, readOpCount);
} else {
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, opReadEntry, ctx);
Expand Down Expand Up @@ -3128,12 +3126,12 @@ private void checkReadTimeout() {
if (timeoutSec < 1) {
return;
}
ReadEntryCallbackWrapper callback = LAST_READ_CALLBACK.get(this);
if (callback != null && callback.isTimedOut(timeoutSec)) {
log.warn("[{}]-{} read entry timeout for {} after {} sec", this.name, callback.ledgerId, callback.entryId,
timeoutSec);
callback.readFailed(createManagedLedgerException(BKException.Code.TimeoutException), callback.readOpCount);
LAST_READ_CALLBACK.set(this, null);
if (this.lastReadCallback != null && this.lastReadCallback.isTimedOut(timeoutSec)) {
log.warn("[{}]-{} read entry timeout for {} after {} sec", this.name, this.lastReadCallback.ledgerId,
this.lastReadCallback.entryId, timeoutSec);
this.lastReadCallback.readFailed(createManagedLedgerException(BKException.Code.TimeoutException),
this.lastReadCallback.readOpCount);
lastReadCallback = null;
}
}

Expand Down

0 comments on commit 07de52d

Please sign in to comment.