Skip to content

Commit

Permalink
Expose control of sequence id in Java producer API (apache#760)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Sep 15, 2017
1 parent dd635ac commit d4a8c25
Show file tree
Hide file tree
Showing 24 changed files with 499 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ public abstract class AbstractReplicator {

protected volatile ProducerImpl producer;

protected static final ProducerConfiguration producerConfiguration = new ProducerConfiguration()
.setSendTimeout(0, TimeUnit.SECONDS).setBlockIfQueueFull(true);
protected final int producerQueueSize;
protected final ProducerConfiguration producerConfiguration;

protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES, 0 ,TimeUnit.MILLISECONDS);

protected final String replicatorPrefix;

protected static final AtomicReferenceFieldUpdater<AbstractReplicator, State> STATE_UPDATER = AtomicReferenceFieldUpdater
.newUpdater(AbstractReplicator.class, State.class, "state");
private volatile State state = State.Stopped;

protected enum State {
Stopped, Starting, Started, Stopping
}
Expand All @@ -66,6 +66,12 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca
this.remoteCluster = remoteCluster;
this.client = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster);
this.producer = null;
this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();

this.producerConfiguration = new ProducerConfiguration();
this.producerConfiguration.setSendTimeout(0, TimeUnit.SECONDS);
this.producerConfiguration.setMaxPendingMessages(producerQueueSize);
this.producerConfiguration.setProducerName(getReplicatorName(replicatorPrefix, localCluster));
STATE_UPDATER.set(this, State.Stopped);
}

Expand All @@ -74,9 +80,13 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca
protected abstract Position getReplicatorReadPosition();

protected abstract long getNumberOfEntriesInBacklog();

protected abstract void disableReplicatorRead();


public ProducerConfiguration getProducerConfiguration() {
return producerConfiguration;
}

public String getRemoteCluster() {
return remoteCluster;
}
Expand Down Expand Up @@ -111,23 +121,22 @@ public synchronized void startProducer() {
}

log.info("[{}][{} -> {}] Starting replicator", topicName, localCluster, remoteCluster);
client.createProducerAsync(topicName, producerConfiguration, getReplicatorName(replicatorPrefix, localCluster))
.thenAccept(producer -> {
readEntries(producer);
}).exceptionally(ex -> {
if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) {
long waitTimeMs = backOff.next();
log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", topicName,
localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0);

// BackOff before retrying
brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
} else {
log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName,
localCluster, remoteCluster, STATE_UPDATER.get(this), ex);
}
return null;
});
client.createProducerAsync(topicName, producerConfiguration).thenAccept(producer -> {
readEntries(producer);
}).exceptionally(ex -> {
if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) {
long waitTimeMs = backOff.next();
log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", topicName,
localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0);

// BackOff before retrying
brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS);
} else {
log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName,
localCluster, remoteCluster, STATE_UPDATER.get(this), ex);
}
return null;
});

}

Expand Down Expand Up @@ -196,10 +205,6 @@ protected boolean isWritable() {
return producer != null && producer.isWritable();
}

public static void setReplicatorQueueSize(int queueSize) {
producerConfiguration.setMaxPendingMessages(queueSize);
}

public static String getRemoteCluster(String remoteCursor) {
String[] split = remoteCursor.split("\\.");
return split[split.length - 1];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ public void recordLatency(EventType eventType, long latencyMs) {
pulsarStats.recordZkLatencyTimeValue(eventType, latencyMs);
}
};
PersistentReplicator.setReplicatorQueueSize(pulsar.getConfiguration().getReplicationProducerQueueSize());
}

public void start() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.Topic.PublishContext;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
import org.apache.pulsar.common.naming.DestinationName;
Expand Down Expand Up @@ -185,6 +186,18 @@ public void recordMessageDrop(int batchSize) {
}
}

/**
* Return the sequence id of
* @return
*/
public long getLastSequenceId() {
if (isNonPersistentTopic) {
return -1;
} else {
return ((PersistentTopic) topic).getLastPublishedSequenceId(producerName);
}
}

private static final class MessagePublishContext implements PublishContext, Runnable {
private Producer producer;
private long sequenceId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,8 @@ protected void handleProducer(final CommandProducer cmdProducer) {
if (isActive()) {
if (producerFuture.complete(producer)) {
log.info("[{}] Created new producer: {}", remoteAddress, producer);
ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName));
ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName,
producer.getLastSequenceId()));
return;
} else {
// The producer's future was completed before by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.SendCallback;
import org.apache.pulsar.common.policies.data.NonPersistentReplicatorStats;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,8 +52,6 @@ public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, St
BrokerService brokerService) {
super(topic.getName(), topic.replicatorPrefix, localCluster, remoteCluster, brokerService);

producerConfiguration
.setMaxPendingMessages(brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize());
producerConfiguration.setBlockIfQueueFull(false);

startProducer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,5 +417,10 @@ public synchronized void purgeInactiveProducers() {
}
}

