From d85a5e23c408eb3243c434909588324455aa94a5 Mon Sep 17 00:00:00 2001 From: Aloys Date: Thu, 24 Dec 2020 20:52:40 +0800 Subject: [PATCH] [Feature] Introduce continuous offset for pulsar (#9039) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #9038 ### Motivation As described in [PIP-70](https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-broker-entry-metadata). One of the use case for Broker entry metadata is providing continuous message sequence-Id for messages in one topic-partition which is useful for Protocol Hanlder like KOP. This PR enable Pulsar to support continuous offset for message based on Broker entry metadata. ### Modifications Introduce a new field for broker entry metadta named `offset`; Introduce a new interceptor type `ManagedLedgerInterceptor` which intercept entry in `ManagedLedger`; Each partition will be assigned a `ManagedLedgerInterceptor` when `ManagedLedger` created; Each Entry will be intercept for adding a monotone increasing offset in Broker entry metadata and the offet is added by batchSize of entry; Support find position by a given offset. --- .../bookkeeper/mledger/ManagedLedger.java | 74 ++++++ .../mledger/ManagedLedgerConfig.java | 9 + .../mledger/ManagedLedgerException.java | 6 + .../mledger/impl/ManagedLedgerImpl.java | 150 +++++++++++- .../bookkeeper/mledger/impl/OpAddEntry.java | 43 +++- .../bookkeeper/mledger/impl/OpFindNewest.java | 28 ++- .../interceptor/ManagedLedgerInterceptor.java | 60 +++++ .../ManagedLedgerInterceptorImpl.java | 113 +++++++++ .../pulsar/broker/service/BrokerService.java | 17 ++ .../pulsar/broker/service/Producer.java | 5 + .../apache/pulsar/broker/service/Topic.java | 4 + .../service/persistent/PersistentTopic.java | 26 +-- .../MangedLedgerInterceptorImplTest.java | 220 ++++++++++++++++++ .../service/PersistentMessageFinderTest.java | 9 +- .../pulsar/common/api/proto/PulsarApi.java | 57 +++++ ...endBrokerTimestampMetadataInterceptor.java | 8 + .../AppendIndexMetadataInterceptor.java | 54 +++++ .../BrokerEntryMetadataInterceptor.java | 2 + .../pulsar/common/protocol/Commands.java | 32 +++ pulsar-common/src/main/proto/PulsarApi.proto | 1 + .../common/protocol/CommandUtilsTests.java | 16 +- 21 files changed, 897 insertions(+), 37 deletions(-) create mode 100644 managed-ledger/src/main/java/org/apache/bookkeeper/mledger/interceptor/ManagedLedgerInterceptor.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index 4b06b5ee9299a..6274f977051ee 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -32,6 +32,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback; +import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; /** @@ -74,6 +75,18 @@ public interface ManagedLedger { */ Position addEntry(byte[] data) throws InterruptedException, ManagedLedgerException; + /** + * Append a new entry to the end of a managed ledger. + * + * @param data + * data entry to be persisted + * @param numberOfMessages + * numberOfMessages of entry + * @return the Position at which the entry has been inserted + * @throws ManagedLedgerException + */ + Position addEntry(byte[] data, int numberOfMessages) throws InterruptedException, ManagedLedgerException; + /** * Append a new entry asynchronously. * @@ -102,6 +115,22 @@ public interface ManagedLedger { */ Position addEntry(byte[] data, int offset, int length) throws InterruptedException, ManagedLedgerException; + /** + * Append a new entry to the end of a managed ledger. + * + * @param data + * data entry to be persisted + * @param numberOfMessages + * numberOfMessages of entry + * @param offset + * offset in the data array + * @param length + * number of bytes + * @return the Position at which the entry has been inserted + * @throws ManagedLedgerException + */ + Position addEntry(byte[] data, int numberOfMessages, int offset, int length) throws InterruptedException, ManagedLedgerException; + /** * Append a new entry asynchronously. * @@ -119,6 +148,26 @@ public interface ManagedLedger { */ void asyncAddEntry(byte[] data, int offset, int length, AddEntryCallback callback, Object ctx); + /** + * Append a new entry asynchronously. + * + * @see #addEntry(byte[]) + * @param data + * data entry to be persisted + * @param numberOfMessages + * numberOfMessages of entry + * @param offset + * offset in the data array + * @param length + * number of bytes + * @param callback + * callback object + * @param ctx + * opaque context + */ + void asyncAddEntry(byte[] data, int numberOfMessages, int offset, int length, AddEntryCallback callback, Object ctx); + + /** * Append a new entry asynchronously. * @@ -132,6 +181,21 @@ public interface ManagedLedger { */ void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx); + /** + * Append a new entry asynchronously. + * + * @see #addEntry(byte[]) + * @param buffer + * buffer with the data entry + * @param numberOfMessages + * numberOfMessages for data entry + * @param callback + * callback object + * @param ctx + * opaque context + */ + void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback callback, Object ctx); + /** * Open a ManagedCursor in this ManagedLedger. * @@ -520,4 +584,14 @@ void asyncSetProperties(Map properties, final AsyncCallbacks.Upd * Roll current ledger if it is full */ void rollCurrentLedgerIfFull(); + + /** + * Find position by sequenceId. + * */ + CompletableFuture asyncFindPosition(com.google.common.base.Predicate predicate); + + /** + * Get the ManagedLedgerInterceptor for ManagedLedger. + * */ + ManagedLedgerInterceptor getManagedLedgerInterceptor(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index b1f25122abbe5..7f982b15e88d0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -33,6 +33,7 @@ import org.apache.bookkeeper.common.annotation.InterfaceStability; import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; +import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor; import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet; /** @@ -75,6 +76,7 @@ public class ManagedLedgerConfig { private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE; private int newEntriesCheckDelayInMillis = 10; private Clock clock = Clock.systemUTC(); + private ManagedLedgerInterceptor managedLedgerInterceptor; public boolean isCreateIfMissing() { return createIfMissing; @@ -637,4 +639,11 @@ public void setNewEntriesCheckDelayInMillis(int newEntriesCheckDelayInMillis) { this.newEntriesCheckDelayInMillis = newEntriesCheckDelayInMillis; } + public ManagedLedgerInterceptor getManagedLedgerInterceptor() { + return managedLedgerInterceptor; + } + + public void setManagedLedgerInterceptor(ManagedLedgerInterceptor managedLedgerInterceptor) { + this.managedLedgerInterceptor = managedLedgerInterceptor; + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java index 14202cb824f24..0f56b993912c3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java @@ -157,6 +157,12 @@ public CursorNotFoundException(String msg) { } } + public static class ManagedLedgerInterceptException extends ManagedLedgerException { + public ManagedLedgerInterceptException(String msg) { + super(msg); + } + } + @Override public synchronized Throwable fillInStackTrace() { // Disable stack traces to be filled in diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index ba50d2ba4270c..562a3318f90b3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -64,6 +64,8 @@ import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; + +import io.netty.util.ReferenceCountUtil; import org.apache.bookkeeper.client.AsyncCallback; import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; @@ -77,6 +79,7 @@ import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.common.util.Retries; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; @@ -98,6 +101,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException; +import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerInterceptException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerTerminatedException; import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; @@ -108,6 +112,7 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; +import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor; import org.apache.bookkeeper.mledger.offload.OffloadUtils; import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; @@ -195,6 +200,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { volatile PositionImpl lastConfirmedEntry; + private ManagedLedgerInterceptor managedLedgerInterceptor; + protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3; protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60; @@ -283,6 +290,9 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper this.maximumRolloverTimeMs = (long) (config.getMaximumRolloverTimeMs() * (1 + random.nextDouble() * 5 / 100.0)); this.mlOwnershipChecker = mlOwnershipChecker; this.propertiesMap = Maps.newHashMap(); + if (config.getManagedLedgerInterceptor() != null) { + this.managedLedgerInterceptor = config.getManagedLedgerInterceptor(); + } } synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callback, final Object ctx) { @@ -310,6 +320,9 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { propertiesMap.put(property.getKey(), property.getValue()); } } + if (managedLedgerInterceptor != null) { + managedLedgerInterceptor.onManagedLedgerPropertiesInitialize(propertiesMap); + } // Last ledger stat may be zeroed, we must update it if (ledgers.size() > 0) { @@ -325,6 +338,9 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { .setEntries(lh.getLastAddConfirmed() + 1).setSize(lh.getLength()) .setTimestamp(clock.millis()).build(); ledgers.put(id, info); + if (managedLedgerInterceptor != null) { + managedLedgerInterceptor.onManagedLedgerLastLedgerInitialize(name, lh); + } initializeBookKeeper(callback); } else if (isNoSuchLedgerExistsException(rc)) { log.warn("[{}] Ledger not found: {}", name, ledgers.lastKey()); @@ -550,6 +566,11 @@ public Position addEntry(byte[] data) throws InterruptedException, ManagedLedger return addEntry(data, 0, data.length); } + @Override + public Position addEntry(byte[] data, int numberOfMessages) throws InterruptedException, ManagedLedgerException { + return addEntry(data, numberOfMessages, 0, data.length); + } + @Override public Position addEntry(byte[] data, int offset, int length) throws InterruptedException, ManagedLedgerException { final CountDownLatch counter = new CountDownLatch(1); @@ -585,6 +606,41 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { return result.position; } + @Override + public Position addEntry(byte[] data, int numberOfMessages, int offset, int length) throws InterruptedException, ManagedLedgerException { + final CountDownLatch counter = new CountDownLatch(1); + // Result list will contain the status exception and the resulting + // position + class Result { + ManagedLedgerException status = null; + Position position = null; + } + final Result result = new Result(); + + asyncAddEntry(data, numberOfMessages, offset, length, new AddEntryCallback() { + @Override + public void addComplete(Position position, Object ctx) { + result.position = position; + counter.countDown(); + } + + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + result.status = exception; + counter.countDown(); + } + }, null); + + counter.await(); + + if (result.status != null) { + log.error("[{}] Error adding entry", name, result.status); + throw result.status; + } + + return result.position; + } + @Override public void asyncAddEntry(final byte[] data, final AddEntryCallback callback, final Object ctx) { asyncAddEntry(data, 0, data.length, callback, ctx); @@ -597,6 +653,13 @@ public void asyncAddEntry(final byte[] data, int offset, int length, final AddEn asyncAddEntry(buffer, callback, ctx); } + @Override + public void asyncAddEntry(final byte[] data, int numberOfMessages, int offset, int length, final AddEntryCallback callback, + final Object ctx) { + ByteBuf buffer = Unpooled.wrappedBuffer(data, offset, length); + asyncAddEntry(buffer, numberOfMessages, callback, ctx); + } + @Override public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx) { if (log.isDebugEnabled()) { @@ -609,6 +672,18 @@ public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx) executor.executeOrdered(name, safeRun(() -> internalAsyncAddEntry(addOperation))); } + @Override + public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback callback, Object ctx) { + if (log.isDebugEnabled()) { + log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state); + } + + OpAddEntry addOperation = OpAddEntry.create(this, buffer, numberOfMessages, callback, ctx); + + // Jump to specific thread to avoid contention from writers writing from different threads + executor.executeOrdered(name, safeRun(() -> internalAsyncAddEntry(addOperation))); + } + private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { pendingAddEntries.add(addOperation); final State state = STATE_UPDATER.get(this); @@ -672,8 +747,27 @@ private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { addOperation.setCloseWhenDone(true); STATE_UPDATER.set(this, State.ClosingLedger); } + // interceptor entry before add to bookie + if (beforeAddEntry(addOperation)) { + addOperation.initiate(); + } + } + } - addOperation.initiate(); + private boolean beforeAddEntry(OpAddEntry addOperation) { + // if no interceptor, just return true to make sure addOperation will be initiate() + if (managedLedgerInterceptor == null) { + return true; + } + try { + managedLedgerInterceptor.beforeAddEntry(addOperation, addOperation.getNumberOfMessages()); + return true; + } catch (Exception e) { + addOperation.failed( + new ManagedLedgerInterceptException("Interceptor managed ledger before add to bookie failed.")); + ReferenceCountUtil.release(addOperation.data); + log.error("[{}] Failed to intercept adding an entry to bookie.", name, e); + return false; } } @@ -1357,10 +1451,12 @@ public synchronized void updateLedgersIdsComplete(Stat stat) { // If op is used by another ledger handle, we need to close it and create a new one if (existsOp.ledger != null) { existsOp.close(); - existsOp = OpAddEntry.create(existsOp.ml, existsOp.data, existsOp.callback, existsOp.ctx); + existsOp = OpAddEntry.create(existsOp.ml, existsOp.data, existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx); } existsOp.setLedger(currentLedger); - pendingAddEntries.add(existsOp); + if (beforeAddEntry(existsOp)) { + pendingAddEntries.add(existsOp); + } } } while (existsOp != null && --pendingSize > 0); @@ -1470,6 +1566,51 @@ public void closeComplete(int rc, LedgerHandle lh, Object o) { } } + @Override + public CompletableFuture asyncFindPosition(com.google.common.base.Predicate predicate) { + + CompletableFuture future = new CompletableFuture(); + Long firstLedgerId = ledgers.firstKey(); + final PositionImpl startPosition = firstLedgerId == null ? null : new PositionImpl(firstLedgerId, 0); + if (startPosition == null) { + future.complete(null); + return future; + } + AsyncCallbacks.FindEntryCallback findEntryCallback = new AsyncCallbacks.FindEntryCallback() { + @Override + public void findEntryComplete(Position position, Object ctx) { + final Position finalPosition; + if (position == null) { + finalPosition = startPosition; + if (finalPosition == null) { + log.warn("[{}] Unable to find position for predicate {}.", name, predicate); + future.complete(null); + return; + } + log.info("[{}] Unable to find position for predicate {}. Use the first position {} instead.", name, predicate, startPosition); + } else { + finalPosition = getNextValidPosition((PositionImpl) position); + } + future.complete((PositionImpl) finalPosition); + } + + @Override + public void findEntryFailed(ManagedLedgerException exception, Optional failedReadPosition, Object ctx) { + log.warn("[{}] Unable to find position for predicate {}.", name, predicate); + future.complete(null); + } + }; + long max = getNumberOfEntries() - 1; + OpFindNewest op = new OpFindNewest(this, startPosition, predicate, max, findEntryCallback, null); + op.find(); + return future; + } + + @Override + public ManagedLedgerInterceptor getManagedLedgerInterceptor() { + return managedLedgerInterceptor; + } + void clearPendingAddEntries(ManagedLedgerException e) { while (!pendingAddEntries.isEmpty()) { OpAddEntry op = pendingAddEntries.poll(); @@ -3085,6 +3226,9 @@ private ManagedLedgerInfo getManagedLedgerInfo() { mlInfo.setTerminatedPosition(NestedPositionInfo.newBuilder().setLedgerId(lastConfirmedEntry.getLedgerId()) .setEntryId(lastConfirmedEntry.getEntryId())); } + if (managedLedgerInterceptor != null) { + managedLedgerInterceptor.onUpdateManagedLedgerInfo(propertiesMap); + } for (Map.Entry property : propertiesMap.entrySet()) { mlInfo.addProperties(MLDataFormats.KeyValue.newBuilder() .setKey(property.getKey()).setValue(property.getValue())); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index 08d188fc66dac..fa5228c5f73f3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -44,10 +44,11 @@ * Handles the life-cycle of an addEntry() operation. * */ -class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback { +public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback { protected ManagedLedgerImpl ml; LedgerHandle ledger; private long entryId; + private int numberOfMessages; @SuppressWarnings("unused") private static final AtomicReferenceFieldUpdater callbackUpdater = @@ -95,6 +96,27 @@ public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, AddEntryCall return op; } + public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, int numberOfMessages, AddEntryCallback callback, Object ctx) { + OpAddEntry op = RECYCLER.get(); + op.ml = ml; + op.ledger = null; + op.numberOfMessages = numberOfMessages; + op.data = data.retain(); + op.dataLength = data.readableBytes(); + op.callback = callback; + op.ctx = ctx; + op.addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml); + op.closeWhenDone = false; + op.entryId = -1; + op.startTime = System.nanoTime(); + op.state = State.OPEN; + ml.mbean.addAddEntrySample(op.dataLength); + if (log.isDebugEnabled()) { + log.debug("Created new OpAddEntry {}", op); + } + return op; + } + public void setLedger(LedgerHandle ledger) { this.ledger = ledger; } @@ -272,7 +294,23 @@ void close() { public State getState() { return state; } - + + public ByteBuf getData() { + return data; + } + + public int getNumberOfMessages() { + return numberOfMessages; + } + + public void setNumberOfMessages(int numberOfMessages) { + this.numberOfMessages = numberOfMessages; + } + + public void setData(ByteBuf data) { + this.data = data; + } + private final Handle recyclerHandle; private OpAddEntry(Handle recyclerHandle) { @@ -290,6 +328,7 @@ public void recycle() { ml = null; ledger = null; data = null; + numberOfMessages = 0; dataLength = -1; callback = null; ctx = null; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java index 861d2472e05d6..cbecedd1640c2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java @@ -31,6 +31,7 @@ class OpFindNewest implements ReadEntryCallback { private final ManagedCursorImpl cursor; + private final ManagedLedgerImpl ledger; private final PositionImpl startPosition; private final FindEntryCallback callback; private final Predicate condition; @@ -49,6 +50,23 @@ enum State { public OpFindNewest(ManagedCursorImpl cursor, PositionImpl startPosition, Predicate condition, long numberOfEntries, FindEntryCallback callback, Object ctx) { this.cursor = cursor; + this.ledger = cursor.ledger; + this.startPosition = startPosition; + this.callback = callback; + this.condition = condition; + this.ctx = ctx; + + this.min = 0; + this.max = numberOfEntries; + + this.searchPosition = startPosition; + this.state = State.checkFirst; + } + + public OpFindNewest(ManagedLedgerImpl ledger, PositionImpl startPosition, Predicate condition, + long numberOfEntries, FindEntryCallback callback, Object ctx) { + this.cursor = null; + this.ledger = ledger; this.startPosition = startPosition; this.callback = callback; this.condition = condition; @@ -77,7 +95,7 @@ public void readEntryComplete(Entry entry, Object ctx) { // check last entry state = State.checkLast; - searchPosition = cursor.ledger.getPositionAfterN(searchPosition, max, PositionBound.startExcluded); + searchPosition = ledger.getPositionAfterN(searchPosition, max, PositionBound.startExcluded); find(); } break; @@ -88,7 +106,7 @@ public void readEntryComplete(Entry entry, Object ctx) { } else { // start binary search state = State.searching; - searchPosition = cursor.ledger.getPositionAfterN(startPosition, mid(), PositionBound.startExcluded); + searchPosition = ledger.getPositionAfterN(startPosition, mid(), PositionBound.startExcluded); find(); } break; @@ -106,7 +124,7 @@ public void readEntryComplete(Entry entry, Object ctx) { callback.findEntryComplete(lastMatchedPosition, OpFindNewest.this.ctx); return; } - searchPosition = cursor.ledger.getPositionAfterN(startPosition, mid(), PositionBound.startExcluded); + searchPosition = ledger.getPositionAfterN(startPosition, mid(), PositionBound.startExcluded); find(); } } @@ -117,8 +135,8 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { } public void find() { - if (cursor.hasMoreEntries(searchPosition)) { - cursor.ledger.asyncReadEntry(searchPosition, this, null); + if (cursor != null ? cursor.hasMoreEntries(searchPosition) : ledger.hasMoreEntries(searchPosition)) { + ledger.asyncReadEntry(searchPosition, this, null); } else { callback.findEntryComplete(lastMatchedPosition, OpFindNewest.this.ctx); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/interceptor/ManagedLedgerInterceptor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/interceptor/ManagedLedgerInterceptor.java new file mode 100644 index 0000000000000..f5d9f12d87291 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/interceptor/ManagedLedgerInterceptor.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.interceptor; + +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.common.annotation.InterfaceAudience; +import org.apache.bookkeeper.common.annotation.InterfaceStability; +import org.apache.bookkeeper.mledger.impl.OpAddEntry; + +import java.util.Map; + +/** + * Interceptor for ManagedLedger. + * */ +@InterfaceAudience.LimitedPrivate +@InterfaceStability.Stable +public interface ManagedLedgerInterceptor { + + /** + * Intercept an OpAddEntry and return an OpAddEntry. + * @param op an OpAddEntry to be intercepted. + * @param batchSize + * @return an OpAddEntry. + */ + OpAddEntry beforeAddEntry(OpAddEntry op, int batchSize); + + /** + * Intercept when ManagedLedger is initialized. + * @param propertiesMap map of properties. + */ + void onManagedLedgerPropertiesInitialize(Map propertiesMap); + + /** + * Intercept when ManagedLedger is initialized. + * @param name name of ManagedLedger + * @param ledgerHandle a LedgerHandle. + */ + void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle ledgerHandle); + + /** + * @param propertiesMap map of properties. + */ + void onUpdateManagedLedgerInfo(Map propertiesMap); +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java new file mode 100644 index 0000000000000..0e59d2f3fd7c0 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.intercept; + +import java.util.Map; +import java.util.Set; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.mledger.impl.OpAddEntry; +import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor; +import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor; +import org.apache.pulsar.common.protocol.Commands; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ManagedLedgerInterceptorImpl implements ManagedLedgerInterceptor { + private static final Logger log = LoggerFactory.getLogger(ManagedLedgerInterceptorImpl.class); + private static final String INDEX = "index"; + + + private final Set brokerEntryMetadataInterceptors; + + + public ManagedLedgerInterceptorImpl(Set brokerEntryMetadataInterceptors) { + this.brokerEntryMetadataInterceptors = brokerEntryMetadataInterceptors; + } + + public long getIndex() { + long index = -1; + for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) { + if (interceptor instanceof AppendIndexMetadataInterceptor) { + index = ((AppendIndexMetadataInterceptor) interceptor).getIndex(); + } + } + return index; + } + + @Override + public OpAddEntry beforeAddEntry(OpAddEntry op, int batchSize) { + if (op == null || batchSize <= 0) { + return op; + } + op.setData(Commands.addBrokerEntryMetadata(op.getData(), brokerEntryMetadataInterceptors, batchSize)); + return op; + } + + @Override + public void onManagedLedgerPropertiesInitialize(Map propertiesMap) { + if (propertiesMap == null || propertiesMap.size() == 0) { + return; + } + + if (propertiesMap.containsKey(INDEX)) { + for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) { + if (interceptor instanceof AppendIndexMetadataInterceptor) { + ((AppendIndexMetadataInterceptor) interceptor) + .recoveryIndexGenerator(Long.parseLong(propertiesMap.get(INDEX))); + } + } + } + } + + @Override + public void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) { + try { + for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) { + if (interceptor instanceof AppendIndexMetadataInterceptor) { + LedgerEntries ledgerEntries = + lh.read(lh.getLastAddConfirmed() - 1, lh.getLastAddConfirmed()); + for (LedgerEntry entry : ledgerEntries) { + PulsarApi.BrokerEntryMetadata brokerEntryMetadata = + Commands.parseBrokerEntryMetadataIfExist(entry.getEntryBuffer()); + if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) { + ((AppendIndexMetadataInterceptor) interceptor) + .recoveryIndexGenerator(brokerEntryMetadata.getIndex()); + } + } + + } + } + } catch (org.apache.bookkeeper.client.api.BKException | InterruptedException e) { + log.error("[{}] Read last entry error.", name, e); + } + } + + @Override + public void onUpdateManagedLedgerInfo(Map propertiesMap) { + for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) { + if (interceptor instanceof AppendIndexMetadataInterceptor) { + propertiesMap.put(INDEX, String.valueOf(((AppendIndexMetadataInterceptor) interceptor).getIndex())); + } + } + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 6cfdf38b75ca3..792c67652dc80 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -83,6 +83,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor; import org.apache.bookkeeper.mledger.util.Futures; import org.apache.bookkeeper.util.ZkUtils; import org.apache.commons.lang3.StringUtils; @@ -98,6 +99,7 @@ import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory; import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader; import org.apache.pulsar.broker.intercept.BrokerInterceptor; +import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl; import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; @@ -126,6 +128,7 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.configuration.FieldContext; +import org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor; import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor; import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -1092,6 +1095,20 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, } getManagedLedgerConfig(topicName).thenAccept(managedLedgerConfig -> { + if (isBrokerEntryMetadataEnabled()) { + // init managedLedger interceptor + for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) { + if (interceptor instanceof AppendIndexMetadataInterceptor) { + // add individual AppendOffsetMetadataInterceptor for each topic + brokerEntryMetadataInterceptors.remove(interceptor); + brokerEntryMetadataInterceptors.add(new AppendIndexMetadataInterceptor()); + } + } + ManagedLedgerInterceptor mlInterceptor = + new ManagedLedgerInterceptorImpl(brokerEntryMetadataInterceptors); + managedLedgerConfig.setManagedLedgerInterceptor(mlInterceptor); + } + managedLedgerConfig.setCreateIfMissing(createIfMissing); // Once we have the configuration, we can proceed with the async open operation diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index c69f9bade52a5..2bca2320a552d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -463,6 +463,11 @@ static MessagePublishContext get(Producer producer, long lowestSequenceId, long return callback; } + @Override + public long getNumberOfMessages() { + return batchSize; + } + private final Handle recyclerHandle; private MessagePublishContext(Handle recyclerHandle) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index af88ce8407e07..c940195d9b8dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -90,6 +90,10 @@ default void setOriginalHighestSequenceId(long originalHighestSequenceId) { default long getOriginalHighestSequenceId() { return -1L; } + + default long getNumberOfMessages() { + return 1L; + } } void publishMessage(ByteBuf headersAndPayload, PublishContext callback); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 676f98cb8b41d..49e9042d960ac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -357,10 +357,7 @@ public void publishMessage(ByteBuf headersAndPayload, PublishContext publishCont messageDeduplication.isDuplicate(publishContext, headersAndPayload); switch (status) { case NotDup: - // intercept headersAndPayload and add entry metadata - if (appendBrokerEntryMetadata(headersAndPayload, publishContext)) { - ledger.asyncAddEntry(headersAndPayload, this, publishContext); - } + asyncAddEntry(headersAndPayload, publishContext); break; case Dup: // Immediately acknowledge duplicated message @@ -374,22 +371,13 @@ public void publishMessage(ByteBuf headersAndPayload, PublishContext publishCont } } - private boolean appendBrokerEntryMetadata(ByteBuf headersAndPayload, PublishContext publishContext) { - // just return true if BrokerEntryMetadata is not enabled - if (!brokerService.isBrokerEntryMetadataEnabled()) { - return true; - } - - try { - headersAndPayload = Commands.addBrokerEntryMetadata(headersAndPayload, - brokerService.getBrokerEntryMetadataInterceptors()); - } catch (Exception e) { - decrementPendingWriteOpsAndCheck(); - publishContext.completed(new BrokerServiceException.AddEntryMetadataException(e), -1, -1); - log.error("[{}] Failed to add broker entry metadata.", topic, e); - return false; + private void asyncAddEntry(ByteBuf headersAndPayload, PublishContext publishContext) { + if (brokerService.isBrokerEntryMetadataEnabled()) { + ledger.asyncAddEntry(headersAndPayload, + (int) publishContext.getNumberOfMessages(), this, publishContext); + } else { + ledger.asyncAddEntry(headersAndPayload, this, publishContext); } - return true; } public void asyncReadEntry(PositionImpl position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java new file mode 100644 index 0000000000000..2c86b9814a860 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.intercept; + +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor; +import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor; +import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils; +import org.apache.pulsar.common.protocol.Commands; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.Test; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static com.sun.org.apache.xml.internal.serialize.OutputFormat.Defaults.Encoding; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; + +public class MangedLedgerInterceptorImplTest extends MockedBookKeeperTestCase { + private static final Logger log = LoggerFactory.getLogger(MangedLedgerInterceptorImplTest.class); + + + @Test + public void testAddBrokerEntryMetadata() throws Exception { + final int MOCK_BATCH_SIZE = 2; + int numberOfEntries = 10; + final String ledgerAndCursorName = "topicEntryMetadataSequenceId"; + + ManagedLedgerInterceptor interceptor = new ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors()); + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(2); + config.setManagedLedgerInterceptor(interceptor); + + ManagedLedger ledger = factory.open(ledgerAndCursorName, config); + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); + + for ( int i = 0 ; i < numberOfEntries; i ++) { + ledger.addEntry(("message" + i).getBytes(), MOCK_BATCH_SIZE); + } + + + assertEquals(19, ((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getIndex()); + List entryList = cursor.readEntries(numberOfEntries); + for (int i = 0 ; i < numberOfEntries; i ++) { + PulsarApi.BrokerEntryMetadata metadata = + Commands.parseBrokerEntryMetadataIfExist(entryList.get(i).getDataBuffer()); + assertNotNull(metadata); + assertEquals(metadata.getIndex(), (i + 1) * MOCK_BATCH_SIZE - 1); + } + + cursor.close();; + ledger.close(); + factory.shutdown(); + } + + + @Test(timeOut = 20000) + public void testRecoveryIndex() throws Exception { + final int MOCK_BATCH_SIZE = 2; + ManagedLedgerInterceptor interceptor = new ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors()); + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setManagedLedgerInterceptor(interceptor); + ManagedLedger ledger = factory.open("my_recovery_index_test_ledger", config); + + ledger.addEntry("dummy-entry-1".getBytes(Encoding), MOCK_BATCH_SIZE); + + ManagedCursor cursor = ledger.openCursor("c1"); + + ledger.addEntry("dummy-entry-2".getBytes(Encoding), MOCK_BATCH_SIZE); + + assertEquals(((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getIndex(), MOCK_BATCH_SIZE * 2 - 1); + + ledger.close(); + + log.info("Closing ledger and reopening"); + + // / Reopen the same managed-ledger + ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle()); + ledger = factory2.open("my_recovery_index_test_ledger", config); + + cursor = ledger.openCursor("c1"); + + assertEquals(ledger.getNumberOfEntries(), 2); + assertEquals(((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getIndex(), MOCK_BATCH_SIZE * 2 - 1); + + + List entries = cursor.readEntries(100); + assertEquals(entries.size(), 1); + entries.forEach(e -> e.release()); + + cursor.close(); + ledger.close(); + factory2.shutdown(); + } + + @Test + public void testFindPositionByIndex() throws Exception { + final int MOCK_BATCH_SIZE = 2; + final int maxEntriesPerLedger = 5; + int maxSequenceIdPerLedger = MOCK_BATCH_SIZE * maxEntriesPerLedger; + ManagedLedgerInterceptor interceptor = new ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors()); + + + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setManagedLedgerInterceptor(interceptor); + managedLedgerConfig.setMaxEntriesPerLedger(5); + + ManagedLedger ledger = factory.open("my_ml_broker_entry_metadata_test_ledger", managedLedgerConfig); + ManagedCursor cursor = ledger.openCursor("c1"); + + long firstLedgerId = -1; + for (int i = 0; i < maxEntriesPerLedger; i++) { + firstLedgerId = ((PositionImpl) ledger.addEntry("dummy-entry".getBytes(Encoding), MOCK_BATCH_SIZE)).getLedgerId(); + } + + assertEquals(((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getIndex(), 9); + + + PositionImpl position = null; + for (int index = 0; index <= ((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getIndex(); index ++) { + position = (PositionImpl) ledger.asyncFindPosition(new IndexSearchPredicate(index)).get(); + assertEquals(position.getEntryId(), (index % maxSequenceIdPerLedger) / MOCK_BATCH_SIZE); + } + + // roll over ledger + long secondLedgerId = -1; + for (int i = 0; i < maxEntriesPerLedger; i++) { + secondLedgerId = ((PositionImpl) ledger.addEntry("dummy-entry".getBytes(Encoding), MOCK_BATCH_SIZE)).getLedgerId(); + } + assertEquals(((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getIndex(), 19); + assertNotEquals(firstLedgerId, secondLedgerId); + + for (int index = 0; index <= ((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getIndex(); index ++) { + position = (PositionImpl) ledger.asyncFindPosition(new IndexSearchPredicate(index)).get(); + assertEquals(position.getEntryId(), (index % maxSequenceIdPerLedger) / MOCK_BATCH_SIZE); + } + + // reopen ledger + ledger.close(); + // / Reopen the same managed-ledger + ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle()); + ledger = factory2.open("my_ml_broker_entry_metadata_test_ledger", managedLedgerConfig); + + long thirdLedgerId = -1; + for (int i = 0; i < maxEntriesPerLedger; i++) { + thirdLedgerId = ((PositionImpl) ledger.addEntry("dummy-entry".getBytes(Encoding), MOCK_BATCH_SIZE)).getLedgerId(); + } + assertEquals(((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getIndex(), 29); + assertNotEquals(secondLedgerId, thirdLedgerId); + + for (int index = 0; index <= ((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getIndex(); index ++) { + position = (PositionImpl) ledger.asyncFindPosition(new IndexSearchPredicate(index)).get(); + assertEquals(position.getEntryId(), (index % maxSequenceIdPerLedger) / MOCK_BATCH_SIZE); + } + cursor.close(); + ledger.close(); + factory2.shutdown(); + } + + public static Set getBrokerEntryMetadataInterceptors() { + Set interceptorNames = new HashSet<>(); + interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor"); + interceptorNames.add("org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor"); + return BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(interceptorNames, + Thread.currentThread().getContextClassLoader()); + } + + class IndexSearchPredicate implements com.google.common.base.Predicate { + + long indexToSearch = -1; + public IndexSearchPredicate(long indexToSearch) { + this.indexToSearch = indexToSearch; + } + + @Override + public boolean apply(@Nullable Entry entry) { + try { + PulsarApi.BrokerEntryMetadata brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer()); + return brokerEntryMetadata.getIndex() < indexToSearch; + } catch (Exception e) { + log.error("Error deserialize message for message position find", e); + } finally { + entry.release(); + } + return false; + } + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 174080212cfc9..c0c78d2a92857 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -108,7 +108,7 @@ public static ByteBuf createMessageByteBufWrittenToLedger(String msg) throws Exc public static byte[] appendBrokerTimestamp(ByteBuf headerAndPayloads) throws Exception { ByteBuf msgWithEntryMeta = - Commands.addBrokerEntryMetadata(headerAndPayloads, getBrokerEntryMetadataInterceptors()); + Commands.addBrokerEntryMetadata(headerAndPayloads, getBrokerEntryMetadataInterceptors(), 1); byte[] byteMessage = msgWithEntryMeta.nioBuffer().array(); msgWithEntryMeta.release(); return byteMessage; @@ -321,9 +321,10 @@ void testPersistentMessageFinderWithBrokerTimestampForMessage() throws Exception } public static Set getBrokerEntryMetadataInterceptors() { - - return BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors( - Sets.newHashSet("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor"), + Set interceptorNames = new HashSet<>(); + interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor"); + interceptorNames.add("org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor"); + return BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(interceptorNames, Thread.currentThread().getContextClassLoader()); } /** diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index aa9f2c1d4e6a0..f19ba6339f4b7 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -7074,6 +7074,10 @@ public interface BrokerEntryMetadataOrBuilder // optional uint64 broker_timestamp = 1; boolean hasBrokerTimestamp(); long getBrokerTimestamp(); + + // optional uint64 index = 2; + boolean hasIndex(); + long getIndex(); } public static final class BrokerEntryMetadata extends org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite @@ -7120,8 +7124,19 @@ public long getBrokerTimestamp() { return brokerTimestamp_; } + // optional uint64 index = 2; + public static final int INDEX_FIELD_NUMBER = 2; + private long index_; + public boolean hasIndex() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public long getIndex() { + return index_; + } + private void initFields() { brokerTimestamp_ = 0L; + index_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -7143,6 +7158,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr if (((bitField0_ & 0x00000001) == 0x00000001)) { output.writeUInt64(1, brokerTimestamp_); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeUInt64(2, index_); + } } private int memoizedSerializedSize = -1; @@ -7155,6 +7173,10 @@ public int getSerializedSize() { size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream .computeUInt64Size(1, brokerTimestamp_); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeUInt64Size(2, index_); + } memoizedSerializedSize = size; return size; } @@ -7270,6 +7292,8 @@ public Builder clear() { super.clear(); brokerTimestamp_ = 0L; bitField0_ = (bitField0_ & ~0x00000001); + index_ = 0L; + bitField0_ = (bitField0_ & ~0x00000002); return this; } @@ -7307,6 +7331,10 @@ public org.apache.pulsar.common.api.proto.PulsarApi.BrokerEntryMetadata buildPar to_bitField0_ |= 0x00000001; } result.brokerTimestamp_ = brokerTimestamp_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.index_ = index_; result.bitField0_ = to_bitField0_; return result; } @@ -7316,6 +7344,9 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.BrokerEntr if (other.hasBrokerTimestamp()) { setBrokerTimestamp(other.getBrokerTimestamp()); } + if (other.hasIndex()) { + setIndex(other.getIndex()); + } return this; } @@ -7350,6 +7381,11 @@ public Builder mergeFrom( brokerTimestamp_ = input.readUInt64(); break; } + case 16: { + bitField0_ |= 0x00000002; + index_ = input.readUInt64(); + break; + } } } } @@ -7377,6 +7413,27 @@ public Builder clearBrokerTimestamp() { return this; } + // optional uint64 index = 2; + private long index_ ; + public boolean hasIndex() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public long getIndex() { + return index_; + } + public Builder setIndex(long value) { + bitField0_ |= 0x00000002; + index_ = value; + + return this; + } + public Builder clearIndex() { + bitField0_ = (bitField0_ & ~0x00000002); + index_ = 0L; + + return this; + } + // @@protoc_insertion_point(builder_scope:pulsar.proto.BrokerEntryMetadata) } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendBrokerTimestampMetadataInterceptor.java b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendBrokerTimestampMetadataInterceptor.java index 6043f26f61c9c..78cdfc845e7c2 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendBrokerTimestampMetadataInterceptor.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendBrokerTimestampMetadataInterceptor.java @@ -30,4 +30,12 @@ public class AppendBrokerTimestampMetadataInterceptor implements BrokerEntryMeta public PulsarApi.BrokerEntryMetadata.Builder intercept(PulsarApi.BrokerEntryMetadata.Builder brokerMetadata) { return brokerMetadata.setBrokerTimestamp(System.currentTimeMillis()); } + + @Override + public PulsarApi.BrokerEntryMetadata.Builder interceptWithBatchSize( + PulsarApi.BrokerEntryMetadata.Builder brokerMetadata, + int batchSize) { + // do nothing, just return brokerMetadata + return brokerMetadata; + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java new file mode 100644 index 0000000000000..dba3d9aec49e3 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.intercept; + +import org.apache.pulsar.common.api.proto.PulsarApi; + +import java.util.concurrent.atomic.AtomicLong; + +public class AppendIndexMetadataInterceptor implements BrokerEntryMetadataInterceptor{ + private final AtomicLong indexGenerator; + + public AppendIndexMetadataInterceptor() { + this.indexGenerator = new AtomicLong(-1); + } + + public void recoveryIndexGenerator(long index) { + if (indexGenerator.get() < index) { + indexGenerator.set(index); + } + } + + @Override + public PulsarApi.BrokerEntryMetadata.Builder intercept(PulsarApi.BrokerEntryMetadata.Builder brokerMetadata) { + // do nothing, just return brokerMetadata + return brokerMetadata; + } + + @Override + public PulsarApi.BrokerEntryMetadata.Builder interceptWithBatchSize( + PulsarApi.BrokerEntryMetadata.Builder brokerMetadata, + int batchSize) { + return brokerMetadata.setIndex(indexGenerator.addAndGet(batchSize)); + } + + public long getIndex() { + return indexGenerator.get(); + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataInterceptor.java b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataInterceptor.java index 42dfc8d30ed0b..6dcb794b19265 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataInterceptor.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataInterceptor.java @@ -26,4 +26,6 @@ */ public interface BrokerEntryMetadataInterceptor { PulsarApi.BrokerEntryMetadata.Builder intercept(PulsarApi.BrokerEntryMetadata.Builder brokerMetadata); + PulsarApi.BrokerEntryMetadata.Builder interceptWithBatchSize(PulsarApi.BrokerEntryMetadata.Builder brokerMetadata, + int batchSize); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 7ec913244a5ee..7dfb13104a767 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; @@ -1962,6 +1963,37 @@ public static ByteBuf addBrokerEntryMetadata(ByteBuf headerAndPayload, return compositeByteBuf; } + public static ByteBuf addBrokerEntryMetadata(ByteBuf headerAndPayload, + Set brokerInterceptors, + int batchSize) { + // | BROKER_ENTRY_METADATA_MAGIC_NUMBER | BROKER_ENTRY_METADATA_SIZE | BROKER_ENTRY_METADATA | + // | 2 bytes | 4 bytes | BROKER_ENTRY_METADATA_SIZE bytes | + + PulsarApi.BrokerEntryMetadata.Builder brokerMetadataBuilder = PulsarApi.BrokerEntryMetadata.newBuilder(); + for (BrokerEntryMetadataInterceptor interceptor : brokerInterceptors) { + interceptor.intercept(brokerMetadataBuilder); + interceptor.interceptWithBatchSize(brokerMetadataBuilder, batchSize); + } + PulsarApi.BrokerEntryMetadata brokerEntryMetadata = brokerMetadataBuilder.build(); + int brokerMetaSize = brokerEntryMetadata.getSerializedSize(); + ByteBuf brokerMeta = + PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6, brokerMetaSize + 6); + brokerMeta.writeShort(Commands.magicBrokerEntryMetadata); + brokerMeta.writeInt(brokerMetaSize); + ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(brokerMeta); + try { + brokerEntryMetadata.writeTo(outStream); + } catch (IOException e) { + // This is in-memory serialization, should not fail + throw new RuntimeException(e); + } + outStream.recycle(); + + CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(); + compositeByteBuf.addComponents(true, brokerMeta, headerAndPayload); + return compositeByteBuf; + } + public static ByteBuf skipBrokerEntryMetadataIfExist(ByteBuf headerAndPayloadWithBrokerEntryMetadata) { int readerIndex = headerAndPayloadWithBrokerEntryMetadata.readerIndex(); if (headerAndPayloadWithBrokerEntryMetadata.readShort() == magicBrokerEntryMetadata) { diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index b99882864f3d8..ace18c8cba962 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -183,6 +183,7 @@ message SingleMessageMetadata { // metadata added for entry from broker message BrokerEntryMetadata { optional uint64 broker_timestamp = 1; + optional uint64 index = 2; } enum ServerError { diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java index df5bf76d7525b..2bd4df09d0516 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java @@ -145,14 +145,19 @@ public void testByteBufComposite() throws Exception { @Test public void testAddBrokerEntryMetadata() throws Exception { + int MOCK_BATCH_SIZE = 10; String data = "test-message"; ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length()); byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8)); PulsarApi.BrokerEntryMetadata brokerMetadata = - PulsarApi.BrokerEntryMetadata.newBuilder().setBrokerTimestamp(System.currentTimeMillis()).build(); + PulsarApi.BrokerEntryMetadata + .newBuilder() + .setBrokerTimestamp(System.currentTimeMillis()) + .setIndex(MOCK_BATCH_SIZE - 1) + .build(); ByteBuf dataWithBrokerEntryMetadata = - Commands.addBrokerEntryMetadata(byteBuf, getBrokerEntryMetadataInterceptors()); + Commands.addBrokerEntryMetadata(byteBuf, getBrokerEntryMetadataInterceptors(), MOCK_BATCH_SIZE); assertEquals(brokerMetadata.getSerializedSize() + data.length() + 6, dataWithBrokerEntryMetadata.readableBytes()); @@ -167,7 +172,7 @@ public void testSkipBrokerEntryMetadata() throws Exception { ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length()); byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8)); ByteBuf dataWithBrokerEntryMetadata = - Commands.addBrokerEntryMetadata(byteBuf, getBrokerEntryMetadataInterceptors()); + Commands.addBrokerEntryMetadata(byteBuf, getBrokerEntryMetadataInterceptors(), 11); Commands.skipBrokerEntryMetadataIfExist(dataWithBrokerEntryMetadata); assertEquals(data.length(), dataWithBrokerEntryMetadata.readableBytes()); @@ -179,15 +184,17 @@ public void testSkipBrokerEntryMetadata() throws Exception { @Test public void testParseBrokerEntryMetadata() throws Exception { + int MOCK_BATCH_SIZE = 10; String data = "test-message"; ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length()); byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8)); ByteBuf dataWithBrokerEntryMetadata = - Commands.addBrokerEntryMetadata(byteBuf, getBrokerEntryMetadataInterceptors()); + Commands.addBrokerEntryMetadata(byteBuf, getBrokerEntryMetadataInterceptors(), MOCK_BATCH_SIZE); PulsarApi.BrokerEntryMetadata brokerMetadata = Commands.parseBrokerEntryMetadataIfExist(dataWithBrokerEntryMetadata); assertTrue(brokerMetadata.getBrokerTimestamp() <= System.currentTimeMillis()); + assertEquals(brokerMetadata.getIndex(), MOCK_BATCH_SIZE - 1); assertEquals(data.length(), dataWithBrokerEntryMetadata.readableBytes()); byte [] content = new byte[dataWithBrokerEntryMetadata.readableBytes()]; @@ -198,6 +205,7 @@ public void testParseBrokerEntryMetadata() throws Exception { public Set getBrokerEntryMetadataInterceptors() { Set interceptorNames = new HashSet<>(); interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor"); + interceptorNames.add("org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor"); return BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(interceptorNames, Thread.currentThread().getContextClassLoader()); }