Skip to content

Commit

Permalink
Add user-defined properties to cursor position (apache#744)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Sep 8, 2017
1 parent 9c75f8f commit 8c0d399
Show file tree
Hide file tree
Showing 11 changed files with 1,515 additions and 226 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.bookkeeper.mledger;

import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
Expand Down Expand Up @@ -56,6 +57,11 @@ public enum IndividualDeletedEntries {
*/
public String getName();

/**
* Return any properties that were associated with the last stored position
*/
public Map<String, Long> getProperties();

/**
* Read entries from the ManagedLedger, up to the specified number. The returned list can be smaller.
*
Expand Down Expand Up @@ -179,10 +185,25 @@ public void asyncGetNthEntry(int N, IndividualDeletedEntries deletedEntries, Rea
*
* @param position
* the last position that have been successfully consumed
*
* @throws ManagedLedgerException
*/
public void markDelete(Position position) throws InterruptedException, ManagedLedgerException;

/**
* This signals that the reader is done with all the entries up to "position" (included). This can potentially
* trigger a ledger deletion, if all the other cursors are done too with the underlying ledger.
*
* @param position
* the last position that have been successfully consumed
* @param properties
* additional user-defined properties that can be associated with a particular cursor position
*
* @throws ManagedLedgerException
*/
public void markDelete(Position position, Map<String, Long> properties)
throws InterruptedException, ManagedLedgerException;

/**
* Asynchronous mark delete
*
Expand All @@ -196,6 +217,21 @@ public void asyncGetNthEntry(int N, IndividualDeletedEntries deletedEntries, Rea
*/
public void asyncMarkDelete(Position position, MarkDeleteCallback callback, Object ctx);

/**
* Asynchronous mark delete
*
* @see #markDelete(Position)
* @param position
* the last position that have been successfully consumed
* @param properties
* additional user-defined properties that can be associated with a particular cursor position
* @param callback
* callback object
* @param ctx
* opaque context
*/
public void asyncMarkDelete(Position position, Map<String, Long> properties, MarkDeleteCallback callback, Object ctx);

/**
* Delete a single message
* <p>
Expand Down Expand Up @@ -372,7 +408,7 @@ public List<Entry> replayEntries(Set<? extends Position> positions)
* callback object returning the list of entries
* @param ctx
* opaque context
* @return skipped positions
* @return skipped positions
* set of positions which are already deleted/acknowledged and skipped while replaying them
*/
public Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions, ReadEntriesCallback callback, Object ctx);
Expand Down Expand Up @@ -404,19 +440,19 @@ public List<Entry> replayEntries(Set<? extends Position> positions)

/**
* Activate cursor: EntryCacheManager caches entries only for activated-cursors
*
*
*/
public void setActive();

/**
* Deactivate cursor
*
*
*/
public void setInactive();

/**
* Checks if cursor is active or not.
*
*
* @return
*/
public boolean isActive();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public static class CursorInfo {
// Last snapshot of the mark-delete position
public PositionInfo markDelete;
public List<MessageRangeInfo> individualDeletedMessages;
public Map<String, Long> properties;
}

public static class PositionInfo {
Expand Down
Loading

0 comments on commit 8c0d399

Please sign in to comment.