Skip to content

Commit

Permalink
PIP-68: WaitForExclusive producer access mode (apache#8992)
Browse files Browse the repository at this point in the history
* PIP-68: WaitForExclusive producer access mode

* Fixed checkstyle issues

* Fixed log level to info

* Update pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
  • Loading branch information
merlimat authored Dec 18, 2020
1 parent 2ef5ab5 commit 99476d3
Show file tree
Hide file tree
Showing 16 changed files with 370 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
Expand Down Expand Up @@ -111,6 +114,8 @@ public abstract class AbstractTopic implements Topic {

protected volatile Optional<Long> topicEpoch = Optional.empty();
private volatile boolean hasExclusiveProducer;
private final Queue<Pair<Producer, CompletableFuture<Optional<Long>>>> waitingExclusiveProducers =
new ConcurrentLinkedQueue<>();

private static final AtomicLongFieldUpdater<AbstractTopic> USAGE_COUNT_UPDATER =
AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "usageCount");
Expand Down Expand Up @@ -337,14 +342,15 @@ public CompletableFuture<Void> checkSchemaCompatibleForConsumer(SchemaData schem
}

@Override
public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
public CompletableFuture<Optional<Long>> addProducer(Producer producer,
CompletableFuture<Void> producerQueuedFuture) {
checkArgument(producer.getTopic() == this);

CompletableFuture<Optional<Long>> future = new CompletableFuture<>();

incrementTopicEpochIfNeeded(producer)
.thenAccept(epoch -> {
lock.readLock().lock();
incrementTopicEpochIfNeeded(producer, producerQueuedFuture)
.thenAccept(producerEpoch -> {
lock.writeLock().lock();
try {
brokerService.checkTopicNsOwnership(getName());
checkTopicFenced();
Expand All @@ -360,11 +366,11 @@ public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
USAGE_COUNT_UPDATER.get(this));
}

future.complete(epoch);
future.complete(producerEpoch);
} catch (Throwable e) {
future.completeExceptionally(e);
} finally {
lock.readLock().unlock();
lock.writeLock().unlock();
}
}).exceptionally(ex -> {
future.completeExceptionally(ex);
Expand All @@ -374,12 +380,13 @@ public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
return future;
}

protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer producer) {
protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer producer,
CompletableFuture<Void> producerQueuedFuture) {
lock.writeLock().lock();
try {
switch (producer.getAccessMode()) {
case Shared:
if (hasExclusiveProducer) {
if (hasExclusiveProducer || !waitingExclusiveProducers.isEmpty()) {
return FutureUtil.failedFuture(new ProducerBusyException(
"Topic has an existing exclusive producer: " + producers.keys().nextElement()));
} else {
Expand All @@ -388,7 +395,7 @@ protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer
}

case Exclusive:
if (hasExclusiveProducer) {
if (hasExclusiveProducer || !waitingExclusiveProducers.isEmpty()) {
return FutureUtil.failedFuture(new ProducerFencedException(
"Topic has an existing exclusive producer: " + producers.keys().nextElement()));
} else if (!producers.isEmpty()) {
Expand All @@ -410,17 +417,52 @@ protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer
} else {
future = incrementTopicEpoch(topicEpoch);
}
future.exceptionally(ex -> {
hasExclusiveProducer = false;
return null;
});

return future.thenApply(epoch -> {
topicEpoch = Optional.of(epoch);
return topicEpoch;
}).exceptionally(ex -> {
});
}

case WaitForExclusive: {
if (hasExclusiveProducer || !producers.isEmpty()) {
CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
log.info("[{}] Queuing producer {} since there's already a producer", topic, producer);
waitingExclusiveProducers.add(Pair.of(producer, future));
producerQueuedFuture.complete(null);
return future;
} else if (producer.getTopicEpoch().isPresent()
&& producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)) {
// If a producer reconnects, but all the topic epoch has already moved forward, this producer needs
// to be fenced, because a new producer had been present in between.
return FutureUtil.failedFuture(new ProducerFencedException(
String.format("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d",
topicEpoch.get(), producer.getTopicEpoch().get())));
} else {
// There are currently no existing producers
hasExclusiveProducer = true;

CompletableFuture<Long> future;
if (producer.getTopicEpoch().isPresent()) {
future = setTopicEpoch(producer.getTopicEpoch().get());
} else {
future = incrementTopicEpoch(topicEpoch);
}
future.exceptionally(ex -> {
hasExclusiveProducer = false;
return null;
});
}

// case WaitForExclusive:
// TODO: Implementation
return future.thenApply(epoch -> {
topicEpoch = Optional.of(epoch);
return topicEpoch;
});
}
}

default:
return FutureUtil.failedFuture(
Expand Down Expand Up @@ -584,7 +626,35 @@ protected void handleProducerRemoved(Producer producer) {
// decrement usage only if this was a valid producer close
long newCount = USAGE_COUNT_UPDATER.decrementAndGet(this);
if (newCount == 0) {
hasExclusiveProducer = false;
lock.writeLock().lock();
try {
hasExclusiveProducer = false;
Pair<Producer, CompletableFuture<Optional<Long>>> nextWaitingProducer =
waitingExclusiveProducers.poll();
if (nextWaitingProducer != null) {
Producer nextProducer = nextWaitingProducer.getKey();
CompletableFuture<Optional<Long>> producerFuture = nextWaitingProducer.getValue();
hasExclusiveProducer = true;

CompletableFuture<Long> future;
if (nextProducer.getTopicEpoch().isPresent()) {
future = setTopicEpoch(nextProducer.getTopicEpoch().get());
} else {
future = incrementTopicEpoch(topicEpoch);
}

future.thenAccept(epoch -> {
topicEpoch = Optional.of(epoch);
producerFuture.complete(topicEpoch);
}).exceptionally(ex -> {
hasExclusiveProducer = false;
producerFuture.completeExceptionally(ex);
return null;
});
}
} finally {
lock.writeLock().unlock();
}
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Removed producer -- count: {}", topic, producer.getProducerName(),
Expand Down Expand Up @@ -733,6 +803,10 @@ public TopicPolicies getTopicPolicies(TopicName topicName) {
}
}

protected int getWaitingProducersCount() {
return waitingExclusiveProducers.size();
}

protected boolean isExceedMaximumMessageSize(int size) {
Integer maxMessageSize = null;
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public interface PulsarCommandSender {
void sendProducerSuccessResponse(long requestId, String producerName, SchemaVersion schemaVersion);

void sendProducerSuccessResponse(long requestId, String producerName, long lastSequenceId,
SchemaVersion schemaVersion, Optional<Long> topicEpoch);
SchemaVersion schemaVersion, Optional<Long> topicEpoch,
boolean isProducerReady);

void sendSendReceiptResponse(long producerId, long sequenceId, long highestId, long ledgerId,
long entryId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,10 @@ public void sendProducerSuccessResponse(long requestId, String producerName, Sch

@Override
public void sendProducerSuccessResponse(long requestId, String producerName, long lastSequenceId,
SchemaVersion schemaVersion, Optional<Long> topicEpoch) {
SchemaVersion schemaVersion, Optional<Long> topicEpoch,
boolean isProducerReady) {
PulsarApi.BaseCommand command = Commands.newProducerSuccessCommand(requestId, producerName, lastSequenceId,
schemaVersion, topicEpoch);
schemaVersion, topicEpoch, isProducerReady);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
command.getProducerSuccess().recycle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1152,17 +1152,18 @@ protected void handleProducer(final CommandProducer cmdProducer) {
});

schemaVersionFuture.thenAccept(schemaVersion -> {
CompletableFuture<Void> producerQueuedFuture = new CompletableFuture<>();
Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName,
getPrincipal(), isEncrypted, metadata, schemaVersion, epoch,
userProvidedProducerName, producerAccessMode, topicEpoch);

topic.addProducer(producer).thenAccept(newTopicEpoch -> {
topic.addProducer(producer, producerQueuedFuture).thenAccept(newTopicEpoch -> {
if (isActive()) {
if (producerFuture.complete(producer)) {
log.info("[{}] Created new producer: {}", remoteAddress, producer);
commandSender.sendProducerSuccessResponse(requestId, producerName,
producer.getLastSequenceId(), producer.getSchemaVersion(),
newTopicEpoch);
newTopicEpoch, true /* producer is ready now */);
return;
} else {
// The producer's future was completed before by
Expand Down Expand Up @@ -1192,6 +1193,17 @@ protected void handleProducer(final CommandProducer cmdProducer) {
}
return null;
});

producerQueuedFuture.thenRun(() -> {
// If the producer is queued waiting, we will get an immediate notification
// that we need to pass to client
if (isActive()) {
log.info("[{}] Producer is waiting in queue: {}", remoteAddress, producer);
commandSender.sendProducerSuccessResponse(requestId, producerName,
producer.getLastSequenceId(), producer.getSchemaVersion(),
Optional.empty(), false/* producer is not ready now */);
}
});
});
}).exceptionally(exception -> {
Throwable cause = exception.getCause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,11 @@ default long getOriginalHighestSequenceId() {
* Tries to add a producer to the topic. Several validations will be performed.
*
* @param producer
* @param producerQueuedFuture
* a future that will be triggered if the producer is being queued up prior of getting established
* @return the "topic epoch" if there is one or empty
*/
CompletableFuture<Optional<Long>> addProducer(Producer producer);
CompletableFuture<Optional<Long>> addProducer(Producer producer, CompletableFuture<Void> producerQueuedFuture);

void removeProducer(Producer producer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,7 @@ public NonPersistentTopicStats getStats(boolean getPreciseBacklog) {
stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : (stats.msgThroughputIn / stats.msgRateIn);
stats.msgInCounter = getMsgInCounter();
stats.bytesInCounter = getBytesInCounter();
stats.waitingPublishers = getWaitingProducersCount();

subscriptions.forEach((name, subscription) -> {
NonPersistentSubscriptionStats subStats = subscription.getStats();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,13 +496,14 @@ public synchronized void addFailed(ManagedLedgerException exception, Object ctx)
}

@Override
public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
return super.addProducer(producer).thenApply(epoch -> {
public CompletableFuture<Optional<Long>> addProducer(Producer producer,
CompletableFuture<Void> producerQueuedFuture) {
return super.addProducer(producer, producerQueuedFuture).thenApply(topicEpoch -> {
messageDeduplication.producerAdded(producer.getProducerName());

// Start replication producers if not already
startReplProducers();
return epoch;
return topicEpoch;
});
}

Expand Down Expand Up @@ -1642,6 +1643,7 @@ public TopicStats getStats(boolean getPreciseBacklog) {
stats.msgInCounter = getMsgInCounter();
stats.bytesInCounter = getBytesInCounter();
stats.msgChunkPublished = this.msgChunkPublished;
stats.waitingPublishers = getWaitingProducersCount();

subscriptions.forEach((name, subscription) -> {
SubscriptionStats subStats = subscription.getStats(getPreciseBacklog);
Expand Down
Loading

0 comments on commit 99476d3

Please sign in to comment.