Skip to content

Commit

Permalink
[Feature] Introduce continuous offset for pulsar (apache#9039)
Browse files Browse the repository at this point in the history
Fixes apache#9038 
### Motivation

As described in [PIP-70](https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-broker-entry-metadata).
One of the use case for Broker entry metadata is  providing continuous message sequence-Id for messages in one topic-partition which is useful for Protocol Hanlder like KOP.

This PR enable Pulsar to support continuous offset for message based on Broker entry metadata.

### Modifications

Introduce a new field for broker entry metadta named `offset`;
Introduce a new interceptor type `ManagedLedgerInterceptor` which intercept entry in `ManagedLedger`;
Each partition will be assigned a `ManagedLedgerInterceptor` when `ManagedLedger` created;
Each Entry will be intercept for adding a  monotone increasing offset in Broker entry metadata and the offet is added by batchSize of entry;
Support find position by a given offset.
  • Loading branch information
aloyszhang authored Dec 24, 2020
1 parent 11b9359 commit d85a5e2
Show file tree
Hide file tree
Showing 21 changed files with 897 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;

/**
Expand Down Expand Up @@ -74,6 +75,18 @@ public interface ManagedLedger {
*/
Position addEntry(byte[] data) throws InterruptedException, ManagedLedgerException;

/**
* Append a new entry to the end of a managed ledger.
*
* @param data
* data entry to be persisted
* @param numberOfMessages
* numberOfMessages of entry
* @return the Position at which the entry has been inserted
* @throws ManagedLedgerException
*/
Position addEntry(byte[] data, int numberOfMessages) throws InterruptedException, ManagedLedgerException;

/**
* Append a new entry asynchronously.
*
Expand Down Expand Up @@ -102,6 +115,22 @@ public interface ManagedLedger {
*/
Position addEntry(byte[] data, int offset, int length) throws InterruptedException, ManagedLedgerException;

/**
* Append a new entry to the end of a managed ledger.
*
* @param data
* data entry to be persisted
* @param numberOfMessages
* numberOfMessages of entry
* @param offset
* offset in the data array
* @param length
* number of bytes
* @return the Position at which the entry has been inserted
* @throws ManagedLedgerException
*/
Position addEntry(byte[] data, int numberOfMessages, int offset, int length) throws InterruptedException, ManagedLedgerException;

/**
* Append a new entry asynchronously.
*
Expand All @@ -119,6 +148,26 @@ public interface ManagedLedger {
*/
void asyncAddEntry(byte[] data, int offset, int length, AddEntryCallback callback, Object ctx);

/**
* Append a new entry asynchronously.
*
* @see #addEntry(byte[])
* @param data
* data entry to be persisted
* @param numberOfMessages
* numberOfMessages of entry
* @param offset
* offset in the data array
* @param length
* number of bytes
* @param callback
* callback object
* @param ctx
* opaque context
*/
void asyncAddEntry(byte[] data, int numberOfMessages, int offset, int length, AddEntryCallback callback, Object ctx);


/**
* Append a new entry asynchronously.
*
Expand All @@ -132,6 +181,21 @@ public interface ManagedLedger {
*/
void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx);

/**
* Append a new entry asynchronously.
*
* @see #addEntry(byte[])
* @param buffer
* buffer with the data entry
* @param numberOfMessages
* numberOfMessages for data entry
* @param callback
* callback object
* @param ctx
* opaque context
*/
void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback callback, Object ctx);

/**
* Open a ManagedCursor in this ManagedLedger.
*
Expand Down Expand Up @@ -520,4 +584,14 @@ void asyncSetProperties(Map<String, String> properties, final AsyncCallbacks.Upd
* Roll current ledger if it is full
*/
void rollCurrentLedgerIfFull();

/**
* Find position by sequenceId.
* */
CompletableFuture<Position> asyncFindPosition(com.google.common.base.Predicate<Entry> predicate);

/**
* Get the ManagedLedgerInterceptor for ManagedLedger.
* */
ManagedLedgerInterceptor getManagedLedgerInterceptor();
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;

import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor;
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;

/**
Expand Down Expand Up @@ -75,6 +76,7 @@ public class ManagedLedgerConfig {
private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE;
private int newEntriesCheckDelayInMillis = 10;
private Clock clock = Clock.systemUTC();
private ManagedLedgerInterceptor managedLedgerInterceptor;

public boolean isCreateIfMissing() {
return createIfMissing;
Expand Down Expand Up @@ -637,4 +639,11 @@ public void setNewEntriesCheckDelayInMillis(int newEntriesCheckDelayInMillis) {
this.newEntriesCheckDelayInMillis = newEntriesCheckDelayInMillis;
}

public ManagedLedgerInterceptor getManagedLedgerInterceptor() {
return managedLedgerInterceptor;
}

public void setManagedLedgerInterceptor(ManagedLedgerInterceptor managedLedgerInterceptor) {
this.managedLedgerInterceptor = managedLedgerInterceptor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ public CursorNotFoundException(String msg) {
}
}

public static class ManagedLedgerInterceptException extends ManagedLedgerException {
public ManagedLedgerInterceptException(String msg) {
super(msg);
}
}

@Override
public synchronized Throwable fillInStackTrace() {
// Disable stack traces to be filled in
Expand Down
Loading

0 comments on commit d85a5e2

Please sign in to comment.