public long getLastPublishedSequenceId(String producerName) {
Long sequenceId = highestSequencedPushed.get(producerName);
return sequenceId != null ? sequenceId : -1;
}

private static final Logger log = LoggerFactory.getLogger(MessageDeduplication.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat
private final PersistentTopic topic;
private final ManagedCursor cursor;

private final int producerQueueSize;

private static final int MaxReadBatchSize = 100;
private int readBatchSize;

Expand Down Expand Up @@ -97,7 +97,6 @@ public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String
HAVE_PENDING_READ_UPDATER.set(this, FALSE);
PENDING_MESSAGES_UPDATER.set(this, 0);

producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize();
readBatchSize = Math.min(producerQueueSize, MaxReadBatchSize);
producerQueueThreshold = (int) (producerQueueSize * 0.9);

Expand Down Expand Up @@ -139,14 +138,14 @@ protected Position getReplicatorReadPosition() {
protected long getNumberOfEntriesInBacklog() {
return cursor.getNumberOfEntriesInBacklog();
}

@Override
protected void disableReplicatorRead() {
// deactivate cursor after successfully close the producer
this.cursor.setInactive();
}


protected void readMoreEntries() {
int availablePermits = producerQueueSize - PENDING_MESSAGES_UPDATER.get(this);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1468,5 +1468,9 @@ public DispatchRateLimiter getDispatchRateLimiter() {
return this.dispatchRateLimiter;
}

public long getLastPublishedSequenceId(String producerName) {
return messageDeduplication.getLastPublishedSequenceId(producerName);
}

private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void setup() throws Exception {
doReturn(zkDataCache).when(configCacheService).policiesCache();
doReturn(configCacheService).when(pulsar).getConfigurationCache();
doReturn(Optional.empty()).when(zkDataCache).get(anyString());

LocalZooKeeperCacheService zkCache = mock(LocalZooKeeperCacheService.class);
doReturn(CompletableFuture.completedFuture(Optional.empty())).when(zkDataCache).getAsync(any());
doReturn(zkDataCache).when(zkCache).policiesCache();
Expand Down Expand Up @@ -945,31 +945,28 @@ public void testClosingReplicationProducerTwice() throws Exception {
doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();

PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService);
String remoteReplicatorName = topic.replicatorPrefix + "." + localCluster;

final URL brokerUrl = new URL(
"http://" + pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getBrokerServicePort());
PulsarClient client = spy( PulsarClient.create(brokerUrl.toString()) );
PulsarClientImpl clientImpl = (PulsarClientImpl) client;
Field conf = AbstractReplicator.class.getDeclaredField("producerConfiguration");
conf.setAccessible(true);

ManagedCursor cursor = mock(ManagedCursorImpl.class);
doReturn(remoteCluster).when(cursor).getName();
brokerService.getReplicationClients().put(remoteCluster, client);
PersistentReplicator replicator = new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService);

doReturn(new CompletableFuture<Producer>()).when(clientImpl).createProducerAsync(globalTopicName, (ProducerConfiguration) conf.get(replicator), remoteReplicatorName);
doReturn(new CompletableFuture<Producer>()).when(clientImpl).createProducerAsync(globalTopicName, replicator.getProducerConfiguration());

replicator.startProducer();
verify(clientImpl).createProducerAsync(globalTopicName, (ProducerConfiguration) conf.get(replicator), remoteReplicatorName);
verify(clientImpl).createProducerAsync(globalTopicName, replicator.getProducerConfiguration());

replicator.disconnect(false);
replicator.disconnect(false);

replicator.startProducer();

verify(clientImpl, Mockito.times(2)).createProducerAsync(globalTopicName, (ProducerConfiguration) conf.get(replicator), remoteReplicatorName);
verify(clientImpl, Mockito.times(2)).createProducerAsync(globalTopicName, replicator.getProducerConfiguration());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ public void testConcurrentReplicator() throws Exception {
}
Thread.sleep(3000);

Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.anyString(), Mockito.anyObject(),
Mockito.anyString());
Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.anyString(), Mockito.anyObject());

}

Expand Down Expand Up @@ -623,7 +622,7 @@ public Void call() throws Exception {
/**
* It verifies that: if it fails while removing replicator-cluster-cursor: it should not restart the replicator and
* it should have cleaned up from the list
*
*
* @throws Exception
*/
@Test
Expand Down Expand Up @@ -750,7 +749,7 @@ public void testResumptionAfterBacklogRelaxed() throws Exception {
/**
* It verifies that PersistentReplicator considers CursorAlreadyClosedException as non-retriable-read exception and
* it should closed the producer as cursor is already closed because replicator is already deleted.
*
*
* @throws Exception
*/
@Test(timeOut = 5000)
Expand Down
Loading

0 comments on commit d4a8c25

Please sign in to comment.