diff --git a/pom.xml b/pom.xml
index 1b2182be2571d..89afbd72ab8c2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -108,7 +108,7 @@ flexible messaging model and an intuitive client API.
4.3.1.72-yahoo
3.4.10
4.0.46.Final
- 0.9.5
+ 1.0.5
9.3.11.v20160721
1.7.17
0.0.23
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
index 08e041a43d443..3c43611c656e0 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/MessageToValuesMapper.java
@@ -22,8 +22,8 @@
import org.apache.pulsar.client.api.Message;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Values;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Values;
public interface MessageToValuesMapper extends Serializable {
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
index 3a90a615b127f..3e163eeecbf07 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
@@ -21,6 +21,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
+import org.apache.storm.utils.TupleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,13 +33,12 @@
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
-import backtype.storm.Constants;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
public class PulsarBolt extends BaseRichBolt implements IMetric {
/**
@@ -97,8 +97,7 @@ public void prepare(Map conf, TopologyContext context, OutputCollector collector
@Override
public void execute(Tuple input) {
- // do not send tick tuples since they are used to execute periodic tasks
- if (isTickTuple(input)) {
+ if (TupleUtils.isTick(input)) {
collector.ack(input);
return;
}
@@ -162,11 +161,6 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
pulsarBoltConf.getTupleToMessageMapper().declareOutputFields(declarer);
}
- protected static boolean isTickTuple(Tuple tuple) {
- return tuple != null && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent())
- && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
- }
-
/**
* Helpers for metrics
*/
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
index 911cfaeedd68f..feeb0e9ea13f5 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarSpout.java
@@ -37,13 +37,13 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
public class PulsarSpout extends BaseRichSpout implements IMetric {
diff --git a/pulsar-storm/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java b/pulsar-storm/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java
index e7f1976bbc107..5c464d9b84795 100644
--- a/pulsar-storm/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java
+++ b/pulsar-storm/src/main/java/org/apache/pulsar/storm/TupleToMessageMapper.java
@@ -22,8 +22,8 @@
import org.apache.pulsar.client.api.Message;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Tuple;
public interface TupleToMessageMapper extends Serializable {
diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java
index a0c4fa4dd9330..c4e69cb8aa7de 100644
--- a/pulsar-storm/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java
+++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/MockOutputCollector.java
@@ -21,8 +21,8 @@
import java.util.Collection;
import java.util.List;
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.tuple.Tuple;
+import org.apache.storm.task.IOutputCollector;
+import org.apache.storm.tuple.Tuple;
public class MockOutputCollector implements IOutputCollector {
@@ -60,6 +60,11 @@ public void fail(Tuple input) {
acked = false;
}
+ @Override
+ public void resetTimeout(Tuple tuple) {
+
+ }
+
public boolean acked() {
return acked;
}
diff --git a/pulsar-storm/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java b/pulsar-storm/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java
index d334f0a9b189c..a92895c8d798c 100644
--- a/pulsar-storm/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java
+++ b/pulsar-storm/src/test/java/org/apache/pulsar/storm/MockSpoutOutputCollector.java
@@ -23,7 +23,7 @@
import org.apache.pulsar.client.api.Message;
-import backtype.storm.spout.ISpoutOutputCollector;
+import org.apache.storm.spout.ISpoutOutputCollector;
public class MockSpoutOutputCollector implements ISpoutOutputCollector {
@@ -46,6 +46,11 @@ public void emitDirect(int taskId, String streamId, List