diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java index 1e6469d4a1276..639adb915f0ba 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java @@ -158,6 +158,16 @@ public void fail(Object msgId) { */ @Override public void nextTuple() { + emitNextAvailableTuple(); + } + + /** + * It makes sure that it emits next available non-tuple to topology unless consumer queue doesn't have any message + * available. It receives message from consumer queue and converts it to tuple and emits to topology. if the + * converted tuple is null then it tries to receives next message and perform the same until it finds non-tuple to + * emit. + */ + public void emitNextAvailableTuple() { Message msg; // check if there are any failed messages to re-emit in the topology @@ -182,12 +192,18 @@ public void nextTuple() { LOG.debug("[{}] Receiving the next message from pulsar consumer to emit to the collector", spoutId); } try { - msg = consumer.receive(1, TimeUnit.SECONDS); - if (msg != null) { - ++messagesReceived; - messageSizeReceived += msg.getData().length; + boolean done = false; + while (!done) { + msg = consumer.receive(100, TimeUnit.MILLISECONDS); + if (msg != null) { + ++messagesReceived; + messageSizeReceived += msg.getData().length; + done = mapToValueAndEmit(msg); + } else { + // queue is empty and nothing to emit + done = true; + } } - mapToValueAndEmit(msg); } catch (PulsarClientException e) { LOG.error("[{}] Error receiving message from pulsar consumer", spoutId, e); } @@ -228,7 +244,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { } - private void mapToValueAndEmit(Message msg) { + private boolean mapToValueAndEmit(Message msg) { if (msg != null) { Values values = pulsarSpoutConf.getMessageToValuesMapper().toValues(msg); ++pendingAcks; @@ -244,8 +260,10 @@ private void mapToValueAndEmit(Message msg) { if (LOG.isDebugEnabled()) { LOG.debug("[{}] Emitted message {} to the collector", spoutId, msg.getMessageId()); } + return true; } } + return false; } public class MessageRetries {