From 53f4de7da65d92bb4d0acab311b6d6bd640f0e43 Mon Sep 17 00:00:00 2001 From: Boyang Jerry Peng Date: Mon, 6 Nov 2017 12:58:14 -0800 Subject: [PATCH] updating Storm version in Pulsar (#881) * updating Storm version in Pulsar * fix unit test * adding back tick tuple --- pom.xml | 2 +- .../pulsar/storm/MessageToValuesMapper.java | 4 ++-- .../org/apache/pulsar/storm/PulsarBolt.java | 22 +++++++------------ .../org/apache/pulsar/storm/PulsarSpout.java | 14 ++++++------ .../pulsar/storm/TupleToMessageMapper.java | 4 ++-- .../pulsar/storm/MockOutputCollector.java | 9 ++++++-- .../storm/MockSpoutOutputCollector.java | 7 +++++- .../apache/pulsar/storm/PulsarBoltTest.java | 20 ++++------------- .../apache/pulsar/storm/PulsarSpoutTest.java | 8 +++---- .../pulsar/storm/example/StormExample.java | 22 +++++++++---------- 10 files changed, 52 insertions(+), 60 deletions(-) diff --git a/pom.xml b/pom.xml index 1b2182be2571d..89afbd72ab8c2 100644 --- a/pom.xml +++ b/pom.xml @@ -108,7 +108,7 @@ flexible messaging model and an intuitive client API. 4.3.1.72-yahoo 3.4.10 4.0.46.Final - 0.9.5 + 1.0.5 9.3.11.v20160721 1.7.17 0.0.23 diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java index 08e041a43d443..3c43611c656e0 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java @@ -22,8 +22,8 @@ import org.apache.pulsar.client.api.Message; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Values; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Values; public interface MessageToValuesMapper extends Serializable { diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java index 3a90a615b127f..3e163eeecbf07 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentMap; +import org.apache.storm.utils.TupleUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,13 +33,12 @@ import org.apache.pulsar.client.api.ProducerConfiguration; import org.apache.pulsar.client.api.PulsarClientException; -import backtype.storm.Constants; -import backtype.storm.metric.api.IMetric; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichBolt; -import backtype.storm.tuple.Tuple; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.tuple.Tuple; public class PulsarBolt extends BaseRichBolt implements IMetric { /** @@ -97,8 +97,7 @@ public void prepare(Map conf, TopologyContext context, OutputCollector collector @Override public void execute(Tuple input) { - // do not send tick tuples since they are used to execute periodic tasks - if (isTickTuple(input)) { + if (TupleUtils.isTick(input)) { collector.ack(input); return; } @@ -162,11 +161,6 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { pulsarBoltConf.getTupleToMessageMapper().declareOutputFields(declarer); } - protected static boolean isTickTuple(Tuple tuple) { - return tuple != null && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent()) - && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId()); - } - /** * Helpers for metrics */ diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java index 911cfaeedd68f..feeb0e9ea13f5 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java @@ -37,13 +37,13 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.Backoff; -import backtype.storm.metric.api.IMetric; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.base.BaseRichSpout; -import backtype.storm.tuple.Values; -import backtype.storm.utils.Utils; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; public class PulsarSpout extends BaseRichSpout implements IMetric { diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java index e7f1976bbc107..5c464d9b84795 100644 --- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java +++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java @@ -22,8 +22,8 @@ import org.apache.pulsar.client.api.Message; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Tuple; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; public interface TupleToMessageMapper extends Serializable { diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java index a0c4fa4dd9330..c4e69cb8aa7de 100644 --- a/pulsar-storm/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java +++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java @@ -21,8 +21,8 @@ import java.util.Collection; import java.util.List; -import backtype.storm.task.IOutputCollector; -import backtype.storm.tuple.Tuple; +import org.apache.storm.task.IOutputCollector; +import org.apache.storm.tuple.Tuple; public class MockOutputCollector implements IOutputCollector { @@ -60,6 +60,11 @@ public void fail(Tuple input) { acked = false; } + @Override + public void resetTimeout(Tuple tuple) { + + } + public boolean acked() { return acked; } diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java index d334f0a9b189c..a92895c8d798c 100644 --- a/pulsar-storm/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java +++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java @@ -23,7 +23,7 @@ import org.apache.pulsar.client.api.Message; -import backtype.storm.spout.ISpoutOutputCollector; +import org.apache.storm.spout.ISpoutOutputCollector; public class MockSpoutOutputCollector implements ISpoutOutputCollector { @@ -46,6 +46,11 @@ public void emitDirect(int taskId, String streamId, List tuple, Object m lastMessage = (Message) messageId; } + @Override + public long getPendingCount() { + return 0; + } + @Override public void reportError(Throwable error) { } diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarBoltTest.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarBoltTest.java index 142ba8a49563d..a8744726ef9ad 100644 --- a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarBoltTest.java +++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarBoltTest.java @@ -37,11 +37,10 @@ import org.testng.annotations.Test; import org.testng.collections.Maps; -import backtype.storm.Constants; -import backtype.storm.task.OutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Tuple; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Tuple; public class PulsarBoltTest extends ProducerConsumerBase { @@ -213,15 +212,4 @@ public void testSerializability() throws Exception { PulsarBolt boltWithNoAuth = new PulsarBolt(pulsarBoltConf, new ClientConfiguration()); TestUtil.testSerializability(boltWithNoAuth); } - - @Test - public void testTickTuple() throws Exception { - Tuple mockTuple = mock(Tuple.class); - when(mockTuple.getSourceComponent()).thenReturn(Constants.SYSTEM_COMPONENT_ID); - when(mockTuple.getSourceStreamId()).thenReturn(Constants.SYSTEM_TICK_STREAM_ID); - bolt.execute(mockTuple); - Assert.assertTrue(mockCollector.acked()); - Message msg = consumer.receive(5, TimeUnit.SECONDS); - Assert.assertNull(msg); - } } diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java index db6dddf0d7b09..a479a8b153b0a 100644 --- a/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java +++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/PulsarSpoutTest.java @@ -42,10 +42,10 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.policies.data.PersistentTopicStats; -import backtype.storm.spout.SpoutOutputCollector; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Values; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.tuple.Values; public class PulsarSpoutTest extends ProducerConsumerBase { diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/example/StormExample.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/example/StormExample.java index 5508112f71e6d..344a082265da0 100644 --- a/pulsar-storm/src/test/java/org/apache/pulsar/storm/example/StormExample.java +++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/example/StormExample.java @@ -31,17 +31,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import backtype.storm.Config; -import backtype.storm.LocalCluster; -import backtype.storm.metric.api.IMetricsConsumer; -import backtype.storm.task.IErrorReporter; -import backtype.storm.task.TopologyContext; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Values; -import backtype.storm.utils.Utils; +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.metric.api.IMetricsConsumer; +import org.apache.storm.task.IErrorReporter; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.api.PulsarClient;