diff --git a/buildtools/src/main/resources/pulsar/checkstyle.xml b/buildtools/src/main/resources/pulsar/checkstyle.xml index 890a8628a7746..b46c8bfca621d 100644 --- a/buildtools/src/main/resources/pulsar/checkstyle.xml +++ b/buildtools/src/main/resources/pulsar/checkstyle.xml @@ -29,6 +29,8 @@ page at http://checkstyle.sourceforge.net/config.html --> + + @@ -73,6 +75,8 @@ page at http://checkstyle.sourceforge.net/config.html --> + + @@ -352,22 +356,6 @@ page at http://checkstyle.sourceforge.net/config.html --> - - - - - - - - - @@ -41,6 +28,9 @@ + + + @@ -48,4 +38,6 @@ + + diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java index 5b878fc6d2b99..71ff228aa6242 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java @@ -18,111 +18,111 @@ */ package org.apache.bookkeeper.mledger; -import java.util.List; - import com.google.common.annotations.Beta; +import java.util.List; /** * Definition of all the callbacks used for the ManagedLedger asynchronous API. * */ @Beta +@SuppressWarnings("checkstyle:javadoctype") public interface AsyncCallbacks { - public interface OpenLedgerCallback { - public void openLedgerComplete(ManagedLedger ledger, Object ctx); + interface OpenLedgerCallback { + void openLedgerComplete(ManagedLedger ledger, Object ctx); - public void openLedgerFailed(ManagedLedgerException exception, Object ctx); + void openLedgerFailed(ManagedLedgerException exception, Object ctx); } - public interface DeleteLedgerCallback { - public void deleteLedgerComplete(Object ctx); + interface DeleteLedgerCallback { + void deleteLedgerComplete(Object ctx); - public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx); + void deleteLedgerFailed(ManagedLedgerException exception, Object ctx); } - public interface OpenCursorCallback { - public void openCursorComplete(ManagedCursor cursor, Object ctx); + interface OpenCursorCallback { + void openCursorComplete(ManagedCursor cursor, Object ctx); - public void openCursorFailed(ManagedLedgerException exception, Object ctx); + void openCursorFailed(ManagedLedgerException exception, Object ctx); } - public interface DeleteCursorCallback { - public void deleteCursorComplete(Object ctx); + interface DeleteCursorCallback { + void deleteCursorComplete(Object ctx); - public void deleteCursorFailed(ManagedLedgerException exception, Object ctx); + void deleteCursorFailed(ManagedLedgerException exception, Object ctx); } - public interface AddEntryCallback { - public void addComplete(Position position, Object ctx); + interface AddEntryCallback { + void addComplete(Position position, Object ctx); - public void addFailed(ManagedLedgerException exception, Object ctx); + void addFailed(ManagedLedgerException exception, Object ctx); } - public interface CloseCallback { - public void closeComplete(Object ctx); + interface CloseCallback { + void closeComplete(Object ctx); - public void closeFailed(ManagedLedgerException exception, Object ctx); + void closeFailed(ManagedLedgerException exception, Object ctx); } - public interface ReadEntriesCallback { - public void readEntriesComplete(List entries, Object ctx); + interface ReadEntriesCallback { + void readEntriesComplete(List entries, Object ctx); - public void readEntriesFailed(ManagedLedgerException exception, Object ctx); + void readEntriesFailed(ManagedLedgerException exception, Object ctx); } - public interface ReadEntryCallback { - public void readEntryComplete(Entry entry, Object ctx); + interface ReadEntryCallback { + void readEntryComplete(Entry entry, Object ctx); - public void readEntryFailed(ManagedLedgerException exception, Object ctx); + void readEntryFailed(ManagedLedgerException exception, Object ctx); } - public interface MarkDeleteCallback { - public void markDeleteComplete(Object ctx); + interface MarkDeleteCallback { + void markDeleteComplete(Object ctx); - public void markDeleteFailed(ManagedLedgerException exception, Object ctx); + void markDeleteFailed(ManagedLedgerException exception, Object ctx); } - public interface ClearBacklogCallback { - public void clearBacklogComplete(Object ctx); + interface ClearBacklogCallback { + void clearBacklogComplete(Object ctx); - public void clearBacklogFailed(ManagedLedgerException exception, Object ctx); + void clearBacklogFailed(ManagedLedgerException exception, Object ctx); } - public interface SkipEntriesCallback { - public void skipEntriesComplete(Object ctx); + interface SkipEntriesCallback { + void skipEntriesComplete(Object ctx); - public void skipEntriesFailed(ManagedLedgerException exception, Object ctx); + void skipEntriesFailed(ManagedLedgerException exception, Object ctx); } - public interface DeleteCallback { - public void deleteComplete(Object ctx); + interface DeleteCallback { + void deleteComplete(Object ctx); - public void deleteFailed(ManagedLedgerException exception, Object ctx); + void deleteFailed(ManagedLedgerException exception, Object ctx); } - public interface TerminateCallback { - public void terminateComplete(Position lastCommittedPosition, Object ctx); + interface TerminateCallback { + void terminateComplete(Position lastCommittedPosition, Object ctx); - public void terminateFailed(ManagedLedgerException exception, Object ctx); + void terminateFailed(ManagedLedgerException exception, Object ctx); } - public interface FindEntryCallback { - public void findEntryComplete(Position position, Object ctx); + interface FindEntryCallback { + void findEntryComplete(Position position, Object ctx); - public void findEntryFailed(ManagedLedgerException exception, Object ctx); + void findEntryFailed(ManagedLedgerException exception, Object ctx); } - public interface ResetCursorCallback { - public void resetComplete(Object ctx); + interface ResetCursorCallback { + void resetComplete(Object ctx); - public void resetFailed(ManagedLedgerException exception, Object ctx); + void resetFailed(ManagedLedgerException exception, Object ctx); } - public interface ManagedLedgerInfoCallback { - public void getInfoComplete(ManagedLedgerInfo info, Object ctx); + interface ManagedLedgerInfoCallback { + void getInfoComplete(ManagedLedgerInfo info, Object ctx); - public void getInfoFailed(ManagedLedgerException exception, Object ctx); + void getInfoFailed(ManagedLedgerException exception, Object ctx); } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Entry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Entry.java index 93633280c1e81..fc90a99114612 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Entry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/Entry.java @@ -19,7 +19,6 @@ package org.apache.bookkeeper.mledger; import com.google.common.annotations.Beta; - import io.netty.buffer.ByteBuf; /** @@ -49,7 +48,7 @@ public interface Entry { * @return the position at which the entry was stored */ Position getPosition(); - + /** * @return ledgerId of the position */ @@ -62,7 +61,7 @@ public interface Entry { /** * Release the resources (data) allocated for this entry and recycle if all the resources are deallocated (ref-count - * of data reached to 0) + * of data reached to 0). */ boolean release(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index 71160fa45c57e..db8e3699c0fd6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -18,10 +18,11 @@ */ package org.apache.bookkeeper.mledger; +import com.google.common.annotations.Beta; +import com.google.common.base.Predicate; import java.util.List; import java.util.Map; import java.util.Set; - import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback; @@ -30,37 +31,36 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback; -import com.google.common.annotations.Beta; -import com.google.common.base.Predicate; - /** * A ManangedCursor is a persisted cursor inside a ManagedLedger. - *

- * The ManagedCursor is used to read from the ManagedLedger and to signal when the consumer is done with the messages - * that it has read before. + * + *

