From 407c4452e904c111f0a02d1c970ac1ef9de834fc Mon Sep 17 00:00:00 2001 From: Rajan Dhabalia Date: Mon, 20 May 2019 13:22:23 -0700 Subject: [PATCH] [pulsar-storm] support reader for pulsar-spout (#4236) * [pulsar-storm] pulsar-spout can use reader to read message without durable subscription * fix test --- .../org/apache/pulsar/storm/PulsarSpout.java | 132 +++++++++++++++--- .../storm/PulsarSpoutConfiguration.java | 32 +++++ .../pulsar/storm/PulsarSpoutConsumer.java | 58 ++++++++ .../pulsar/storm/SharedPulsarClient.java | 21 +++ .../apache/pulsar/storm/PulsarSpoutTest.java | 67 ++++++++- 5 files changed, 289 insertions(+), 21 deletions(-) create mode 100644 pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java index 5a5ea591d776f..0d1bebce516ed 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java @@ -20,6 +20,7 @@ import static java.lang.String.format; +import java.io.IOException; import java.util.Collections; import java.util.Map; import java.util.Objects; @@ -35,12 +36,13 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; import org.apache.storm.metric.api.IMetric; -import org.apache.storm.shade.org.eclipse.jetty.util.log.Log; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -65,7 +67,6 @@ public class PulsarSpout extends BaseRichSpout implements IMetric { public static final String CONSUMER_THROUGHPUT_BYTES = "consumerThroughput"; private final ClientConfigurationData clientConf; - private final ConsumerConfigurationData consumerConf; private final PulsarSpoutConfiguration pulsarSpoutConf; private final long failedRetriesTimeoutNano; private final int maxFailedRetries; @@ -77,7 +78,7 @@ public class PulsarSpout extends BaseRichSpout implements IMetric { private String componentId; private String spoutId; private SpoutOutputCollector collector; - private Consumer consumer; + private PulsarSpoutConsumer consumer; private volatile long messagesReceived = 0; private volatile long messagesEmitted = 0; private volatile long messagesFailed = 0; @@ -93,12 +94,6 @@ public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientBuilder clien this.clientConf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone(); this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl()); - this.consumerConf = new ConsumerConfigurationData<>(); - this.consumerConf.setTopicNames(Collections.singleton(pulsarSpoutConf.getTopic())); - this.consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName()); - this.consumerConf.setSubscriptionType(pulsarSpoutConf.getSubscriptionType()); - this.consumerConf.setReceiverQueueSize(pulsarSpoutConf.getConsumerReceiverQueueSize()); - this.pulsarSpoutConf = pulsarSpoutConf; this.failedRetriesTimeoutNano = pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.NANOSECONDS); this.maxFailedRetries = pulsarSpoutConf.getMaxFailedRetries(); @@ -110,7 +105,12 @@ public void close() { LOG.info("[{}] Closing Pulsar consumer for topic {}", spoutId, pulsarSpoutConf.getTopic()); if (pulsarSpoutConf.isAutoUnsubscribe()) { - consumer.unsubscribe(); + try { + consumer.unsubscribe(); + }catch(PulsarClientException e) { + LOG.error("[{}] Failed to unsubscribe {} on topic {}", spoutId, + this.pulsarSpoutConf.getSubscriptionName(), pulsarSpoutConf.getTopic(), e); + } } if (!pulsarSpoutConf.isSharedConsumerEnabled() && consumer != null) { @@ -259,16 +259,7 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect pendingMessageRetries.clear(); failedMessages.clear(); try { - sharedPulsarClient = SharedPulsarClient.get(componentId, clientConf); - if (pulsarSpoutConf.isSharedConsumerEnabled()) { - consumer = sharedPulsarClient.getSharedConsumer(consumerConf); - } else { - try { - consumer = sharedPulsarClient.getClient().subscribeAsync(consumerConf).join(); - } catch (CompletionException e) { - throw (PulsarClientException) e.getCause(); - } - } + consumer = createConsumer(); LOG.info("[{}] Created a pulsar consumer on topic {} to receive messages with subscription {}", spoutId, pulsarSpoutConf.getTopic(), pulsarSpoutConf.getSubscriptionName()); } catch (PulsarClientException e) { @@ -280,6 +271,27 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect pulsarSpoutConf.getMetricsTimeIntervalInSecs()); } + private PulsarSpoutConsumer createConsumer() throws PulsarClientException { + sharedPulsarClient = SharedPulsarClient.get(componentId, clientConf); + PulsarSpoutConsumer consumer; + if (pulsarSpoutConf.isSharedConsumerEnabled()) { + consumer = pulsarSpoutConf.isDurableSubscription() + ? new SpoutConsumer(sharedPulsarClient.getSharedConsumer(newConsumerConfiguration(pulsarSpoutConf))) + : new SpoutReader(sharedPulsarClient.getSharedReader(newReaderConfiguration(pulsarSpoutConf))); + } else { + try { + consumer = pulsarSpoutConf.isDurableSubscription() + ? new SpoutConsumer(sharedPulsarClient.getClient() + .subscribeAsync(newConsumerConfiguration(pulsarSpoutConf)).join()) + : new SpoutReader(sharedPulsarClient.getClient() + .createReaderAsync(newReaderConfiguration(pulsarSpoutConf)).join()); + } catch (CompletionException e) { + throw (PulsarClientException) e.getCause(); + } + } + return consumer; + } + @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { pulsarSpoutConf.getMessageToValuesMapper().declareOutputFields(declarer); @@ -363,4 +375,84 @@ public Object getValueAndReset() { resetMetrics(); return metrics; } + + private ReaderConfigurationData newReaderConfiguration(PulsarSpoutConfiguration pulsarSpoutConf) { + ReaderConfigurationData readerConf = new ReaderConfigurationData<> (); + readerConf.setTopicName(pulsarSpoutConf.getTopic()); + readerConf.setReaderName(pulsarSpoutConf.getSubscriptionName()); + readerConf.setStartMessageId(pulsarSpoutConf.getNonDurableSubscriptionReadPosition()); + return readerConf; + } + + private ConsumerConfigurationData newConsumerConfiguration(PulsarSpoutConfiguration pulsarSpoutConf2) { + ConsumerConfigurationData consumerConf = new ConsumerConfigurationData<>(); + consumerConf.setTopicNames(Collections.singleton(pulsarSpoutConf.getTopic())); + consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName()); + consumerConf.setSubscriptionType(pulsarSpoutConf.getSubscriptionType()); + consumerConf.setReceiverQueueSize(pulsarSpoutConf.getConsumerReceiverQueueSize()); + return consumerConf; + } + + static class SpoutConsumer implements PulsarSpoutConsumer { + private Consumer consumer; + + public SpoutConsumer(Consumer consumer) { + super(); + this.consumer = consumer; + } + + @Override + public Message receive(int timeout, TimeUnit unit) throws PulsarClientException { + return consumer.receive(timeout, unit); + } + + @Override + public void acknowledgeAsync(Message msg) { + consumer.acknowledgeAsync(msg); + } + + @Override + public void close() throws PulsarClientException { + consumer.close(); + } + + @Override + public void unsubscribe() throws PulsarClientException { + consumer.unsubscribe(); + } + + } + + static class SpoutReader implements PulsarSpoutConsumer { + private Reader reader; + + public SpoutReader(Reader reader) { + super(); + this.reader = reader; + } + + @Override + public Message receive(int timeout, TimeUnit unit) throws PulsarClientException { + return reader.readNext(timeout, unit); + } + + @Override + public void acknowledgeAsync(Message msg) { + // No-op + } + + @Override + public void close() throws PulsarClientException { + try { + reader.close(); + } catch (IOException e) { + throw new PulsarClientException(e); + } + } + + @Override + public void unsubscribe() throws PulsarClientException { + // No-op + } + } } diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java index 0f27ddc589990..daa598f78e1fc 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConfiguration.java @@ -21,6 +21,7 @@ import java.util.Objects; import java.util.concurrent.TimeUnit; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.SubscriptionType; /** @@ -47,7 +48,11 @@ public class PulsarSpoutConfiguration extends PulsarStormConfiguration { private SubscriptionType subscriptionType = SubscriptionType.Shared; private boolean autoUnsubscribe = false; private int consumerReceiverQueueSize = 1000; + private boolean durableSubscription = true; + // read position if non-durable subscription is enabled : default oldest message available in topic + private MessageId nonDurableSubscriptionReadPosition = MessageId.earliest; + /** * @return the subscription name for the consumer in the spout */ @@ -174,4 +179,31 @@ public boolean isAutoUnsubscribe() { public void setAutoUnsubscribe(boolean autoUnsubscribe) { this.autoUnsubscribe = autoUnsubscribe; } + + public boolean isDurableSubscription() { + return durableSubscription; + } + + /** + * if subscription is not durable then it creates non-durable reader to start reading from the + * {@link #setNonDurableSubscriptionReadPosition(MessagePosition)} in topic. + * + * @param nonDurableSubscription + */ + public void setDurableSubscription(boolean durableSubscription) { + this.durableSubscription = durableSubscription; + } + + public MessageId getNonDurableSubscriptionReadPosition() { + return nonDurableSubscriptionReadPosition; + } + + /** + * Non-durable-subscription/Reader can be set to start reading from a specific position earliest/latest. + * + * @param nonDurableSubscriptionReadPosition + */ + public void setNonDurableSubscriptionReadPosition(MessageId nonDurableSubscriptionReadPosition) { + this.nonDurableSubscriptionReadPosition = nonDurableSubscriptionReadPosition; + } } diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java new file mode 100644 index 0000000000000..d845c4e21d9f2 --- /dev/null +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpoutConsumer.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.storm; + +import java.util.concurrent.TimeUnit; + +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +public interface PulsarSpoutConsumer { + + /** + * Receives a single message. + * + * @param waitTime + * @param milliseconds + * @return + * @throws PulsarClientException + */ + Message receive(int waitTime, TimeUnit unit) throws PulsarClientException; + + /** + * Ack the message async. + * + * @param msg + */ + void acknowledgeAsync(Message msg); + + /** + * unsubscribe the consumer + * @throws PulsarClientException + */ + void unsubscribe() throws PulsarClientException; + + /** + * Close the consumer + * + * @throws PulsarClientException + */ + void close() throws PulsarClientException; + +} diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java index d07903ef74d39..b8263a4dfb754 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/SharedPulsarClient.java @@ -27,10 +27,12 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.impl.conf.ProducerConfigurationData; +import org.apache.pulsar.client.impl.conf.ReaderConfigurationData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +45,7 @@ public class SharedPulsarClient { private final AtomicInteger counter = new AtomicInteger(); private Consumer consumer; + private Reader reader; private Producer producer; private SharedPulsarClient(String componentId, ClientConfigurationData clientConf) @@ -104,6 +107,23 @@ public Consumer getSharedConsumer(ConsumerConfigurationData cons return consumer; } + public Reader getSharedReader(ReaderConfigurationData readerConf) throws PulsarClientException { + counter.incrementAndGet(); + synchronized (this) { + if (reader == null) { + try { + reader = client.createReaderAsync(readerConf).join(); + } catch (CompletionException e) { + throw (PulsarClientException) e.getCause(); + } + LOG.info("[{}] Created a new Pulsar reader on {}", componentId, readerConf.getTopicName()); + } else { + LOG.info("[{}] Using a shared reader on {}", componentId, readerConf.getTopicName()); + } + } + return reader; + } + public Producer getSharedProducer(ProducerConfigurationData producerConf) throws PulsarClientException { counter.incrementAndGet(); synchronized (this) { @@ -130,4 +150,5 @@ public void close() throws PulsarClientException { } } } + } diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java index 588c5a039052a..5764ac7317d39 100644 --- a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java +++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java @@ -25,9 +25,16 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertTrue; import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.pulsar.client.api.ClientBuilder; import org.apache.pulsar.client.api.Consumer; @@ -36,6 +43,11 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ClientBuilderImpl; import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.shade.io.netty.buffer.ByteBuf; +import org.apache.pulsar.shade.io.netty.buffer.PooledByteBufAllocator; +import org.apache.pulsar.storm.PulsarSpout.SpoutConsumer; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Values; import org.slf4j.Logger; @@ -73,16 +85,69 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { Message msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), null, Schema.BYTES); Consumer consumer = mock(Consumer.class); + SpoutConsumer spoutConsumer = new SpoutConsumer(consumer); CompletableFuture future = new CompletableFuture<>(); future.complete(null); doReturn(future).when(consumer).acknowledgeAsync(msg.getMessageId()); Field consField = PulsarSpout.class.getDeclaredField("consumer"); consField.setAccessible(true); - consField.set(spout, consumer); + consField.set(spout, spoutConsumer); spout.fail(msg); spout.ack(msg); spout.emitNextAvailableTuple(); verify(consumer, atLeast(1)).receive(anyInt(), any()); } + + @Test + public void testPulsarSpout() throws Exception { + PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration(); + conf.setServiceUrl("http://localhost:8080"); + conf.setSubscriptionName("sub1"); + conf.setTopic("persistent://prop/ns1/topic1"); + conf.setSubscriptionType(SubscriptionType.Exclusive); + conf.setSharedConsumerEnabled(true); + AtomicBoolean called = new AtomicBoolean(false); + conf.setMessageToValuesMapper(new MessageToValuesMapper() { + @Override + public Values toValues(Message msg) { + called.set(true); + return new Values("test"); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + } + + }); + + ClientBuilder builder = spy(new ClientBuilderImpl()); + PulsarSpout spout = spy(new PulsarSpout(conf, builder)); + TopologyContext context = mock(TopologyContext.class); + final String componentId = "test-component-id"; + doReturn(componentId).when(context).getThisComponentId(); + SpoutOutputCollector collector = mock(SpoutOutputCollector.class); + Map config = new HashMap<>(); + Field field = SharedPulsarClient.class.getDeclaredField("instances"); + field.setAccessible(true); + ConcurrentMap instances = (ConcurrentMap) field + .get(SharedPulsarClient.class); + + SharedPulsarClient client = mock(SharedPulsarClient.class); + Consumer consumer = mock(Consumer.class); + when(client.getSharedConsumer(any())).thenReturn(consumer); + instances.put(componentId, client); + + ByteBuf data = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128); + data.writeBytes("test".getBytes()); + Message msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), data, Schema.BYTES); + when(consumer.receive(anyInt(), any())).thenReturn(msg); + + spout.open(config, context, collector); + spout.emitNextAvailableTuple(); + + assertTrue(called.get()); + verify(consumer, atLeast(1)).receive(anyInt(), any()); + } + }