Skip to content

Commit

Permalink
Merge branch 'master' of github.com:nathanmarz/storm-contrib into sub…
Browse files Browse the repository at this point in the history
…modules-merge

Conflicts:
	storm-backport
	storm-benchmark
	storm-cassandra
	storm-growl
	storm-growl/README.markdown
	storm-growl/pom.xml
	storm-growl/src/main/java/storm/growl/GrowlBolt.java
	storm-growl/src/main/java/storm/growl/TestGrowlTopology.java
	storm-hbase
	storm-jms
	storm-redis-pubsub
	storm-scribe
	storm-signals
	storm-state
  • Loading branch information
ptgoetz committed Feb 26, 2013
2 parents 31c7aef + 68a18b7 commit ae82278
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 43 deletions.
24 changes: 13 additions & 11 deletions storm-kafka/project.clj
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
(defproject storm/storm-kafka "0.9.0-wip15-scala292"
:source-path "src/clj"
:java-source-path "src/jvm"
:javac-options {:debug "true" :fork "true"}
:repositories {"conjars" "http://conjars.org/repo/"}
:dependencies [[com.twitter/kafka_2.9.2 "0.7.0"
(defproject storm/storm-kafka "0.9.0-wip16a-scala292"
:java-source-paths ["src/jvm"]
:repositories {"scala-tools" "http://scala-tools.org/repo-releases"
"conjars" "http://conjars.org/repo/"}
:dependencies [[org.scala-lang/scala-library "2.9.2"]
[com.twitter/kafka_2.9.2 "0.7.0"
:exclusions [org.apache.zookeeper/zookeeper
log4j/log4j]]]
:dev-dependencies [[storm "0.9.0-wip15"]
[org.slf4j/log4j-over-slf4j "1.6.6"]
[ch.qos.logback/logback-classic "1.0.6"]
[org.clojure/clojure "1.4.0"]]
:jvm-opts ["-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib"])
:profiles
{:provided {:dependencies [[storm "0.9.0-wip15"]
[org.slf4j/log4j-over-slf4j "1.6.6"]
;;[ch.qos.logback/logback-classic "1.0.6"]
[org.clojure/clojure "1.4.0"]]}}
:jvm-opts ["-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib"]
:min-lein-version "2.0")
10 changes: 8 additions & 2 deletions storm-kafka/src/jvm/storm/kafka/DynamicPartitionConnections.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,22 @@ public SimpleConsumer register(HostPort host, int partition) {
info.partitions.add(partition);
return info.consumer;
}

public SimpleConsumer getConnection(GlobalPartitionId id) {
ConnectionInfo info = _connections.get(id.host);
if(info != null) return info.consumer;
return null;
}

public void unregister(HostPort port, int partition) {
ConnectionInfo info = _connections.get(port);
info.partitions.remove(partition);
if(info.partitions.isEmpty()) {
info.consumer.close();
_connections.remove(port);
}
_connections.remove(port);
}

public void unregister(GlobalPartitionId id) {
unregister(id.host, id.partition);
}
Expand Down
25 changes: 19 additions & 6 deletions storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package storm.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.*;

import backtype.storm.metric.api.IMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -15,6 +13,7 @@
import backtype.storm.topology.base.BaseRichSpout;
import kafka.message.Message;
import storm.kafka.PartitionManager.KafkaMessageId;
import storm.kafka.trident.KafkaUtils;

// TODO: need to add blacklisting
// TODO: need to make a best effort to not re-emit messages if don't have to
Expand Down Expand Up @@ -43,7 +42,7 @@ static enum EmitState {
PartitionCoordinator _coordinator;
DynamicPartitionConnections _connections;
ZkState _state;

long _lastUpdateMs = 0;

int _currPartitionIndex = 0;
Expand All @@ -53,7 +52,7 @@ public KafkaSpout(SpoutConfig spoutConf) {
}

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
public void open(Map conf, TopologyContext context, final SpoutOutputCollector collector) {
_collector = collector;

Map stateConf = new HashMap(conf);
Expand All @@ -76,6 +75,20 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect
_coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
}

context.registerMetric("kafkaOffset", new IMetric() {
KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_spoutConfig.topic, _connections);
@Override
public Object getValueAndReset() {
List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
Set<GlobalPartitionId> latestPartitions = new HashSet();
for(PartitionManager pm : pms) { latestPartitions.add(pm.getPartition()); }
_kafkaOffsetMetric.refreshPartitions(latestPartitions);
for(PartitionManager pm : pms) {
_kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset());
}
return _kafkaOffsetMetric.getValueAndReset();
}
}, 60);
}

