diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 52926888b46f1..8a6f9a2c14790 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -147,7 +147,7 @@ public class ManagedCursorImpl implements ManagedCursor { private RateLimiter markDeleteLimiter; private boolean alwaysInactive = false; - + /** used temporary variables to {@link #getNumIndividualDeletedEntriesToSkip(long)} **/ private static final FastThreadLocal tempTotalEntriesToSkip = new FastThreadLocal<>(); private static final FastThreadLocal tempDeletedMessages = new FastThreadLocal<>(); @@ -167,7 +167,7 @@ class MarkDeleteEntry { public MarkDeleteEntry(PositionImpl newPosition, Map properties, MarkDeleteCallback callback, Object ctx) { - this.newPosition = PositionImpl.get(newPosition); + this.newPosition = newPosition; this.properties = properties; this.callback = callback; this.ctx = ctx; @@ -394,7 +394,7 @@ private void recoveredCursor(PositionImpl position, Map properties if (position.compareTo(ledger.getLastPosition()) > 0) { log.warn("[{}] [{}] Current position {} is ahead of last position {}", ledger.getName(), name, position, ledger.getLastPosition()); - position = PositionImpl.get(ledger.getLastPosition()); + position = ledger.getLastPosition(); } log.info("[{}] Cursor {} recovered to position {}", ledger.getName(), name, position); @@ -474,7 +474,7 @@ public void asyncReadEntries(final int numberOfEntriesToRead, final ReadEntriesC } PENDING_READ_OPS_UPDATER.incrementAndGet(this); - OpReadEntry op = OpReadEntry.create(this, PositionImpl.get(readPosition), numberOfEntriesToRead, callback, ctx); + OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback, ctx); ledger.asyncReadEntries(op); } @@ -595,7 +595,7 @@ public void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallbac } asyncReadEntries(numberOfEntriesToRead, callback, ctx); } else { - OpReadEntry op = OpReadEntry.create(this, PositionImpl.get(readPosition), numberOfEntriesToRead, callback, + OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback, ctx); if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) { @@ -1397,7 +1397,7 @@ PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) { } break; } - + if (log.isDebugEnabled()) { log.debug("[{}] Moved ack position from: {} to: {} -- skipped: {}", ledger.getName(), oldMarkDeletePosition, newMarkDeletePosition, skippedEntries); @@ -1406,7 +1406,7 @@ PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) { } // markDelete-position and clear out deletedMsgSet - markDeletePosition = PositionImpl.get(newMarkDeletePosition); + markDeletePosition = newMarkDeletePosition; individualDeletedMessages.removeAtMost(markDeletePosition.getLedgerId(), markDeletePosition.getEntryId()); if (readPosition.compareTo(newMarkDeletePosition) <= 0) { @@ -1846,12 +1846,12 @@ public boolean isDurable() { @Override public Position getReadPosition() { - return PositionImpl.get(readPosition); + return readPosition; } @Override public Position getMarkDeletedPosition() { - return PositionImpl.get(markDeletePosition); + return markDeletePosition; } @Override