Skip to content

Commit

Permalink
Cleanup managed-ledger module (apache#6631)
Browse files Browse the repository at this point in the history
- unresolved reference in JavaDoc
- unnecessary qualifiers for interfaces
- simple lambda can be replaced with method reference
- missing override annotation
  • Loading branch information
yjshen authored Mar 30, 2020
1 parent 347d385 commit 45ee784
Show file tree
Hide file tree
Showing 19 changed files with 75 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import org.apache.bookkeeper.common.annotation.InterfaceAudience.LimitedPrivate;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
import org.apache.bookkeeper.common.util.OrderedScheduler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ void markDelete(Position position, Map<String, Long> properties)
* The deletion of the messages is not persisted into the durable storage and cannot be recovered upon the reopening
* of the ManagedLedger
*
* @param positions
* @param position
* the positions of the messages to be deleted
* @param callback
* callback object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,6 @@ public interface ManagedLedger {
*
* @param name
* the name associated with the ManagedCursor
* @param initializeOnLatest
* the flag tell the method wthether it should intialize the cursor at latest position or not.
* @return the ManagedCursor
* @throws ManagedLedgerException
*/
Expand All @@ -157,7 +155,7 @@ public interface ManagedLedger {
* @return the ManagedCursor
* @throws ManagedLedgerException
*/
public ManagedCursor openCursor(String name, InitialPosition initialPosition) throws InterruptedException, ManagedLedgerException;
ManagedCursor openCursor(String name, InitialPosition initialPosition) throws InterruptedException, ManagedLedgerException;

/**
* Open a ManagedCursor in this ManagedLedger.
Expand All @@ -175,7 +173,7 @@ public interface ManagedLedger {
* @return the ManagedCursor
* @throws ManagedLedgerException
*/
public ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties) throws InterruptedException, ManagedLedgerException;
ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties) throws InterruptedException, ManagedLedgerException;

/**
* Creates a new cursor whose metadata is not backed by durable storage. A caller can treat the non-durable cursor
Expand Down Expand Up @@ -248,7 +246,7 @@ public interface ManagedLedger {
* @param ctx
* opaque context
*/
public void asyncOpenCursor(String name, InitialPosition initialPosition, OpenCursorCallback callback, Object ctx);
void asyncOpenCursor(String name, InitialPosition initialPosition, OpenCursorCallback callback, Object ctx);

/**
* Open a ManagedCursor asynchronously.
Expand All @@ -264,7 +262,7 @@ public interface ManagedLedger {
* @param ctx
* opaque context
*/
public void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, Long> properties, OpenCursorCallback callback, Object ctx);
void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, Long> properties, OpenCursorCallback callback, Object ctx);

