Skip to content

Commit

Permalink
Add epoch for connection handler to handle create producer timeout. (a…
Browse files Browse the repository at this point in the history
…pache#5571)

Fixes apache#5535

Motivation
Currently, if user create producer timeout, the connection handler of producer will reconnect to the broker later, but if in broker already done the previous create producer request, the reconnection will failed with "producer with name xxx is already connected".

So this PR will introduce epoch for connection handler and add a field named isGeneratedName for producer to handle above problem.

This PR only handle the generated producer name scenario, so many users occur errors such like
apache#5535, so we need to fix the generated producer name scenario first.

For the scenario of user specified producer name, we can discuss later and find a simple approach to handle it, i left my idea here: using producer id and producer name as the identity of producer, producer name used for EO producer and producer id can used by the producer reconnect, but this approach depends on globally unique producer id generator.

Modifications
If the producer with generated producer name and epoch of the producer is bigger than the exists producer, the new producer will overwrite the old producer, so the reconnect producer will create succeed.

Verifying this change
Add unit tests to simulate producer timeout and reconnection
  • Loading branch information
codelipenghui authored and jiazhai committed Nov 19, 2019
1 parent 8d1f793 commit 75c7229
Show file tree
Hide file tree
Showing 16 changed files with 435 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand All @@ -38,8 +40,6 @@
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,7 +53,7 @@ public abstract class AbstractTopic implements Topic {
protected final String topic;

// Producers currently connected to this topic
protected final ConcurrentOpenHashSet<Producer> producers;
protected final ConcurrentHashMap<String, Producer> producers;

protected final BrokerService brokerService;

Expand Down Expand Up @@ -86,7 +86,7 @@ public abstract class AbstractTopic implements Topic {
public AbstractTopic(String topic, BrokerService brokerService) {
this.topic = topic;
this.brokerService = brokerService;
this.producers = new ConcurrentOpenHashSet<>(16, 1);
this.producers = new ConcurrentHashMap<>();
this.isFenced = false;
this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
this.lastActive = System.nanoTime();
Expand Down Expand Up @@ -123,7 +123,7 @@ protected boolean isProducersExceeded() {

protected boolean hasLocalProducers() {
AtomicBoolean foundLocal = new AtomicBoolean(false);
producers.forEach(producer -> {
producers.values().forEach(producer -> {
if (!producer.isRemote()) {
foundLocal.set(true);
}
Expand All @@ -138,7 +138,7 @@ public String toString() {
}

@Override
public ConcurrentOpenHashSet<Producer> getProducers() {
public Map<String, Producer> getProducers() {
return producers;
}

Expand Down Expand Up @@ -258,19 +258,66 @@ public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
@Override
public void resetPublishCountAndEnableReadIfRequired() {
if (this.publishRateLimiter.resetPublishCount()) {
enableProduerRead();
enableProducerRead();
}
}

/**
* it sets cnx auto-readable if producer's cnx is disabled due to publish-throttling
*/
protected void enableProduerRead() {
protected void enableProducerRead() {
if (producers != null) {
producers.forEach(producer -> producer.getCnx().enableCnxAutoRead());
producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead());
}
}

protected void checkTopicFenced() throws BrokerServiceException {
if (isFenced) {
log.warn("[{}] Attempting to add producer to a fenced topic", topic);
throw new BrokerServiceException.TopicFencedException("Topic is temporarily unavailable");
}
}

protected void internalAddProducer(Producer producer) throws BrokerServiceException {
if (isProducersExceeded()) {
log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit");
}

if (log.isDebugEnabled()) {
log.debug("[{}] {} Got request to create producer ", topic, producer.getProducerName());
}

Producer existProducer = producers.putIfAbsent(producer.getProducerName(), producer);
if (existProducer != null) {
tryOverwriteOldProducer(existProducer, producer);
}
}

private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer)
throws BrokerServiceException {
boolean canOverwrite = false;
if (oldProducer.equals(newProducer) && !oldProducer.isUserProvidedProducerName()
&& !newProducer.isUserProvidedProducerName() && newProducer.getEpoch() > oldProducer.getEpoch()) {
oldProducer.close(false);
canOverwrite = true;
}
if (canOverwrite) {
if(!producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
// Met concurrent update, throw exception here so that client can try reconnect later.
throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName()
+ "' replace concurrency error");
} else {
handleProducerRemoved(oldProducer);
}
} else {
throw new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName() + "' is already connected to topic");
}
}

protected abstract void handleProducerRemoved(Producer producer);

@Override
public boolean isPublishRateExceeded() {
return this.publishRateLimiter.isPublishRateExceeded();
Expand Down Expand Up @@ -304,7 +351,7 @@ private void updatePublishDispatcher(Policies policies) {
} else {
log.info("Disabling publish throttling for {}", this.topic);
this.publishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
enableProduerRead();
enableProducerRead();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.apache.bookkeeper.mledger.ManagedCursor;
Expand Down Expand Up @@ -184,9 +185,9 @@ private void dropBacklog(PersistentTopic persistentTopic, BacklogQuota quota) {
*/
private void disconnectProducers(PersistentTopic persistentTopic) {
List<CompletableFuture<Void>> futures = Lists.newArrayList();
ConcurrentOpenHashSet<Producer> producers = persistentTopic.getProducers();
Map<String, Producer> producers = persistentTopic.getProducers();

producers.forEach(producer -> {
producers.values().forEach(producer -> {
log.info("Producer [{}] has exceeded backlog quota on topic [{}]. Disconnecting producer",
producer.getProducerName(), persistentTopic.getName());
futures.add(producer.disconnect());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public class Producer {
private final Topic topic;
private final ServerCnx cnx;
private final String producerName;
private final long epoch;
private final boolean userProvidedProducerName;
private final long producerId;
private final String appId;
private Rate msgIn;
Expand All @@ -86,11 +88,14 @@ public class Producer {
private final SchemaVersion schemaVersion;

public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName, String appId,
boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion) {
boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion, long epoch,
boolean userProvidedProducerName) {
this.topic = topic;
this.cnx = cnx;
this.producerId = producerId;
this.producerName = checkNotNull(producerName);
this.userProvidedProducerName = userProvidedProducerName;
this.epoch = epoch;
this.closeFuture = new CompletableFuture<>();
this.appId = appId;
this.authenticationData = cnx.authenticationData;
Expand Down Expand Up @@ -244,7 +249,7 @@ private void publishOperationCompleted() {
if (newPendingPublishAcks == 0 && !closeFuture.isDone()) {
synchronized (this) {
if (isClosed && !closeFuture.isDone()) {
closeNow();
closeNow(true);
}
}
}
Expand Down Expand Up @@ -470,7 +475,7 @@ public String toString() {
*
* @return completable future indicate completion of close
*/
public synchronized CompletableFuture<Void> close() {
public synchronized CompletableFuture<Void> close(boolean removeFromTopic) {
if (log.isDebugEnabled()) {
log.debug("Closing producer {} -- isClosed={}", this, isClosed);
}
Expand All @@ -482,14 +487,16 @@ public synchronized CompletableFuture<Void> close() {
cnx.isActive(), pendingPublishAcks);
}
if (!cnx.isActive() || pendingPublishAcks == 0) {
closeNow();
closeNow(removeFromTopic);
}
}
return closeFuture;
}

void closeNow() {
topic.removeProducer(this);
void closeNow(boolean removeFromTopic) {
if (removeFromTopic) {
topic.removeProducer(this);
}
cnx.removedProducer(this);

if (log.isDebugEnabled()) {
Expand All @@ -509,7 +516,7 @@ public CompletableFuture<Void> disconnect() {
log.info("Disconnecting producer: {}", this);
cnx.ctx().executor().execute(() -> {
cnx.closeProducer(this);
closeNow();
closeNow(true);
});
}
return closeFuture;
Expand Down Expand Up @@ -543,6 +550,14 @@ public boolean isNonPersistentTopic() {
return isNonPersistentTopic;
}

public long getEpoch() {
return epoch;
}

public boolean isUserProvidedProducerName() {
return userProvidedProducerName;
}

@VisibleForTesting
long getPendingPublishAcks() {
return pendingPublishAcks;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
producers.values().forEach((producerFuture) -> {
if (producerFuture.isDone() && !producerFuture.isCompletedExceptionally()) {
Producer producer = producerFuture.getNow(null);
producer.closeNow();
producer.closeNow(true);
}
});

Expand Down Expand Up @@ -817,6 +817,8 @@ protected void handleProducer(final CommandProducer cmdProducer) {
// Use producer name provided by client if present
final String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName()
: service.generateUniqueProducerName();
final long epoch = cmdProducer.getEpoch();
final boolean userProvidedProducerName = cmdProducer.getUserProvidedProducerName();
final boolean isEncrypted = cmdProducer.getEncrypted();
final Map<String, String> metadata = CommandUtils.metadataFromCommand(cmdProducer);
final SchemaData schema = cmdProducer.hasSchema() ? getSchema(cmdProducer.getSchema()) : null;
Expand Down Expand Up @@ -938,7 +940,7 @@ protected void handleProducer(final CommandProducer cmdProducer) {

schemaVersionFuture.thenAccept(schemaVersion -> {
Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName, authRole,
isEncrypted, metadata, schemaVersion);
isEncrypted, metadata, schemaVersion, epoch, userProvidedProducerName);

try {
topic.addProducer(producer);
Expand All @@ -952,12 +954,12 @@ protected void handleProducer(final CommandProducer cmdProducer) {
} else {
// The producer's future was completed before by
// a close command
producer.closeNow();
producer.closeNow(true);
log.info("[{}] Cleared producer created after timeout on client side {}",
remoteAddress, producer);
}
} else {
producer.closeNow();
producer.closeNow(true);
log.info("[{}] Cleared producer created after connection was closed: {}",
remoteAddress, producer);
producerFuture.completeExceptionally(
Expand Down Expand Up @@ -1220,7 +1222,7 @@ protected void handleCloseProducer(CommandCloseProducer closeProducer) {
Producer producer = producerFuture.getNow(null);
log.info("[{}][{}] Closing producer on cnx {}", producer.getTopic(), producer.getProducerName(), remoteAddress);

producer.close().thenAccept(v -> {
producer.close(true).thenAccept(v -> {
log.info("[{}][{}] Closed producer on cnx {}", producer.getTopic(), producer.getProducerName(),
remoteAddress);
ctx.writeAndFlush(Commands.newSuccess(requestId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;

Expand Down Expand Up @@ -116,7 +115,7 @@ CompletableFuture<Subscription> createSubscription(String subscriptionName, Init

CompletableFuture<Void> delete();

ConcurrentOpenHashSet<Producer> getProducers();
Map<String, Producer> getProducers();

String getName();

Expand Down
Loading

0 comments on commit 75c7229

Please sign in to comment.