Skip to content

Commit

Permalink
PIP 68: Exclusive Producer (apache#8685)
Browse files Browse the repository at this point in the history
* PIP 68: Exclusive Producer

* Added missing enums cases in C++

* Addressed comments

* Moved constant to top of file

* Fix mistake in previous update

* Added handling for topic deletion
  • Loading branch information
merlimat authored Dec 3, 2020
1 parent f9ad058 commit d12486b
Show file tree
Hide file tree
Showing 34 changed files with 999 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,30 @@
*/
package org.apache.pulsar.broker.service;

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import com.google.common.base.MoreObjects;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
Expand Down Expand Up @@ -101,6 +109,13 @@ public abstract class AbstractTopic implements Topic {
protected CompletableFuture<TransactionBuffer> transactionBuffer;
protected ReentrantLock transactionBufferLock = new ReentrantLock();

protected volatile Optional<Long> topicEpoch = Optional.empty();
private volatile boolean hasExclusiveProducer;

private static final AtomicLongFieldUpdater<AbstractTopic> USAGE_COUNT_UPDATER =
AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "usageCount");
private volatile long usageCount = 0;

public AbstractTopic(String topic, BrokerService brokerService) {
this.topic = topic;
this.brokerService = brokerService;
Expand Down Expand Up @@ -316,6 +331,106 @@ public CompletableFuture<Void> checkSchemaCompatibleForConsumer(SchemaData schem
.checkConsumerCompatibility(id, schema, schemaCompatibilityStrategy);
}

@Override
public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
checkArgument(producer.getTopic() == this);

CompletableFuture<Optional<Long>> future = new CompletableFuture<>();

incrementTopicEpochIfNeeded(producer)
.thenAccept(epoch -> {
lock.readLock().lock();
try {
brokerService.checkTopicNsOwnership(getName());
checkTopicFenced();
if (isTerminated()) {
log.warn("[{}] Attempting to add producer to a terminated topic", topic);
throw new TopicTerminatedException("Topic was already terminated");
}
internalAddProducer(producer);

USAGE_COUNT_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(),
USAGE_COUNT_UPDATER.get(this));
}

future.complete(epoch);
} catch (Throwable e) {
future.completeExceptionally(e);
} finally {
lock.readLock().unlock();
}
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});

return future;
}

protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer producer) {
lock.writeLock().lock();
try {
switch (producer.getAccessMode()) {
case Shared:
if (hasExclusiveProducer) {
return FutureUtil.failedFuture(new ProducerBusyException(
"Topic has an existing exclusive producer: " + producers.keys().nextElement()));
} else {
// Normal producer getting added, we don't need a new epoch
return CompletableFuture.completedFuture(topicEpoch);
}

case Exclusive:
if (hasExclusiveProducer) {
return FutureUtil.failedFuture(new ProducerFencedException(
"Topic has an existing exclusive producer: " + producers.keys().nextElement()));
} else if (!producers.isEmpty()) {
return FutureUtil.failedFuture(new ProducerFencedException("Topic has existing shared producers"));
} else if (producer.getTopicEpoch().isPresent()
&& producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)) {
// If a producer reconnects, but all the topic epoch has already moved forward, this producer needs
// to be fenced, because a new producer had been present in between.
return FutureUtil.failedFuture(new ProducerFencedException(
String.format("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d",
topicEpoch.get(), producer.getTopicEpoch().get())));
} else {
// There are currently no existing producers
hasExclusiveProducer = true;

CompletableFuture<Long> future;
if (producer.getTopicEpoch().isPresent()) {
future = setTopicEpoch(producer.getTopicEpoch().get());
} else {
future = incrementTopicEpoch(topicEpoch);
}
return future.thenApply(epoch -> {
topicEpoch = Optional.of(epoch);
return topicEpoch;
}).exceptionally(ex -> {
hasExclusiveProducer = false;
return null;
});
}

// case WaitForExclusive:
// TODO: Implementation

default:
return FutureUtil.failedFuture(
new BrokerServiceException("Invalid producer access mode: " + producer.getAccessMode()));
}

} finally {
lock.writeLock().unlock();
}
}

protected abstract CompletableFuture<Long> setTopicEpoch(long newEpoch);

protected abstract CompletableFuture<Long> incrementTopicEpoch(Optional<Long> currentEpoch);

@Override
public void recordAddLatency(long latency, TimeUnit unit) {
addEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
Expand Down Expand Up @@ -450,7 +565,44 @@ private boolean isUserProvidedProducerName(Producer producer){
return producer.isUserProvidedProducerName() && !producer.getProducerName().startsWith(replicatorPrefix);
}

protected abstract void handleProducerRemoved(Producer producer);

@Override
public void removeProducer(Producer producer) {
checkArgument(producer.getTopic() == this);

if (producers.remove(producer.getProducerName(), producer)) {
handleProducerRemoved(producer);
}
}

protected void handleProducerRemoved(Producer producer) {
// decrement usage only if this was a valid producer close
long newCount = USAGE_COUNT_UPDATER.decrementAndGet(this);
if (newCount == 0) {
hasExclusiveProducer = false;
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Removed producer -- count: {}", topic, producer.getProducerName(),
USAGE_COUNT_UPDATER.get(this));
}
lastActive = System.nanoTime();
}