/**
* Get a list of all the cursors reading from this ManagedLedger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
import org.apache.bookkeeper.client.api.DigestType;

import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;

import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;

/**
* Configuration class for a ManagedLedger.
*/
Expand Down Expand Up @@ -521,7 +524,7 @@ public long getReadEntryTimeoutSeconds() {
* Ledger read entry timeout after which callback will be completed with failure. (disable timeout by setting
* readTimeoutSeconds <= 0)
*
* @param readTimeoutSeconds
* @param readEntryTimeoutSeconds
* @return
*/
public ManagedLedgerConfig setReadEntryTimeoutSeconds(long readEntryTimeoutSeconds) {
Expand Down Expand Up @@ -556,7 +559,7 @@ public Class<? extends EnsemblePlacementPolicy> getBookKeeperEnsemblePlacementPo
/**
* Returns EnsemblePlacementPolicy configured for the Managed-ledger.
*
* @param bookKeeperEnsemblePlacementPolicy
* @param bookKeeperEnsemblePlacementPolicyClassName
*/
public void setBookKeeperEnsemblePlacementPolicyClassName(
Class<? extends EnsemblePlacementPolicy> bookKeeperEnsemblePlacementPolicyClassName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ ReadOnlyCursor openReadOnlyCursor(String managedLedgerName, Position startPositi
/**
* Open a {@link ReadOnlyCursor} positioned to the earliest entry for the specified managed ledger
*
* @param name
* @param managedLedgerName
* @param startPosition
* set the cursor on that particular position. If setting to `PositionImpl.earliest` it will be
* positioned on the first available entry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,12 @@
import com.google.common.collect.Maps;
import com.google.common.primitives.Longs;
import io.netty.buffer.ByteBuf;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
Expand Down Expand Up @@ -154,7 +151,7 @@ public long getMaxSize() {
}

public void clear() {
caches.values().forEach(cache -> cache.clear());
caches.values().forEach(EntryCache::clear);
}

protected class EntryCacheDisabled implements EntryCache {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private static class Item {
private final ArrayList<Item> heap = Lists.newArrayList();

// Maps a cursor to its position in the heap
private final ConcurrentMap<String, Item> cursors = new ConcurrentSkipListMap<String, Item>();
private final ConcurrentMap<String, Item> cursors = new ConcurrentSkipListMap<>();

private final StampedLock rwLock = new StampedLock();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ public class ManagedCursorImpl implements ManagedCursor {
// Stat of the cursor z-node
private volatile Stat cursorLedgerStat;

private static final LongPairConsumer<PositionImpl> positionRangeConverter = (key, value) -> new PositionImpl(key,
value);
private static final LongPairConsumer<PositionImpl> positionRangeConverter = PositionImpl::new;
private static final LongPairConsumer<PositionImplRecyclable> recyclePositionRangeConverter = (key, value) -> {
PositionImplRecyclable position = PositionImplRecyclable.create();
position.ledgerId = key;
Expand Down Expand Up @@ -226,7 +225,7 @@ public interface VoidCallback {
this.ledger = ledger;
this.name = cursorName;
this.individualDeletedMessages = config.isUnackedRangesOpenCacheSetEnabled()
? new ConcurrentOpenLongPairRangeSet<PositionImpl>(4096, positionRangeConverter)
? new ConcurrentOpenLongPairRangeSet<>(4096, positionRangeConverter)
: new LongPairRangeSet.DefaultRangeSet<>(positionRangeConverter);
this.digestType = BookKeeper.DigestType.fromApiDigestType(config.getDigestType());
STATE_UPDATER.set(this, State.Uninitialized);
Expand Down Expand Up @@ -1963,10 +1962,10 @@ public void closeFailed(ManagedLedgerException exception, Object ctx) {
* that can be persist in zk-metastore. If current unack-range is higher than configured threshold then broker
* persists mark-delete into cursor-ledger else into zk-metastore.
*
* @param cursorsLedgerId
* @param position
* @param properties
* @param callback
* @param ctx
*/
void persistPositionWhenClosing(PositionImpl position, Map<String, Long> properties,
final AsyncCallbacks.CloseCallback callback, final Object ctx) {
Expand Down Expand Up @@ -2029,10 +2028,8 @@ private boolean shouldPersistUnackRangesToLedger() {
private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl position, Map<String, Long> properties,
MetaStoreCallback<Void> callback, boolean persistIndividualDeletedMessageRanges) {
if (state == State.Closed) {
ledger.getExecutor().execute(safeRun(() -> {
callback.operationFailed(new MetaStoreException(
new ManagedLedgerException.CursorAlreadyClosedException(name + " cursor already closed")));
}));
ledger.getExecutor().execute(safeRun(() -> callback.operationFailed(new MetaStoreException(
new CursorAlreadyClosedException(name + " cursor already closed")))));
return;
}

Expand Down Expand Up @@ -2484,9 +2481,8 @@ private void asyncDeleteLedger(final LedgerHandle lh, int retry) {
log.warn("[{}] Failed to delete ledger {}: {}", ledger.getName(), lh.getId(),
BKException.getMessage(rc));
if (!isNoSuchLedgerExistsException(rc)) {
ledger.getScheduledExecutor().schedule(safeRun(() -> {
asyncDeleteLedger(lh, retry - 1);
}), DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS);
ledger.getScheduledExecutor().schedule(safeRun(() -> asyncDeleteLedger(lh, retry - 1)),
DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS);
}
return;
} else {
Expand Down Expand Up @@ -2520,9 +2516,8 @@ private void asyncDeleteCursorLedger(int retry) {
log.warn("[{}][{}] Failed to delete ledger {}: {}", ledger.getName(), name, cursorLedger.getId(),
BKException.getMessage(rc));
if (!isNoSuchLedgerExistsException(rc)) {
ledger.getScheduledExecutor().schedule(safeRun(() -> {
asyncDeleteCursorLedger(retry - 1);
}), DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS);
ledger.getScheduledExecutor().schedule(safeRun(() -> asyncDeleteCursorLedger(retry - 1)),
DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC, TimeUnit.SECONDS);
}
}
}, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private ManagedLedgerFactoryImpl(BookkeeperFactoryForCustomEnsemblePlacementPoli
this.config = config;
this.mbean = new ManagedLedgerFactoryMBeanImpl(this);
this.entryCacheManager = new EntryCacheManager(this);
this.statsTask = scheduledExecutor.scheduleAtFixedRate(() -> refreshStats(), 0, StatsPeriodSeconds, TimeUnit.SECONDS);
this.statsTask = scheduledExecutor.scheduleAtFixedRate(this::refreshStats, 0, StatsPeriodSeconds, TimeUnit.SECONDS);


this.cacheEvictionTimeThresholdNanos = TimeUnit.MILLISECONDS
Expand Down Expand Up @@ -341,9 +341,7 @@ public void initializeFailed(ManagedLedgerException e) {
}
}, null);
return future;
}).thenAccept(ml -> {
callback.openLedgerComplete(ml, ctx);
}).exceptionally(exception -> {
}).thenAccept(ml -> callback.openLedgerComplete(ml, ctx)).exceptionally(exception -> {
callback.openLedgerFailed((ManagedLedgerException) exception.getCause(), ctx);
return null;
});
Expand Down Expand Up @@ -392,9 +390,7 @@ public void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosi
config.getBookKeeperEnsemblePlacementPolicyProperties())),
store, config, scheduledExecutor, orderedExecutor, managedLedgerName);

roManagedLedger.initializeAndCreateCursor((PositionImpl) startPosition).thenAccept(roCursor -> {
callback.openReadOnlyCursorComplete(roCursor, ctx);
}).exceptionally(ex -> {
roManagedLedger.initializeAndCreateCursor((PositionImpl) startPosition).thenAccept(roCursor -> callback.openReadOnlyCursorComplete(roCursor, ctx)).exceptionally(ex -> {
Throwable t = ex;
if (t instanceof CompletionException) {
t = ex.getCause();
Expand Down Expand Up @@ -640,7 +636,7 @@ public BookKeeper getBookKeeper() {
* Factory to create Bookkeeper-client for a given ensemblePlacementPolicy
*
*/
public static interface BookkeeperFactoryForCustomEnsemblePlacementPolicy {
public interface BookkeeperFactoryForCustomEnsemblePlacementPolicy {
default BookKeeper get() {
return get(null);
}
Expand Down
Loading

0 comments on commit 45ee784

Please sign in to comment.