diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java index 76ce20b5c9088..8aa18305ec680 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractReplicator.java @@ -42,17 +42,17 @@ public abstract class AbstractReplicator { protected volatile ProducerImpl producer; - protected static final ProducerConfiguration producerConfiguration = new ProducerConfiguration() - .setSendTimeout(0, TimeUnit.SECONDS).setBlockIfQueueFull(true); + protected final int producerQueueSize; + protected final ProducerConfiguration producerConfiguration; protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES, 0 ,TimeUnit.MILLISECONDS); protected final String replicatorPrefix; - + protected static final AtomicReferenceFieldUpdater STATE_UPDATER = AtomicReferenceFieldUpdater .newUpdater(AbstractReplicator.class, State.class, "state"); private volatile State state = State.Stopped; - + protected enum State { Stopped, Starting, Started, Stopping } @@ -66,6 +66,12 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca this.remoteCluster = remoteCluster; this.client = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster); this.producer = null; + this.producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize(); + + this.producerConfiguration = new ProducerConfiguration(); + this.producerConfiguration.setSendTimeout(0, TimeUnit.SECONDS); + this.producerConfiguration.setMaxPendingMessages(producerQueueSize); + this.producerConfiguration.setProducerName(getReplicatorName(replicatorPrefix, localCluster)); STATE_UPDATER.set(this, State.Stopped); } @@ -74,9 +80,13 @@ public AbstractReplicator(String topicName, String replicatorPrefix, String loca protected abstract Position getReplicatorReadPosition(); protected abstract long getNumberOfEntriesInBacklog(); - + protected abstract void disableReplicatorRead(); - + + public ProducerConfiguration getProducerConfiguration() { + return producerConfiguration; + } + public String getRemoteCluster() { return remoteCluster; } @@ -111,23 +121,22 @@ public synchronized void startProducer() { } log.info("[{}][{} -> {}] Starting replicator", topicName, localCluster, remoteCluster); - client.createProducerAsync(topicName, producerConfiguration, getReplicatorName(replicatorPrefix, localCluster)) - .thenAccept(producer -> { - readEntries(producer); - }).exceptionally(ex -> { - if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) { - long waitTimeMs = backOff.next(); - log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", topicName, - localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0); - - // BackOff before retrying - brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS); - } else { - log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName, - localCluster, remoteCluster, STATE_UPDATER.get(this), ex); - } - return null; - }); + client.createProducerAsync(topicName, producerConfiguration).thenAccept(producer -> { + readEntries(producer); + }).exceptionally(ex -> { + if (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopped)) { + long waitTimeMs = backOff.next(); + log.warn("[{}][{} -> {}] Failed to create remote producer ({}), retrying in {} s", topicName, + localCluster, remoteCluster, ex.getMessage(), waitTimeMs / 1000.0); + + // BackOff before retrying + brokerService.executor().schedule(this::startProducer, waitTimeMs, TimeUnit.MILLISECONDS); + } else { + log.warn("[{}][{} -> {}] Failed to create remote producer. Replicator state: {}", topicName, + localCluster, remoteCluster, STATE_UPDATER.get(this), ex); + } + return null; + }); } @@ -196,10 +205,6 @@ protected boolean isWritable() { return producer != null && producer.isWritable(); } - public static void setReplicatorQueueSize(int queueSize) { - producerConfiguration.setMaxPendingMessages(queueSize); - } - public static String getRemoteCluster(String remoteCursor) { String[] split = remoteCursor.split("\\."); return split[split.length - 1]; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 468005727c46c..bcfba0efdc595 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -262,7 +262,6 @@ public void recordLatency(EventType eventType, long latencyMs) { pulsarStats.recordZkLatencyTimeValue(eventType, latencyMs); } }; - PersistentReplicator.setReplicatorQueueSize(pulsar.getConfiguration().getReplicationProducerQueueSize()); } public void start() throws Exception { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index 101afa17ce578..076aaec469322 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -32,6 +32,7 @@ import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException; import org.apache.pulsar.broker.service.Topic.PublishContext; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.common.api.Commands; import org.apache.pulsar.common.api.proto.PulsarApi.ServerError; import org.apache.pulsar.common.naming.DestinationName; @@ -185,6 +186,18 @@ public void recordMessageDrop(int batchSize) { } } + /** + * Return the sequence id of + * @return + */ + public long getLastSequenceId() { + if (isNonPersistentTopic) { + return -1; + } else { + return ((PersistentTopic) topic).getLastPublishedSequenceId(producerName); + } + } + private static final class MessagePublishContext implements PublishContext, Runnable { private Producer producer; private long sequenceId; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index b5a166e06c360..bd9a1f506c333 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -509,7 +509,8 @@ protected void handleProducer(final CommandProducer cmdProducer) { if (isActive()) { if (producerFuture.complete(producer)) { log.info("[{}] Created new producer: {}", remoteAddress, producer); - ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName)); + ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName, + producer.getLastSequenceId())); return; } else { // The producer's future was completed before by diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java index 0296e805d9a18..ae3316bea5b9f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentReplicator.java @@ -34,7 +34,6 @@ import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.SendCallback; import org.apache.pulsar.common.policies.data.NonPersistentReplicatorStats; -import org.apache.pulsar.common.policies.data.ReplicatorStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,8 +52,6 @@ public NonPersistentReplicator(NonPersistentTopic topic, String localCluster, St BrokerService brokerService) { super(topic.getName(), topic.replicatorPrefix, localCluster, remoteCluster, brokerService); - producerConfiguration - .setMaxPendingMessages(brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize()); producerConfiguration.setBlockIfQueueFull(false); startProducer(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index dae88af9ac6fd..6333f3a893ca0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -417,5 +417,10 @@ public synchronized void purgeInactiveProducers() { } } + public long getLastPublishedSequenceId(String producerName) { + Long sequenceId = highestSequencedPushed.get(producerName); + return sequenceId != null ? sequenceId : -1; + } + private static final Logger log = LoggerFactory.getLogger(MessageDeduplication.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 48bbf0975b7c3..096fc51280e53 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -58,7 +58,7 @@ public class PersistentReplicator extends AbstractReplicator implements Replicat private final PersistentTopic topic; private final ManagedCursor cursor; - private final int producerQueueSize; + private static final int MaxReadBatchSize = 100; private int readBatchSize; @@ -97,7 +97,6 @@ public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String HAVE_PENDING_READ_UPDATER.set(this, FALSE); PENDING_MESSAGES_UPDATER.set(this, 0); - producerQueueSize = brokerService.pulsar().getConfiguration().getReplicationProducerQueueSize(); readBatchSize = Math.min(producerQueueSize, MaxReadBatchSize); producerQueueThreshold = (int) (producerQueueSize * 0.9); @@ -139,14 +138,14 @@ protected Position getReplicatorReadPosition() { protected long getNumberOfEntriesInBacklog() { return cursor.getNumberOfEntriesInBacklog(); } - + @Override protected void disableReplicatorRead() { // deactivate cursor after successfully close the producer this.cursor.setInactive(); } - + protected void readMoreEntries() { int availablePermits = producerQueueSize - PENDING_MESSAGES_UPDATER.get(this); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 136f4f40b2888..83d9344670601 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1468,5 +1468,9 @@ public DispatchRateLimiter getDispatchRateLimiter() { return this.dispatchRateLimiter; } + public long getLastPublishedSequenceId(String producerName) { + return messageDeduplication.getLastPublishedSequenceId(producerName); + } + private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 899676b936c1d..fe264e967e9ef 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -142,7 +142,7 @@ public void setup() throws Exception { doReturn(zkDataCache).when(configCacheService).policiesCache(); doReturn(configCacheService).when(pulsar).getConfigurationCache(); doReturn(Optional.empty()).when(zkDataCache).get(anyString()); - + LocalZooKeeperCacheService zkCache = mock(LocalZooKeeperCacheService.class); doReturn(CompletableFuture.completedFuture(Optional.empty())).when(zkDataCache).getAsync(any()); doReturn(zkDataCache).when(zkCache).policiesCache(); @@ -945,31 +945,28 @@ public void testClosingReplicationProducerTwice() throws Exception { doReturn(new ArrayList()).when(ledgerMock).getCursors(); PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService); - String remoteReplicatorName = topic.replicatorPrefix + "." + localCluster; final URL brokerUrl = new URL( "http://" + pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getBrokerServicePort()); PulsarClient client = spy( PulsarClient.create(brokerUrl.toString()) ); PulsarClientImpl clientImpl = (PulsarClientImpl) client; - Field conf = AbstractReplicator.class.getDeclaredField("producerConfiguration"); - conf.setAccessible(true); ManagedCursor cursor = mock(ManagedCursorImpl.class); doReturn(remoteCluster).when(cursor).getName(); brokerService.getReplicationClients().put(remoteCluster, client); PersistentReplicator replicator = new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService); - doReturn(new CompletableFuture()).when(clientImpl).createProducerAsync(globalTopicName, (ProducerConfiguration) conf.get(replicator), remoteReplicatorName); + doReturn(new CompletableFuture()).when(clientImpl).createProducerAsync(globalTopicName, replicator.getProducerConfiguration()); replicator.startProducer(); - verify(clientImpl).createProducerAsync(globalTopicName, (ProducerConfiguration) conf.get(replicator), remoteReplicatorName); + verify(clientImpl).createProducerAsync(globalTopicName, replicator.getProducerConfiguration()); replicator.disconnect(false); replicator.disconnect(false); replicator.startProducer(); - verify(clientImpl, Mockito.times(2)).createProducerAsync(globalTopicName, (ProducerConfiguration) conf.get(replicator), remoteReplicatorName); + verify(clientImpl, Mockito.times(2)).createProducerAsync(globalTopicName, replicator.getProducerConfiguration()); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index e640462470837..b7860683063db 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -212,8 +212,7 @@ public void testConcurrentReplicator() throws Exception { } Thread.sleep(3000); - Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.anyString(), Mockito.anyObject(), - Mockito.anyString()); + Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.anyString(), Mockito.anyObject()); } @@ -623,7 +622,7 @@ public Void call() throws Exception { /** * It verifies that: if it fails while removing replicator-cluster-cursor: it should not restart the replicator and * it should have cleaned up from the list - * + * * @throws Exception */ @Test @@ -750,7 +749,7 @@ public void testResumptionAfterBacklogRelaxed() throws Exception { /** * It verifies that PersistentReplicator considers CursorAlreadyClosedException as non-retriable-read exception and * it should closed the producer as cursor is already closed because replicator is already deleted. - * + * * @throws Exception */ @Test(timeOut = 5000) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java new file mode 100644 index 0000000000000..fb5cc0fa8635c --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationTest.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNull; + +import java.util.concurrent.TimeUnit; + +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class ClientDeduplicationTest extends ProducerConsumerBase { + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testProducerSequenceAfterReconnect() throws Exception { + String topic = "persistent://my-property/use/my-ns/testProducerSequenceAfterReconnect"; + admin.namespaces().setDeduplicationStatus("my-property/use/my-ns", true); + + ProducerConfiguration conf = new ProducerConfiguration(); + conf.setProducerName("my-producer-name"); + Producer producer = pulsarClient.createProducer(topic, conf); + + assertEquals(producer.getLastSequenceId(), -1L); + + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + assertEquals(producer.getLastSequenceId(), i); + } + + producer.close(); + + producer = pulsarClient.createProducer(topic, conf); + assertEquals(producer.getLastSequenceId(), 9L); + + for (int i = 10; i < 20; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + assertEquals(producer.getLastSequenceId(), i); + } + + producer.close(); + } + + @Test + public void testProducerSequenceAfterRestart() throws Exception { + String topic = "persistent://my-property/use/my-ns/testProducerSequenceAfterRestart"; + admin.namespaces().setDeduplicationStatus("my-property/use/my-ns", true); + + ProducerConfiguration conf = new ProducerConfiguration(); + conf.setProducerName("my-producer-name"); + Producer producer = pulsarClient.createProducer(topic, conf); + + assertEquals(producer.getLastSequenceId(), -1L); + + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + assertEquals(producer.getLastSequenceId(), i); + } + + producer.close(); + + // Kill and restart broker + restartBroker(); + + producer = pulsarClient.createProducer(topic, conf); + assertEquals(producer.getLastSequenceId(), 9L); + + for (int i = 10; i < 20; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + assertEquals(producer.getLastSequenceId(), i); + } + + producer.close(); + } + + @Test(timeOut = 30000) + public void testProducerDeduplication() throws Exception { + String topic = "persistent://my-property/use/my-ns/testProducerDeduplication"; + admin.namespaces().setDeduplicationStatus("my-property/use/my-ns", true); + + ProducerConfiguration conf = new ProducerConfiguration(); + conf.setProducerName("my-producer-name"); + + // Set infinite timeout + conf.setSendTimeout(0, TimeUnit.SECONDS); + Producer producer = pulsarClient.createProducer(topic, conf); + + assertEquals(producer.getLastSequenceId(), -1L); + + Consumer consumer = pulsarClient.subscribe(topic, "my-subscription"); + + producer.send(MessageBuilder.create().setContent("my-message-0".getBytes()).setSequenceId(0).build()); + producer.send(MessageBuilder.create().setContent("my-message-1".getBytes()).setSequenceId(1).build()); + producer.send(MessageBuilder.create().setContent("my-message-2".getBytes()).setSequenceId(2).build()); + + // Repeat the messages and verify they're not received by consumer + producer.send(MessageBuilder.create().setContent("my-message-1".getBytes()).setSequenceId(1).build()); + producer.send(MessageBuilder.create().setContent("my-message-2".getBytes()).setSequenceId(2).build()); + + producer.close(); + + for (int i = 0; i < 3; i++) { + Message msg = consumer.receive(); + assertEquals(new String(msg.getData()), "my-message-" + i); + consumer.acknowledge(msg); + } + + // No other messages should be received + Message msg = consumer.receive(1, TimeUnit.SECONDS); + assertNull(msg); + + // Kill and restart broker + restartBroker(); + + producer = pulsarClient.createProducer(topic, conf); + assertEquals(producer.getLastSequenceId(), 2L); + + // Repeat the messages and verify they're not received by consumer + producer.send(MessageBuilder.create().setContent("my-message-1".getBytes()).setSequenceId(1).build()); + producer.send(MessageBuilder.create().setContent("my-message-2".getBytes()).setSequenceId(2).build()); + + msg = consumer.receive(1, TimeUnit.SECONDS); + assertNull(msg); + + producer.close(); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java index aa8dba2d1c108..949dc46d35e45 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java @@ -118,7 +118,7 @@ public Object[][] subType() { * 5. unload "my-ns2" which closes the connection as broker doesn't have any more client connected on that connection * 6. all namespace-bundles are in "connecting" state and waiting for available broker * - * + * * @throws Exception */ @Test @@ -237,7 +237,7 @@ public void testDisconnectClientWithoutClosingConnection() throws Exception { /** * Verifies: 1. Closing of Broker service unloads all bundle gracefully and there must not be any connected bundles * after closing broker service - * + * * @throws Exception */ @Test @@ -294,7 +294,7 @@ public void testCloseBrokerService() throws Exception { * 1. broker disconnects that consumer *

* 2. redeliver all those messages to other supported consumer under the same subscription - * + * * @param subType * @throws Exception */ @@ -491,7 +491,7 @@ public void testResetCursor(SubscriptionType subType) throws Exception { * 3. create multiple producer and make lookup-requests simultaneously * 4. Client1 receives TooManyLookupException and should close connection * - * + * * @throws Exception */ @Test(timeOut = 5000) @@ -543,7 +543,7 @@ public void testCloseConnectionOnBrokerRejectedRequest() throws Exception { return null; }); }); - + } latch.await(10, TimeUnit.SECONDS); @@ -562,14 +562,14 @@ public void testCloseConnectionOnBrokerRejectedRequest() throws Exception { /** * It verifies that broker throttles down configured concurrent topic loading requests - * + * *

-     * 1. Start broker with N maxConcurrentTopicLoadRequest 
+     * 1. Start broker with N maxConcurrentTopicLoadRequest
      * 2. create concurrent producers on different topics which makes broker to load topics concurrently
      * 3. Producer operationtimeout = 1 ms so, if producers creation will fail for throttled topics
      * 4. verify all producers should have connected
      * 
- * + * * @throws Exception */ @Test(timeOut = 5000) @@ -613,8 +613,8 @@ public void testMaxConcurrentTopicLoading() throws Exception { final String randomTopicName1 = topicName + randomUUID().toString(); final String randomTopicName2 = topicName + randomUUID().toString(); // pass producer-name to avoid exception: producer is already connected to topic - futures.add(pulsarClient2.createProducerAsync(randomTopicName1, config1, randomTopicName1)); - futures.add(pulsarClient.createProducerAsync(randomTopicName2, config1, randomTopicName2)); + futures.add(pulsarClient2.createProducerAsync(randomTopicName1, config1)); + futures.add(pulsarClient.createProducerAsync(randomTopicName2, config1)); latch.countDown(); }); } @@ -630,7 +630,7 @@ public void testMaxConcurrentTopicLoading() throws Exception { } /** * It verifies that client closes the connection on internalSerevrError which is "ServiceNotReady" from Broker-side - * + * * @throws Exception */ @Test(timeOut = 5000) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java index 5160131fe8cbe..e4baf06b95c0c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageBuilder.java @@ -109,6 +109,24 @@ public static MessageBuilder create() { */ MessageBuilder setEventTime(long timestamp); + /** + * Specify a custom sequence id for the message being published. + *

+ * The sequence id can be used for deduplication purposes and it needs to follow these rules: + *

    + *
  1. sequenceId >= 0 + *
  2. Sequence id for a message needs to be greater than sequence id for earlier messages: + * sequenceId(N+1) > sequenceId(N) + *
  3. It's not necessary for sequence ids to be consecutive. There can be holes between messages. Eg. the + * sequenceId could represent an offset or a cumulative size. + *
+ * + * @param sequenceId + * the sequence id to assign to the current message + * @since 1.20.0 + */ + MessageBuilder setSequenceId(long sequenceId); + /** * Override the replication clusters for this message. * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Producer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Producer.java index cecdc24f8ef24..878899c964088 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Producer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Producer.java @@ -25,7 +25,7 @@ /** * Producer object. - * + * * The producer is used to publish messages on a topic * * @@ -37,6 +37,11 @@ public interface Producer extends Closeable { */ String getTopic(); + /** + * @return the producer name which could have been assigned by the system or specified by the client + */ + String getProducerName(); + /** * Send a message * @@ -82,7 +87,7 @@ public interface Producer extends Closeable { * contain the {@link MessageId} assigned by the broker to the published message. *

* Example: - * + * *

      * Message msg = MessageBuilder.create().setContent(myContent).build();
      * producer.sendAsync(msg).thenRun(v -> {
@@ -103,25 +108,38 @@ public interface Producer extends Closeable {
      */
     CompletableFuture sendAsync(Message message);
 
+    /**
+     * Get the last sequence id that was published by this producer.
+     * 

+ * This represent either the automatically assigned or custom sequence id (set on the {@link MessageBuilder}) that + * was published and acknowledged by the broker. + *

+ * After recreating a producer with the same producer name, this will return the last message that was published in + * the previous producer session, or -1 if there no message was ever published. + * + * @return the last sequence id published by this producer + */ + long getLastSequenceId(); + /** * Get statistics for the producer - * + * * numMsgsSent : Number of messages sent in the current interval numBytesSent : Number of bytes sent in the current * interval numSendFailed : Number of messages failed to send in the current interval numAcksReceived : Number of * acks received in the current interval totalMsgsSent : Total number of messages sent totalBytesSent : Total number * of bytes sent totalSendFailed : Total number of messages failed to send totalAcksReceived: Total number of acks * received - * + * * @return statistic for the producer or null if ProducerStats is disabled. */ ProducerStats getStats(); /** * Close the producer and releases resources allocated. - * + * * No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case * of errors, pending writes will not be retried. - * + * * @throws PulsarClientException.AlreadyClosedException * if the producer was already closed */ @@ -130,10 +148,10 @@ public interface Producer extends Closeable { /** * Close the producer and releases resources allocated. - * + * * No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case * of errors, pending writes will not be retried. - * + * * @return a future that can used to track when the producer has been closed */ CompletableFuture closeAsync(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java index 92ebe20911dd3..6e37afa8d2876 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java @@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import java.io.Serializable; +import java.util.Optional; import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.impl.RoundRobinPartitionMessageRouterImpl; @@ -35,10 +36,8 @@ */ public class ProducerConfiguration implements Serializable { - /** - * - */ private static final long serialVersionUID = 1L; + private String producerName = null; private long sendTimeoutMs = 30000; private boolean blockIfQueueFull = false; private int maxPendingMessages = 1000; @@ -50,10 +49,38 @@ public class ProducerConfiguration implements Serializable { private CompressionType compressionType = CompressionType.NONE; + // Cannot use Optional since it's not serializable + private Long initialSequenceId = null; + public enum MessageRoutingMode { SinglePartition, RoundRobinPartition, CustomPartition } + /** + * @return the configured custom producer name or null if no custom name was specified + * @since 1.20.0 + */ + public String getProducerName() { + return producerName; + } + + /** + * Specify a name for the producer + *

+ * If not assigned, the system will generate a globally unique name which can be access with + * {@link Producer#getProducerName()}. + *

+ * When specifying a name, it is app to the user to ensure that, for a given topic, the producer name is unique + * across all Pulsar's clusters. + * + * @param producerName + * the custom name to use for the producer + * @since 1.20.0 + */ + public void setProducerName(String producerName) { + this.producerName = producerName; + } + /** * @return the message send timeout in ms */ @@ -87,8 +114,8 @@ public int getMaxPendingMessages() { /** * Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker. *

- * When the queue is full, by default, all calls to {@link Producer#send} and {@link Producer#sendAsync} - * will fail unless blockIfQueueFull is set to true. Use {@link #setBlockIfQueueFull} to change the blocking behavior. + * When the queue is full, by default, all calls to {@link Producer#send} and {@link Producer#sendAsync} will fail + * unless blockIfQueueFull is set to true. Use {@link #setBlockIfQueueFull} to change the blocking behavior. * * @param maxPendingMessages * @return @@ -227,7 +254,7 @@ public boolean getBatchingEnabled() { * contents. * * When enabled default batch delay is set to 10 ms and default batch size is 1000 messages - * + * * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit) * @since 1.0.36
* Make sure all the consumer applications have been updated to use this client version, before starting to @@ -240,7 +267,7 @@ public ProducerConfiguration setBatchingEnabled(boolean batchMessagesEnabled) { } /** - * + * * @return the batch time period in ms. * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit) */ @@ -251,7 +278,7 @@ public long getBatchingMaxPublishDelayMs() { /** * Set the time period within which the messages sent will be batched default: 10ms if batch messages are * enabled. If set to a non zero value, messages will be queued until this time interval or until - * + * * @see ProducerConfiguration#batchingMaxMessages threshold is reached; all messages will be published as a single * batch message. The consumer will be delivered individual messages in the batch in the same order they were * enqueued @@ -272,7 +299,7 @@ public ProducerConfiguration setBatchingMaxPublishDelay(long batchDelay, TimeUni } /** - * + * * @return the maximum number of messages permitted in a batch. */ public int getBatchingMaxMessages() { @@ -282,7 +309,7 @@ public int getBatchingMaxMessages() { /** * Set the maximum number of messages permitted in a batch. default: 1000 If set to a value greater than 1, * messages will be queued until this threshold is reached or batch interval has elapsed - * + * * @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit) All messages in batch will be published as * a single batch message. The consumer will be delivered individual messages in the batch in the same order * they were enqueued @@ -296,6 +323,24 @@ public ProducerConfiguration setBatchingMaxMessages(int batchMessagesMaxMessages return this; } + public Optional getInitialSequenceId() { + return initialSequenceId != null ? Optional.of(initialSequenceId) : Optional.empty(); + } + + /** + * Set the baseline for the sequence ids for messages published by the producer. + *

+ * First message will be using (initialSequenceId + 1) as its sequence id and subsequent messages will be assigned + * incremental sequence ids, if not otherwise specified. + * + * @param initialSequenceId + * @return + */ + public ProducerConfiguration setInitialSequenceId(long initialSequenceId) { + this.initialSequenceId = initialSequenceId; + return this; + } + @Override public boolean equals(Object obj) { if (obj instanceof ProducerConfiguration) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java index 562a65b55dffc..1fc0f195b37c5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java @@ -28,6 +28,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult; @@ -61,7 +63,8 @@ public class ClientCnx extends PulsarHandler { private final Authentication authentication; private State state; - private final ConcurrentLongHashMap> pendingRequests = new ConcurrentLongHashMap<>(16, 1); + private final ConcurrentLongHashMap>> pendingRequests = new ConcurrentLongHashMap<>( + 16, 1); private final ConcurrentLongHashMap> pendingLookupRequests = new ConcurrentLongHashMap<>( 16, 1); private final ConcurrentLongHashMap producers = new ConcurrentLongHashMap<>(16, 1); @@ -179,7 +182,7 @@ protected void handleSendReceipt(CommandSendReceipt sendReceipt) { log.warn("[{}] Message has been dropped for non-persistent topic producer-id {}", ctx.channel(), producerId); } - + if (log.isDebugEnabled()) { log.debug("{} Got receipt for producer: {} -- msg: {} -- id: {}:{}", ctx.channel(), producerId, sequenceId, ledgerId, entryId); @@ -209,7 +212,7 @@ protected void handleSuccess(CommandSuccess success) { log.debug("{} Received success response from server: {}", ctx.channel(), success.getRequestId()); } long requestId = success.getRequestId(); - CompletableFuture requestFuture = pendingRequests.remove(requestId); + CompletableFuture> requestFuture = pendingRequests.remove(requestId); if (requestFuture != null) { requestFuture.complete(null); } else { @@ -226,9 +229,9 @@ protected void handleProducerSuccess(CommandProducerSuccess success) { success.getRequestId(), success.getProducerName()); } long requestId = success.getRequestId(); - CompletableFuture requestFuture = pendingRequests.remove(requestId); + CompletableFuture> requestFuture = pendingRequests.remove(requestId); if (requestFuture != null) { - requestFuture.complete(success.getProducerName()); + requestFuture.complete(new ImmutablePair<>(success.getProducerName(), success.getLastSequenceId())); } else { log.warn("{} Received unknown request id from server: {}", ctx.channel(), success.getRequestId()); } @@ -351,7 +354,7 @@ protected void handleError(CommandError error) { log.warn("{} Producer creation has been blocked because backlog quota exceeded for producer topic", ctx.channel()); } - CompletableFuture requestFuture = pendingRequests.remove(requestId); + CompletableFuture> requestFuture = pendingRequests.remove(requestId); if (requestFuture != null) { requestFuture.completeExceptionally(getPulsarClientException(error.getError(), error.getMessage())); } else { @@ -430,8 +433,8 @@ CompletableFuture connectionFuture() { return connectionFuture; } - CompletableFuture sendRequestWithId(ByteBuf cmd, long requestId) { - CompletableFuture future = new CompletableFuture<>(); + CompletableFuture> sendRequestWithId(ByteBuf cmd, long requestId) { + CompletableFuture> future = new CompletableFuture<>(); pendingRequests.put(requestId, future); ctx.writeAndFlush(cmd).addListener(writeFuture -> { if (!writeFuture.isSuccess()) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java index fb574239788e3..c0dd67fdcec52 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageBuilderImpl.java @@ -88,6 +88,13 @@ public MessageBuilder setEventTime(long timestamp) { return this; } + @Override + public MessageBuilder setSequenceId(long sequenceId) { + checkArgument(sequenceId >= 0); + msgMetadataBuilder.setSequenceId(sequenceId); + return this; + } + @Override public MessageBuilder setReplicationClusters(List clusters) { Preconditions.checkNotNull(clusters); @@ -102,4 +109,6 @@ public MessageBuilder disableReplication() { msgMetadataBuilder.addReplicateTo("__local__"); return this; } + + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java index 469f65b931bd6..c8817b41dd976 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java @@ -55,13 +55,25 @@ public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerCo start(); } + @Override + public String getProducerName() { + return producers.get(0).getProducerName(); + } + + @Override + public long getLastSequenceId() { + // Return the highest sequence id across all partitions. This will be correct, + // since there is a single id generator across all partitions for the same producer + return producers.stream().map(Producer::getLastSequenceId).mapToLong(Long::longValue).max().orElse(-1); + } + private void start() { AtomicReference createFail = new AtomicReference(); AtomicInteger completed = new AtomicInteger(); for (int partitionIndex = 0; partitionIndex < numPartitions; partitionIndex++) { String partitionName = DestinationName.get(topic).getPartition(partitionIndex).toString(); - ProducerImpl producer = new ProducerImpl(client, partitionName, null, conf, - new CompletableFuture(), partitionIndex); + ProducerImpl producer = new ProducerImpl(client, partitionName, conf, new CompletableFuture(), + partitionIndex); producers.add(producer); producer.producerCreatedFuture().handle((prod, createException) -> { if (createException != null) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 87208fac67e30..c0a57226ec92e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -71,7 +71,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask { // Variable is used through the atomic updater @SuppressWarnings("unused") - private volatile long msgIdGenerator = 0; + private volatile long msgIdGenerator; private final BlockingQueue pendingMessages; private final BlockingQueue pendingCallbacks; @@ -92,14 +92,16 @@ public class ProducerImpl extends ProducerBase implements TimerTask { private final CompressionCodec compressor; + private volatile long lastSequenceIdPublished; + private static final AtomicLongFieldUpdater msgIdGeneratorUpdater = AtomicLongFieldUpdater .newUpdater(ProducerImpl.class, "msgIdGenerator"); - public ProducerImpl(PulsarClientImpl client, String topic, String producerName, ProducerConfiguration conf, + public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration conf, CompletableFuture producerCreatedFuture, int partitionIndex) { super(client, topic, conf, producerCreatedFuture); this.producerId = client.newProducerId(); - this.producerName = producerName; + this.producerName = conf.getProducerName(); this.partitionIndex = partitionIndex; this.pendingMessages = Queues.newArrayBlockingQueue(conf.getMaxPendingMessages()); this.pendingCallbacks = Queues.newArrayBlockingQueue(conf.getMaxPendingMessages()); @@ -107,6 +109,15 @@ public ProducerImpl(PulsarClientImpl client, String topic, String producerName, this.compressor = CompressionCodecProvider .getCompressionCodec(convertCompressionType(conf.getCompressionType())); + if (conf.getInitialSequenceId().isPresent()) { + long initialSequenceId = conf.getInitialSequenceId().get(); + this.lastSequenceIdPublished = initialSequenceId; + this.msgIdGenerator = initialSequenceId + 1; + } else { + this.lastSequenceIdPublished = -1; + this.msgIdGenerator = 0; + } + if (conf.getSendTimeoutMs() > 0) { sendTimeout = client.timer().newTimeout(this, conf.getSendTimeoutMs(), TimeUnit.MILLISECONDS); } @@ -132,6 +143,11 @@ private boolean isBatchMessagingEnabled() { return conf.getBatchingEnabled(); } + @Override + public long getLastSequenceId() { + return lastSequenceIdPublished; + } + @Override public CompletableFuture sendAsync(Message message) { CompletableFuture future = new CompletableFuture<>(); @@ -217,7 +233,7 @@ public void sendAsync(Message message, SendCallback callback) { PulsarDecoder.MaxMessageSize))); return; } - + if (!msg.isReplicated() && msgMetadata.hasProducerName()) { callback.sendComplete(new PulsarClientException.InvalidMessageException("Cannot re-use the same message")); compressedPayload.release(); @@ -226,15 +242,20 @@ public void sendAsync(Message message, SendCallback callback) { try { synchronized (this) { - long sequenceId = msgIdGeneratorUpdater.getAndIncrement(this); + long sequenceId; + if (!msgMetadata.hasSequenceId()) { + sequenceId = msgIdGeneratorUpdater.getAndIncrement(this); + msgMetadata.setSequenceId(sequenceId); + } else { + sequenceId = msgMetadata.getSequenceId(); + } if (!msgMetadata.hasPublishTime()) { msgMetadata.setPublishTime(System.currentTimeMillis()); checkArgument(!msgMetadata.hasProducerName()); - checkArgument(!msgMetadata.hasSequenceId()); msgMetadata.setProducerName(producerName); - msgMetadata.setSequenceId(sequenceId); + if (conf.getCompressionType() != CompressionType.NONE) { msgMetadata.setCompression(convertCompressionType(conf.getCompressionType())); msgMetadata.setUncompressedSize(uncompressedSize); @@ -533,6 +554,7 @@ void ackReceived(ClientCnx cnx, long sequenceId, long ledgerId, long entryId) { if (callback) { op = pendingCallbacks.poll(); if (op != null) { + lastSequenceIdPublished = op.sequenceId + op.numMessagesInBatch - 1; op.setMessageId(ledgerId, entryId, partitionIndex); try { // Need to protect ourselves from any exception being thrown in the future handler from the @@ -728,7 +750,10 @@ void connectionOpened(final ClientCnx cnx) { long requestId = client.newRequestId(); cnx.sendRequestWithId(Commands.newProducer(topic, producerId, requestId, producerName), requestId) - .thenAccept(producerName -> { + .thenAccept(pair -> { + String producerName = pair.getLeft(); + long lastSequenceId = pair.getRight(); + // We are now reconnected to broker and clear to send messages. Re-send all pending messages and // set the cnx pointer so that new messages will be sent immediately synchronized (ProducerImpl.this) { @@ -749,6 +774,11 @@ void connectionOpened(final ClientCnx cnx) { this.producerName = producerName; } + if (this.lastSequenceIdPublished == -1 && !conf.getInitialSequenceId().isPresent()) { + this.lastSequenceIdPublished = lastSequenceId; + this.msgIdGenerator = lastSequenceId + 1; + } + if (!producerCreatedFuture.isDone() && isBatchMessagingEnabled()) { // schedule the first batch message task client.timer().newTimeout(batchMessageAndSendTask, conf.getBatchingMaxPublishDelayMs(), diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index 33791d46ed0c5..bf82c5be7246a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -149,13 +149,7 @@ public CompletableFuture createProducerAsync(String topic) { return createProducerAsync(topic, new ProducerConfiguration()); } - @Override public CompletableFuture createProducerAsync(final String topic, final ProducerConfiguration conf) { - return createProducerAsync(topic, conf, null); - } - - public CompletableFuture createProducerAsync(final String topic, final ProducerConfiguration conf, - String producerName) { if (state.get() != State.Open) { return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed")); } @@ -180,8 +174,7 @@ public CompletableFuture createProducerAsync(final String topic, final producer = new PartitionedProducerImpl(PulsarClientImpl.this, topic, conf, metadata.partitions, producerCreatedFuture); } else { - producer = new ProducerImpl(PulsarClientImpl.this, topic, producerName, conf, producerCreatedFuture, - -1); + producer = new ProducerImpl(PulsarClientImpl.this, topic, conf, producerCreatedFuture, -1); } synchronized (producers) { diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java index 0fd4c0ec00bc0..993094b2b4c0a 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java @@ -173,9 +173,14 @@ public static ByteBuf newSuccess(long requestId) { } public static ByteBuf newProducerSuccess(long requestId, String producerName) { + return newProducerSuccess(requestId, producerName, -1); + } + + public static ByteBuf newProducerSuccess(long requestId, String producerName, long lastSequenceId) { CommandProducerSuccess.Builder producerSuccessBuilder = CommandProducerSuccess.newBuilder(); producerSuccessBuilder.setRequestId(requestId); producerSuccessBuilder.setProducerName(producerName); + producerSuccessBuilder.setLastSequenceId(lastSequenceId); CommandProducerSuccess producerSuccess = producerSuccessBuilder.build(); ByteBuf res = serializeWithSize( BaseCommand.newBuilder().setType(Type.PRODUCER_SUCCESS).setProducerSuccess(producerSuccess)); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index dffa67b6fcd14..ed2289d9729ff 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -250,11 +250,11 @@ private ProtocolVersion(int index, int value) { public interface MessageIdDataOrBuilder extends com.google.protobuf.MessageLiteOrBuilder { - // required uint64 ledgerId = 1; + // required int64 ledgerId = 1; boolean hasLedgerId(); long getLedgerId(); - // required uint64 entryId = 2; + // required int64 entryId = 2; boolean hasEntryId(); long getEntryId(); @@ -301,7 +301,7 @@ public MessageIdData getDefaultInstanceForType() { } private int bitField0_; - // required uint64 ledgerId = 1; + // required int64 ledgerId = 1; public static final int LEDGERID_FIELD_NUMBER = 1; private long ledgerId_; public boolean hasLedgerId() { @@ -311,7 +311,7 @@ public long getLedgerId() { return ledgerId_; } - // required uint64 entryId = 2; + // required int64 entryId = 2; public static final int ENTRYID_FIELD_NUMBER = 2; private long entryId_; public boolean hasEntryId() { @@ -373,10 +373,10 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr throws java.io.IOException { getSerializedSize(); if (((bitField0_ & 0x00000001) == 0x00000001)) { - output.writeUInt64(1, ledgerId_); + output.writeInt64(1, ledgerId_); } if (((bitField0_ & 0x00000002) == 0x00000002)) { - output.writeUInt64(2, entryId_); + output.writeInt64(2, entryId_); } if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeInt32(3, partition_); @@ -394,11 +394,11 @@ public int getSerializedSize() { size = 0; if (((bitField0_ & 0x00000001) == 0x00000001)) { size += com.google.protobuf.CodedOutputStream - .computeUInt64Size(1, ledgerId_); + .computeInt64Size(1, ledgerId_); } if (((bitField0_ & 0x00000002) == 0x00000002)) { size += com.google.protobuf.CodedOutputStream - .computeUInt64Size(2, entryId_); + .computeInt64Size(2, entryId_); } if (((bitField0_ & 0x00000004) == 0x00000004)) { size += com.google.protobuf.CodedOutputStream @@ -635,12 +635,12 @@ public Builder mergeFrom( } case 8: { bitField0_ |= 0x00000001; - ledgerId_ = input.readUInt64(); + ledgerId_ = input.readInt64(); break; } case 16: { bitField0_ |= 0x00000002; - entryId_ = input.readUInt64(); + entryId_ = input.readInt64(); break; } case 24: { @@ -659,7 +659,7 @@ public Builder mergeFrom( private int bitField0_; - // required uint64 ledgerId = 1; + // required int64 ledgerId = 1; private long ledgerId_ ; public boolean hasLedgerId() { return ((bitField0_ & 0x00000001) == 0x00000001); @@ -680,7 +680,7 @@ public Builder clearLedgerId() { return this; } - // required uint64 entryId = 2; + // required int64 entryId = 2; private long entryId_ ; public boolean hasEntryId() { return ((bitField0_ & 0x00000002) == 0x00000002); @@ -13618,6 +13618,10 @@ public interface CommandProducerSuccessOrBuilder // required string producer_name = 2; boolean hasProducerName(); String getProducerName(); + + // optional int64 last_sequence_id = 3 [default = -1]; + boolean hasLastSequenceId(); + long getLastSequenceId(); } public static final class CommandProducerSuccess extends com.google.protobuf.GeneratedMessageLite @@ -13696,9 +13700,20 @@ private com.google.protobuf.ByteString getProducerNameBytes() { } } + // optional int64 last_sequence_id = 3 [default = -1]; + public static final int LAST_SEQUENCE_ID_FIELD_NUMBER = 3; + private long lastSequenceId_; + public boolean hasLastSequenceId() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public long getLastSequenceId() { + return lastSequenceId_; + } + private void initFields() { requestId_ = 0L; producerName_ = ""; + lastSequenceId_ = -1L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -13731,6 +13746,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeBytes(2, getProducerNameBytes()); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + output.writeInt64(3, lastSequenceId_); + } } private int memoizedSerializedSize = -1; @@ -13747,6 +13765,10 @@ public int getSerializedSize() { size += com.google.protobuf.CodedOutputStream .computeBytesSize(2, getProducerNameBytes()); } + if (((bitField0_ & 0x00000004) == 0x00000004)) { + size += com.google.protobuf.CodedOutputStream + .computeInt64Size(3, lastSequenceId_); + } memoizedSerializedSize = size; return size; } @@ -13864,6 +13886,8 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00000001); producerName_ = ""; bitField0_ = (bitField0_ & ~0x00000002); + lastSequenceId_ = -1L; + bitField0_ = (bitField0_ & ~0x00000004); return this; } @@ -13905,6 +13929,10 @@ public org.apache.pulsar.common.api.proto.PulsarApi.CommandProducerSuccess build to_bitField0_ |= 0x00000002; } result.producerName_ = producerName_; + if (((from_bitField0_ & 0x00000004) == 0x00000004)) { + to_bitField0_ |= 0x00000004; + } + result.lastSequenceId_ = lastSequenceId_; result.bitField0_ = to_bitField0_; return result; } @@ -13917,6 +13945,9 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandPro if (other.hasProducerName()) { setProducerName(other.getProducerName()); } + if (other.hasLastSequenceId()) { + setLastSequenceId(other.getLastSequenceId()); + } return this; } @@ -13964,6 +13995,11 @@ public Builder mergeFrom( producerName_ = input.readBytes(); break; } + case 24: { + bitField0_ |= 0x00000004; + lastSequenceId_ = input.readInt64(); + break; + } } } } @@ -14027,6 +14063,27 @@ void setProducerName(com.google.protobuf.ByteString value) { } + // optional int64 last_sequence_id = 3 [default = -1]; + private long lastSequenceId_ = -1L; + public boolean hasLastSequenceId() { + return ((bitField0_ & 0x00000004) == 0x00000004); + } + public long getLastSequenceId() { + return lastSequenceId_; + } + public Builder setLastSequenceId(long value) { + bitField0_ |= 0x00000004; + lastSequenceId_ = value; + + return this; + } + public Builder clearLastSequenceId() { + bitField0_ = (bitField0_ & ~0x00000004); + lastSequenceId_ = -1L; + + return this; + } + // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandProducerSuccess) } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedOutputStream.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedOutputStream.java index 73b3888f71c0b..de44aa6d4aa6a 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedOutputStream.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/protobuf/ByteBufCodedOutputStream.java @@ -127,11 +127,16 @@ public void writeInt32(final int fieldNumber, final int value) throws IOExceptio writeInt32NoTag(value); } + public void writeInt64(final int fieldNumber, final long value) throws IOException { + writeTag(fieldNumber, WireFormat.WIRETYPE_VARINT); + writeInt64NoTag(value); + } + public void writeUInt64(final int fieldNumber, final long value) throws IOException { writeTag(fieldNumber, WireFormat.WIRETYPE_VARINT); writeUInt64NoTag(value); } - + /** Write a {@code bool} field, including tag, to the stream. */ public void writeBool(final int fieldNumber, final boolean value) throws IOException { writeTag(fieldNumber, WireFormat.WIRETYPE_VARINT); @@ -142,7 +147,12 @@ public void writeBool(final int fieldNumber, final boolean value) throws IOExcep public void writeBoolNoTag(final boolean value) throws IOException { writeRawByte(value ? 1 : 0); } - + + /** Write a {@code uint64} field to the stream. */ + public void writeInt64NoTag(final long value) throws IOException { + writeRawVarint64(value); + } + /** Write a {@code uint64} field to the stream. */ public void writeUInt64NoTag(final long value) throws IOException { writeRawVarint64(value); diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 9bb3bc5d6d505..3db12d98a31ed 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -310,6 +310,10 @@ message CommandSuccess { message CommandProducerSuccess { required uint64 request_id = 1; required string producer_name = 2; + + // The last sequence id that was stored by this producer in the previous session + // This will only be meaningful if deduplication has been enabled. + optional int64 last_sequence_id = 3 [default = -1]; } message CommandError {