The ManagedCursor is used to read from the ManagedLedger and to signal when the consumer is done with the + * messages that it has read before. */ @Beta public interface ManagedCursor { - public enum FindPositionConstraint { + @SuppressWarnings("checkstyle:javadoctype") + enum FindPositionConstraint { SearchActiveEntries, SearchAllAvailableEntries - }; + } - public enum IndividualDeletedEntries { + @SuppressWarnings("checkstyle:javadoctype") + enum IndividualDeletedEntries { Include, Exclude - }; + } /** * Get the unique cursor name. * * @return the cursor name */ - public String getName(); + String getName(); /** - * Return any properties that were associated with the last stored position + * Return any properties that were associated with the last stored position. */ - public Map getProperties(); + Map getProperties(); /** * Read entries from the ManagedLedger, up to the specified number. The returned list can be smaller. @@ -70,7 +70,7 @@ public enum IndividualDeletedEntries { * @return the list of entries * @throws ManagedLedgerException */ - public List readEntries(int numberOfEntriesToRead) throws InterruptedException, ManagedLedgerException; + List readEntries(int numberOfEntriesToRead) throws InterruptedException, ManagedLedgerException; /** * Asynchronously read entries from the ManagedLedger. @@ -83,12 +83,12 @@ public enum IndividualDeletedEntries { * @param ctx * opaque context */ - public void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx); + void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx); /** * Get 'N'th entry from the mark delete position in the cursor without updating any cursor positions. * - * @param N + * @param n * entry position * @param deletedEntries * skip individual deleted entries @@ -98,39 +98,39 @@ public enum IndividualDeletedEntries { * @throws InterruptedException * @throws ManagedLedgerException */ - public Entry getNthEntry(int N, IndividualDeletedEntries deletedEntries) + Entry getNthEntry(int n, IndividualDeletedEntries deletedEntries) throws InterruptedException, ManagedLedgerException; /** * Asynchronously get 'N'th entry from the mark delete position in the cursor without updating any cursor positions. * - * @param N + * @param n * entry position * @param deletedEntries * skip individual deleted entries * @param callback * @param ctx */ - public void asyncGetNthEntry(int N, IndividualDeletedEntries deletedEntries, ReadEntryCallback callback, + void asyncGetNthEntry(int n, IndividualDeletedEntries deletedEntries, ReadEntryCallback callback, Object ctx); /** * Read entries from the ManagedLedger, up to the specified number. The returned list can be smaller. * - * If no entries are available, the method will block until at least a new message will be persisted. + *

If no entries are available, the method will block until at least a new message will be persisted. * * @param numberOfEntriesToRead * maximum number of entries to return * @return the list of entries * @throws ManagedLedgerException */ - public List readEntriesOrWait(int numberOfEntriesToRead) throws InterruptedException, ManagedLedgerException; + List readEntriesOrWait(int numberOfEntriesToRead) throws InterruptedException, ManagedLedgerException; /** * Asynchronously read entries from the ManagedLedger. * - * If no entries are available, the callback will not be triggered. Instead it will be registered to wait until a - * new message will be persisted into the managed ledger + *

If no entries are available, the callback will not be triggered. Instead it will be registered to wait until + * a new message will be persisted into the managed ledger * * @see #readEntriesOrWait(int) * @param numberOfEntriesToRead @@ -140,44 +140,45 @@ public void asyncGetNthEntry(int N, IndividualDeletedEntries deletedEntries, Rea * @param ctx * opaque context */ - public void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx); + void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx); /** - * Cancel a previously scheduled asyncReadEntriesOrWait operation + * Cancel a previously scheduled asyncReadEntriesOrWait operation. * - * @see #asyncReadEntriesOrWait(int) + * @see #asyncReadEntriesOrWait(int, ReadEntriesCallback, Object) * @return true if the read operation was canceled or false if there was no pending operation */ - public boolean cancelPendingReadRequest(); + boolean cancelPendingReadRequest(); /** * Tells whether this cursor has already consumed all the available entries. - *

- * This method is not blocking. + * + *

This method is not blocking. * * @return true if there are pending entries to read, false otherwise */ - public boolean hasMoreEntries(); + boolean hasMoreEntries(); /** * Return the number of messages that this cursor still has to read. * - * This method has linear time complexity on the number of ledgers included in the managed ledger. + *

This method has linear time complexity on the number of ledgers included in the managed ledger. * * @return the number of entries */ - public long getNumberOfEntries(); + long getNumberOfEntries(); /** * Return the number of non-deleted messages on this cursor. * - * This will also include messages that have already been read from the cursor but not deleted or mark-deleted yet. + *

This will also include messages that have already been read from the cursor but not deleted or mark-deleted + * yet. * - * This method has linear time complexity on the number of ledgers included in the managed ledger. + *

This method has linear time complexity on the number of ledgers included in the managed ledger. * * @return the number of entries */ - public long getNumberOfEntriesInBacklog(); + long getNumberOfEntriesInBacklog(); /** * This signals that the reader is done with all the entries up to "position" (included). This can potentially @@ -188,7 +189,7 @@ public void asyncGetNthEntry(int N, IndividualDeletedEntries deletedEntries, Rea * * @throws ManagedLedgerException */ - public void markDelete(Position position) throws InterruptedException, ManagedLedgerException; + void markDelete(Position position) throws InterruptedException, ManagedLedgerException; /** * This signals that the reader is done with all the entries up to "position" (included). This can potentially @@ -201,11 +202,11 @@ public void asyncGetNthEntry(int N, IndividualDeletedEntries deletedEntries, Rea * * @throws ManagedLedgerException */ - public void markDelete(Position position, Map properties) + void markDelete(Position position, Map properties) throws InterruptedException, ManagedLedgerException; /** - * Asynchronous mark delete + * Asynchronous mark delete. * * @see #markDelete(Position) * @param position @@ -215,10 +216,10 @@ public void markDelete(Position position, Map properties) * @param ctx * opaque context */ - public void asyncMarkDelete(Position position, MarkDeleteCallback callback, Object ctx); + void asyncMarkDelete(Position position, MarkDeleteCallback callback, Object ctx); /** - * Asynchronous mark delete + * Asynchronous mark delete. * * @see #markDelete(Position) * @param position @@ -230,30 +231,30 @@ public void markDelete(Position position, Map properties) * @param ctx * opaque context */ - public void asyncMarkDelete(Position position, Map properties, MarkDeleteCallback callback, Object ctx); + void asyncMarkDelete(Position position, Map properties, MarkDeleteCallback callback, Object ctx); /** - * Delete a single message - *

- * Mark a single message for deletion. When all the previous messages are all deleted, then markDelete() will be + * Delete a single message. + * + *

Mark a single message for deletion. When all the previous messages are all deleted, then markDelete() will be * called internally to advance the persistent acknowledged position. - *

- * The deletion of the message is not persisted into the durable storage and cannot be recovered upon the reopening - * of the ManagedLedger + * + *

The deletion of the message is not persisted into the durable storage and cannot be recovered upon the + * reopening of the ManagedLedger * * @param position * the position of the message to be deleted */ - public void delete(Position position) throws InterruptedException, ManagedLedgerException; + void delete(Position position) throws InterruptedException, ManagedLedgerException; /** * Delete a single message asynchronously - *

- * Mark a single message for deletion. When all the previous messages are all deleted, then markDelete() will be + * + *

Mark a single message for deletion. When all the previous messages are all deleted, then markDelete() will be * called internally to advance the persistent acknowledged position. - *

- * The deletion of the message is not persisted into the durable storage and cannot be recovered upon the reopening - * of the ManagedLedger + * + *

The deletion of the message is not persisted into the durable storage and cannot be recovered upon the + * reopening of the ManagedLedger * * @param position * the position of the message to be deleted @@ -262,61 +263,61 @@ public void markDelete(Position position, Map properties) * @param ctx * opaque context */ - public void asyncDelete(Position position, DeleteCallback callback, Object ctx); + void asyncDelete(Position position, DeleteCallback callback, Object ctx); /** * Get the read position. This points to the next message to be read from the cursor. * * @return the read position */ - public Position getReadPosition(); + Position getReadPosition(); /** * Get the newest mark deleted position on this cursor. * * @return the mark deleted position */ - public Position getMarkDeletedPosition(); + Position getMarkDeletedPosition(); /** * Rewind the cursor to the mark deleted position to replay all the already read but not yet mark deleted messages. * - * The next message to be read is the one after the current mark deleted message. + *

The next message to be read is the one after the current mark deleted message. */ - public void rewind(); + void rewind(); /** * Move the cursor to a different read position. * - * If the new position happens to be before the already mark deleted position, it will be set to the mark deleted - * position instead. + *

If the new position happens to be before the already mark deleted position, it will be set to the mark + * deleted position instead. * * @param newReadPosition * the position where to move the cursor */ - public void seek(Position newReadPosition); + void seek(Position newReadPosition); /** * Clear the cursor backlog. * - * Consume all the entries for this cursor. + *

Consume all the entries for this cursor. */ - public void clearBacklog() throws InterruptedException, ManagedLedgerException; + void clearBacklog() throws InterruptedException, ManagedLedgerException; /** * Clear the cursor backlog. * - * Consume all the entries for this cursor. + *

Consume all the entries for this cursor. * * @param callback * callback object * @param ctx * opaque context */ - public void asyncClearBacklog(ClearBacklogCallback callback, Object ctx); + void asyncClearBacklog(ClearBacklogCallback callback, Object ctx); /** - * Skip n entries from the read position of this cursor + * Skip n entries from the read position of this cursor. * * @param numEntriesToSkip * number of entries to skip @@ -325,11 +326,11 @@ public void markDelete(Position position, Map properties) * @throws InterruptedException * @throws ManagedLedgerException */ - public void skipEntries(int numEntriesToSkip, IndividualDeletedEntries deletedEntries) + void skipEntries(int numEntriesToSkip, IndividualDeletedEntries deletedEntries) throws InterruptedException, ManagedLedgerException; /** - * Skip n entries from the read position of this cursor + * Skip n entries from the read position of this cursor. * * @param numEntriesToSkip * number of entries to skip @@ -340,11 +341,11 @@ public void skipEntries(int numEntriesToSkip, IndividualDeletedEntries deletedEn * @param ctx * opaque context */ - public void asyncSkipEntries(int numEntriesToSkip, IndividualDeletedEntries deletedEntries, + void asyncSkipEntries(int numEntriesToSkip, IndividualDeletedEntries deletedEntries, final SkipEntriesCallback callback, Object ctx); /** - * Find the newest entry that matches the given predicate + * Find the newest entry that matches the given predicate. * * @param condition * predicate that reads an entry an applies a condition @@ -352,10 +353,10 @@ public void asyncSkipEntries(int numEntriesToSkip, IndividualDeletedEntries dele * @throws InterruptedException * @throws ManagedLedgerException */ - public Position findNewestMatching(Predicate condition) throws InterruptedException, ManagedLedgerException; + Position findNewestMatching(Predicate condition) throws InterruptedException, ManagedLedgerException; /** - * Find the newest entry that matches the given predicate + * Find the newest entry that matches the given predicate. * * @param condition * predicate that reads an entry an applies a condition @@ -364,31 +365,31 @@ public void asyncSkipEntries(int numEntriesToSkip, IndividualDeletedEntries dele * @param ctx * opaque context */ - public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, + void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate condition, FindEntryCallback callback, Object ctx); /** - * reset the cursor to specified position to enable replay of messages + * reset the cursor to specified position to enable replay of messages. * * @param position * position to move the cursor to * @throws InterruptedException * @throws ManagedLedgerException */ - public void resetCursor(final Position position) throws InterruptedException, ManagedLedgerException; + void resetCursor(final Position position) throws InterruptedException, ManagedLedgerException; /** - * reset the cursor to specified position to enable replay of messages + * reset the cursor to specified position to enable replay of messages. * * @param position * position to move the cursor to * @param callback * callback object */ - public void asyncResetCursor(final Position position, AsyncCallbacks.ResetCursorCallback callback); + void asyncResetCursor(final Position position, AsyncCallbacks.ResetCursorCallback callback); /** - * Read the specified set of positions from ManagedLedger + * Read the specified set of positions from ManagedLedger. * * @param positions * set of positions to read @@ -396,11 +397,11 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate * @throws InterruptedException * @throws ManagedLedgerException */ - public List replayEntries(Set positions) + List replayEntries(Set positions) throws InterruptedException, ManagedLedgerException; /** - * Read the specified set of positions from ManagedLedger + * Read the specified set of positions from ManagedLedger. * * @param positions * set of positions to read @@ -411,7 +412,8 @@ public List replayEntries(Set positions) * @return skipped positions * set of positions which are already deleted/acknowledged and skipped while replaying them */ - public Set asyncReplayEntries(Set positions, ReadEntriesCallback callback, Object ctx); + Set asyncReplayEntries( + Set positions, ReadEntriesCallback callback, Object ctx); /** * Close the cursor and releases the associated resources. @@ -419,73 +421,73 @@ public List replayEntries(Set positions) * @throws InterruptedException * @throws ManagedLedgerException */ - public void close() throws InterruptedException, ManagedLedgerException; + void close() throws InterruptedException, ManagedLedgerException; /** - * Close the cursor asynchronously and release the associated resources + * Close the cursor asynchronously and release the associated resources. * * @param callback * callback object * @param ctx * opaque context */ - public void asyncClose(AsyncCallbacks.CloseCallback callback, Object ctx); + void asyncClose(AsyncCallbacks.CloseCallback callback, Object ctx); /** * Get the first position. * * @return the first position */ - public Position getFirstPosition(); + Position getFirstPosition(); /** - * Activate cursor: EntryCacheManager caches entries only for activated-cursors + * Activate cursor: EntryCacheManager caches entries only for activated-cursors. * */ - public void setActive(); + void setActive(); /** - * Deactivate cursor + * Deactivate cursor. * */ - public void setInactive(); + void setInactive(); /** * Checks if cursor is active or not. * * @return */ - public boolean isActive(); + boolean isActive(); /** - * Tells whether the cursor is durable or just kept in memory + * Tells whether the cursor is durable or just kept in memory. */ - public boolean isDurable(); + boolean isDurable(); /** - * Returns total number of entries from the first not-acked message to current dispatching position - * + * Returns total number of entries from the first not-acked message to current dispatching position. + * * @return */ long getNumberOfEntriesSinceFirstNotAckedMessage(); /** - * Returns number of mark-Delete range - * + * Returns number of mark-Delete range. + * * @return */ int getTotalNonContiguousDeletedMessagesRange(); /** - * Returns cursor throttle mark-delete rate - * + * Returns cursor throttle mark-delete rate. + * * @return */ double getThrottleMarkDelete(); /** - * Update throttle mark delete rate - * + * Update throttle mark delete rate. + * */ void setThrottleMarkDelete(double throttleMarkDelete); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index 0d2c41b4bd41b..e13664c27d01f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -18,8 +18,8 @@ */ package org.apache.bookkeeper.mledger; +import com.google.common.annotations.Beta; import io.netty.buffer.ByteBuf; - import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; @@ -27,12 +27,10 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback; -import com.google.common.annotations.Beta; - /** * A ManagedLedger it's a superset of a BookKeeper ledger concept. - *

- * It mimics the concept of an appender log that: + * + *

It mimics the concept of an appender log that: * *

    *
  • has a unique name (chosen by clients) by which it can be created/opened/deleted
  • @@ -42,8 +40,8 @@ *
  • when all the consumers have processed all the entries contained in a Bookkeeper ledger, the ledger is * deleted
  • *
- *

- * Caveats: + * + *

Caveats: *

    *
  • A single ManagedLedger can only be open once at any time. Implementation can protect double access from the same * VM, but accesses from different machines to the same ManagedLedger need to be avoided through an external source of @@ -56,7 +54,7 @@ public interface ManagedLedger { /** * @return the unique name of this ManagedLedger */ - public String getName(); + String getName(); /** * Append a new entry to the end of a managed ledger. @@ -66,10 +64,10 @@ public interface ManagedLedger { * @return the Position at which the entry has been inserted * @throws ManagedLedgerException */ - public Position addEntry(byte[] data) throws InterruptedException, ManagedLedgerException; + Position addEntry(byte[] data) throws InterruptedException, ManagedLedgerException; /** - * Append a new entry asynchronously + * Append a new entry asynchronously. * * @see #addEntry(byte[]) * @param data @@ -80,7 +78,7 @@ public interface ManagedLedger { * @param ctx * opaque context */ - public void asyncAddEntry(byte[] data, AddEntryCallback callback, Object ctx); + void asyncAddEntry(byte[] data, AddEntryCallback callback, Object ctx); /** * Append a new entry to the end of a managed ledger. @@ -94,10 +92,10 @@ public interface ManagedLedger { * @return the Position at which the entry has been inserted * @throws ManagedLedgerException */ - public Position addEntry(byte[] data, int offset, int length) throws InterruptedException, ManagedLedgerException; + Position addEntry(byte[] data, int offset, int length) throws InterruptedException, ManagedLedgerException; /** - * Append a new entry asynchronously + * Append a new entry asynchronously. * * @see #addEntry(byte[]) * @param data @@ -111,10 +109,10 @@ public interface ManagedLedger { * @param ctx * opaque context */ - public void asyncAddEntry(byte[] data, int offset, int length, AddEntryCallback callback, Object ctx); + void asyncAddEntry(byte[] data, int offset, int length, AddEntryCallback callback, Object ctx); /** - * Append a new entry asynchronously + * Append a new entry asynchronously. * * @see #addEntry(byte[]) * @param buffer @@ -124,28 +122,29 @@ public interface ManagedLedger { * @param ctx * opaque context */ - public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx); + void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx); /** * Open a ManagedCursor in this ManagedLedger. - *

    - * If the cursors doesn't exist, a new one will be created and its position will be at the end of the ManagedLedger. + * + *

    If the cursors doesn't exist, a new one will be created and its position will be at the end of the + * ManagedLedger. * * @param name * the name associated with the ManagedCursor * @return the ManagedCursor * @throws ManagedLedgerException */ - public ManagedCursor openCursor(String name) throws InterruptedException, ManagedLedgerException; + ManagedCursor openCursor(String name) throws InterruptedException, ManagedLedgerException; /** * Creates a new cursor whose metadata is not backed by durable storage. A caller can treat the non-durable cursor * exactly like a normal cursor, with the only difference in that after restart it will not remember which entries * were deleted. Also it does not prevent data from being deleted. - *

    - * The cursor is anonymous and can be positioned on an arbitrary position. - *

    - * This method is not-blocking. + * + *

    The cursor is anonymous and can be positioned on an arbitrary position. + * + *

    This method is not-blocking. * * @param startCursorPosition * the position where the cursor should be initialized, or null to start from the current latest entry. @@ -153,7 +152,7 @@ public interface ManagedLedger { * to the specified position * @return the new NonDurableCursor */ - public ManagedCursor newNonDurableCursor(Position startCursorPosition) throws ManagedLedgerException; + ManagedCursor newNonDurableCursor(Position startCursorPosition) throws ManagedLedgerException; /** * Delete a ManagedCursor asynchronously. @@ -166,19 +165,20 @@ public interface ManagedLedger { * @param ctx * opaque context */ - public void asyncDeleteCursor(String name, DeleteCursorCallback callback, Object ctx); + void asyncDeleteCursor(String name, DeleteCursorCallback callback, Object ctx); /** * Remove a ManagedCursor from this ManagedLedger. - *

    - * If the cursor doesn't exist, the operation will still succeed. + * + *

    If the cursor doesn't exist, the operation will still succeed. * * @param name * the name associated with the ManagedCursor - * @return the ManagedCursor + * * @throws ManagedLedgerException + * @throws InterruptedException */ - public void deleteCursor(String name) throws InterruptedException, ManagedLedgerException; + void deleteCursor(String name) throws InterruptedException, ManagedLedgerException; /** * Open a ManagedCursor asynchronously. @@ -191,90 +191,91 @@ public interface ManagedLedger { * @param ctx * opaque context */ - public void asyncOpenCursor(String name, OpenCursorCallback callback, Object ctx); + void asyncOpenCursor(String name, OpenCursorCallback callback, Object ctx); /** - * Get a list of all the cursors reading from this ManagedLedger + * Get a list of all the cursors reading from this ManagedLedger. * * @return a list of cursors */ - public Iterable getCursors(); + Iterable getCursors(); /** - * Get a list of all the active cursors reading from this ManagedLedger + * Get a list of all the active cursors reading from this ManagedLedger. * * @return a list of cursors */ - public Iterable getActiveCursors(); + Iterable getActiveCursors(); /** * Get the total number of entries for this managed ledger. - *

    - * This is defined by the number of entries in all the BookKeeper ledgers that are being maintained by this + * + *

    This is defined by the number of entries in all the BookKeeper ledgers that are being maintained by this * ManagedLedger. - *

    - * This method is non-blocking. + * + *

    This method is non-blocking. * * @return the number of entries */ - public long getNumberOfEntries(); + long getNumberOfEntries(); /** * Get the total number of active entries for this managed ledger. - *

    - * This is defined by the number of non consumed entries in all the BookKeeper ledgers that are being maintained by - * this ManagedLedger. - *

    - * This method is non-blocking. + * + *

    This is defined by the number of non consumed entries in all the BookKeeper ledgers that are being maintained + * by this ManagedLedger. + * + *

    This method is non-blocking. * * @return the number of entries */ - public long getNumberOfActiveEntries(); + long getNumberOfActiveEntries(); /** * Get the total sizes in bytes of the managed ledger, without accounting for replicas. - *

    - * This is defined by the sizes of all the BookKeeper ledgers that are being maintained by this ManagedLedger. - *

    - * This method is non-blocking. + * + *

    This is defined by the sizes of all the BookKeeper ledgers that are being maintained by this ManagedLedger. + * + *

    This method is non-blocking. * * @return total size in bytes */ - public long getTotalSize(); + long getTotalSize(); /** * Get estimated total unconsumed or backlog size in bytes for the managed ledger, without accounting for replicas. * * @return estimated total backlog size */ - public long getEstimatedBacklogSize(); + long getEstimatedBacklogSize(); /** - * Activate cursors those caught up backlog-threshold entries and deactivate slow cursors which are creating backlog + * Activate cursors those caught up backlog-threshold entries and deactivate slow cursors which are creating + * backlog. */ - public void checkBackloggedCursors(); + void checkBackloggedCursors(); - public void asyncTerminate(TerminateCallback callback, Object ctx); + void asyncTerminate(TerminateCallback callback, Object ctx); /** * Terminate the managed ledger and return the last committed entry. - *

    - * Once the managed ledger is terminated, it will not accept any more write + * + *

    Once the managed ledger is terminated, it will not accept any more write * * @return * @throws InterruptedException * @throws ManagedLedgerException */ - public Position terminate() throws InterruptedException, ManagedLedgerException; + Position terminate() throws InterruptedException, ManagedLedgerException; /** * Close the ManagedLedger. - *

    - * This will close all the underlying BookKeeper ledgers. All the ManagedCursors associated will be invalidated. + * + *

    This will close all the underlying BookKeeper ledgers. All the ManagedCursors associated will be invalidated. * * @throws ManagedLedgerException */ - public void close() throws InterruptedException, ManagedLedgerException; + void close() throws InterruptedException, ManagedLedgerException; /** * Close the ManagedLedger asynchronously. @@ -285,51 +286,51 @@ public interface ManagedLedger { * @param ctx * opaque context */ - public void asyncClose(CloseCallback callback, Object ctx); + void asyncClose(CloseCallback callback, Object ctx); /** * @return the managed ledger stats MBean */ - public ManagedLedgerMXBean getStats(); + ManagedLedgerMXBean getStats(); /** - * Delete the ManagedLedger + * Delete the ManagedLedger. * * @throws InterruptedException * @throws ManagedLedgerException */ - public void delete() throws InterruptedException, ManagedLedgerException; + void delete() throws InterruptedException, ManagedLedgerException; /** - * Async delete a ledger + * Async delete a ledger. * * @param callback * @param ctx * @throws InterruptedException * @throws ManagedLedgerException */ - public void asyncDelete(DeleteLedgerCallback callback, Object ctx); + void asyncDelete(DeleteLedgerCallback callback, Object ctx); /** - * Get the slowest consumer + * Get the slowest consumer. * * @return the slowest consumer */ - public ManagedCursor getSlowestConsumer(); + ManagedCursor getSlowestConsumer(); /** - * Returns whether the managed ledger was terminated + * Returns whether the managed ledger was terminated. */ - public boolean isTerminated(); + boolean isTerminated(); /** - * Returns managed-ledger config + * Returns managed-ledger config. */ ManagedLedgerConfig getConfig(); /** - * Updates managed-ledger config - * + * Updates managed-ledger config. + * * @param config */ void setConfig(ManagedLedgerConfig config); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index fd81ba1881776..bbac80a0311c9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -20,16 +20,14 @@ import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.annotations.Beta; +import com.google.common.base.Charsets; import java.util.Arrays; import java.util.concurrent.TimeUnit; - import org.apache.bookkeeper.client.BookKeeper.DigestType; -import com.google.common.annotations.Beta; -import com.google.common.base.Charsets; - /** - * Configuration class for a ManagedLedger + * Configuration class for a ManagedLedger. */ @Beta public class ManagedLedgerConfig { @@ -98,7 +96,7 @@ public int getMinimumRolloverTimeMs() { /** * Set the minimum rollover time for ledgers in this managed ledger. * - * If this time is > 0, a ledger will not be rolled over more frequently than the specified time, even if it has + *

    If this time is > 0, a ledger will not be rolled over more frequently than the specified time, even if it has * reached the maximum number of entries or maximum size. This parameter can be used to reduce the amount of * rollovers on managed ledger with high write throughput. * @@ -114,7 +112,7 @@ public void setMinimumRolloverTime(int minimumRolloverTime, TimeUnit unit) { } /** - * @return the maximum rollover time + * @return the maximum rollover time. */ public long getMaximumRolloverTimeMs() { return maximumRolloverTimeMs; @@ -123,8 +121,8 @@ public long getMaximumRolloverTimeMs() { /** * Set the maximum rollover time for ledgers in this managed ledger. * - * If the ledger is not rolled over until this time, even if it has not reached the number of entry or size limit, - * this setting will trigger rollover. This parameter can be used for topics with low request rate to force + *

    If the ledger is not rolled over until this time, even if it has not reached the number of entry or size + * limit, this setting will trigger rollover. This parameter can be used for topics with low request rate to force * rollover, so recovery failure does not have to go far back. * * @param maximumRolloverTime @@ -376,8 +374,6 @@ public long getRetentionSizeInMB() { /** * Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets * corrupted at bookkeeper and managed-cursor is stuck at that ledger. - * - * @param autoSkipNonRecoverableData */ public boolean isAutoSkipNonRecoverableData() { return autoSkipNonRecoverableData; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java index 9ad8284c73ac7..c6ed6aff1ece1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java @@ -20,8 +20,8 @@ import com.google.common.annotations.Beta; -@SuppressWarnings("serial") @Beta +@SuppressWarnings({"serial", "checkstyle:javadoctype"}) public class ManagedLedgerException extends Exception { public ManagedLedgerException(String msg) { super(msg); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java index c19e50cea2b61..c562bfb2c328e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java @@ -18,11 +18,10 @@ */ package org.apache.bookkeeper.mledger; +import com.google.common.annotations.Beta; import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback; -import com.google.common.annotations.Beta; - /** * A factory to open/create managed ledgers and delete them. * @@ -39,7 +38,7 @@ public interface ManagedLedgerFactory { * @return the managed ledger * @throws ManagedLedgerException */ - public ManagedLedger open(String name) throws InterruptedException, ManagedLedgerException; + ManagedLedger open(String name) throws InterruptedException, ManagedLedgerException; /** * Open a managed ledger. If the managed ledger does not exist, a new one will be automatically created. @@ -51,7 +50,7 @@ public interface ManagedLedgerFactory { * @return the managed ledger * @throws ManagedLedgerException */ - public ManagedLedger open(String name, ManagedLedgerConfig config) + ManagedLedger open(String name, ManagedLedgerConfig config) throws InterruptedException, ManagedLedgerException; /** @@ -65,7 +64,7 @@ public ManagedLedger open(String name, ManagedLedgerConfig config) * @param ctx * opaque context */ - public void asyncOpen(String name, OpenLedgerCallback callback, Object ctx); + void asyncOpen(String name, OpenLedgerCallback callback, Object ctx); /** * Asynchronous open method. @@ -80,22 +79,18 @@ public ManagedLedger open(String name, ManagedLedgerConfig config) * @param ctx * opaque context */ - public void asyncOpen(String name, ManagedLedgerConfig config, OpenLedgerCallback callback, Object ctx); + void asyncOpen(String name, ManagedLedgerConfig config, OpenLedgerCallback callback, Object ctx); /** - * Get the current metadata info for a managed ledger + * Get the current metadata info for a managed ledger. * * @param name * the unique name that identifies the managed ledger - * @param callback - * callback object - * @param ctx - * opaque context */ - public ManagedLedgerInfo getManagedLedgerInfo(String name) throws InterruptedException, ManagedLedgerException; + ManagedLedgerInfo getManagedLedgerInfo(String name) throws InterruptedException, ManagedLedgerException; /** - * Asynchronously get the current metadata info for a managed ledger + * Asynchronously get the current metadata info for a managed ledger. * * @param name * the unique name that identifies the managed ledger @@ -104,13 +99,13 @@ public ManagedLedger open(String name, ManagedLedgerConfig config) * @param ctx * opaque context */ - public void asyncGetManagedLedgerInfo(String name, ManagedLedgerInfoCallback callback, Object ctx); + void asyncGetManagedLedgerInfo(String name, ManagedLedgerInfoCallback callback, Object ctx); /** - * Releases all the resources maintained by the ManagedLedgerFactory + * Releases all the resources maintained by the ManagedLedgerFactory. * * @throws ManagedLedgerException */ - public void shutdown() throws InterruptedException, ManagedLedgerException; + void shutdown() throws InterruptedException, ManagedLedgerException; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java index f534657c6bece..6f1f41d4bd874 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryConfig.java @@ -18,6 +18,9 @@ */ package org.apache.bookkeeper.mledger; +/** + * Configuration for a {@link ManagedLedgerFactory}. + */ public class ManagedLedgerFactoryConfig { private static final long MB = 1024 * 1024; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java index cd8b6ca2abee2..cd775933f0a71 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java @@ -19,46 +19,46 @@ package org.apache.bookkeeper.mledger; /** - * JMX Bean interface for ManagedLedgerFactory stats + * JMX Bean interface for ManagedLedgerFactory stats. */ public interface ManagedLedgerFactoryMXBean { /** - * Get the number of currently opened managed ledgers on the factory + * Get the number of currently opened managed ledgers on the factory. */ int getNumberOfManagedLedgers(); /** - * Get the size in byte used to store the entries payloads + * Get the size in byte used to store the entries payloads. */ long getCacheUsedSize(); /** - * Get the configured maximum cache size + * Get the configured maximum cache size. */ long getCacheMaxSize(); /** - * Get the number of cache hits per second + * Get the number of cache hits per second. */ double getCacheHitsRate(); /** - * Get the number of cache misses per second + * Get the number of cache misses per second. */ double getCacheMissesRate(); /** - * Get the amount of data is retrieved from the cache in byte/s + * Get the amount of data is retrieved from the cache in byte/s. */ double getCacheHitsThroughput(); /** - * Get the amount of data is retrieved from the bookkeeper in byte/s + * Get the amount of data is retrieved from the bookkeeper in byte/s. */ double getCacheMissesThroughput(); /** - * Get the number of cache evictions during the last minute + * Get the number of cache evictions during the last minute. */ long getNumberOfCacheEvictions(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerInfo.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerInfo.java index e6e9f15c619ec..ff3f6b505a57a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerInfo.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerInfo.java @@ -21,8 +21,9 @@ import java.util.List; import java.util.Map; +@SuppressWarnings("checkstyle:javadoctype") public class ManagedLedgerInfo { - /** Z-Node version */ + /** Z-Node version. */ public int version; public String creationDate; public String modificationDate; @@ -40,7 +41,7 @@ public static class LedgerInfo { } public static class CursorInfo { - /** Z-Node version */ + /** Z-Node version. */ public int version; public String creationDate; public String modificationDate; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java index a5f533d3e7450..9de5eaeae9ab2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerMXBean.java @@ -21,6 +21,9 @@ import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats; import org.apache.bookkeeper.mledger.util.StatsBuckets; +/** + * Management Bean for a {@link ManagedLedger}. + */ public interface ManagedLedgerMXBean { /** diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java index c9982312a70c9..33a16cb83926e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCache.java @@ -36,9 +36,9 @@ public interface EntryCache extends Comparable { /** * Insert an entry in the cache. - *

    - * If the overall limit have been reached, this will triggered the eviction of other entries, possibly from other - * EntryCache instances + * + *

    If the overall limit have been reached, this will triggered the eviction of other entries, possibly from + * other EntryCache instances * * @param entry * the entry to be cached @@ -55,7 +55,7 @@ public interface EntryCache extends Comparable { void invalidateEntries(PositionImpl lastPosition); /** - * Remove from the cache all the entries belonging to a specific ledger + * Remove from the cache all the entries belonging to a specific ledger. * * @param ledgerId * the ledger id @@ -63,7 +63,7 @@ public interface EntryCache extends Comparable { void invalidateAllEntries(long ledgerId); /** - * Remove all the entries from the cache + * Remove all the entries from the cache. */ void clear(); @@ -79,7 +79,7 @@ public interface EntryCache extends Comparable { /** * Read entries from the cache or from bookkeeper. * - * Get the entry data either from cache or bookkeeper and mixes up the results in a single list. + *

    Get the entry data either from cache or bookkeeper and mixes up the results in a single list. * * @param lh * the ledger handle @@ -100,7 +100,7 @@ void asyncReadEntry(LedgerHandle lh, long firstEntry, long lastEntry, boolean is /** * Read entry at given position from the cache or from bookkeeper. * - * Get the entry data either from cache or bookkeeper and mixes up the results in a single list. + *

    Get the entry data either from cache or bookkeeper and mixes up the results in a single list. * * @param lh * the ledger handle @@ -114,7 +114,7 @@ void asyncReadEntry(LedgerHandle lh, long firstEntry, long lastEntry, boolean is void asyncReadEntry(LedgerHandle lh, PositionImpl position, ReadEntryCallback callback, Object ctx); /** - * Get the total size in bytes of all the entries stored in this cache + * Get the total size in bytes of all the entries stored in this cache. * * @return the size of the entry cache */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheDefaultEvictionPolicy.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheDefaultEvictionPolicy.java index 418cb097b6788..fc9e790656b62 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheDefaultEvictionPolicy.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheDefaultEvictionPolicy.java @@ -21,22 +21,19 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.util.Collections.reverseOrder; -import java.util.Collections; +import com.google.common.collect.Lists; import java.util.List; - import org.apache.bookkeeper.mledger.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; - /** * Default eviction policy. * - * This policy consider only the bigger caches for doing eviction. + *

    This policy consider only the bigger caches for doing eviction. * - * The PercentOfSizeToConsiderForEviction parameter should always be bigger than the cacheEvictionWatermak, otherwise - * the eviction cycle will free less memory than what was required. + *

    The PercentOfSizeToConsiderForEviction parameter should always be bigger than the cacheEvictionWatermak, + * otherwisethe eviction cycle will free less memory than what was required. */ public class EntryCacheDefaultEvictionPolicy implements EntryCacheEvictionPolicy { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheEvictionPolicy.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheEvictionPolicy.java index af0a8786b0ef3..341c5c328ade8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheEvictionPolicy.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheEvictionPolicy.java @@ -21,12 +21,12 @@ import java.util.List; /** - * Cache eviction policy abstraction interface + * Cache eviction policy abstraction interface. * */ public interface EntryCacheEvictionPolicy { /** - * Perform the cache eviction of at least sizeToFree bytes on the supplied list of caches + * Perform the cache eviction of at least sizeToFree bytes on the supplied list of caches. * * @param caches * the list of caches to consider diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java index 469a0e9492d08..8aec56c50279c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java @@ -20,32 +20,29 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; +import com.google.common.collect.Lists; +import com.google.common.primitives.Longs; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; import java.util.Collection; import java.util.List; - import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; import org.apache.bookkeeper.mledger.ManagedLedgerException; -import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; import org.apache.bookkeeper.mledger.util.Pair; import org.apache.bookkeeper.mledger.util.RangeCache; import org.apache.bookkeeper.mledger.util.RangeCache.Weighter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; -import com.google.common.primitives.Longs; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; - /** - * Cache data payload for entries of all ledgers + * Cache data payload for entries of all ledgers. */ public class EntryCacheImpl implements EntryCache { @@ -55,17 +52,12 @@ public class EntryCacheImpl implements EntryCache { private static final double MB = 1024 * 1024; - private static final Weighter entryWeighter = new Weighter() { - @Override - public long getSize(EntryImpl entry) { - return entry.getLength(); - } - }; + private static final Weighter entryWeighter = EntryImpl::getLength; public EntryCacheImpl(EntryCacheManager manager, ManagedLedgerImpl ml) { this.manager = manager; this.ml = ml; - this.entries = new RangeCache(entryWeighter); + this.entries = new RangeCache<>(entryWeighter); if (log.isDebugEnabled()) { log.debug("[{}] Initialized managed-ledger entry cache", ml.getName()); @@ -77,7 +69,7 @@ public String getName() { return ml.getName(); } - public final static PooledByteBufAllocator allocator = new PooledByteBufAllocator( // + public final static PooledByteBufAllocator ALLOCATOR = new PooledByteBufAllocator( true, // preferDirect 0, // nHeapArenas, 1, // nDirectArena @@ -110,7 +102,7 @@ public boolean insert(EntryImpl entry) { int size = entry.getLength(); ByteBuf cachedData = null; try { - cachedData = allocator.directBuffer(size, size); + cachedData = ALLOCATOR.directBuffer(size, size); } catch (Throwable t) { log.warn("[{}] Failed to allocate buffer for entry cache: {}", ml.getName(), t.getMessage(), t); return false; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java index 7c536b76f9e27..7faa18c8e7fc9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java @@ -21,13 +21,16 @@ import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.primitives.Longs; +import io.netty.buffer.ByteBuf; import java.util.Enumeration; import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; - import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.LedgerEntry; @@ -35,17 +38,11 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; -import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.primitives.Longs; - -import io.netty.buffer.ByteBuf; - +@SuppressWarnings("checkstyle:javadoctype") public class EntryCacheManager { private final long maxSize; @@ -242,6 +239,6 @@ public int compareTo(EntryCache other) { public static Entry create(long ledgerId, long entryId, ByteBuf data) { return EntryImpl.create(ledgerId, entryId, data); } - + private static final Logger log = LoggerFactory.getLogger(EntryCacheManager.class); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java index 5e053d4c9e03e..7371ebcb8ad36 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java @@ -18,17 +18,15 @@ */ package org.apache.bookkeeper.mledger.impl; -import org.apache.bookkeeper.client.LedgerEntry; -import org.apache.bookkeeper.mledger.Entry; - import com.google.common.collect.ComparisonChain; - import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.util.AbstractReferenceCounted; import io.netty.util.Recycler; import io.netty.util.Recycler.Handle; import io.netty.util.ReferenceCounted; +import org.apache.bookkeeper.client.LedgerEntry; +import org.apache.bookkeeper.mledger.Entry; final class EntryImpl extends AbstractReferenceCounted implements Entry, Comparable, ReferenceCounted { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java index d924fedc15306..f7bdde40f39e8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java @@ -20,31 +20,29 @@ import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.collect.Lists; import java.util.ArrayList; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.locks.StampedLock; - import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.util.Pair; -import com.google.common.collect.Lists; - /** * Contains all the cursors for a ManagedLedger. * - * The goal is to always know the slowest consumer and hence decide which is the oldest ledger we need to keep. + *

    The goal is to always know the slowest consumer and hence decide which is the oldest ledger we need to keep. * - * This data structure maintains a list and a map of cursors. The map is used to relate a cursor name with an entry in - * the linked-list. The list is a sorted double linked-list of cursors. + *

    This data structure maintains a list and a map of cursors. The map is used to relate a cursor name with an entry + * in the linked-list. The list is a sorted double linked-list of cursors. * - * When a cursor is markDeleted, this list is updated and the cursor is moved in its new position. + *

    When a cursor is markDeleted, this list is updated and the cursor is moved in its new position. * - * To minimize the moving around, the order is maintained using the ledgerId, but not the entryId, since we only care - * about ledgers to be deleted. + *

    To minimize the moving around, the order is maintained using the ledgerId, but not the entryId, since we only + * care about ledgers to be deleted. * */ class ManagedCursorContainer implements Iterable { @@ -230,7 +228,7 @@ public void remove() { // ////////////////////// /** - * Push the item up towards the the root of the tree (lowest reading position) + * Push the item up towards the the root of the tree (lowest reading position). */ private void siftUp(Item item) { Item parent = getParent(item); @@ -241,7 +239,7 @@ private void siftUp(Item item) { } /** - * Push the item down towards the bottom of the tree (highest reading position) + * Push the item down towards the bottom of the tree (highest reading position). */ private void siftDown(final Item item) { while (true) { @@ -270,7 +268,7 @@ private void siftDown(final Item item) { } /** - * Swap two items in the heap + * Swap two items in the heap. */ private void swap(Item item1, Item item2) { int idx1 = item1.idx; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 22cdf3d0f4f0e..39ac5c599efb5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -20,8 +20,23 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException; +import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC; +import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES; +import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; +import com.google.common.base.MoreObjects; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Range; +import com.google.common.collect.RangeSet; +import com.google.common.collect.Sets; +import com.google.common.collect.TreeRangeSet; +import com.google.common.util.concurrent.RateLimiter; +import com.google.protobuf.InvalidProtocolBufferException; import java.util.ArrayDeque; import java.util.Collections; import java.util.List; @@ -36,7 +51,6 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; - import org.apache.bookkeeper.client.AsyncCallback.CloseCallback; import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback; import org.apache.bookkeeper.client.BKException; @@ -69,22 +83,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.MoreObjects; -import com.google.common.base.Predicate; -import com.google.common.collect.Collections2; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Range; -import com.google.common.collect.RangeSet; -import com.google.common.collect.Sets; -import com.google.common.collect.TreeRangeSet; -import com.google.common.util.concurrent.RateLimiter; -import com.google.protobuf.InvalidProtocolBufferException; -import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES; -import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC; -import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; -import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException; - +@SuppressWarnings("checkstyle:javadoctype") public class ManagedCursorImpl implements ManagedCursor { protected final BookKeeper bookkeeper; @@ -96,19 +95,19 @@ public class ManagedCursorImpl implements ManagedCursor { protected volatile PositionImpl readPosition; private volatile MarkDeleteEntry lastMarkDeleteEntry; - protected static final AtomicReferenceFieldUpdater WAITING_READ_OP_UPDATER = AtomicReferenceFieldUpdater - .newUpdater(ManagedCursorImpl.class, OpReadEntry.class, "waitingReadOp"); + protected static final AtomicReferenceFieldUpdater WAITING_READ_OP_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, OpReadEntry.class, "waitingReadOp"); @SuppressWarnings("unused") private volatile OpReadEntry waitingReadOp = null; private static final int FALSE = 0; private static final int TRUE = 1; - private static final AtomicIntegerFieldUpdater RESET_CURSOR_IN_PROGRESS_UPDATER = AtomicIntegerFieldUpdater - .newUpdater(ManagedCursorImpl.class, "resetCursorInProgress"); + private static final AtomicIntegerFieldUpdater RESET_CURSOR_IN_PROGRESS_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "resetCursorInProgress"); @SuppressWarnings("unused") private volatile int resetCursorInProgress = FALSE; - private static final AtomicIntegerFieldUpdater PENDING_READ_OPS_UPDATER = AtomicIntegerFieldUpdater - .newUpdater(ManagedCursorImpl.class, "pendingReadOps"); + private static final AtomicIntegerFieldUpdater PENDING_READ_OPS_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "pendingReadOps"); @SuppressWarnings("unused") private volatile int pendingReadOps = 0; @@ -148,8 +147,8 @@ public MarkDeleteEntry(PositionImpl newPosition, Map properties, } private final ArrayDeque pendingMarkDeleteOps = new ArrayDeque<>(); - private static final AtomicIntegerFieldUpdater PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER = AtomicIntegerFieldUpdater - .newUpdater(ManagedCursorImpl.class, "pendingMarkDeletedSubmittedCount"); + private static final AtomicIntegerFieldUpdater PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER = + AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "pendingMarkDeletedSubmittedCount"); @SuppressWarnings("unused") private volatile int pendingMarkDeletedSubmittedCount = 0; private long lastLedgerSwitchTimestamp; @@ -162,10 +161,11 @@ enum State { Closed // The managed cursor has been closed } - private static final AtomicReferenceFieldUpdater STATE_UPDATER = AtomicReferenceFieldUpdater - .newUpdater(ManagedCursorImpl.class, State.class, "state"); + private static final AtomicReferenceFieldUpdater STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, State.class, "state"); private volatile State state = null; + @SuppressWarnings("checkstyle:javadoctype") public interface VoidCallback { void operationComplete(); @@ -199,7 +199,7 @@ public Map getProperties() { /** * Performs the initial recovery, reading the mark-deleted position from the ledger and then calling initialize to - * have a new opened ledger + * have a new opened ledger. */ void recover(final VoidCallback callback) { // Read the meta-data ledgerId from the store @@ -328,7 +328,8 @@ private void recoverIndividualDeletedMessages(List i } } - private void recoveredCursor(PositionImpl position, Map properties, LedgerHandle recoveredFromCursorLedger) { + private void recoveredCursor(PositionImpl position, Map properties, + LedgerHandle recoveredFromCursorLedger) { // if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty), // we need to move to the next existing ledger if (!ledger.ledgerExists(position.getLedgerId())) { @@ -400,8 +401,9 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { counter.await(); - if (result.exception != null) + if (result.exception != null) { throw result.exception; + } return result.entries; } @@ -421,7 +423,7 @@ public void asyncReadEntries(final int numberOfEntriesToRead, final ReadEntriesC } @Override - public Entry getNthEntry(int N, IndividualDeletedEntries deletedEntries) + public Entry getNthEntry(int n, IndividualDeletedEntries deletedEntries) throws InterruptedException, ManagedLedgerException { final CountDownLatch counter = new CountDownLatch(1); @@ -432,7 +434,7 @@ class Result { final Result result = new Result(); - asyncGetNthEntry(N, deletedEntries, new ReadEntryCallback() { + asyncGetNthEntry(n, deletedEntries, new ReadEntryCallback() { @Override public void readEntryFailed(ManagedLedgerException exception, Object ctx) { @@ -449,16 +451,17 @@ public void readEntryComplete(Entry entry, Object ctx) { counter.await(); - if (result.exception != null) + if (result.exception != null) { throw result.exception; + } return result.entry; } @Override - public void asyncGetNthEntry(int N, IndividualDeletedEntries deletedEntries, ReadEntryCallback callback, + public void asyncGetNthEntry(int n, IndividualDeletedEntries deletedEntries, ReadEntryCallback callback, Object ctx) { - checkArgument(N > 0); + checkArgument(n > 0); if (STATE_UPDATER.get(this) == State.Closed) { callback.readEntryFailed(new ManagedLedgerException("Cursor was already closed"), ctx); return; @@ -468,12 +471,12 @@ public void asyncGetNthEntry(int N, IndividualDeletedEntries deletedEntries, Rea PositionImpl endPosition = ledger.getLastPosition(); if (startPosition.compareTo(endPosition) <= 0) { long numOfEntries = getNumberOfEntries(Range.closed(startPosition, endPosition)); - if (numOfEntries >= N) { + if (numOfEntries >= n) { long deletedMessages = 0; if (deletedEntries == IndividualDeletedEntries.Exclude) { - deletedMessages = getNumIndividualDeletedEntriesToSkip(N); + deletedMessages = getNumIndividualDeletedEntriesToSkip(n); } - PositionImpl positionAfterN = ledger.getPositionAfterN(markDeletePosition, N + deletedMessages, + PositionImpl positionAfterN = ledger.getPositionAfterN(markDeletePosition, n + deletedMessages, PositionBound.startExcluded); ledger.asyncReadEntry(positionAfterN, callback, ctx); } else { @@ -514,8 +517,9 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { counter.await(); - if (result.exception != null) + if (result.exception != null) { throw result.exception; + } return result.entries; } @@ -687,8 +691,9 @@ public void findEntryFailed(ManagedLedgerException exception, Object ctx) { }, null); counter.await(); - if (result.exception != null) + if (result.exception != null) { throw result.exception; + } return result.position; } @@ -917,8 +922,9 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { counter.await(); - if (result.exception != null) + if (result.exception != null) { throw result.exception; + } return result.entries; } @@ -1017,7 +1023,8 @@ private long getNumberOfEntries(Range range) { } if (log.isDebugEnabled()) { - log.debug("[{}] Found {} entries - deleted: {}", ledger.getName(), allEntries - deletedEntries, deletedEntries); + log.debug("[{}] Found {} entries - deleted: {}", + ledger.getName(), allEntries - deletedEntries, deletedEntries); } return allEntries - deletedEntries; } @@ -1504,8 +1511,8 @@ public void asyncDelete(Position pos, final AsyncCallbacks.DeleteCallback callba try { if (log.isDebugEnabled()) { - log.debug( - "[{}] [{}] Deleting single message at {}. Current status: {} - md-position: {} - previous-position: {}", + log.debug("[{}] [{}] Deleting single message at {}. " + + "Current status: {} - md-position: {} - previous-position: {}", ledger.getName(), name, pos, individualDeletedMessages, markDeletePosition, previousPosition); } @@ -1834,7 +1841,7 @@ public void asyncClose(final AsyncCallbacks.CloseCallback callback, final Object } /** - * Internal version of seek that doesn't do the validation check + * Internal version of seek that doesn't do the validation check. * * @param newReadPositionInt */ @@ -2168,7 +2175,7 @@ void readOperationCompleted() { void asyncDeleteLedger(final LedgerHandle lh) { asyncDeleteLedger(lh, DEFAULT_LEDGER_DELETE_RETRIES); } - + private void asyncDeleteLedger(final LedgerHandle lh, int retry) { if (lh == null || retry <= 0) { if (lh != null) { @@ -2199,7 +2206,7 @@ private void asyncDeleteLedger(final LedgerHandle lh, int retry) { void asyncDeleteCursorLedger() { asyncDeleteCursorLedger(DEFAULT_LEDGER_DELETE_RETRIES); } - + private void asyncDeleteCursorLedger(int retry) { STATE_UPDATER.set(this, State.Closed); @@ -2229,7 +2236,7 @@ private void asyncDeleteCursorLedger(int retry) { } /** - * return BK error codes that are considered not likely to be recoverable + * return BK error codes that are considered not likely to be recoverable. */ private static boolean isBkErrorNotRecoverable(int rc) { switch (rc) { @@ -2245,7 +2252,7 @@ private static boolean isBkErrorNotRecoverable(int rc) { } /** - * If we fail to recover the cursor ledger, we want to still open the ML and rollback + * If we fail to recover the cursor ledger, we want to still open the ML and rollback. * * @param info */ @@ -2293,7 +2300,7 @@ public String getIndividuallyDeletedMessages() { /** * Checks given position is part of deleted-range and returns next position of upper-end as all the messages are - * deleted up to that point + * deleted up to that point. * * @param position * @return next available position @@ -2309,7 +2316,7 @@ public PositionImpl getNextAvailablePosition(PositionImpl position) { public Position getNextLedgerPosition(long currentLedgerId) { Long nextExistingLedger = ledger.getNextValidLedger(currentLedgerId); - return nextExistingLedger!=null ? PositionImpl.get(nextExistingLedger, 0) : null; + return nextExistingLedger != null ? PositionImpl.get(nextExistingLedger, 0) : null; } public boolean isIndividuallyDeletedEntriesEmpty() { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index d461f9af38553..cf9058eeb41fc 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -19,7 +19,11 @@ package org.apache.bookkeeper.mledger.impl; import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException; +import com.google.common.base.Predicates; +import com.google.common.collect.Maps; +import io.netty.util.concurrent.DefaultThreadFactory; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -31,7 +35,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; - import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.conf.ClientConfiguration; @@ -67,12 +70,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Predicates; -import com.google.common.collect.Maps; - -import io.netty.util.concurrent.DefaultThreadFactory; -import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException; - public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { private final MetaStore store; private final BookKeeper bookKeeper; @@ -158,7 +155,7 @@ private synchronized void refreshStats() { } /** - * Helper for getting stats + * Helper for getting stats. * * @return */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java index caedefdbda64a..b97fda682ff24 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java @@ -19,10 +19,10 @@ package org.apache.bookkeeper.mledger.impl; import java.util.concurrent.TimeUnit; - import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean; import org.apache.bookkeeper.mledger.util.Rate; +@SuppressWarnings("checkstyle:javadoctype") public class ManagedLedgerFactoryMBeanImpl implements ManagedLedgerFactoryMXBean { private final ManagedLedgerFactoryImpl factory; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 68a7645115221..80a0bbed8b402 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -23,6 +23,14 @@ import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; +import com.google.common.collect.BoundType; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Queues; +import com.google.common.collect.Range; +import com.google.common.util.concurrent.RateLimiter; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -39,7 +47,6 @@ import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; import org.apache.bookkeeper.client.BKException; @@ -83,16 +90,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.BoundType; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Queues; -import com.google.common.collect.Range; -import com.google.common.util.concurrent.RateLimiter; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; - public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private final static long MegaByte = 1024 * 1024; @@ -1114,8 +1111,8 @@ public void operationFailed(MetaStoreException e) { if (e instanceof BadVersionException) { synchronized (ManagedLedgerImpl.this) { log.error( - "[{}] Failed to udpate ledger list. z-node version mismatch. Closing managed ledger", - name); + "[{}] Failed to udpate ledger list. z-node version mismatch. Closing managed ledger", + name); STATE_UPDATER.set(ManagedLedgerImpl.this, State.Fenced); clearPendingAddEntries(e); return; @@ -1504,7 +1501,7 @@ private boolean isLedgerRetentionOverSizeQuota() { } /** - * Checks whether there are ledger that have been fully consumed and deletes them + * Checks whether there are ledger that have been fully consumed and deletes them. * * @throws Exception */ @@ -1555,9 +1552,10 @@ void internalTrimConsumedLedgers() { if (log.isDebugEnabled()) { log.debug( - "[{}] Checking ledger {} -- time-old: {} sec -- expired: {} -- over-quota: {} -- current-ledger: {}", - name, ls.getLedgerId(), (System.currentTimeMillis() - ls.getTimestamp()) / 1000.0, expired, - overRetentionQuota, currentLedger.getId()); + "[{}] Checking ledger {} -- time-old: {} sec -- " + + "expired: {} -- over-quota: {} -- current-ledger: {}", + name, ls.getLedgerId(), (System.currentTimeMillis() - ls.getTimestamp()) / 1000.0, expired, + overRetentionQuota, currentLedger.getId()); } if (ls.getLedgerId() == currentLedger.getId() || (!expired && !overRetentionQuota)) { if (log.isDebugEnabled()) { @@ -1729,6 +1727,7 @@ private void asyncDeleteLedger(long ledgerId, long retry) { }, null); } + @SuppressWarnings("checkstyle:fallthrough") private void deleteAllLedgers(DeleteLedgerCallback callback, Object ctx) { List ledgers = Lists.newArrayList(ManagedLedgerImpl.this.ledgers.values()); AtomicInteger ledgersToDelete = new AtomicInteger(ledgers.size()); @@ -1788,7 +1787,7 @@ public void operationFailed(MetaStoreException e) { } /** - * Get the number of entries between a contiguous range of two positions + * Get the number of entries between a contiguous range of two positions. * * @param range * the position range @@ -1831,7 +1830,7 @@ long getNumberOfEntries(Range range) { } /** - * Get the entry position at a given distance from a given position + * Get the entry position at a given distance from a given position. * * @param startPosition * starting position @@ -2010,7 +2009,7 @@ PositionImpl getMarkDeletePositionOfSlowestConsumer() { } /** - * Get the last position written in the managed ledger, alongside with the associated counter + * Get the last position written in the managed ledger, alongside with the associated counter. */ Pair getLastPositionAndCounter() { PositionImpl pos; @@ -2103,7 +2102,7 @@ private ManagedLedgerInfo getManagedLedgerInfo() { } /** - * Throws an exception if the managed ledger has been previously fenced + * Throws an exception if the managed ledger has been previously fenced. * * @throws ManagedLedgerException */ @@ -2139,10 +2138,10 @@ public void setConfig(ManagedLedgerConfig config) { this.cursors.forEach(c -> c.setThrottleMarkDelete(config.getThrottleMarkDelete())); } - static interface ManagedLedgerInitializeLedgerCallback { - public void initializeComplete(); + interface ManagedLedgerInitializeLedgerCallback { + void initializeComplete(); - public void initializeFailed(ManagedLedgerException e); + void initializeFailed(ManagedLedgerException e); } // Expose internal values for debugging purposes @@ -2191,7 +2190,7 @@ public long getCacheSize() { } /** - * return BK error codes that are considered not likely to be recoverable + * return BK error codes that are considered not likely to be recoverable. */ private static boolean isBkErrorNotRecoverable(int rc) { switch (rc) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java index 9358a4d16bd14..38c67f49f8085 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java @@ -20,12 +20,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; - import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerMXBean; +import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats; import org.apache.bookkeeper.mledger.util.Rate; import org.apache.bookkeeper.mledger.util.StatsBuckets; -import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats; public class ManagedLedgerMBeanImpl implements ManagedLedgerMXBean { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java index 686e796f87ec4..ee22c5ea0e295 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java @@ -18,6 +18,9 @@ */ package org.apache.bookkeeper.mledger.impl; +import com.google.common.collect.BoundType; +import com.google.common.collect.Range; +import com.google.protobuf.InvalidProtocolBufferException; import java.util.Enumeration; import java.util.Iterator; import java.util.List; @@ -25,7 +28,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; - import org.apache.bookkeeper.client.AsyncCallback; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -40,10 +42,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.BoundType; -import com.google.common.collect.Range; -import com.google.protobuf.InvalidProtocolBufferException; - /** */ public class ManagedLedgerOfflineBacklog { @@ -160,7 +158,8 @@ public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, MetaStore. BKException.getMessage(rc)); } if (rc == BKException.Code.OK) { - MLDataFormats.ManagedLedgerInfo.LedgerInfo info = MLDataFormats.ManagedLedgerInfo.LedgerInfo + MLDataFormats.ManagedLedgerInfo.LedgerInfo info = + MLDataFormats.ManagedLedgerInfo.LedgerInfo .newBuilder().setLedgerId(id).setEntries(lh.getLastAddConfirmed() + 1) .setSize(lh.getLength()).setTimestamp(System.currentTimeMillis()).build(); ledgers.put(id, info); @@ -300,8 +299,8 @@ public void readComplete(int rc, LedgerHandle lh, Enumeration seq, positionInfo = MLDataFormats.PositionInfo.parseFrom(entry.getEntry()); } catch (InvalidProtocolBufferException e) { log.warn( - "[{}] Error reading position from metadata ledger {} for cursor {}: {}", - managedLedgerName, ledgerId, cursorName, e); + "[{}] Error reading position from metadata ledger {} for cursor {}: {}", + managedLedgerName, ledgerId, cursorName, e); offlineTopicStats.addCursorDetails(cursorName, errorInReadingCursor, lh.getId()); return; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java index 1cc78d22a31f4..27e4acf45cf90 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java @@ -19,7 +19,6 @@ package org.apache.bookkeeper.mledger.impl; import java.util.List; - import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; @@ -29,28 +28,30 @@ */ public interface MetaStore { - public static interface Stat { + @SuppressWarnings("checkstyle:javadoctype") + interface Stat { int getVersion(); long getCreationTimestamp(); long getModificationTimestamp(); } - public static interface UpdateLedgersIdsCallback { + @SuppressWarnings("checkstyle:javadoctype") + interface UpdateLedgersIdsCallback { void updateLedgersIdsComplete(MetaStoreException status, Stat stat); } - public static interface MetaStoreCallback { + @SuppressWarnings("checkstyle:javadoctype") + interface MetaStoreCallback { void operationComplete(T result, Stat stat); void operationFailed(MetaStoreException e); } /** - * Get the metadata used by the ManagedLedger + * Get the metadata used by the ManagedLedger. * * @param ledgerName * the name of the ManagedLedger - * @return a version object and a list of LedgerStats * @throws MetaStoreException */ void getManagedLedgerInfo(String ledgerName, MetaStoreCallback callback); @@ -59,15 +60,12 @@ public static interface MetaStoreCallback { * * @param ledgerName * the name of the ManagedLedger - * - * @param ManagedLedgerInfo - * the metadata object to be persisted + * @param mlInfo + * managed ledger info * @param version * version object associated with current state * @param callback * callback object - * @param ctx - * opaque context object */ void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, Stat version, MetaStoreCallback callback); @@ -77,15 +75,13 @@ void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, Stat vers * * @param ledgerName * the name of the ManagedLedger - * @return a list of the consumer Ids - * @throws MetaStoreException */ void getCursors(String ledgerName, MetaStoreCallback> callback); /** * Get the ledger id associated with a cursor. * - * This ledger id will contains the mark-deleted position for the cursor. + *

    This ledger id will contains the mark-deleted position for the cursor. * * @param ledgerName * @param cursorName @@ -94,12 +90,13 @@ void asyncUpdateLedgerIds(String ledgerName, ManagedLedgerInfo mlInfo, Stat vers void asyncGetCursorInfo(String ledgerName, String cursorName, MetaStoreCallback callback); /** - * Update the persisted position of a cursor + * Update the persisted position of a cursor. * * @param ledgerName * the name of the ManagedLedger * @param cursorName - * @param ledgerId + * @param info + * @param version * @param callback * the callback * @throws MetaStoreException @@ -108,7 +105,7 @@ void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedCursorIn MetaStoreCallback callback); /** - * Drop the persistent state of a consumer from the metadata store + * Drop the persistent state of a consumer from the metadata store. * * @param ledgerName * the name of the ManagedLedger @@ -130,7 +127,7 @@ void asyncUpdateCursorInfo(String ledgerName, String cursorName, ManagedCursorIn void removeManagedLedger(String ledgerName, MetaStoreCallback callback); /** - * Get a list of all the managed ledgers in the system + * Get a list of all the managed ledgers in the system. * * @return an Iterable of the names of the managed ledgers * @throws MetaStoreException diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java index 005b34a97a62d..ae3e1282757d0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java @@ -20,10 +20,13 @@ import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; +import com.google.common.base.Charsets; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.TextFormat; +import com.google.protobuf.TextFormat.ParseException; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; - import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException; import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; @@ -40,11 +43,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Charsets; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.TextFormat; -import com.google.protobuf.TextFormat.ParseException; - +@SuppressWarnings("checkstyle:javadoctype") public class MetaStoreImplZookeeper implements MetaStore { private static final Charset Encoding = Charsets.UTF_8; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java index 9de01f4694dbe..6b6c36997cec8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java @@ -18,8 +18,9 @@ */ package org.apache.bookkeeper.mledger.impl; +import com.google.common.base.MoreObjects; +import com.google.common.collect.Range; import java.util.Map; - import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; @@ -29,9 +30,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.MoreObjects; -import com.google.common.collect.Range; - public class NonDurableCursorImpl extends ManagedCursorImpl { NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName, diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index af28c11f00127..b1f1d56a57264 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -20,9 +20,11 @@ import static com.google.common.base.Preconditions.checkArgument; +import io.netty.buffer.ByteBuf; +import io.netty.util.Recycler; +import io.netty.util.Recycler.Handle; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.AsyncCallback.CloseCallback; import org.apache.bookkeeper.client.BKException; @@ -34,12 +36,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.buffer.ByteBuf; -import io.netty.util.Recycler; -import io.netty.util.Recycler.Handle; - /** - * Handles the life-cycle of an addEntry() operation + * Handles the life-cycle of an addEntry() operation. * */ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback { @@ -55,8 +53,8 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback { ByteBuf data; private int dataLength; - private static final AtomicReferenceFieldUpdater callbackUpdater = AtomicReferenceFieldUpdater - .newUpdater(OpAddEntry.class, AddEntryCallback.class, "callback"); + private static final AtomicReferenceFieldUpdater callbackUpdater = + AtomicReferenceFieldUpdater.newUpdater(OpAddEntry.class, AddEntryCallback.class, "callback"); public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) { OpAddEntry op = RECYCLER.get(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java index 5de1a104b44fe..60ecacfd8dce5 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java @@ -18,17 +18,15 @@ */ package org.apache.bookkeeper.mledger.impl; -import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; +import com.google.common.base.Predicate; import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback; +import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.PositionBound; -import com.google.common.base.Predicate; -/** - */ -public class OpFindNewest implements ReadEntryCallback { +class OpFindNewest implements ReadEntryCallback { private final ManagedCursorImpl cursor; private final PositionImpl startPosition; private final FindEntryCallback callback; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index aa6cda20508ab..31d1c79b380f2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -20,8 +20,10 @@ import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; +import com.google.common.collect.Lists; +import io.netty.util.Recycler; +import io.netty.util.Recycler.Handle; import java.util.List; - import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerException; @@ -31,12 +33,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; - -import io.netty.util.Recycler; -import io.netty.util.Recycler.Handle; - -public class OpReadEntry implements ReadEntriesCallback { +class OpReadEntry implements ReadEntriesCallback { ManagedCursorImpl cursor; PositionImpl readPosition; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java index 851883aceecee..a0607a875dc89 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java @@ -20,13 +20,12 @@ import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.base.Objects; +import com.google.common.collect.ComparisonChain; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; -import com.google.common.base.Objects; -import com.google.common.collect.ComparisonChain; - public class PositionImpl implements Position, Comparable { private final long ledgerId; @@ -77,7 +76,7 @@ public PositionImpl getNext() { } /** - * String representation of virtual cursor - LedgerId:EntryId + * String representation of virtual cursor - LedgerId:EntryId. */ @Override public String toString() { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/package-info.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/package-info.java new file mode 100644 index 0000000000000..9699362cb3496 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; \ No newline at end of file diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/package-info.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/package-info.java new file mode 100644 index 0000000000000..975a9581e41c0 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger; \ No newline at end of file diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/package-info.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/package-info.java new file mode 100644 index 0000000000000..1e9054901a1e5 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/proto/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.proto; \ No newline at end of file diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/CallbackMutex.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/CallbackMutex.java index f9e86be3a5cc4..9e934bfa461a6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/CallbackMutex.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/CallbackMutex.java @@ -19,14 +19,13 @@ package org.apache.bookkeeper.mledger.util; import java.util.concurrent.Semaphore; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Mutex object that can be acquired from a thread and released from a different thread. * - * This is meant to be acquired when calling an asynchronous method and released in its callback which is probably + *

    This is meant to be acquired when calling an asynchronous method and released in its callback which is probably * executed in a different thread. */ public class CallbackMutex { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java index 54e2c6ed9a6ed..11801bb342036 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Futures.java @@ -21,11 +21,17 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; - import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.ManagedLedgerException; +/** + * Conveniences to use with {@link CompletableFuture}. + */ public class Futures { + + /** + * Adapts a {@link CloseCallback} to a {@link CompletableFuture}. + */ public static class CloseFuture extends CompletableFuture implements CloseCallback { @Override diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Pair.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Pair.java index 8f04d30e8227b..10787b020d927 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Pair.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Pair.java @@ -20,15 +20,23 @@ import com.google.common.base.Objects; -public class Pair { - public final A first; - public final B second; +/** + * A generic container for two values. + * + * @param + * the first value type + * @param + * the second value type + */ +public class Pair { + public final FirstT first; + public final SecondT second; - public static Pair create(X x, Y y) { - return new Pair(x, y); + public static Pair create(FirstT x, SecondT y) { + return new Pair<>(x, y); } - public Pair(A first, B second) { + public Pair(FirstT first, SecondT second) { this.first = first; this.second = second; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java index 128c4dd25d310..16c2d1d491c9a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/RangeCache.java @@ -20,6 +20,8 @@ import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.collect.Lists; +import io.netty.util.ReferenceCounted; import java.util.Collection; import java.util.List; import java.util.Map; @@ -27,10 +29,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicLong; -import com.google.common.collect.Lists; - -import io.netty.util.ReferenceCounted; - /** * Special type of cache where get() and delete() operations can be done over a range of keys. * @@ -46,14 +44,14 @@ public class RangeCache, Value extends ReferenceCoun private final Weighter weighter; // Weighter object used to extract the size from values /** - * Construct a new RangeLruCache with default Weighter + * Construct a new RangeLruCache with default Weighter. */ public RangeCache() { this(new DefaultWeighter()); } /** - * Construct a new RangeLruCache + * Construct a new RangeLruCache. * * @param weighter * a custom weighter to compute the size of each stored value @@ -65,7 +63,7 @@ public RangeCache(Weighter weighter) { } /** - * Insert + * Insert. * * @param key * @param value @@ -188,7 +186,7 @@ public long getSize() { } /** - * Remove all the entries from the cache + * Remove all the entries from the cache. * * @return the old size */ @@ -210,16 +208,16 @@ public synchronized long clear() { } /** - * Interface of a object that is able to the extract the "weight" (size/cost/space) of the cached values + * Interface of a object that is able to the extract the "weight" (size/cost/space) of the cached values. * - * @param + * @param */ - public static interface Weighter { - long getSize(Value value); + public interface Weighter { + long getSize(ValueT value); } /** - * Default cache weighter, every value is assumed the same cost + * Default cache weighter, every value is assumed the same cost. * * @param */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Rate.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Rate.java index 97ade7dd0cc4d..2b8e018399d93 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Rate.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/Rate.java @@ -19,6 +19,7 @@ package org.apache.bookkeeper.mledger.util; import static com.google.common.base.Preconditions.checkArgument; + import java.util.concurrent.atomic.LongAdder; /** diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/SafeRun.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/SafeRun.java index f61e6d378c946..fef9574640bb1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/SafeRun.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/SafeRun.java @@ -19,9 +19,11 @@ package org.apache.bookkeeper.mledger.util; import java.util.function.Consumer; - import org.apache.bookkeeper.util.SafeRunnable; +/** + * Static builders for {@link SafeRunnable}s. + */ public class SafeRun { public static SafeRunnable safeRun(Runnable runnable) { return new SafeRunnable() { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/package-info.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/package-info.java new file mode 100644 index 0000000000000..e905eaeebe080 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/util/package-info.java @@ -0,0 +1,19 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.util; \ No newline at end of file diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java b/managed-ledger/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java index 979126cf85880..b14d984bd99fd 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/client/BookKeeperTestClient.java @@ -19,7 +19,6 @@ package org.apache.bookkeeper.client; import java.io.IOException; - import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java b/managed-ledger/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java index 08b2f5bb5f8b4..0da3369c0527d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/client/MockBookKeeper.java @@ -18,6 +18,8 @@ */ package org.apache.bookkeeper.client; +import io.netty.channel.oio.OioEventLoopGroup; +import io.netty.util.concurrent.DefaultThreadFactory; import java.util.Arrays; import java.util.Map; import java.util.Set; @@ -27,7 +29,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; - import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback; import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; @@ -36,9 +37,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.netty.channel.oio.OioEventLoopGroup; -import io.netty.util.concurrent.DefaultThreadFactory; - /** * Test BookKeeperClient which allows access to members we don't wish to expose in the public API. */ diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/client/MockLedgerEntry.java b/managed-ledger/src/test/java/org/apache/bookkeeper/client/MockLedgerEntry.java index f34b80928b74d..4b39cfd44b7b5 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/client/MockLedgerEntry.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/client/MockLedgerEntry.java @@ -18,10 +18,9 @@ */ package org.apache.bookkeeper.client; -import java.io.InputStream; - import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import java.io.InputStream; public class MockLedgerEntry extends LedgerEntry { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java b/managed-ledger/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java index f137cd18fadd9..7aa3a63a79ec1 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/client/MockLedgerHandle.java @@ -18,6 +18,9 @@ */ package org.apache.bookkeeper.client; +import com.google.common.collect.Lists; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import java.security.GeneralSecurityException; import java.util.ArrayDeque; import java.util.ArrayList; @@ -25,7 +28,6 @@ import java.util.Enumeration; import java.util.Queue; import java.util.concurrent.RejectedExecutionException; - import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.AsyncCallback.CloseCallback; import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; @@ -33,11 +35,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; - public class MockLedgerHandle extends LedgerHandle { final ArrayList entries = Lists.newArrayList(); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/SimpleBookKeeperTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/SimpleBookKeeperTest.java index 64bd8cf1e37e7..c1b4894c73636 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/SimpleBookKeeperTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/SimpleBookKeeperTest.java @@ -18,9 +18,9 @@ */ package org.apache.bookkeeper.mledger; +import com.google.common.base.Charsets; import java.nio.charset.Charset; import java.util.Enumeration; - import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; @@ -29,8 +29,6 @@ import org.slf4j.LoggerFactory; import org.testng.annotations.Test; -import com.google.common.base.Charsets; - public class SimpleBookKeeperTest extends MockedBookKeeperTestCase { private static final String SECRET = "secret"; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java index e9c999bcb12c2..107abcf7583b6 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheManagerTest.java @@ -27,9 +27,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; - import org.apache.bookkeeper.mledger.Entry; -import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.testng.annotations.BeforeClass; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java index 117bb0bc612df..1be2692a6b8fd 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/EntryCacheTest.java @@ -20,19 +20,14 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyLong; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static org.testng.Assert.assertEquals; +import io.netty.buffer.Unpooled; import java.lang.reflect.Method; import java.util.List; import java.util.Vector; import java.util.concurrent.CountDownLatch; - import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.LedgerEntry; @@ -48,8 +43,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import io.netty.buffer.Unpooled; - @Test public class EntryCacheTest extends MockedBookKeeperTestCase { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java index df61a7dd0fa48..62f4045e8c601 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorConcurrencyTest.java @@ -19,10 +19,9 @@ package org.apache.bookkeeper.mledger.impl; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNull; +import static org.testng.Assert.*; +import com.google.common.collect.Lists; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -30,7 +29,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; - import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; @@ -46,8 +44,6 @@ import org.slf4j.LoggerFactory; import org.testng.annotations.Test; -import com.google.common.collect.Lists; - public class ManagedCursorConcurrencyTest extends MockedBookKeeperTestCase { private static final Logger log = LoggerFactory.getLogger(ManagedCursorConcurrencyTest.class); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index 7d39c3728e7a9..5ad25f5af9439 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -18,16 +18,15 @@ */ package org.apache.bookkeeper.mledger.impl; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; +import static org.testng.Assert.*; +import com.google.common.base.Predicate; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; - import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; @@ -41,10 +40,6 @@ import org.apache.bookkeeper.mledger.Position; import org.testng.annotations.Test; -import com.google.common.base.Predicate; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - @Test public class ManagedCursorContainerTest { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java index 1e2400d8371d7..7febfdd0babab 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorPropertiesTest.java @@ -23,7 +23,6 @@ import java.util.Collections; import java.util.Map; import java.util.TreeMap; - import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 61e9b3da785bb..d3ed63695dfa9 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -18,13 +18,11 @@ */ package org.apache.bookkeeper.mledger.impl; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotEquals; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; +import static org.testng.Assert.*; +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; @@ -43,10 +41,9 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; - import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; @@ -71,10 +68,6 @@ import org.slf4j.LoggerFactory; import org.testng.annotations.Test; -import com.google.common.base.Charsets; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - public class ManagedCursorTest extends MockedBookKeeperTestCase { private static final Charset Encoding = Charsets.UTF_8; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java index a945759105d41..59d35d69bab1e 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java @@ -24,6 +24,8 @@ import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import com.google.common.base.Charsets; +import com.google.common.collect.Lists; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; @@ -31,17 +33,16 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; - import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.BookKeeperTestClient; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback; -import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; import org.apache.bookkeeper.mledger.Position; @@ -49,9 +50,6 @@ import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; import org.testng.annotations.Test; -import com.google.common.base.Charsets; -import com.google.common.collect.Lists; - public class ManagedLedgerBkTest extends BookKeeperClusterTestCase { public ManagedLedgerBkTest() { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java index 0ae6d3a50ce32..5004d28c8f541 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java @@ -18,14 +18,11 @@ */ package org.apache.bookkeeper.mledger.impl; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.fail; +import static org.testng.Assert.*; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; - import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java index 59a55e58102fc..c3e75c2e78d2b 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanTest.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.concurrent.TimeUnit; - import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerSingleBookieTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerSingleBookieTest.java index 23b9ca2ea4f43..f663de522141d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerSingleBookieTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerSingleBookieTest.java @@ -20,10 +20,10 @@ import static org.testng.Assert.assertEquals; +import com.google.common.base.Charsets; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; - import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -31,8 +31,6 @@ import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.testng.annotations.Test; -import com.google.common.base.Charsets; - public class ManagedLedgerSingleBookieTest extends MockedBookKeeperTestCase { private static final Charset Encoding = Charsets.UTF_8; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTerminationTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTerminationTest.java index d5f14acace145..052c9fb5b68cf 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTerminationTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTerminationTest.java @@ -23,7 +23,6 @@ import java.util.Collections; import java.util.List; - import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 2dcf8cd0f142f..460e5227db809 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -18,13 +18,13 @@ */ package org.apache.bookkeeper.mledger.impl; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; +import static org.testng.Assert.*; +import com.google.common.base.Charsets; +import com.google.common.collect.Sets; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Modifier; @@ -41,7 +41,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; - import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; @@ -77,13 +76,6 @@ import org.slf4j.LoggerFactory; import org.testng.annotations.Test; -import com.google.common.base.Charsets; -import com.google.common.collect.Sets; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.buffer.Unpooled; - public class ManagedLedgerTest extends MockedBookKeeperTestCase { private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTest.class); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java index 7173a9bbb208b..a5f6a3a60f42a 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java @@ -23,7 +23,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; - import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; import org.apache.bookkeeper.mledger.impl.MetaStore.Stat; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java index 718256e6f018d..afc751c68eaf7 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java @@ -18,17 +18,16 @@ */ package org.apache.bookkeeper.mledger.impl; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; +import static org.testng.Assert.*; +import com.google.common.base.Charsets; +import com.google.common.collect.Iterables; import java.nio.charset.Charset; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; - import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; @@ -45,9 +44,6 @@ import org.slf4j.LoggerFactory; import org.testng.annotations.Test; -import com.google.common.base.Charsets; -import com.google.common.collect.Iterables; - public class NonDurableCursorTest extends MockedBookKeeperTestCase { private static final Charset Encoding = Charsets.UTF_8; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/PositionTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/PositionTest.java index 7922b6d0fde89..fe2fcdaf18fa9 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/PositionTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/PositionTest.java @@ -21,7 +21,6 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; -import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; import org.testng.annotations.Test; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/CallbackMutexTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/CallbackMutexTest.java index 5386c7187fcb5..599458dd26012 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/CallbackMutexTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/CallbackMutexTest.java @@ -19,7 +19,6 @@ package org.apache.bookkeeper.mledger.util; import java.util.concurrent.atomic.AtomicBoolean; - import org.testng.Assert; import org.testng.annotations.Test; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java index 561a1947608be..61f2e9f3cd3e2 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/util/RangeCacheTest.java @@ -21,12 +21,10 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; -import org.testng.annotations.Test; - import com.google.common.collect.Lists; - import io.netty.util.AbstractReferenceCounted; import io.netty.util.ReferenceCounted; +import org.testng.annotations.Test; @Test public class RangeCacheTest { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java index 08ba1f913eec1..685dd92fa2dba 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java @@ -36,7 +36,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; - import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.client.BookKeeperTestClient; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java index 040fb27ca2fb7..7bdf2a1d5fe36 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java @@ -21,7 +21,6 @@ import java.lang.reflect.Method; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; - import org.apache.bookkeeper.client.MockBookKeeper; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java index 7360e531b4bf1..044e22dc93b67 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/ZooKeeperUtil.java @@ -27,7 +27,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.concurrent.CountDownLatch; - import org.apache.bookkeeper.util.ZkUtils; import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase; import org.apache.commons.io.FileUtils; diff --git a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java index 9f1708204bbda..e868091e3ed3e 100644 --- a/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java +++ b/managed-ledger/src/test/java/org/apache/zookeeper/MockZooKeeper.java @@ -18,6 +18,13 @@ */ package org.apache.zookeeper; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimaps; +import com.google.common.collect.SetMultimap; +import com.google.common.collect.Sets; +import io.netty.util.concurrent.DefaultThreadFactory; import java.lang.reflect.Constructor; import java.util.List; import java.util.Set; @@ -26,7 +33,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; - import org.apache.bookkeeper.mledger.util.Pair; import org.apache.zookeeper.AsyncCallback.Children2Callback; import org.apache.zookeeper.AsyncCallback.ChildrenCallback; @@ -41,15 +47,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Multimaps; -import com.google.common.collect.SetMultimap; -import com.google.common.collect.Sets; - -import io.netty.util.concurrent.DefaultThreadFactory; - @SuppressWarnings({ "deprecation", "restriction", "rawtypes" }) public class MockZooKeeper extends ZooKeeper { private TreeMap> tree; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/AllocatorStatsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/AllocatorStatsGenerator.java index 998145bee6f45..a9700f73fefea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/AllocatorStatsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/AllocatorStatsGenerator.java @@ -41,7 +41,7 @@ public static AllocatorStats generate(String allocatorName) { if ("default".equals(allocatorName)) { allocator = PooledByteBufAllocator.DEFAULT; } else if ("ml-cache".equals(allocatorName)) { - allocator = EntryCacheImpl.allocator; + allocator = EntryCacheImpl.ALLOCATOR; } else { throw new IllegalArgumentException("Invalid allocator name : " + allocatorName); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java index e16dfd1dc71a9..f172ea3f4b407 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java @@ -57,7 +57,7 @@ public synchronized List generate() { m.put("brk_ml_cache_hits_throughput", mlCacheStats.getCacheHitsThroughput()); m.put("brk_ml_cache_misses_throughput", mlCacheStats.getCacheMissesThroughput()); - PooledByteBufAllocator allocator = EntryCacheImpl.allocator; + PooledByteBufAllocator allocator = EntryCacheImpl.ALLOCATOR; long activeAllocations = 0; long activeAllocationsTiny = 0; long activeAllocationsSmall = 0;