diff --git a/bin/pulsar-perf b/bin/pulsar-perf index a520d96493862..08598f99c2478 100755 --- a/bin/pulsar-perf +++ b/bin/pulsar-perf @@ -1,19 +1,19 @@ #!/usr/bin/env bash # # Copyright 2016 Yahoo Inc. -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# +# BINDIR=$(dirname "$0") PULSAR_HOME=`cd $BINDIR/..;pwd` @@ -38,14 +38,14 @@ else fi # exclude tests jar -RELEASE_JAR=`ls $PULSAR_HOME/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1` +RELEASE_JAR=`ls $PULSAR_HOME/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1` if [ $? == 0 ]; then PULSAR_JAR=$RELEASE_JAR fi # exclude tests jar BUILT_JAR=`ls $PULSAR_HOME/pulsar-testclient/target/pulsar-*.jar 2> /dev/null | grep -v tests | tail -1` -if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then +if [ $? != 0 ] && [ ! -e "$PULSAR_JAR" ]; then echo "\nCouldn't find pulsar jar."; echo "Make sure you've run 'mvn package'\n"; exit 1; @@ -58,7 +58,7 @@ add_maven_deps_to_classpath() { if [ "$MAVEN_HOME" != "" ]; then MVN=${MAVEN_HOME}/bin/mvn fi - + # Need to generate classpath from maven pom. This is costly so generate it # and cache it. Save the file into our target dir so a mvn clean will get # clean it up and force us create a new one. @@ -75,7 +75,8 @@ Usage: pulsar where command is one of: produce Run a producer consume Run a consumer - monitor-brokers Continuously receive broker data and/or load reports + read Run a topic reader + monitor-brokers Continuously receive broker data and/or load reports simulation-client Run a simulation server acting as a Pulsar client simulation-controller Run a simulation controller to give commands to servers @@ -141,6 +142,8 @@ if [ "$COMMAND" == "produce" ]; then exec $JAVA $OPTS com.yahoo.pulsar.testclient.PerformanceProducer --conf-file $PULSAR_PERFTEST_CONF "$@" elif [ "$COMMAND" == "consume" ]; then exec $JAVA $OPTS com.yahoo.pulsar.testclient.PerformanceConsumer --conf-file $PULSAR_PERFTEST_CONF "$@" +elif [ "$COMMAND" == "read" ]; then + exec $JAVA $OPTS com.yahoo.pulsar.testclient.PerformanceReader --conf-file $PULSAR_PERFTEST_CONF "$@" elif [ "$COMMAND" == "monitor-brokers" ]; then exec $JAVA $OPTS com.yahoo.pulsar.testclient.BrokerMonitor "$@" elif [ "$COMMAND" == "simulation-client" ]; then diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java index 23a2012edf545..c8f217238675f 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/ServerCnx.java @@ -35,10 +35,8 @@ import com.yahoo.pulsar.broker.authentication.AuthenticationDataCommand; import com.yahoo.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; -import com.yahoo.pulsar.broker.service.persistent.PersistentSubscription; -import com.yahoo.pulsar.broker.service.persistent.PersistentTopic; import com.yahoo.pulsar.client.api.PulsarClientException; -import com.yahoo.pulsar.common.policies.data.ConsumerStats; +import com.yahoo.pulsar.client.impl.MessageIdImpl; import com.yahoo.pulsar.common.api.Commands; import com.yahoo.pulsar.common.api.PulsarHandler; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandAck; @@ -61,6 +59,7 @@ import com.yahoo.pulsar.common.api.proto.PulsarApi.ServerError; import com.yahoo.pulsar.common.naming.DestinationName; import com.yahoo.pulsar.common.policies.data.BacklogQuota; +import com.yahoo.pulsar.common.policies.data.ConsumerStats; import com.yahoo.pulsar.common.util.collections.ConcurrentLongHashMap; import io.netty.buffer.ByteBuf; @@ -153,8 +152,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E // //// // // Incoming commands handling // //// - - + @Override protected void handleLookup(CommandLookupTopic lookup) { final long requestId = lookup.getRequestId(); @@ -226,7 +224,7 @@ protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata pa "Failed due to too many pending lookup requests", requestId)); } } - + @Override protected void handleConsumerStats(CommandConsumerStats commandConsumerStats) { if (log.isDebugEnabled()) { @@ -247,8 +245,7 @@ protected void handleConsumerStats(CommandConsumerStats commandConsumerStats) { "Consumer " + consumerId + " not found", requestId); } else { if (log.isDebugEnabled()) { - log.debug("CommandConsumerStats[requestId = {}, consumer = {}]", - requestId, consumer); + log.debug("CommandConsumerStats[requestId = {}, consumer = {}]", requestId, consumer); } msg = Commands.newConsumerStatsResponse(createConsumerStatsResponse(consumer, requestId)); } @@ -270,7 +267,7 @@ CommandConsumerStatsResponse.Builder createConsumerStatsResponse(Consumer consum commandConsumerStatsResponseBuilder.setBlockedConsumerOnUnackedMsgs(consumerStats.blockedConsumerOnUnackedMsgs); commandConsumerStatsResponseBuilder.setAddress(consumerStats.address); commandConsumerStatsResponseBuilder.setConnectedSince(consumerStats.connectedSince); - + Subscription subscription = consumer.getSubscription(); commandConsumerStatsResponseBuilder.setMsgBacklog(subscription.getNumberOfEntriesInBacklog()); commandConsumerStatsResponseBuilder.setMsgRateExpired(subscription.getExpiredMessageRate()); @@ -338,6 +335,12 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { final long consumerId = subscribe.getConsumerId(); final SubType subType = subscribe.getSubType(); final String consumerName = subscribe.getConsumerName(); + final boolean isDurable = subscribe.getDurable(); + final MessageIdImpl startMessageId = subscribe.hasStartMessageId() + ? new MessageIdImpl(subscribe.getStartMessageId().getLedgerId(), + subscribe.getStartMessageId().getEntryId(), subscribe.getStartMessageId().getPartition()) + : null; + final int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0; authorizationFuture.thenApply(isAuthorized -> { @@ -363,15 +366,16 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { // creation request either complete or fails. log.warn("[{}][{}][{}] Consumer is already present on the connection", remoteAddress, topicName, subscriptionName); - ServerError error = !existingConsumerFuture.isDone() ? ServerError.ServiceNotReady : getErrorCode(existingConsumerFuture);; - ctx.writeAndFlush(Commands.newError(requestId, error, - "Consumer is already present on the connection")); + ServerError error = !existingConsumerFuture.isDone() ? ServerError.ServiceNotReady + : getErrorCode(existingConsumerFuture); + ctx.writeAndFlush( + Commands.newError(requestId, error, "Consumer is already present on the connection")); return null; } } - service.getTopic(topicName).thenCompose( - topic -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId, subType, priorityLevel, consumerName)) + service.getTopic(topicName).thenCompose(topic -> topic.subscribe(ServerCnx.this, subscriptionName, + consumerId, subType, priorityLevel, consumerName, isDurable, startMessageId)) .thenAccept(consumer -> { if (consumerFuture.complete(consumer)) { log.info("[{}] Created subscription on topic {} / {}", remoteAddress, topicName, @@ -393,7 +397,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { }) // .exceptionally(exception -> { log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName, - subscriptionName, exception.getCause().getMessage()); + subscriptionName, exception.getCause().getMessage(), exception); // If client timed out, the future would have been completed by subsequent close. Send error // back to client, only if not completed already. @@ -455,10 +459,11 @@ protected void handleProducer(final CommandProducer cmdProducer) { // until the previous producer creation // request // either complete or fails. - ServerError error = !existingProducerFuture.isDone() ? ServerError.ServiceNotReady : getErrorCode(existingProducerFuture); + ServerError error = !existingProducerFuture.isDone() ? ServerError.ServiceNotReady + : getErrorCode(existingProducerFuture); log.warn("[{}][{}] Producer is already present on the connection", remoteAddress, topicName); - ctx.writeAndFlush(Commands.newError(requestId, error, - "Producer is already present on the connection")); + ctx.writeAndFlush( + Commands.newError(requestId, error, "Producer is already present on the connection")); return null; } } @@ -740,7 +745,7 @@ public void closeProducer(Producer producer) { } long producerId = producer.getProducerId(); producers.remove(producerId); - if(remoteEndpointProtocolVersion >= v5.getNumber()) { + if (remoteEndpointProtocolVersion >= v5.getNumber()) { ctx.writeAndFlush(Commands.newCloseProducer(producerId, -1L)); } else { close(); @@ -755,7 +760,7 @@ public void closeConsumer(Consumer consumer) { } long consumerId = consumer.consumerId(); consumers.remove(consumerId); - if(remoteEndpointProtocolVersion >= v5.getNumber()) { + if (remoteEndpointProtocolVersion >= v5.getNumber()) { ctx.writeAndFlush(Commands.newCloseConsumer(consumerId, -1L)); } else { close(); @@ -860,7 +865,7 @@ public String getRole() { boolean hasConsumer(long consumerId) { return consumers.containsKey(consumerId); } - + public boolean isBatchMessageCompatibleVersion() { return remoteEndpointProtocolVersion >= ProtocolVersion.v4.getNumber(); } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Topic.java index d38252c809597..c4bb768d2d712 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/Topic.java @@ -18,6 +18,7 @@ import java.util.concurrent.CompletableFuture; import com.yahoo.pulsar.broker.service.persistent.PersistentSubscription; +import com.yahoo.pulsar.client.api.MessageId; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import com.yahoo.pulsar.common.policies.data.BacklogQuota; import com.yahoo.pulsar.common.policies.data.Policies; @@ -39,7 +40,7 @@ public interface PublishCallback { void removeProducer(Producer producer); CompletableFuture subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType, - int priorityLevel, String consumerName); + int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId); CompletableFuture unsubscribe(String subName); diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index 37e907418b0b9..be421704a0395 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -26,7 +26,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.yahoo.pulsar.common.util.Codec; import com.yahoo.pulsar.client.impl.MessageImpl; /** @@ -44,10 +43,10 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback { private static final AtomicIntegerFieldUpdater expirationCheckInProgressUpdater = AtomicIntegerFieldUpdater .newUpdater(PersistentMessageExpiryMonitor.class, "expirationCheckInProgress"); - public PersistentMessageExpiryMonitor(String topicName, ManagedCursor cursor) { + public PersistentMessageExpiryMonitor(String topicName, String subscriptionName, ManagedCursor cursor) { this.topicName = topicName; this.cursor = cursor; - this.subName = Codec.decode(cursor.getName()); + this.subName = subscriptionName; this.msgExpired = new Rate(); } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java index bbaa1890b7f0a..160fe9d48b464 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java @@ -46,6 +46,7 @@ import com.yahoo.pulsar.client.impl.PulsarClientImpl; import com.yahoo.pulsar.client.impl.SendCallback; import com.yahoo.pulsar.common.policies.data.ReplicatorStats; +import com.yahoo.pulsar.common.util.Codec; import io.netty.buffer.ByteBuf; import io.netty.util.Recycler; @@ -107,7 +108,7 @@ public PersistentReplicator(PersistentTopic topic, ManagedCursor cursor, String this.remoteCluster = remoteCluster; this.client = (PulsarClientImpl) brokerService.getReplicationClient(remoteCluster); this.producer = null; - this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, cursor); + this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, Codec.decode(cursor.getName()), cursor); HAVE_PENDING_READ_UPDATER.set(this, FALSE); PENDING_MESSAGES_UPDATER.set(this, 0); STATE_UPDATER.set(this, State.Stopped); diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java index 53ed6b12a856c..9b495ca01ee93 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentSubscription.java @@ -51,7 +51,6 @@ import com.yahoo.pulsar.common.naming.DestinationName; import com.yahoo.pulsar.common.policies.data.ConsumerStats; import com.yahoo.pulsar.common.policies.data.PersistentSubscriptionStats; -import com.yahoo.pulsar.common.util.Codec; import com.yahoo.pulsar.utils.CopyOnWriteArrayList; public class PersistentSubscription implements Subscription { @@ -71,12 +70,12 @@ public class PersistentSubscription implements Subscription { // for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000; - public PersistentSubscription(PersistentTopic topic, ManagedCursor cursor) { + public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor) { this.topic = topic; this.cursor = cursor; this.topicName = topic.getName(); - this.subName = Codec.decode(cursor.getName()); - this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, cursor); + this.subName = subscriptionName; + this.expiryMonitor = new PersistentMessageExpiryMonitor(topicName, subscriptionName, cursor); IS_FENCED_UPDATER.set(this, FALSE); } @@ -131,6 +130,12 @@ public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceE } if (dispatcher.getConsumers().isEmpty()) { deactivateCursor(); + + if (!cursor.isDurable()) { + // If cursor is not durable, we need to clean up the subscription as well + close(); + topic.removeSubscription(subName); + } } // invalid consumer remove will throw an exception @@ -607,6 +612,6 @@ public synchronized void redeliverUnacknowledgedMessages(Consumer consumer, List public void markTopicWithBatchMessagePublished() { topic.markBatchMessagePublished(); } - + private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class); } diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java index 94ee4b56caa22..96e615a36d720 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentTopic.java @@ -67,10 +67,13 @@ import com.yahoo.pulsar.broker.service.Consumer; import com.yahoo.pulsar.broker.service.Producer; import com.yahoo.pulsar.broker.service.ServerCnx; +import com.yahoo.pulsar.broker.service.Subscription; import com.yahoo.pulsar.broker.service.Topic; import com.yahoo.pulsar.broker.stats.ClusterReplicationMetrics; import com.yahoo.pulsar.broker.stats.NamespaceStats; import com.yahoo.pulsar.broker.stats.ReplicationMetrics; +import com.yahoo.pulsar.client.api.MessageId; +import com.yahoo.pulsar.client.impl.MessageIdImpl; import com.yahoo.pulsar.client.impl.MessageImpl; import com.yahoo.pulsar.client.util.FutureUtil; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; @@ -182,11 +185,11 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS replicators.put(remoteCluster, new PersistentReplicator(this, cursor, localCluster, remoteCluster, brokerService)); } else { - final String cursorName = Codec.decode(cursor.getName()); - subscriptions.put(cursorName, new PersistentSubscription(this, cursor)); + final String subscriptionName = Codec.decode(cursor.getName()); + subscriptions.put(subscriptionName, new PersistentSubscription(this, subscriptionName, cursor)); // subscription-cursor gets activated by default: deactivate as there is no active subscription right // now - subscriptions.get(cursorName).deactivateCursor(); + subscriptions.get(subscriptionName).deactivateCursor(); } } this.lastActive = System.nanoTime(); @@ -321,18 +324,18 @@ public void removeProducer(Producer producer) { @Override public CompletableFuture subscribe(final ServerCnx cnx, String subscriptionName, long consumerId, - SubType subType, int priorityLevel, String consumerName) { + SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId) { final CompletableFuture future = new CompletableFuture<>(); - if(hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) { - if(log.isDebugEnabled()) { + if (hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) { + if (log.isDebugEnabled()) { log.debug("[{}] Consumer doesn't support batch-message {}", topic, subscriptionName); } future.completeExceptionally(new UnsupportedVersionException("Consumer doesn't support batch-message")); return future; } - + if (subscriptionName.startsWith(replicatorPrefix)) { log.warn("[{}] Failed to create subscription for {}", topic, subscriptionName); future.completeExceptionally(new NamingException("Subscription with reserved subscription name attempted")); @@ -355,55 +358,97 @@ public CompletableFuture subscribe(final ServerCnx cnx, String subscri lock.readLock().unlock(); } + CompletableFuture subscriptionFuture = isDurable ? // + getDurableSubscription(subscriptionName) // + : getNonDurableSubscription(subscriptionName, startMessageId); + + int maxUnackedMessages = isDurable ? brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerConsumer() :0; + + subscriptionFuture.thenAccept(subscription -> { + try { + Consumer consumer = new Consumer(subscription, subType, consumerId, priorityLevel, consumerName, + maxUnackedMessages, cnx, cnx.getRole()); + subscription.addConsumer(consumer); + if (!cnx.isActive()) { + consumer.close(); + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", topic, subscriptionName, + consumer.consumerName(), USAGE_COUNT_UPDATER.get(PersistentTopic.this)); + } + future.completeExceptionally( + new BrokerServiceException("Connection was closed while the opening the cursor ")); + } else { + log.info("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId); + future.complete(consumer); + } + } catch (BrokerServiceException e) { + if (e instanceof ConsumerBusyException) { + log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId, + consumerName); + } else if (e instanceof SubscriptionBusyException) { + log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage()); + } + + USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this); + future.completeExceptionally(e); + } + }).exceptionally(ex -> { + log.warn("[{}] Failed to create subscription for {}: ", topic, subscriptionName, ex.getMessage()); + USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this); + future.completeExceptionally(new PersistenceException(ex)); + return null; + }); + + return future; + } + + private CompletableFuture getDurableSubscription(String subscriptionName) { + CompletableFuture subscriptionFuture = new CompletableFuture<>(); ledger.asyncOpenCursor(Codec.encode(subscriptionName), new OpenCursorCallback() { @Override public void openCursorComplete(ManagedCursor cursor, Object ctx) { if (log.isDebugEnabled()) { - log.debug("[{}][{}] Opened cursor for {} {}", topic, subscriptionName, consumerId, consumerName); + log.debug("[{}][{}] Opened cursor", topic, subscriptionName); } - try { - PersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName, - name -> new PersistentSubscription(PersistentTopic.this, cursor)); - - Consumer consumer = new Consumer(subscription, subType, consumerId, priorityLevel, consumerName, - brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerConsumer(), cnx, - cnx.getRole()); - subscription.addConsumer(consumer); - if (!cnx.isActive()) { - consumer.close(); - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", topic, subscriptionName, - consumer.consumerName(), USAGE_COUNT_UPDATER.get(PersistentTopic.this)); - } - future.completeExceptionally( - new BrokerServiceException("Connection was closed while the opening the cursor ")); - } else { - log.info("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId); - future.complete(consumer); - } - } catch (BrokerServiceException e) { - if (e instanceof ConsumerBusyException) { - log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId, - consumerName); - } else if (e instanceof SubscriptionBusyException) { - log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage()); - } - - USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this); - future.completeExceptionally(e); - } + subscriptionFuture.complete(subscriptions.computeIfAbsent(subscriptionName, + name -> new PersistentSubscription(PersistentTopic.this, subscriptionName, cursor))); } @Override public void openCursorFailed(ManagedLedgerException exception, Object ctx) { log.warn("[{}] Failed to create subscription for {}", topic, subscriptionName); USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this); - future.completeExceptionally(new PersistenceException(exception)); + subscriptionFuture.completeExceptionally(new PersistenceException(exception)); } }, null); + return subscriptionFuture; + } - return future; + private CompletableFuture getNonDurableSubscription(String subscriptionName, MessageId startMessageId) { + CompletableFuture subscriptionFuture = new CompletableFuture<>(); + + Subscription subscription = subscriptions.computeIfAbsent(subscriptionName, name -> { + // Create a new non-durable cursor only for the first consumer that connects + MessageIdImpl msgId = startMessageId != null ? (MessageIdImpl) startMessageId + : (MessageIdImpl) MessageId.latest; + + Position startPosition = new PositionImpl(msgId.getLedgerId(), msgId.getEntryId()); + ManagedCursor cursor = null; + try { + cursor = ledger.newNonDurableCursor(startPosition); + } catch (ManagedLedgerException e) { + subscriptionFuture.completeExceptionally(e); + } + + return new PersistentSubscription(this, subscriptionName, cursor); + }); + + if (!subscriptionFuture.isDone()) { + subscriptionFuture.complete(subscription); + } + + return subscriptionFuture; } /** @@ -441,6 +486,10 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { return unsubscribeFuture; } + void removeSubscription(String subscriptionName) { + subscriptions.remove(subscriptionName); + } + /** * Delete the managed ledger associated with this topic * diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index 6beb01f18ab9a..a246cbecdd07e 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -188,7 +188,7 @@ public void testAddRemoveConsumer() throws Exception { log.info("--- Starting PersistentDispatcherFailoverConsumerTest::testAddConsumer ---"); PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); - PersistentSubscription sub = new PersistentSubscription(topic, cursorMock); + PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock); int partitionIndex = 0; PersistentDispatcherSingleActiveConsumer pdfc = new PersistentDispatcherSingleActiveConsumer(cursorMock, diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentMessageFinderTest.java index 10d2e926bc89b..a493bbff4233e 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentMessageFinderTest.java @@ -175,7 +175,7 @@ public void findEntryFailed(ManagedLedgerException exception, Object ctx) { }); assertTrue(ex.get()); - PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1); + PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1); monitor.findEntryFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"), null); Field field = monitor.getClass().getDeclaredField("expirationCheckInProgress"); field.setAccessible(true); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicConcurrentTest.java index 71f5e0b4baed1..520d3a505d0e3 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicConcurrentTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicConcurrentTest.java @@ -116,7 +116,7 @@ public void testConcurrentTopicAndSubscriptionDelete() throws Exception { .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build(); Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), - 0, cmd.getConsumerName()); + 0, cmd.getConsumerName(), cmd.getDurable(), null); f1.get(); final CyclicBarrier barrier = new CyclicBarrier(2); @@ -174,7 +174,7 @@ public void testConcurrentTopicGCAndSubscriptionDelete() throws Exception { .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build(); Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), - 0, cmd.getConsumerName()); + 0, cmd.getConsumerName(), cmd.getDurable(), null); f1.get(); final CyclicBarrier barrier = new CyclicBarrier(2); @@ -236,7 +236,7 @@ public void testConcurrentTopicDeleteAndUnsubscribe() throws Exception { .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build(); Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), - 0, cmd.getConsumerName()); + 0, cmd.getConsumerName(), cmd.getDurable(), null); f1.get(); final CyclicBarrier barrier = new CyclicBarrier(2); @@ -294,7 +294,7 @@ public void testConcurrentTopicDeleteAndSubsUnsubscribe() throws Exception { .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build(); Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), - 0, cmd.getConsumerName()); + 0, cmd.getConsumerName(), cmd.getDurable(), null); f1.get(); final CyclicBarrier barrier = new CyclicBarrier(2); diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicTest.java index 6e8dcf08d9373..51373cdfc03d6 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicTest.java @@ -349,12 +349,12 @@ public void testSubscribeUnsubscribe() throws Exception { // 1. simple subscribe Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), - 0, cmd.getConsumerName()); + 0, cmd.getConsumerName(), cmd.getDurable(), null); f1.get(); // 2. duplicate subscribe Future f2 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), - 0, cmd.getConsumerName()); + 0, cmd.getConsumerName(), cmd.getDurable(), null); try { f2.get(); @@ -374,7 +374,7 @@ public void testSubscribeUnsubscribe() throws Exception { @Test public void testAddRemoveConsumer() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); - PersistentSubscription sub = new PersistentSubscription(topic, cursorMock); + PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock); // 1. simple add consumer Consumer consumer = new Consumer(sub, SubType.Exclusive, 1 /* consumer id */, 0, "Cons1"/* consumer name */, @@ -406,7 +406,7 @@ public void testAddRemoveConsumer() throws Exception { @Test public void testUbsubscribeRaceConditions() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); - PersistentSubscription sub = new PersistentSubscription(topic, cursorMock); + PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock); Consumer consumer1 = new Consumer(sub, SubType.Exclusive, 1 /* consumer id */, 0, "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1"); sub.addConsumer(consumer1); @@ -459,7 +459,7 @@ public void testDeleteTopic() throws Exception { .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build(); Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), - 0, cmd.getConsumerName()); + 0, cmd.getConsumerName(), cmd.getDurable(), null); f1.get(); assertTrue(topic.delete().isCompletedExceptionally()); @@ -474,7 +474,7 @@ public void testDeleteAndUnsubscribeTopic() throws Exception { .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build(); Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), - 0, cmd.getConsumerName()); + 0, cmd.getConsumerName(), cmd.getDurable(), null); f1.get(); final CyclicBarrier barrier = new CyclicBarrier(2); @@ -528,7 +528,7 @@ public void testConcurrentTopicAndSubscriptionDelete() throws Exception { .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build(); Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), - 0, cmd.getConsumerName()); + 0, cmd.getConsumerName(), cmd.getDurable(), null); f1.get(); final CyclicBarrier barrier = new CyclicBarrier(2); @@ -614,7 +614,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build(); Future f = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), - 0, cmd.getConsumerName()); + 0, cmd.getConsumerName(), cmd.getDurable(), null); try { f.get(); @@ -725,7 +725,7 @@ public void testFailoverSubscription() throws Exception { // 1. Subscribe with non partition topic Future f1 = topic1.subscribe(serverCnx, cmd1.getSubscription(), cmd1.getConsumerId(), - cmd1.getSubType(), 0, cmd1.getConsumerName()); + cmd1.getSubType(), 0, cmd1.getConsumerName(), cmd1.getDurable(), null); f1.get(); // 2. Subscribe with partition topic @@ -736,7 +736,7 @@ public void testFailoverSubscription() throws Exception { .setSubType(SubType.Failover).build(); Future f2 = topic2.subscribe(serverCnx, cmd2.getSubscription(), cmd2.getConsumerId(), - cmd2.getSubType(), 0, cmd2.getConsumerName()); + cmd2.getSubType(), 0, cmd2.getConsumerName(), cmd2.getDurable(), null); f2.get(); // 3. Subscribe and create second consumer @@ -745,7 +745,7 @@ public void testFailoverSubscription() throws Exception { .setSubType(SubType.Failover).build(); Future f3 = topic2.subscribe(serverCnx, cmd3.getSubscription(), cmd3.getConsumerId(), - cmd3.getSubType(), 0, cmd3.getConsumerName()); + cmd3.getSubType(), 0, cmd3.getConsumerName(), cmd3.getDurable(), null); f3.get(); assertEquals( @@ -765,7 +765,7 @@ public void testFailoverSubscription() throws Exception { .setSubType(SubType.Failover).build(); Future f4 = topic2.subscribe(serverCnx, cmd4.getSubscription(), cmd4.getConsumerId(), - cmd4.getSubType(), 0, cmd4.getConsumerName()); + cmd4.getSubType(), 0, cmd4.getConsumerName(), cmd4.getDurable(), null); f4.get(); assertEquals( @@ -790,7 +790,7 @@ public void testFailoverSubscription() throws Exception { .setSubType(SubType.Exclusive).build(); Future f5 = topic2.subscribe(serverCnx, cmd5.getSubscription(), cmd5.getConsumerId(), - cmd5.getSubType(), 0, cmd5.getConsumerName()); + cmd5.getSubType(), 0, cmd5.getConsumerName(), cmd5.getDurable(), null); try { f5.get(); @@ -806,7 +806,7 @@ public void testFailoverSubscription() throws Exception { .setSubType(SubType.Exclusive).build(); Future f6 = topic2.subscribe(serverCnx, cmd6.getSubscription(), cmd6.getConsumerId(), - cmd6.getSubType(), 0, cmd6.getConsumerName()); + cmd6.getSubType(), 0, cmd6.getConsumerName(), cmd6.getDurable(), null); f6.get(); // 7. unsubscribe exclusive sub diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/TopicReaderTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/TopicReaderTest.java new file mode 100644 index 0000000000000..f6841b647b03d --- /dev/null +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/client/api/TopicReaderTest.java @@ -0,0 +1,239 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.pulsar.client.api; + +import static org.testng.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.google.common.collect.Sets; +import com.yahoo.pulsar.common.policies.data.PersistentTopicStats; + +public class TopicReaderTest extends ProducerConsumerBase { + private static final Logger log = LoggerFactory.getLogger(TopicReaderTest.class); + + @BeforeMethod + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testSimpleReader() throws Exception { + ReaderConfiguration conf = new ReaderConfiguration(); + Reader reader = pulsarClient.createReader("persistent://my-property/use/my-ns/my-topic1", MessageId.earliest, + conf); + + ProducerConfiguration producerConf = new ProducerConfiguration(); + + Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf); + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + Message msg = null; + Set messageSet = Sets.newHashSet(); + for (int i = 0; i < 10; i++) { + msg = reader.readNext(1, TimeUnit.SECONDS); + + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + + // Acknowledge the consumption of all messages at once + reader.close(); + producer.close(); + } + + @Test + public void testReaderAfterMessagesWerePublished() throws Exception { + ProducerConfiguration producerConf = new ProducerConfiguration(); + + Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf); + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + Reader reader = pulsarClient.createReader("persistent://my-property/use/my-ns/my-topic1", MessageId.earliest, + new ReaderConfiguration()); + + Message msg = null; + Set messageSet = Sets.newHashSet(); + for (int i = 0; i < 10; i++) { + msg = reader.readNext(1, TimeUnit.SECONDS); + + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + + // Acknowledge the consumption of all messages at once + reader.close(); + producer.close(); + } + + @Test + public void testMultipleReaders() throws Exception { + ProducerConfiguration producerConf = new ProducerConfiguration(); + + Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf); + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + Reader reader1 = pulsarClient.createReader("persistent://my-property/use/my-ns/my-topic1", MessageId.earliest, + new ReaderConfiguration()); + + Reader reader2 = pulsarClient.createReader("persistent://my-property/use/my-ns/my-topic1", MessageId.earliest, + new ReaderConfiguration()); + + Message msg = null; + Set messageSet1 = Sets.newHashSet(); + for (int i = 0; i < 10; i++) { + msg = reader1.readNext(1, TimeUnit.SECONDS); + + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet1, receivedMessage, expectedMessage); + } + + Set messageSet2 = Sets.newHashSet(); + for (int i = 0; i < 10; i++) { + msg = reader2.readNext(1, TimeUnit.SECONDS); + + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet2, receivedMessage, expectedMessage); + } + + reader1.close(); + reader2.close(); + producer.close(); + } + + @Test + public void testTopicStats() throws Exception { + String topicName = "persistent://my-property/use/my-ns/my-topic1"; + + Reader reader1 = pulsarClient.createReader(topicName, MessageId.earliest, new ReaderConfiguration()); + + Reader reader2 = pulsarClient.createReader(topicName, MessageId.earliest, new ReaderConfiguration()); + + PersistentTopicStats stats = admin.persistentTopics().getStats(topicName); + assertEquals(stats.subscriptions.size(), 2); + + reader1.close(); + stats = admin.persistentTopics().getStats(topicName); + assertEquals(stats.subscriptions.size(), 1); + + reader2.close(); + + stats = admin.persistentTopics().getStats(topicName); + assertEquals(stats.subscriptions.size(), 0); + } + + @Test + public void testReaderOnLastMessage() throws Exception { + ProducerConfiguration producerConf = new ProducerConfiguration(); + + Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf); + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + Reader reader = pulsarClient.createReader("persistent://my-property/use/my-ns/my-topic1", MessageId.latest, + new ReaderConfiguration()); + + for (int i = 10; i < 20; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + // Publish more messages and verify the readers only sees new messages + + Message msg = null; + Set messageSet = Sets.newHashSet(); + for (int i = 10; i < 20; i++) { + msg = reader.readNext(1, TimeUnit.SECONDS); + + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + + // Acknowledge the consumption of all messages at once + reader.close(); + producer.close(); + } + + @Test + public void testReaderOnSpecificMessage() throws Exception { + ProducerConfiguration producerConf = new ProducerConfiguration(); + + Producer producer = pulsarClient.createProducer("persistent://my-property/use/my-ns/my-topic1", producerConf); + List messageIds = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + messageIds.add(producer.send(message.getBytes())); + } + + Reader reader = pulsarClient.createReader("persistent://my-property/use/my-ns/my-topic1", messageIds.get(4), + new ReaderConfiguration()); + + // Publish more messages and verify the readers only sees messages starting from the intended message + Message msg = null; + Set messageSet = Sets.newHashSet(); + for (int i = 5; i < 10; i++) { + msg = reader.readNext(1, TimeUnit.SECONDS); + + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "my-message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + + // Acknowledge the consumption of all messages at once + reader.close(); + producer.close(); + } + +} \ No newline at end of file diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/MessageId.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/MessageId.java index 581db6d064202..ad7fbd504b5e1 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/MessageId.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/MessageId.java @@ -21,11 +21,11 @@ /** * Opaque unique identifier of a single message - * + * * The MessageId can be used to reference a specific message, for example when acknowledging, without having to retain * the message content in memory for an extended period of time. - * - * + * + * */ public interface MessageId { @@ -36,7 +36,7 @@ public interface MessageId { /** * De-serialize a message id from a byte array - * + * * @param data * byte array containing the serialized message id * @return the de-serialized messageId object @@ -44,4 +44,8 @@ public interface MessageId { public static MessageId fromByteArray(byte[] data) throws IOException { return MessageIdImpl.fromByteArray(data); } + + public static final MessageId earliest = new MessageIdImpl(-1, -1, -1); + + public static final MessageId latest = new MessageIdImpl(Long.MAX_VALUE, Long.MAX_VALUE, -1); } diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/PulsarClient.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/PulsarClient.java index 4273530e57391..935a285f4a0ed 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/PulsarClient.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/PulsarClient.java @@ -159,6 +159,59 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t */ CompletableFuture subscribeAsync(String topic, String subscription, ConsumerConfiguration conf); + /** + * Create a topic reader with given {@code ReaderConfiguration} for reading messages from the specified topic. + *

