Skip to content

Commit

Permalink
updating Storm version in Pulsar (apache#881)
Browse files Browse the repository at this point in the history
* updating Storm version in Pulsar

* fix unit test

* adding back tick tuple
  • Loading branch information
jerrypeng authored and merlimat committed Nov 6, 2017
1 parent 9433f36 commit 53f4de7
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 60 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ flexible messaging model and an intuitive client API.</description>
<bookkeeper.version>4.3.1.72-yahoo</bookkeeper.version>
<zookeeper.version>3.4.10</zookeeper.version>
<netty.version>4.0.46.Final</netty.version>
<storm.version>0.9.5</storm.version>
<storm.version>1.0.5</storm.version>
<jetty.version>9.3.11.v20160721</jetty.version>
<athenz.version>1.7.17</athenz.version>
<prometheus.version>0.0.23</prometheus.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

import org.apache.pulsar.client.api.Message;

import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Values;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Values;

public interface MessageToValuesMapper extends Serializable {

Expand Down
22 changes: 8 additions & 14 deletions pulsar-storm/src/main/java/org/apache/pulsar/storm/PulsarBolt.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

import org.apache.storm.utils.TupleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -32,13 +33,12 @@
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;

import backtype.storm.Constants;
import backtype.storm.metric.api.IMetric;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

public class PulsarBolt extends BaseRichBolt implements IMetric {
/**
Expand Down Expand Up @@ -97,8 +97,7 @@ public void prepare(Map conf, TopologyContext context, OutputCollector collector

@Override
public void execute(Tuple input) {
// do not send tick tuples since they are used to execute periodic tasks
if (isTickTuple(input)) {
if (TupleUtils.isTick(input)) {
collector.ack(input);
return;
}
Expand Down Expand Up @@ -162,11 +161,6 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
pulsarBoltConf.getTupleToMessageMapper().declareOutputFields(declarer);
}

protected static boolean isTickTuple(Tuple tuple) {
return tuple != null && Constants.SYSTEM_COMPONENT_ID.equals(tuple.getSourceComponent())
&& Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
}

/**
* Helpers for metrics
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;

import backtype.storm.metric.api.IMetric;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

public class PulsarSpout extends BaseRichSpout implements IMetric {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

import org.apache.pulsar.client.api.Message;

import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;

public interface TupleToMessageMapper extends Serializable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import java.util.Collection;
import java.util.List;

import backtype.storm.task.IOutputCollector;
import backtype.storm.tuple.Tuple;
import org.apache.storm.task.IOutputCollector;
import org.apache.storm.tuple.Tuple;

public class MockOutputCollector implements IOutputCollector {

Expand Down Expand Up @@ -60,6 +60,11 @@ public void fail(Tuple input) {
acked = false;
}

@Override
public void resetTimeout(Tuple tuple) {

}

public boolean acked() {
return acked;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import org.apache.pulsar.client.api.Message;

import backtype.storm.spout.ISpoutOutputCollector;
import org.apache.storm.spout.ISpoutOutputCollector;

public class MockSpoutOutputCollector implements ISpoutOutputCollector {

Expand All @@ -46,6 +46,11 @@ public void emitDirect(int taskId, String streamId, List<Object> tuple, Object m
lastMessage = (Message) messageId;
}

@Override
public long getPendingCount() {
return 0;
}

@Override
public void reportError(Throwable error) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,10 @@
import org.testng.annotations.Test;
import org.testng.collections.Maps;

import backtype.storm.Constants;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;

public class PulsarBoltTest extends ProducerConsumerBase {

Expand Down Expand Up @@ -213,15 +212,4 @@ public void testSerializability() throws Exception {
PulsarBolt boltWithNoAuth = new PulsarBolt(pulsarBoltConf, new ClientConfiguration());
TestUtil.testSerializability(boltWithNoAuth);
}

@Test
public void testTickTuple() throws Exception {
Tuple mockTuple = mock(Tuple.class);
when(mockTuple.getSourceComponent()).thenReturn(Constants.SYSTEM_COMPONENT_ID);
when(mockTuple.getSourceStreamId()).thenReturn(Constants.SYSTEM_TICK_STREAM_ID);
bolt.execute(mockTuple);
Assert.assertTrue(mockCollector.acked());
Message msg = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNull(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.PersistentTopicStats;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Values;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Values;

public class PulsarSpoutTest extends ProducerConsumerBase {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.metric.api.IMetricsConsumer;
import backtype.storm.task.IErrorReporter;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.metric.api.IMetricsConsumer;
import org.apache.storm.task.IErrorReporter;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
Expand Down

0 comments on commit 53f4de7

Please sign in to comment.