Skip to content

Commit

Permalink
Add streaming dispatcher. (apache#9056)
Browse files Browse the repository at this point in the history
Related to  apache#3804

### Motivation

Trying to streamline the dispatcher's read requests to manager ledger instead of micro batch.

### Modifications

Created a StreamingEntryReader that can streamline read request to managed ledger.
Created StreamingDispatcher interface with necessary method to interact with StreamingEntryReader.
Created PersistentStreamingDispatcherSingleActive/MultipleConsumer that make use of StreamingEntryReader to read entries from managed ledger.
Add config to use streaming dispatcher.
  • Loading branch information
MarvinCai authored Feb 2, 2021
1 parent 9af8577 commit 8cfaf48
Show file tree
Hide file tree
Showing 29 changed files with 1,905 additions and 237 deletions.
5 changes: 5 additions & 0 deletions build/run_unit_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,17 @@ function broker_group_2() {
-DtestForkCount=1 \
-DtestReuseFork=true

$MVN_TEST_COMMAND -pl pulsar-broker -Dinclude="**/*StreamingDispatcher*Test.java" \
-DtestForkCount=1 \
-DtestReuseFork=true

$MVN_TEST_COMMAND -pl pulsar-broker -Dinclude="org/apache/pulsar/broker/zookeeper/**/*.java,
org/apache/pulsar/broker/loadbalance/**/*.java,
org/apache/pulsar/broker/service/**/*.java" \
-Dexclude="**/ReplicatorTest.java,
**/MessagePublishBufferThrottleTest.java,
**/TopicOwnerTest.java,
**/*StreamingDispatcher*Test.java,
**/AntiAffinityNamespaceGroupTest.java"
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger;

/**
* Contains callback that can be registered with {@link ManagedLedger} to wait for new entries to be available.
*/
public interface WaitingEntryCallBack {

/**
* The callback {@link ManagedLedger} will trigger when new entries are available.
*/
void entriesAvailable();
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.ManagedLedgerMXBean;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.WaitingEntryCallBack;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
Expand Down Expand Up @@ -170,6 +171,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
// Cursors that are waiting to be notified when new entries are persisted
final ConcurrentLinkedQueue<ManagedCursorImpl> waitingCursors;

// Objects that are waiting to be notified when new entries are persisted
final ConcurrentLinkedQueue<WaitingEntryCallBack> waitingEntryCallBacks;

// This map is used for concurrent open cursor requests, where the 2nd request will attach a listener to the
// uninitialized cursor future from the 1st request
final Map<String, CompletableFuture<ManagedCursor>> uninitializedCursors;
Expand Down Expand Up @@ -290,6 +294,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper
this.mbean = new ManagedLedgerMBeanImpl(this);
this.entryCache = factory.getEntryCacheManager().getEntryCache(this);
this.waitingCursors = Queues.newConcurrentLinkedQueue();
this.waitingEntryCallBacks = Queues.newConcurrentLinkedQueue();
this.uninitializedCursors = Maps.newHashMap();
this.clock = config.getClock();

Expand Down Expand Up @@ -2109,6 +2114,21 @@ void notifyCursors() {
}
}

void notifyWaitingEntryCallBacks() {
while (true) {
final WaitingEntryCallBack cb = waitingEntryCallBacks.poll();
if (cb == null) {
break;
}

executor.execute(safeRun(cb::entriesAvailable));
}
}

public void addWaitingEntryCallBack(WaitingEntryCallBack cb) {
this.waitingEntryCallBacks.add(cb);
}

private void trimConsumedLedgersInBackground() {
trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
}
Expand Down Expand Up @@ -3086,7 +3106,7 @@ public PositionImpl getPreviousPosition(PositionImpl position) {
* the position to validate
* @return true if the position is valid, false otherwise
*/
boolean isValidPosition(PositionImpl position) {
public boolean isValidPosition(PositionImpl position) {
PositionImpl last = lastConfirmedEntry;
if (log.isDebugEnabled()) {
log.debug("IsValid position: {} -- last: {}", position, last);
Expand Down Expand Up @@ -3130,7 +3150,9 @@ public PositionImpl getNextValidPosition(final PositionImpl position) {
next = getNextValidPositionInternal(position);
} catch (NullPointerException e) {
next = lastConfirmedEntry.getNext();
log.error("[{}] Can't find next valid position, fail back to the next position of the last position.", name, e);
if (log.isDebugEnabled()) {
log.debug("[{}] Can't find next valid position, fall back to the next position of the last position.", name, e);
}
}
return next;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ public void safeRun() {
cb.addComplete(lastEntry, data.asReadOnly(), ctx);
ReferenceCountUtil.release(data);
ml.notifyCursors();
ml.notifyWaitingEntryCallBacks();
this.recycle();
} else {
ReferenceCountUtil.release(data);
Expand All @@ -232,6 +233,7 @@ public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
if (cb != null) {
cb.addComplete(PositionImpl.get(lh.getId(), entryId), null, ctx);
ml.notifyCursors();
ml.notifyWaitingEntryCallBacks();
this.recycle();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private boolean preciseDispatcherFlowControl = false;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Whether to use streaming read dispatcher. Currently is in preview and can be changed " +
"in subsequent release."
)
private boolean streamingDispatch = false;

@FieldContext(
dynamic = true,
category = CATEGORY_SERVER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public SubType getType() {

public abstract boolean isConsumerAvailable(Consumer consumer);

protected void cancelPendingRead() {}

/**
* <pre>
* Broker gives more priority while dispatching messages. Here, broker follows descending priorities. (eg:
Expand Down
Loading

0 comments on commit 8cfaf48

Please sign in to comment.