Skip to content

Commit

Permalink
[pulsar-storm] support reader for pulsar-spout (apache#4236)
Browse files Browse the repository at this point in the history
* [pulsar-storm] pulsar-spout can use reader to read message without durable subscription

* fix test
  • Loading branch information
rdhabalia authored May 20, 2019
1 parent 0c8e15f commit 407c445
Show file tree
Hide file tree
Showing 5 changed files with 289 additions and 21 deletions.
132 changes: 112 additions & 20 deletions pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static java.lang.String.format;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
Expand All @@ -35,12 +36,13 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.shade.org.eclipse.jetty.util.log.Log;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
Expand All @@ -65,7 +67,6 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
public static final String CONSUMER_THROUGHPUT_BYTES = "consumerThroughput";

private final ClientConfigurationData clientConf;
private final ConsumerConfigurationData<byte[]> consumerConf;
private final PulsarSpoutConfiguration pulsarSpoutConf;
private final long failedRetriesTimeoutNano;
private final int maxFailedRetries;
Expand All @@ -77,7 +78,7 @@ public class PulsarSpout extends BaseRichSpout implements IMetric {
private String componentId;
private String spoutId;
private SpoutOutputCollector collector;
private Consumer<byte[]> consumer;
private PulsarSpoutConsumer consumer;
private volatile long messagesReceived = 0;
private volatile long messagesEmitted = 0;
private volatile long messagesFailed = 0;
Expand All @@ -93,12 +94,6 @@ public PulsarSpout(PulsarSpoutConfiguration pulsarSpoutConf, ClientBuilder clien

this.clientConf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData().clone();
this.clientConf.setServiceUrl(pulsarSpoutConf.getServiceUrl());
this.consumerConf = new ConsumerConfigurationData<>();
this.consumerConf.setTopicNames(Collections.singleton(pulsarSpoutConf.getTopic()));
this.consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName());
this.consumerConf.setSubscriptionType(pulsarSpoutConf.getSubscriptionType());
this.consumerConf.setReceiverQueueSize(pulsarSpoutConf.getConsumerReceiverQueueSize());

this.pulsarSpoutConf = pulsarSpoutConf;
this.failedRetriesTimeoutNano = pulsarSpoutConf.getFailedRetriesTimeout(TimeUnit.NANOSECONDS);
this.maxFailedRetries = pulsarSpoutConf.getMaxFailedRetries();
Expand All @@ -110,7 +105,12 @@ public void close() {
LOG.info("[{}] Closing Pulsar consumer for topic {}", spoutId, pulsarSpoutConf.getTopic());

if (pulsarSpoutConf.isAutoUnsubscribe()) {
consumer.unsubscribe();
try {
consumer.unsubscribe();
}catch(PulsarClientException e) {
LOG.error("[{}] Failed to unsubscribe {} on topic {}", spoutId,
this.pulsarSpoutConf.getSubscriptionName(), pulsarSpoutConf.getTopic(), e);
}
}

if (!pulsarSpoutConf.isSharedConsumerEnabled() && consumer != null) {
Expand Down Expand Up @@ -259,16 +259,7 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect
pendingMessageRetries.clear();
failedMessages.clear();
try {
sharedPulsarClient = SharedPulsarClient.get(componentId, clientConf);
if (pulsarSpoutConf.isSharedConsumerEnabled()) {
consumer = sharedPulsarClient.getSharedConsumer(consumerConf);
} else {
try {
consumer = sharedPulsarClient.getClient().subscribeAsync(consumerConf).join();
} catch (CompletionException e) {
throw (PulsarClientException) e.getCause();
}
}
consumer = createConsumer();
LOG.info("[{}] Created a pulsar consumer on topic {} to receive messages with subscription {}", spoutId,
pulsarSpoutConf.getTopic(), pulsarSpoutConf.getSubscriptionName());
} catch (PulsarClientException e) {
Expand All @@ -280,6 +271,27 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect
pulsarSpoutConf.getMetricsTimeIntervalInSecs());
}

private PulsarSpoutConsumer createConsumer() throws PulsarClientException {
sharedPulsarClient = SharedPulsarClient.get(componentId, clientConf);
PulsarSpoutConsumer consumer;
if (pulsarSpoutConf.isSharedConsumerEnabled()) {
consumer = pulsarSpoutConf.isDurableSubscription()
? new SpoutConsumer(sharedPulsarClient.getSharedConsumer(newConsumerConfiguration(pulsarSpoutConf)))
: new SpoutReader(sharedPulsarClient.getSharedReader(newReaderConfiguration(pulsarSpoutConf)));
} else {
try {
consumer = pulsarSpoutConf.isDurableSubscription()
? new SpoutConsumer(sharedPulsarClient.getClient()
.subscribeAsync(newConsumerConfiguration(pulsarSpoutConf)).join())
: new SpoutReader(sharedPulsarClient.getClient()
.createReaderAsync(newReaderConfiguration(pulsarSpoutConf)).join());
} catch (CompletionException e) {
throw (PulsarClientException) e.getCause();
}
}
return consumer;
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
pulsarSpoutConf.getMessageToValuesMapper().declareOutputFields(declarer);
Expand Down Expand Up @@ -363,4 +375,84 @@ public Object getValueAndReset() {
resetMetrics();
return metrics;
}

private ReaderConfigurationData<byte[]> newReaderConfiguration(PulsarSpoutConfiguration pulsarSpoutConf) {
ReaderConfigurationData<byte[]> readerConf = new ReaderConfigurationData<> ();
readerConf.setTopicName(pulsarSpoutConf.getTopic());
readerConf.setReaderName(pulsarSpoutConf.getSubscriptionName());
readerConf.setStartMessageId(pulsarSpoutConf.getNonDurableSubscriptionReadPosition());
return readerConf;
}

private ConsumerConfigurationData<byte[]> newConsumerConfiguration(PulsarSpoutConfiguration pulsarSpoutConf2) {
ConsumerConfigurationData<byte[]> consumerConf = new ConsumerConfigurationData<>();
consumerConf.setTopicNames(Collections.singleton(pulsarSpoutConf.getTopic()));
consumerConf.setSubscriptionName(pulsarSpoutConf.getSubscriptionName());
consumerConf.setSubscriptionType(pulsarSpoutConf.getSubscriptionType());
consumerConf.setReceiverQueueSize(pulsarSpoutConf.getConsumerReceiverQueueSize());
return consumerConf;
}

static class SpoutConsumer implements PulsarSpoutConsumer {
private Consumer<byte[]> consumer;

public SpoutConsumer(Consumer<byte[]> consumer) {
super();
this.consumer = consumer;
}

@Override
public Message<byte[]> receive(int timeout, TimeUnit unit) throws PulsarClientException {
return consumer.receive(timeout, unit);
}

@Override
public void acknowledgeAsync(Message<?> msg) {
consumer.acknowledgeAsync(msg);
}

@Override
public void close() throws PulsarClientException {
consumer.close();
}

@Override
public void unsubscribe() throws PulsarClientException {
consumer.unsubscribe();
}

}

static class SpoutReader implements PulsarSpoutConsumer {
private Reader<byte[]> reader;

public SpoutReader(Reader<byte[]> reader) {
super();
this.reader = reader;
}

@Override
public Message<byte[]> receive(int timeout, TimeUnit unit) throws PulsarClientException {
return reader.readNext(timeout, unit);
}

@Override
public void acknowledgeAsync(Message<?> msg) {
// No-op
}

@Override
public void close() throws PulsarClientException {
try {
reader.close();
} catch (IOException e) {
throw new PulsarClientException(e);
}
}

@Override
public void unsubscribe() throws PulsarClientException {
// No-op
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.SubscriptionType;

/**
Expand All @@ -47,7 +48,11 @@ public class PulsarSpoutConfiguration extends PulsarStormConfiguration {
private SubscriptionType subscriptionType = SubscriptionType.Shared;
private boolean autoUnsubscribe = false;
private int consumerReceiverQueueSize = 1000;
private boolean durableSubscription = true;
// read position if non-durable subscription is enabled : default oldest message available in topic
private MessageId nonDurableSubscriptionReadPosition = MessageId.earliest;


/**
* @return the subscription name for the consumer in the spout
*/
Expand Down Expand Up @@ -174,4 +179,31 @@ public boolean isAutoUnsubscribe() {
public void setAutoUnsubscribe(boolean autoUnsubscribe) {
this.autoUnsubscribe = autoUnsubscribe;
}

public boolean isDurableSubscription() {
return durableSubscription;
}

/**
* if subscription is not durable then it creates non-durable reader to start reading from the
* {@link #setNonDurableSubscriptionReadPosition(MessagePosition)} in topic.
*
* @param nonDurableSubscription
*/
public void setDurableSubscription(boolean durableSubscription) {
this.durableSubscription = durableSubscription;
}

public MessageId getNonDurableSubscriptionReadPosition() {
return nonDurableSubscriptionReadPosition;
}

/**
* Non-durable-subscription/Reader can be set to start reading from a specific position earliest/latest.
*
* @param nonDurableSubscriptionReadPosition
*/
public void setNonDurableSubscriptionReadPosition(MessageId nonDurableSubscriptionReadPosition) {
this.nonDurableSubscriptionReadPosition = nonDurableSubscriptionReadPosition;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.storm;

import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;

public interface PulsarSpoutConsumer {

/**
* Receives a single message.
*
* @param waitTime
* @param milliseconds
* @return
* @throws PulsarClientException
*/
Message<byte[]> receive(int waitTime, TimeUnit unit) throws PulsarClientException;

/**
* Ack the message async.
*
* @param msg
*/
void acknowledgeAsync(Message<?> msg);

/**
* unsubscribe the consumer
* @throws PulsarClientException
*/
void unsubscribe() throws PulsarClientException;

/**
* Close the consumer
*
* @throws PulsarClientException
*/
void close() throws PulsarClientException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -43,6 +45,7 @@ public class SharedPulsarClient {
private final AtomicInteger counter = new AtomicInteger();

private Consumer<byte[]> consumer;
private Reader<byte[]> reader;
private Producer<byte[]> producer;

private SharedPulsarClient(String componentId, ClientConfigurationData clientConf)
Expand Down Expand Up @@ -104,6 +107,23 @@ public Consumer<byte[]> getSharedConsumer(ConsumerConfigurationData<byte[]> cons
return consumer;
}

public Reader<byte[]> getSharedReader(ReaderConfigurationData<byte[]> readerConf) throws PulsarClientException {
counter.incrementAndGet();
synchronized (this) {
if (reader == null) {
try {
reader = client.createReaderAsync(readerConf).join();
} catch (CompletionException e) {
throw (PulsarClientException) e.getCause();
}
LOG.info("[{}] Created a new Pulsar reader on {}", componentId, readerConf.getTopicName());
} else {
LOG.info("[{}] Using a shared reader on {}", componentId, readerConf.getTopicName());
}
}
return reader;
}

public Producer<byte[]> getSharedProducer(ProducerConfigurationData producerConf) throws PulsarClientException {
counter.incrementAndGet();
synchronized (this) {
Expand All @@ -130,4 +150,5 @@ public void close() throws PulsarClientException {
}
}
}

}
Loading

0 comments on commit 407c445

Please sign in to comment.