+ * The Reader provides a low-level abstraction that allows for manual positioning in the topic, without using a + * subscription. Reader can only work on non-partitioned topics. + *

+ * The initial reader positioning is done by specifying a message id. The options are: + *

    + *
  • MessageId.earliest : Start reading from the earliest message available in the topic + *
  • MessageId.latest : Start reading from the end topic, only getting messages published after the + * reader was created + *
  • MessageId : When passing a particular message id, the reader will position itself on that + * specific position. The first message to be read will be the message next to the specified messageId. + *
+ * + * @param topic + * The name of the topic where to read + * @param startMessageId + * The message id where the reader will position itself. The first message returned will be the one after + * the specified startMessageId + * @param conf + * The {@code ReaderConfiguration} object + * @return The {@code Reader} object + */ + Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf) throws PulsarClientException; + + /** + * Asynchronously create a topic reader with given {@code ReaderConfiguration} for reading messages from the + * specified topic. + *

+ * The Reader provides a low-level abstraction that allows for manual positioning in the topic, without using a + * subscription. Reader can only work on non-partitioned topics. + *

+ * The initial reader positioning is done by specifying a message id. The options are: + *

    + *
  • MessageId.earliest : Start reading from the earliest message available in the topic + *
  • MessageId.latest : Start reading from the end topic, only getting messages published after the + * reader was created + *
  • MessageId : When passing a particular message id, the reader will position itself on that + * specific position. The first message to be read will be the message next to the specified messageId. + *
+ * + * @param topic + * The name of the topic where to read + * @param startMessageId + * The message id where the reader will position itself. The first message returned will be the one after + * the specified startMessageId + * @param conf + * The {@code ReaderConfiguration} object + * @return Future of the asynchronously created producer object + */ + CompletableFuture createReaderAsync(String topic, MessageId startMessageId, ReaderConfiguration conf); + /** * Close the PulsarClient and release all the resources. * diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/Reader.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/Reader.java new file mode 100644 index 0000000000000..7303f4914da1b --- /dev/null +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/Reader.java @@ -0,0 +1,57 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.pulsar.client.api; + +import java.io.Closeable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +/** + * A Reader can be used to scan through all the messages currently available in a topic. + * + */ +public interface Reader extends Closeable { + + /** + * @return the topic from which this reader is reading from + */ + String getTopic(); + + /** + * Read the next message in the topic + * + * @return the next messasge + * @throws PulsarClientException + */ + Message readNext() throws PulsarClientException; + + /** + * Read the next message in the topic. + * + * @return the next messasge + * @throws PulsarClientException + */ + Message readNext(int timeout, TimeUnit unit) throws PulsarClientException; + + CompletableFuture readNextAsync(); + + /** + * Asynchronously close the reader and stop the broker to push more messages + * + * @return a future that can be used to track the completion of the operation + */ + CompletableFuture closeAsync(); +} diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ReaderConfiguration.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ReaderConfiguration.java new file mode 100644 index 0000000000000..19a1ec716e336 --- /dev/null +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ReaderConfiguration.java @@ -0,0 +1,97 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.pulsar.client.api; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.Serializable; + +public class ReaderConfiguration implements Serializable { + + private int receiverQueueSize = 1000; + + private ReaderListener readerListener; + + private String readerName = null; + + /** + * @return the configured {@link ReaderListener} for the reader + */ + public ReaderListener getReaderListener() { + return this.readerListener; + } + + /** + * Sets a {@link ReaderListener} for the reader + *

+ * When a {@link ReaderListener} is set, application will receive messages through it. Calls to + * {@link Reader#readNext()} will not be allowed. + * + * @param readerListener + * the listener object + */ + public ReaderConfiguration setReaderListener(ReaderListener readerListener) { + checkNotNull(readerListener); + this.readerListener = readerListener; + return this; + } + + /** + * @return the configure receiver queue size value + */ + public int getReceiverQueueSize() { + return this.receiverQueueSize; + } + + /** + * Sets the size of the consumer receive queue. + *

+ * The consumer receive queue controls how many messages can be accumulated by the {@link Consumer} before the + * application calls {@link Consumer#receive()}. Using a higher value could potentially increase the consumer + * throughput at the expense of bigger memory utilization. + *

+ * Default value is {@code 1000} messages and should be good for most use cases. + * + * @param receiverQueueSize + * the new receiver queue size value + */ + public ReaderConfiguration setReceiverQueueSize(int receiverQueueSize) { + checkArgument(receiverQueueSize >= 0, "Receiver queue size cannot be negative"); + this.receiverQueueSize = receiverQueueSize; + return this; + } + + /** + * @return the consumer name + */ + public String getReaderName() { + return readerName; + } + + /** + * Set the consumer name. + * + * @param readerName + */ + public ReaderConfiguration setReaderName(String readerName) { + checkArgument(readerName != null && !readerName.equals("")); + this.readerName = readerName; + return this; + } + + private static final long serialVersionUID = 1L; +} diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ReaderListener.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ReaderListener.java new file mode 100644 index 0000000000000..bdf271e9d4781 --- /dev/null +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/api/ReaderListener.java @@ -0,0 +1,41 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.pulsar.client.api; + +import java.io.Serializable; + +/** + * A listener that will be called in order for every message received. + * + * + */ +public interface ReaderListener extends Serializable { + /** + * This method is called whenever a new message is received. + * + * Messages are guaranteed to be delivered in order and from the same thread for a single consumer + * + * This method will only be called once for each message, unless either application or broker crashes. + * + * Application is responsible of handling any exception that could be thrown while processing the message. + * + * @param reader + * the Reader object from where the message was received + * @param msg + * the message object + */ + void received(Reader reader, Message msg); +} diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java index fb160131d61c2..ebce735e7e01f 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ConsumerImpl.java @@ -17,14 +17,19 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.computeChecksum; +import static com.yahoo.pulsar.common.api.Commands.hasChecksum; +import static com.yahoo.pulsar.common.api.Commands.readChecksum; import static java.lang.String.format; import java.io.IOException; +import java.util.ArrayList; import java.util.BitSet; import java.util.List; import java.util.NavigableMap; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -33,10 +38,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; -import com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Iterables; import com.yahoo.pulsar.client.api.Consumer; import com.yahoo.pulsar.client.api.ConsumerConfiguration; import com.yahoo.pulsar.client.api.Message; @@ -60,9 +65,6 @@ import io.netty.util.Timeout; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; -import static com.yahoo.pulsar.common.api.Commands.readChecksum; -import static com.yahoo.pulsar.checksum.utils.Crc32cChecksum.computeChecksum; -import static com.yahoo.pulsar.common.api.Commands.hasChecksum; public class ConsumerImpl extends ConsumerBase { private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000; @@ -75,6 +77,8 @@ public class ConsumerImpl extends ConsumerBase { AtomicIntegerFieldUpdater.newUpdater(ConsumerImpl.class, "availablePermits"); private volatile int availablePermits = 0; + private MessageIdImpl lastDequeuedMessage; + private long subscribeTimeout; private final int partitionIndex; @@ -88,27 +92,42 @@ public class ConsumerImpl extends ConsumerBase { private final ReadWriteLock zeroQueueLock; private final UnAckedMessageTracker unAckedMessageTracker; - private final ConcurrentSkipListMap batchMessageAckTracker; + private final ConcurrentNavigableMap batchMessageAckTracker; - private final ConsumerStats stats; + protected final ConsumerStats stats; private final int priorityLevel; + private final SubscriptionMode subscriptionMode; + private final MessageId startMessageId; + + static enum SubscriptionMode { + // Make the subscription to be backed by a durable cursor that will retain messages and persist the current + // position + Durable, + + // Lightweight subscription mode that doesn't have a durable cursor associated + NonDurable + } ConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf, - ExecutorService listenerExecutor, CompletableFuture subscribeFuture) { - this(client, topic, subscription, conf, listenerExecutor, -1, subscribeFuture); + ExecutorService listenerExecutor, int partitionIndex, CompletableFuture subscribeFuture) { + this(client, topic, subscription, conf, listenerExecutor, partitionIndex, subscribeFuture, + SubscriptionMode.Durable, null); } ConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf, - ExecutorService listenerExecutor, int partitionIndex, CompletableFuture subscribeFuture) { + ExecutorService listenerExecutor, int partitionIndex, CompletableFuture subscribeFuture, + SubscriptionMode subscriptionMode, MessageId startMessageId) { super(client, topic, subscription, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture); this.consumerId = client.newConsumerId(); + this.subscriptionMode = subscriptionMode; + this.startMessageId = startMessageId; AVAILABLE_PERMITS_UPDATER.set(this, 0); this.subscribeTimeout = System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs(); this.partitionIndex = partitionIndex; this.receiverQueueRefillThreshold = conf.getReceiverQueueSize() / 2; this.codecProvider = new CompressionCodecProvider(); this.priorityLevel = conf.getPriorityLevel(); - batchMessageAckTracker = new ConcurrentSkipListMap<>(); + this.batchMessageAckTracker = new ConcurrentSkipListMap<>(); if (client.getConfiguration().getStatsIntervalSeconds() > 0) { stats = new ConsumerStats(client, conf, this); } else { @@ -236,6 +255,7 @@ private Message fetchSingleMessageFromBroker() throws PulsarClientException { } do { message = incomingMessages.take(); + lastDequeuedMessage = (MessageIdImpl) message.getMessageId(); ClientCnx msgCnx = ((MessageImpl) message).getCnx(); // synchronized need to prevent race between connectionOpened and the check "msgCnx == cnx()" synchronized (ConsumerImpl.this) { @@ -459,13 +479,38 @@ void connectionOpened(final ClientCnx cnx) { log.info("[{}][{}] Subscribing to topic on cnx {}", topic, subscription, cnx.ctx().channel()); long requestId = client.newRequestId(); - cnx.sendRequestWithId(Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), - priorityLevel, consumerName), requestId).thenRun(() -> { + + int currentSize; + MessageIdImpl startMessageId; + synchronized (this) { + currentSize = incomingMessages.size(); + startMessageId = clearReceiverQueue(); + unAckedMessageTracker.clear(); + batchMessageAckTracker.clear(); + } + + boolean isDurable = subscriptionMode == SubscriptionMode.Durable; + MessageIdData startMessageIdData; + if (isDurable) { + // For regular durable subscriptions, the message id from where to restart will be determined by the broker. + startMessageIdData = null; + } else { + // For non-durable we are going to restart from the next entry + MessageIdData.Builder builder = MessageIdData.newBuilder(); + builder.setLedgerId(startMessageId.getLedgerId()); + builder.setEntryId(startMessageId.getEntryId()); + startMessageIdData = builder.build(); + builder.recycle(); + } + + ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel, + consumerName, isDurable, startMessageIdData); + if (startMessageIdData != null) { + startMessageIdData.recycle(); + } + + cnx.sendRequestWithId(request, requestId).thenRun(() -> { synchronized (ConsumerImpl.this) { - int currentSize = incomingMessages.size(); - incomingMessages.clear(); - unAckedMessageTracker.clear(); - batchMessageAckTracker.clear(); if (changeToReadyState()) { log.info("[{}][{}] Subscribed to topic on {} -- consumer: {}", topic, subscription, cnx.channel().remoteAddress(), consumerId); @@ -525,6 +570,28 @@ && isRetriableError((PulsarClientException) e.getCause()) }); } + /** + * Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that was + * not seen by the application + */ + private MessageIdImpl clearReceiverQueue() { + List currentMessageQueue = new ArrayList<>(incomingMessages.size()); + incomingMessages.drainTo(currentMessageQueue); + if (!currentMessageQueue.isEmpty()) { + MessageIdImpl nextMessageInQueue = (MessageIdImpl) currentMessageQueue.get(0).getMessageId(); + MessageIdImpl previousMessage = new MessageIdImpl(nextMessageInQueue.getLedgerId(), + nextMessageInQueue.getEntryId() - 1, nextMessageInQueue.getPartitionIndex()); + return previousMessage; + } else if (lastDequeuedMessage != null) { + // If the queue was empty we need to restart from the message just after the last one that has been dequeued + // in the past + return lastDequeuedMessage; + } else { + // No message was received or dequeued by this consumer. Next message would still be the startMessageId + return (MessageIdImpl) startMessageId; + } + } + /** * send the flow command to have the broker start pushing messages */ @@ -783,23 +850,27 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, ByteBuf unc * * Periodically, it sends a Flow command to notify the broker that it can push more messages */ - private synchronized void messageProcessed(Message msg) { + protected synchronized void messageProcessed(Message msg) { ClientCnx currentCnx = cnx(); ClientCnx msgCnx = ((MessageImpl) msg).getCnx(); + lastDequeuedMessage = (MessageIdImpl) msg.getMessageId(); if (msgCnx != currentCnx) { // The processed message did belong to the old queue that was cleared after reconnection. return; } - // reset timer for messages that are received by the client - MessageIdImpl id = (MessageIdImpl) msg.getMessageId(); - if (id instanceof BatchMessageIdImpl) { - id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), getPartitionIndex()); - } - unAckedMessageTracker.add(id); increaseAvailablePermits(currentCnx); stats.updateNumMsgsReceived(msg); + + if (conf.getAckTimeoutMillis() != 0) { + // reset timer for messages that are received by the client + MessageIdImpl id = (MessageIdImpl) msg.getMessageId(); + if (id instanceof BatchMessageIdImpl) { + id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), getPartitionIndex()); + } + unAckedMessageTracker.add(id); + } } private void increaseAvailablePermits(ClientCnx currentCnx) { diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PulsarClientImpl.java index de3ca532d216d..cde11e69e108c 100644 --- a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/PulsarClientImpl.java @@ -34,10 +34,13 @@ import com.yahoo.pulsar.client.api.ClientConfiguration; import com.yahoo.pulsar.client.api.Consumer; import com.yahoo.pulsar.client.api.ConsumerConfiguration; +import com.yahoo.pulsar.client.api.MessageId; import com.yahoo.pulsar.client.api.Producer; import com.yahoo.pulsar.client.api.ProducerConfiguration; import com.yahoo.pulsar.client.api.PulsarClient; import com.yahoo.pulsar.client.api.PulsarClientException; +import com.yahoo.pulsar.client.api.Reader; +import com.yahoo.pulsar.client.api.ReaderConfiguration; import com.yahoo.pulsar.client.util.ExecutorProvider; import com.yahoo.pulsar.client.util.FutureUtil; import com.yahoo.pulsar.common.naming.DestinationName; @@ -253,7 +256,7 @@ public CompletableFuture subscribeAsync(final String topic, final Stri consumer = new PartitionedConsumerImpl(PulsarClientImpl.this, topic, subscription, conf, metadata.partitions, listenerThread, consumerSubscribedFuture); } else { - consumer = new ConsumerImpl(PulsarClientImpl.this, topic, subscription, conf, listenerThread, + consumer = new ConsumerImpl(PulsarClientImpl.this, topic, subscription, conf, listenerThread, -1, consumerSubscribedFuture); } @@ -269,6 +272,81 @@ public CompletableFuture subscribeAsync(final String topic, final Stri return consumerSubscribedFuture; } + @Override + public Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf) + throws PulsarClientException { + try { + return createReaderAsync(topic, startMessageId, conf).get(); + } catch (ExecutionException e) { + Throwable t = e.getCause(); + if (t instanceof PulsarClientException) { + throw (PulsarClientException) t; + } else { + throw new PulsarClientException(t); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarClientException(e); + } + } + + @Override + public CompletableFuture createReaderAsync(String topic, MessageId startMessageId, + ReaderConfiguration conf) { + if (state.get() != State.Open) { + return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed")); + } + if (!DestinationName.isValid(topic)) { + return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name")); + } + if (startMessageId == null) { + return FutureUtil + .failedFuture(new PulsarClientException.InvalidConfigurationException("Invalid startMessageId")); + } + if (conf == null) { + return FutureUtil.failedFuture( + new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined")); + } + + CompletableFuture readerFuture = new CompletableFuture<>(); + + getPartitionedTopicMetadata(topic).thenAccept(metadata -> { + if (log.isDebugEnabled()) { + log.debug("[{}] Received topic metadata. partitions: {}", topic, metadata.partitions); + } + + if (metadata.partitions > 1) { + readerFuture.completeExceptionally( + new PulsarClientException("Topic reader cannot be created on a partitioned topic")); + return; + } + + CompletableFuture consumerSubscribedFuture = new CompletableFuture<>(); + // gets the next single threaded executor from the list of executors + ExecutorService listenerThread = externalExecutorProvider.getExecutor(); + ReaderImpl reader = new ReaderImpl(PulsarClientImpl.this, topic, startMessageId, conf, listenerThread, + consumerSubscribedFuture); + + synchronized (consumers) { + consumers.put(reader.getConsumer(), Boolean.TRUE); + } + + consumerSubscribedFuture.thenRun(() -> { + readerFuture.complete(reader); + }).exceptionally(ex -> { + log.warn("[{}] Failed to get create topic reader", topic, ex); + readerFuture.completeExceptionally(ex); + return null; + }); + }).exceptionally(ex -> { + log.warn("[{}] Failed to get partitioned topic metadata", topic, ex); + readerFuture.completeExceptionally(ex); + return null; + }); + + return readerFuture; + } + @Override public void close() throws PulsarClientException { try { diff --git a/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ReaderImpl.java new file mode 100644 index 0000000000000..76d22992af4bd --- /dev/null +++ b/pulsar-client/src/main/java/com/yahoo/pulsar/client/impl/ReaderImpl.java @@ -0,0 +1,113 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.pulsar.client.impl; + +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.codec.digest.DigestUtils; + +import com.yahoo.pulsar.client.api.Consumer; +import com.yahoo.pulsar.client.api.ConsumerConfiguration; +import com.yahoo.pulsar.client.api.Message; +import com.yahoo.pulsar.client.api.MessageId; +import com.yahoo.pulsar.client.api.PulsarClientException; +import com.yahoo.pulsar.client.api.Reader; +import com.yahoo.pulsar.client.api.ReaderConfiguration; +import com.yahoo.pulsar.client.api.ReaderListener; +import com.yahoo.pulsar.client.api.SubscriptionType; +import com.yahoo.pulsar.client.impl.ConsumerImpl.SubscriptionMode; + +public class ReaderImpl implements Reader { + + private final ConsumerImpl consumer; + + public ReaderImpl(PulsarClientImpl client, String topic, MessageId startMessageId, + ReaderConfiguration readerConfiguration, ExecutorService listenerExecutor, + CompletableFuture consumerFuture) { + + String subscription = "reader-" + DigestUtils.sha1Hex(UUID.randomUUID().toString()).substring(0, 10); + + ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration(); + consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive); + consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize()); + if (readerConfiguration.getReaderName() != null) { + consumerConfiguration.setConsumerName(readerConfiguration.getReaderName()); + } + + if (readerConfiguration.getReaderListener() != null) { + ReaderListener readerListener = readerConfiguration.getReaderListener(); + consumerConfiguration.setMessageListener((consumer, msg) -> { + readerListener.received(this, msg); + consumer.acknowledgeCumulativeAsync(msg); + }); + } + + consumer = new ConsumerImpl(client, topic, subscription, consumerConfiguration, listenerExecutor, -1, + consumerFuture, SubscriptionMode.NonDurable, startMessageId); + } + + @Override + public String getTopic() { + return consumer.getTopic(); + } + + ConsumerImpl getConsumer() { + return consumer; + } + + @Override + public Message readNext() throws PulsarClientException { + Message msg = consumer.receive(); + + // Acknowledge message immediately because the reader is based on non-durable subscription. When it reconnects, + // it will specify the subscription position anyway + consumer.acknowledgeCumulativeAsync(msg); + return msg; + } + + @Override + public Message readNext(int timeout, TimeUnit unit) throws PulsarClientException { + Message msg = consumer.receive(timeout, unit); + + if (msg != null) { + consumer.acknowledgeCumulativeAsync(msg); + } + return msg; + } + + @Override + public CompletableFuture readNextAsync() { + return consumer.receiveAsync().thenApply(msg -> { + consumer.acknowledgeCumulativeAsync(msg); + return msg; + }); + } + + @Override + public void close() throws IOException { + consumer.close(); + } + + @Override + public CompletableFuture closeAsync() { + return consumer.closeAsync(); + } + +} diff --git a/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/Commands.java index 9b40b5fdd8993..e9af32cdf977a 100644 --- a/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/Commands.java +++ b/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/Commands.java @@ -284,9 +284,15 @@ public static ByteBuf newSend(long producerId, long sequenceId, int numMessages, sendBuilder.recycle(); return res; } - + public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, SubType subType, int priorityLevel, String consumerName) { + return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName, + true /* isDurable */, null /* startMessageId */ ); + } + + public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, + SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId) { CommandSubscribe.Builder subscribeBuilder = CommandSubscribe.newBuilder(); subscribeBuilder.setTopic(topic); subscribeBuilder.setSubscription(subscription); @@ -295,6 +301,10 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu subscribeBuilder.setConsumerName(consumerName); subscribeBuilder.setRequestId(requestId); subscribeBuilder.setPriorityLevel(priorityLevel); + subscribeBuilder.setDurable(isDurable); + if (startMessageId != null) { + subscribeBuilder.setStartMessageId(startMessageId); + } CommandSubscribe subscribe = subscribeBuilder.build(); ByteBuf res = serializeWithSize(BaseCommand.newBuilder().setType(Type.SUBSCRIBE).setSubscribe(subscribe)); subscribeBuilder.recycle(); diff --git a/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java index 72efc2235a1ca..70c65c385bc83 100644 --- a/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/com/yahoo/pulsar/common/api/proto/PulsarApi.java @@ -4016,6 +4016,14 @@ public interface CommandSubscribeOrBuilder // optional int32 priority_level = 7; boolean hasPriorityLevel(); int getPriorityLevel(); + + // optional bool durable = 8 [default = true]; + boolean hasDurable(); + boolean getDurable(); + + // optional .pulsar.proto.MessageIdData start_message_id = 9; + boolean hasStartMessageId(); + com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData getStartMessageId(); } public static final class CommandSubscribe extends com.google.protobuf.GeneratedMessageLite @@ -4232,6 +4240,26 @@ public int getPriorityLevel() { return priorityLevel_; } + // optional bool durable = 8 [default = true]; + public static final int DURABLE_FIELD_NUMBER = 8; + private boolean durable_; + public boolean hasDurable() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + public boolean getDurable() { + return durable_; + } + + // optional .pulsar.proto.MessageIdData start_message_id = 9; + public static final int START_MESSAGE_ID_FIELD_NUMBER = 9; + private com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData startMessageId_; + public boolean hasStartMessageId() { + return ((bitField0_ & 0x00000100) == 0x00000100); + } + public com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData getStartMessageId() { + return startMessageId_; + } + private void initFields() { topic_ = ""; subscription_ = ""; @@ -4240,6 +4268,8 @@ private void initFields() { requestId_ = 0L; consumerName_ = ""; priorityLevel_ = 0; + durable_ = true; + startMessageId_ = com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -4266,6 +4296,12 @@ public final boolean isInitialized() { memoizedIsInitialized = 0; return false; } + if (hasStartMessageId()) { + if (!getStartMessageId().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } memoizedIsInitialized = 1; return true; } @@ -4299,6 +4335,12 @@ public void writeTo(com.yahoo.pulsar.common.util.protobuf.ByteBufCodedOutputStre if (((bitField0_ & 0x00000040) == 0x00000040)) { output.writeInt32(7, priorityLevel_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + output.writeBool(8, durable_); + } + if (((bitField0_ & 0x00000100) == 0x00000100)) { + output.writeMessage(9, startMessageId_); + } } private int memoizedSerializedSize = -1; @@ -4335,6 +4377,14 @@ public int getSerializedSize() { size += com.google.protobuf.CodedOutputStream .computeInt32Size(7, priorityLevel_); } + if (((bitField0_ & 0x00000080) == 0x00000080)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(8, durable_); + } + if (((bitField0_ & 0x00000100) == 0x00000100)) { + size += com.google.protobuf.CodedOutputStream + .computeMessageSize(9, startMessageId_); + } memoizedSerializedSize = size; return size; } @@ -4462,6 +4512,10 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00000020); priorityLevel_ = 0; bitField0_ = (bitField0_ & ~0x00000040); + durable_ = true; + bitField0_ = (bitField0_ & ~0x00000080); + startMessageId_ = com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance(); + bitField0_ = (bitField0_ & ~0x00000100); return this; } @@ -4523,6 +4577,14 @@ public com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe buildPartial to_bitField0_ |= 0x00000040; } result.priorityLevel_ = priorityLevel_; + if (((from_bitField0_ & 0x00000080) == 0x00000080)) { + to_bitField0_ |= 0x00000080; + } + result.durable_ = durable_; + if (((from_bitField0_ & 0x00000100) == 0x00000100)) { + to_bitField0_ |= 0x00000100; + } + result.startMessageId_ = startMessageId_; result.bitField0_ = to_bitField0_; return result; } @@ -4550,6 +4612,12 @@ public Builder mergeFrom(com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubs if (other.hasPriorityLevel()) { setPriorityLevel(other.getPriorityLevel()); } + if (other.hasDurable()) { + setDurable(other.getDurable()); + } + if (other.hasStartMessageId()) { + mergeStartMessageId(other.getStartMessageId()); + } return this; } @@ -4574,6 +4642,12 @@ public final boolean isInitialized() { return false; } + if (hasStartMessageId()) { + if (!getStartMessageId().isInitialized()) { + + return false; + } + } return true; } @@ -4638,6 +4712,21 @@ public Builder mergeFrom( priorityLevel_ = input.readInt32(); break; } + case 64: { + bitField0_ |= 0x00000080; + durable_ = input.readBool(); + break; + } + case 74: { + com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData.Builder subBuilder = com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData.newBuilder(); + if (hasStartMessageId()) { + subBuilder.mergeFrom(getStartMessageId()); + } + input.readMessage(subBuilder, extensionRegistry); + setStartMessageId(subBuilder.buildPartial()); + subBuilder.recycle(); + break; + } } } } @@ -4839,6 +4928,70 @@ public Builder clearPriorityLevel() { return this; } + // optional bool durable = 8 [default = true]; + private boolean durable_ = true; + public boolean hasDurable() { + return ((bitField0_ & 0x00000080) == 0x00000080); + } + public boolean getDurable() { + return durable_; + } + public Builder setDurable(boolean value) { + bitField0_ |= 0x00000080; + durable_ = value; + + return this; + } + public Builder clearDurable() { + bitField0_ = (bitField0_ & ~0x00000080); + durable_ = true; + + return this; + } + + // optional .pulsar.proto.MessageIdData start_message_id = 9; + private com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData startMessageId_ = com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance(); + public boolean hasStartMessageId() { + return ((bitField0_ & 0x00000100) == 0x00000100); + } + public com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData getStartMessageId() { + return startMessageId_; + } + public Builder setStartMessageId(com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData value) { + if (value == null) { + throw new NullPointerException(); + } + startMessageId_ = value; + + bitField0_ |= 0x00000100; + return this; + } + public Builder setStartMessageId( + com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData.Builder builderForValue) { + startMessageId_ = builderForValue.build(); + + bitField0_ |= 0x00000100; + return this; + } + public Builder mergeStartMessageId(com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData value) { + if (((bitField0_ & 0x00000100) == 0x00000100) && + startMessageId_ != com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance()) { + startMessageId_ = + com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData.newBuilder(startMessageId_).mergeFrom(value).buildPartial(); + } else { + startMessageId_ = value; + } + + bitField0_ |= 0x00000100; + return this; + } + public Builder clearStartMessageId() { + startMessageId_ = com.yahoo.pulsar.common.api.proto.PulsarApi.MessageIdData.getDefaultInstance(); + + bitField0_ = (bitField0_ & ~0x00000100); + return this; + } + // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandSubscribe) } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 900ed62e013b1..3f89accbfcec2 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -126,10 +126,20 @@ message CommandSubscribe { required string topic = 1; required string subscription = 2; required SubType subType = 3; + required uint64 consumer_id = 4; required uint64 request_id = 5; optional string consumer_name = 6; optional int32 priority_level = 7; + + // Signal wether the subscription should be backed by a + // durable cursor or not + optional bool durable = 8 [default = true]; + + // If specified, the subscription will position the cursor + // markd-delete position on the particular message id and + // will send messages from that point + optional MessageIdData start_message_id = 9; } message CommandPartitionedTopicMetadata { diff --git a/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/PerformanceReader.java b/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/PerformanceReader.java new file mode 100644 index 0000000000000..4a98a5c6838d3 --- /dev/null +++ b/pulsar-testclient/src/main/java/com/yahoo/pulsar/testclient/PerformanceReader.java @@ -0,0 +1,237 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.yahoo.pulsar.testclient; + +import static org.apache.commons.lang3.StringUtils.isNotBlank; + +import java.io.FileInputStream; +import java.text.DecimalFormat; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; + +import org.apache.bookkeeper.bookie.storage.ldb.ArrayGroupSort; +import org.apache.commons.lang.SystemUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.RateLimiter; +import com.yahoo.pulsar.client.api.ClientConfiguration; +import com.yahoo.pulsar.client.api.MessageId; +import com.yahoo.pulsar.client.api.PulsarClient; +import com.yahoo.pulsar.client.api.Reader; +import com.yahoo.pulsar.client.api.ReaderConfiguration; +import com.yahoo.pulsar.client.api.ReaderListener; +import com.yahoo.pulsar.client.impl.MessageIdImpl; +import com.yahoo.pulsar.client.impl.PulsarClientImpl; +import com.yahoo.pulsar.client.util.FutureUtil; +import com.yahoo.pulsar.common.naming.DestinationName; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.concurrent.DefaultThreadFactory; + +public class PerformanceReader { + private static final LongAdder messagesReceived = new LongAdder(); + private static final LongAdder bytesReceived = new LongAdder(); + private static final DecimalFormat dec = new DecimalFormat("0.000"); + + static class Arguments { + + @Parameter(names = { "-h", "--help" }, description = "Help message", help = true) + boolean help; + + @Parameter(names = { "--conf-file" }, description = "Configuration file") + public String confFile; + + @Parameter(description = "persistent://prop/cluster/ns/my-topic", required = true) + public List topic; + + @Parameter(names = { "-t", "--num-topics" }, description = "Number of topics") + public int numDestinations = 1; + + @Parameter(names = { "-r", "--rate" }, description = "Simulate a slow message reader (rate in msg/s)") + public double rate = 0; + + @Parameter(names = { "-m", + "--start-message-id" }, description = "Start message id. This can be either 'earliest', 'latest' or a specific message id by using 'lid:eid'") + public String startMessageId = "earliest"; + + @Parameter(names = { "-q", "--receiver-queue-size" }, description = "Size of the receiver queue") + public int receiverQueueSize = 1000; + + @Parameter(names = { "-c", + "--max-connections" }, description = "Max number of TCP connections to a single broker") + public int maxConnections = 100; + + @Parameter(names = { "-i", + "--stats-interval-seconds" }, description = "Statistics Interval Seconds. If 0, statistics will be disabled") + public long statsIntervalSeconds = 0; + + @Parameter(names = { "-u", "--service-url" }, description = "Pulsar Service URL") + public String serviceURL; + + @Parameter(names = { "--auth-plugin" }, description = "Authentication plugin class name") + public String authPluginClassName; + + @Parameter(names = { + "--auth-params" }, description = "Authentication parameters, e.g., \"key1:val1,key2:val2\"") + public String authParams; + } + + public static void main(String[] args) throws Exception { + final Arguments arguments = new Arguments(); + JCommander jc = new JCommander(arguments); + jc.setProgramName("pulsar-perf-reader"); + + try { + jc.parse(args); + } catch (ParameterException e) { + System.out.println(e.getMessage()); + jc.usage(); + System.exit(-1); + } + + if (arguments.help) { + jc.usage(); + System.exit(-1); + } + + if (arguments.topic.size() != 1) { + System.out.println("Only one topic name is allowed"); + jc.usage(); + System.exit(-1); + } + + if (arguments.confFile != null) { + Properties prop = new Properties(System.getProperties()); + prop.load(new FileInputStream(arguments.confFile)); + + if (arguments.serviceURL == null) { + arguments.serviceURL = prop.getProperty("brokerServiceUrl"); + } + + if (arguments.serviceURL == null) { + arguments.serviceURL = prop.getProperty("webServiceUrl"); + } + + // fallback to previous-version serviceUrl property to maintain backward-compatibility + if (arguments.serviceURL == null) { + arguments.serviceURL = prop.getProperty("serviceUrl", "http://localhost:8080/"); + } + + if (arguments.authPluginClassName == null) { + arguments.authPluginClassName = prop.getProperty("authPlugin", null); + } + + if (arguments.authParams == null) { + arguments.authParams = prop.getProperty("authParams", null); + } + } + + // Dump config variables + ObjectMapper m = new ObjectMapper(); + ObjectWriter w = m.writerWithDefaultPrettyPrinter(); + log.info("Starting Pulsar performance reader with config: {}", w.writeValueAsString(arguments)); + + final DestinationName prefixTopicName = DestinationName.get(arguments.topic.get(0)); + + final RateLimiter limiter = arguments.rate > 0 ? RateLimiter.create(arguments.rate) : null; + + ReaderListener listener = (reader, msg) -> { + messagesReceived.increment(); + bytesReceived.add(msg.getData().length); + + if (limiter != null) { + limiter.acquire(); + } + }; + + EventLoopGroup eventLoopGroup; + if (SystemUtils.IS_OS_LINUX) { + eventLoopGroup = new EpollEventLoopGroup(Runtime.getRuntime().availableProcessors() * 2, + new DefaultThreadFactory("pulsar-perf-reader")); + } else { + eventLoopGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors(), + new DefaultThreadFactory("pulsar-perf-reader")); + } + + ClientConfiguration clientConf = new ClientConfiguration(); + clientConf.setConnectionsPerBroker(arguments.maxConnections); + clientConf.setStatsInterval(arguments.statsIntervalSeconds, TimeUnit.SECONDS); + if (isNotBlank(arguments.authPluginClassName)) { + clientConf.setAuthentication(arguments.authPluginClassName, arguments.authParams); + } + PulsarClient pulsarClient = new PulsarClientImpl(arguments.serviceURL, clientConf, eventLoopGroup); + + List> futures = Lists.newArrayList(); + ReaderConfiguration readerConfig = new ReaderConfiguration(); + readerConfig.setReaderListener(listener); + readerConfig.setReceiverQueueSize(arguments.receiverQueueSize); + + MessageId startMessageId; + if ("earliest".equals(arguments.startMessageId)) { + startMessageId = MessageId.earliest; + } else if ("latest".equals(arguments.startMessageId)) { + startMessageId = MessageId.latest; + } else { + String[] parts = arguments.startMessageId.split(":"); + startMessageId = new MessageIdImpl(Long.parseLong(parts[0]), Long.parseLong(parts[1]), -1); + } + + for (int i = 0; i < arguments.numDestinations; i++) { + final DestinationName destinationName = (arguments.numDestinations == 1) ? prefixTopicName + : DestinationName.get(String.format("%s-%d", prefixTopicName, i)); + + futures.add(pulsarClient.createReaderAsync(destinationName.toString(), startMessageId, readerConfig)); + } + + FutureUtil.waitForAll(futures).get(); + + log.info("Start reading from {} topics", arguments.numDestinations); + + long oldTime = System.nanoTime(); + + while (true) { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + break; + } + + long now = System.nanoTime(); + double elapsed = (now - oldTime) / 1e9; + double rate = messagesReceived.sumThenReset() / elapsed; + double throughput = bytesReceived.sumThenReset() / elapsed * 8 / 1024 / 1024; + + log.info("Read throughput: {} msg/s -- {} Mbit/s", dec.format(rate), dec.format(throughput)); + oldTime = now; + } + + pulsarClient.close(); + } + + private static final Logger log = LoggerFactory.getLogger(PerformanceReader.class); +} \ No newline at end of file