Skip to content

Commit

Permalink
[pulsar-storm] Fix NPE while emitting next tuple (apache#3991)
Browse files Browse the repository at this point in the history
### Motivation

[PulsarSpout] removes messages from [pendingMessageRetries](https://github.com/apache/pulsar/blob/master/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java#L126) but it doesn't remove from the `failedMessages` queue because of that PulsarSpout throws NPE while [emitting next tuple](https://github.com/apache/pulsar/blob/master/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java#L181)

````
stack-trace with old pulsar-storm lib: 1.20
2019-04-05 18:49:58.240 b.s.util CmsSpout_[1 1] [INFO] Async loop Stacktrace is: {} java.lang.NullPointerException
    at org.apache.pulsar.storm.PulsarSpout.emitNextAvailableTuple(PulsarSpout.java:176)
    at org.apache.pulsar.storm.PulsarSpout.nextTuple(PulsarSpout.java:160)
    at backtype.storm.daemon.executor$fn__7365$fn__7380$fn__7411.invoke(executor.clj:577)
    at backtype.storm.util$async_loop$fn__551.invoke(util.clj:491)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.lang.Thread.run(Thread.java:748)
```
  • Loading branch information
rdhabalia authored and sijie committed Apr 9, 2019
1 parent 372575a commit 12de91f
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 15 deletions.
54 changes: 39 additions & 15 deletions pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.shade.org.eclipse.jetty.util.log.Log;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
Expand Down Expand Up @@ -124,6 +125,8 @@ public void ack(Object msgId) {
}
consumer.acknowledgeAsync(msg);
pendingMessageRetries.remove(msg.getMessageId());
// we should also remove message from failedMessages but it will be eventually removed while emitting next
// tuple
--pendingAcks;
}
}
Expand Down Expand Up @@ -172,25 +175,12 @@ public void nextTuple() {
* emit.
*/
public void emitNextAvailableTuple() {
Message<byte[]> msg;

// check if there are any failed messages to re-emit in the topology
msg = failedMessages.peek();
if (msg != null) {
MessageRetries messageRetries = pendingMessageRetries.get(msg.getMessageId());
if (Backoff.shouldBackoff(messageRetries.getTimeStamp(), TimeUnit.NANOSECONDS,
messageRetries.getNumRetries(), clientConf.getDefaultBackoffIntervalNanos(),
clientConf.getMaxBackoffIntervalNanos())) {
Utils.sleep(TimeUnit.NANOSECONDS.toMillis(clientConf.getDefaultBackoffIntervalNanos()));
} else {
// remove the message from the queue and emit to the topology, only if it should not be backedoff
LOG.info("[{}] Retrying failed message {}", spoutId, msg.getMessageId());
failedMessages.remove();
mapToValueAndEmit(msg);
}
if(emitFailedMessage()) {
return;
}

Message<byte[]> msg;
// receive from consumer if no failed messages
if (consumer != null) {
if (LOG.isDebugEnabled()) {
Expand All @@ -215,6 +205,40 @@ public void emitNextAvailableTuple() {
}
}

private boolean emitFailedMessage() {
Message<byte[]> msg;

while ((msg = failedMessages.peek()) != null) {
MessageRetries messageRetries = pendingMessageRetries.get(msg.getMessageId());
if (messageRetries != null) {
// emit the tuple if retry doesn't need backoff else sleep with backoff time and return without doing
// anything
if (Backoff.shouldBackoff(messageRetries.getTimeStamp(), TimeUnit.NANOSECONDS,
messageRetries.getNumRetries(), clientConf.getDefaultBackoffIntervalNanos(),
clientConf.getMaxBackoffIntervalNanos())) {
Utils.sleep(TimeUnit.NANOSECONDS.toMillis(clientConf.getDefaultBackoffIntervalNanos()));
} else {
// remove the message from the queue and emit to the topology, only if it should not be backedoff
LOG.info("[{}] Retrying failed message {}", spoutId, msg.getMessageId());
failedMessages.remove();
mapToValueAndEmit(msg);
}
return true;
}

// messageRetries is null because messageRetries is already acked and removed from pendingMessageRetries
// then remove it from failed message queue as well.
if(LOG.isDebugEnabled()) {
LOG.debug("[{}]-{} removing {} from failedMessage because it's already acked",
pulsarSpoutConf.getTopic(), spoutId, msg.getMessageId());
}
failedMessages.remove();
// try to find out next failed message
continue;
}
return false;
}

@Override
@SuppressWarnings({ "rawtypes" })
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.storm;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

import java.lang.reflect.Field;
import java.util.concurrent.CompletableFuture;

import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;

import com.google.common.collect.Maps;

public class PulsarSpoutTest {

private static final Logger log = LoggerFactory.getLogger(PulsarSpoutTest.class);

@Test
public void testAckFailedMessage() throws Exception {

PulsarSpoutConfiguration conf = new PulsarSpoutConfiguration();
conf.setServiceUrl("http://localhost:8080");
conf.setSubscriptionName("sub1");
conf.setTopic("persistent://prop/ns1/topic1");
conf.setSubscriptionType(SubscriptionType.Exclusive);
conf.setMessageToValuesMapper(new MessageToValuesMapper() {
@Override
public Values toValues(Message<byte[]> msg) {
return null;
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}

});

ClientBuilder builder = spy(new ClientBuilderImpl());
PulsarSpout spout = spy(new PulsarSpout(conf, builder));

Message<byte[]> msg = new MessageImpl<>(conf.getTopic(), "1:1", Maps.newHashMap(), null, Schema.BYTES);
Consumer<byte[]> consumer = mock(Consumer.class);
CompletableFuture<Void> future = new CompletableFuture<>();
future.complete(null);
doReturn(future).when(consumer).acknowledgeAsync(msg.getMessageId());
Field consField = PulsarSpout.class.getDeclaredField("consumer");
consField.setAccessible(true);
consField.set(spout, consumer);

spout.fail(msg);
spout.ack(msg);
spout.emitNextAvailableTuple();
verify(consumer, atLeast(1)).receive(anyInt(), any());
}
}

0 comments on commit 12de91f

Please sign in to comment.