public void handleConsumerAdded(String subscriptionName, String consumerName) {
USAGE_COUNT_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] [{}] Added consumer -- count: {}", topic, subscriptionName,
consumerName, USAGE_COUNT_UPDATER.get(this));
}
}

public void decrementUsageCount() {
USAGE_COUNT_UPDATER.decrementAndGet(this);
}

public long currentUsageCount() {
return usageCount;
}

@Override
public boolean isPublishRateExceeded() {
Expand Down Expand Up @@ -536,6 +688,8 @@ public void setDeleteWhileInactive(boolean deleteWhileInactive) {
this.inactiveTopicPolicies.setDeleteWhileInactive(deleteWhileInactive);
}

protected abstract boolean isTerminated();

private static final Logger log = LoggerFactory.getLogger(AbstractTopic.class);

public InactiveTopicPolicies getInactiveTopicPolicies() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ public ProducerBusyException(String msg) {
}
}

public static class ProducerFencedException extends BrokerServiceException {
public ProducerFencedException(String msg) {
super(msg);
}
}

public static class ServiceUnitNotReadyException extends BrokerServiceException {
public ServiceUnitNotReadyException(String msg) {
super(msg);
Expand Down Expand Up @@ -217,6 +223,8 @@ private static PulsarApi.ServerError getClientErrorCode(Throwable t, boolean che
return ServerError.InvalidTxnStatus;
} else if (t instanceof NotAllowedException) {
return ServerError.NotAllowedError;
} else if (t instanceof ProducerFencedException) {
return ServerError.ProducerFenced;
} else if (t instanceof TransactionConflictException) {
return ServerError.TransactionConflict;
} else if (t instanceof CoordinatorException.TransactionNotFoundException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
Expand All @@ -42,6 +43,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.ProducerAccessMode;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
Expand Down Expand Up @@ -83,13 +85,18 @@ public class Producer {
private final boolean isNonPersistentTopic;
private final boolean isEncrypted;

private final ProducerAccessMode accessMode;
private Optional<Long> topicEpoch;

private final Map<String, String> metadata;

private final SchemaVersion schemaVersion;

public Producer(Topic topic, TransportCnx cnx, long producerId, String producerName, String appId,
boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion, long epoch,
boolean userProvidedProducerName) {
boolean userProvidedProducerName,
ProducerAccessMode accessMode,
Optional<Long> topicEpoch) {
this.topic = topic;
this.cnx = cnx;
this.producerId = producerId;
Expand Down Expand Up @@ -117,13 +124,16 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN
stats.setProducerName(producerName);
stats.producerId = producerId;
stats.metadata = this.metadata;
stats.accessMode = Commands.convertProducerAccessMode(accessMode);

this.isRemote = producerName
.startsWith(cnx.getBrokerService().pulsar().getConfiguration().getReplicatorPrefix());
this.remoteCluster = isRemote ? producerName.split("\\.")[2].split(REPL_PRODUCER_NAME_DELIMITER)[0] : null;

this.isEncrypted = isEncrypted;
this.schemaVersion = schemaVersion;
this.accessMode = accessMode;
this.topicEpoch = topicEpoch;
}

@Override
Expand Down Expand Up @@ -635,6 +645,14 @@ public SchemaVersion getSchemaVersion() {
return schemaVersion;
}

public ProducerAccessMode getAccessMode() {
return accessMode;
}

public Optional<Long> getTopicEpoch() {
return topicEpoch;
}

private static final Logger log = LoggerFactory.getLogger(Producer.class);

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
package org.apache.pulsar.broker.service;

import io.netty.util.concurrent.Future;

import java.util.List;
import java.util.Optional;

import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
Expand All @@ -38,7 +41,7 @@ public interface PulsarCommandSender {
void sendProducerSuccessResponse(long requestId, String producerName, SchemaVersion schemaVersion);

void sendProducerSuccessResponse(long requestId, String producerName, long lastSequenceId,
SchemaVersion schemaVersion);
SchemaVersion schemaVersion, Optional<Long> topicEpoch);

void sendSendReceiptResponse(long producerId, long sequenceId, long highestId, long ledgerId,
long entryId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;

import java.util.List;
import java.util.Optional;

import lombok.extern.slf4j.Slf4j;

import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
Expand Down Expand Up @@ -97,9 +101,9 @@ public void sendProducerSuccessResponse(long requestId, String producerName, Sch

@Override
public void sendProducerSuccessResponse(long requestId, String producerName, long lastSequenceId,
SchemaVersion schemaVersion) {
SchemaVersion schemaVersion, Optional<Long> topicEpoch) {
PulsarApi.BaseCommand command = Commands.newProducerSuccessCommand(requestId, producerName, lastSequenceId,
schemaVersion);
schemaVersion, topicEpoch);
safeIntercept(command, cnx);
ByteBuf outBuf = Commands.serializeWithSize(command);
command.getProducerSuccess().recycle();
Expand Down
Loading

0 comments on commit d12486b

Please sign in to comment.