@Override
Expand Down
23 changes: 23 additions & 0 deletions storm-kafka/src/jvm/storm/kafka/PartitionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.google.common.collect.ImmutableMap;
import java.util.*;
import kafka.api.FetchRequest;
import kafka.api.OffsetRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
Expand Down Expand Up @@ -39,6 +40,7 @@ public KafkaMessageId(GlobalPartitionId partition, long offset) {
ZkState _state;
Map _stormConf;


public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, GlobalPartitionId id) {
_partition = id;
_connections = connections;
Expand Down Expand Up @@ -166,6 +168,27 @@ private String committedPath() {
return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition;
}

public long queryPartitionOffsetLatestTime() {
return _consumer.getOffsetsBefore(_spoutConfig.topic, _partition.partition,
OffsetRequest.LatestTime(), 1)[0];
}

public long lastCommittedOffset() {
return _committedTo;
}

public long lastCompletedOffset() {
if(_pending.isEmpty()) {
return _emittedToOffset;
} else {
return _pending.first();
}
}

public GlobalPartitionId getPartition() {
return _partition;
}

public void close() {
_connections.unregister(_partition.host, _partition.partition);
}
Expand Down
91 changes: 84 additions & 7 deletions storm-kafka/src/jvm/storm/kafka/trident/KafkaUtils.java
Original file line number Diff line number Diff line change
@@ -1,28 +1,32 @@
package storm.kafka.trident;

import java.net.ConnectException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.*;

import backtype.storm.metric.api.CombinedMetric;
import backtype.storm.metric.api.IMetric;
import backtype.storm.metric.api.ReducedMetric;
import com.google.common.collect.ImmutableMap;

import backtype.storm.utils.Utils;
import kafka.api.FetchRequest;
import kafka.api.OffsetRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageAndOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import storm.kafka.DynamicPartitionConnections;
import storm.kafka.GlobalPartitionId;
import storm.kafka.HostPort;
import storm.kafka.KafkaConfig.StaticHosts;
import storm.kafka.KafkaConfig.ZkHosts;
import storm.trident.operation.TridentCollector;

public class KafkaUtils {
public static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);

public static IBrokerReader makeBrokerReader(Map stormConf, TridentKafkaConfig conf) {
if(conf.hosts instanceof StaticHosts) {
return new StaticBrokerReader((StaticHosts) conf.hosts);
Expand All @@ -45,7 +49,8 @@ public static List<GlobalPartitionId> getOrderedPartitions(Map<String, List> par
return ret;
}

public static Map emitPartitionBatchNew(TridentKafkaConfig config, SimpleConsumer consumer, GlobalPartitionId partition, TridentCollector collector, Map lastMeta, String topologyInstanceId, String topologyName) {
public static Map emitPartitionBatchNew(TridentKafkaConfig config, SimpleConsumer consumer, GlobalPartitionId partition, TridentCollector collector, Map lastMeta, String topologyInstanceId, String topologyName,
ReducedMetric meanMetric, CombinedMetric maxMetric) {
long offset;
if(lastMeta!=null) {
String lastInstanceId = null;
Expand All @@ -65,7 +70,12 @@ public static Map emitPartitionBatchNew(TridentKafkaConfig config, SimpleConsume
}
ByteBufferMessageSet msgs;
try {
long start = System.nanoTime();
msgs = consumer.fetch(new FetchRequest(config.topic, partition.partition, offset, config.fetchSizeBytes));
long end = System.nanoTime();
long millis = (end - start) / 1000000;
meanMetric.update(millis);
maxMetric.update(millis);
} catch(Exception e) {
if(e instanceof ConnectException) {
throw new FailedFetchException(e);
Expand Down Expand Up @@ -97,4 +107,71 @@ public static void emit(TridentKafkaConfig config, TridentCollector collector, M
collector.emit(value);
}
}


public static class KafkaOffsetMetric implements IMetric {
Map<GlobalPartitionId, Long> _partitionToOffset = new HashMap<GlobalPartitionId, Long>();
Set<GlobalPartitionId> _partitions;
String _topic;
DynamicPartitionConnections _connections;

public KafkaOffsetMetric(String topic, DynamicPartitionConnections connections) {
_topic = topic;
_connections = connections;
}

public void setLatestEmittedOffset(GlobalPartitionId partition, long offset) {
_partitionToOffset.put(partition, offset);
}

@Override
public Object getValueAndReset() {
try {
long totalSpoutLag = 0;
long totalLatestTimeOffset = 0;
long totalLatestEmittedOffset = 0;
HashMap ret = new HashMap();
if(_partitions != null && _partitions.size() == _partitionToOffset.size()) {
for(Map.Entry<GlobalPartitionId, Long> e : _partitionToOffset.entrySet()) {
GlobalPartitionId partition = e.getKey();
SimpleConsumer consumer = _connections.getConnection(partition);
if(consumer == null) {
LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
return null;
}
long latestTimeOffset = consumer.getOffsetsBefore(_topic, partition.partition, OffsetRequest.LatestTime(), 1)[0];
if(latestTimeOffset == 0) {
LOG.warn("No data found in Kafka Partition " + partition.getId());
return null;
}
long latestEmittedOffset = (Long)e.getValue();
long spoutLag = latestTimeOffset - latestEmittedOffset;
ret.put(partition.getId() + "/" + "spoutLag", spoutLag);
ret.put(partition.getId() + "/" + "latestTime", latestTimeOffset);
ret.put(partition.getId() + "/" + "latestEmittedOffset", latestEmittedOffset);
totalSpoutLag += spoutLag;
totalLatestTimeOffset += latestTimeOffset;
totalLatestEmittedOffset += latestEmittedOffset;
}
ret.put("totalSpoutLag", totalSpoutLag);
ret.put("totalLatestTime", totalLatestTimeOffset);
ret.put("totalLatestEmittedOffset", totalLatestEmittedOffset);
return ret;
} else {
LOG.info("Metrics Tick: Not enough data to calculate spout lag.");
}
} catch(Throwable t) {
LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t);
}
return null;
}

public void refreshPartitions(Set<GlobalPartitionId> partitions) {
_partitions = partitions;
Iterator<GlobalPartitionId> it = _partitionToOffset.keySet().iterator();
while(it.hasNext()) {
if(!partitions.contains(it.next())) it.remove();
}
}
};
}
20 changes: 20 additions & 0 deletions storm-kafka/src/jvm/storm/kafka/trident/MaxMetric.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package storm.kafka.trident;


import backtype.storm.metric.api.ICombiner;
import clojure.lang.Numbers;

public class MaxMetric implements ICombiner<Long> {
@Override
public Long identity() {
return null;
}

@Override
public Long combine(Long l1, Long l2) {
if(l1 == null) return l2;
if(l2 == null) return l1;
return Math.max(l1, l2);
}

}
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package storm.kafka.trident;

import backtype.storm.Config;
import backtype.storm.metric.api.CombinedMetric;
import backtype.storm.metric.api.MeanReducer;
import backtype.storm.metric.api.ReducedMetric;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import com.google.common.collect.ImmutableMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import java.util.*;

import kafka.javaapi.consumer.SimpleConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -30,7 +32,7 @@ public OpaqueTridentKafkaSpout(TridentKafkaConfig config) {

@Override
public IOpaquePartitionedTridentSpout.Emitter<Map<String, List>, GlobalPartitionId, Map> getEmitter(Map conf, TopologyContext context) {
return new Emitter(conf);
return new Emitter(conf, context);
}

@Override
Expand Down Expand Up @@ -74,17 +76,26 @@ public Map getPartitionsForBatch() {
class Emitter implements IOpaquePartitionedTridentSpout.Emitter<Map<String, List>, GlobalPartitionId, Map> {
DynamicPartitionConnections _connections;
String _topologyName;

public Emitter(Map conf) {
KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric;
ReducedMetric _kafkaMeanFetchLatencyMetric;
CombinedMetric _kafkaMaxFetchLatencyMetric;

public Emitter(Map conf, TopologyContext context) {
_connections = new DynamicPartitionConnections(_config);
_topologyName = (String) conf.get(Config.TOPOLOGY_NAME);
_kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_config.topic, _connections);
context.registerMetric("kafkaOffset", _kafkaOffsetMetric, 60);
_kafkaMeanFetchLatencyMetric = context.registerMetric("kafkaFetchAvg", new MeanReducer(), 60);
_kafkaMaxFetchLatencyMetric = context.registerMetric("kafkaFetchMax", new MaxMetric(), 60);
}

@Override
public Map emitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, GlobalPartitionId partition, Map lastMeta) {
try {
SimpleConsumer consumer = _connections.register(partition);
return KafkaUtils.emitPartitionBatchNew(_config, consumer, partition, collector, lastMeta, _topologyInstanceId, _topologyName);
Map ret = KafkaUtils.emitPartitionBatchNew(_config, consumer, partition, collector, lastMeta, _topologyInstanceId, _topologyName, _kafkaMeanFetchLatencyMetric, _kafkaMaxFetchLatencyMetric);
_kafkaOffsetMetric.setLatestEmittedOffset(partition, (Long)ret.get("offset"));
return ret;
} catch(FailedFetchException e) {
LOG.warn("Failed to fetch from partition " + partition);
if(lastMeta==null) {
Expand Down Expand Up @@ -116,6 +127,7 @@ public List<GlobalPartitionId> getOrderedPartitions(Map<String, List> partitions
@Override
public void refreshPartitions(List<GlobalPartitionId> list) {
_connections.clear();
_kafkaOffsetMetric.refreshPartitions(new HashSet<GlobalPartitionId>(list));
}
}
}
Loading

0 comments on commit ae82278

Please sign in to comment.