Skip to content

Commit

Permalink
LoggingMetricsConsumer dumps metrics to log file
Browse files Browse the repository at this point in the history
* Added a LoggingMetricsConsumer example of a MetricsConsumer. Use it by adding 'conf.registerMetricsConsumer(LoggingMetricsConsumer.class);' when you construct your topology.
* Added a dedicated appender to the example logback/cluster.xml called METRICS.
  Its log format omits the class and priority since with the dedicated appender those are redu
  • Loading branch information
Philip (flip) Kromer committed Jul 21, 2013
1 parent 470e7cd commit ef94d01
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 6 deletions.
10 changes: 6 additions & 4 deletions conf/storm.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@

## Metrics Consumers
# topology.metrics.consumer.register:
# - class: "org.mycompany.MyMetricsConsumer"
# argument:
# - endpoint: "metrics-collector.mycompany.org"
# parallelism.hint: 1
# - class: "backtype.storm.metrics.LoggingMetricsConsumer"
# parallelism.hint: 1
# - class: "org.mycompany.MyMetricsConsumer"
# parallelism.hint: 1
# argument:
# - endpoint: "metrics-collector.mycompany.org"
27 changes: 25 additions & 2 deletions logback/cluster.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n</pattern>
</encoder>
</appender>
</appender>

<appender name="ACCESS" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${storm.home}/logs/access.log</file>
Expand All @@ -33,7 +33,24 @@
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n</pattern>
</encoder>
</appender>
</appender>

<appender name="METRICS" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${storm.home}/logs/metrics.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>metrics.log.%i</fileNamePattern>
<minIndex>1</minIndex>
<maxIndex>9</maxIndex>
</rollingPolicy>

<triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>2MB</maxFileSize>
</triggeringPolicy>

<encoder>
<pattern>%d %-8r %m%n</pattern>
</encoder>
</appender>

<root level="INFO">
<appender-ref ref="A1"/>
Expand All @@ -43,4 +60,10 @@
<level value="INFO" />
<appender-ref ref="ACCESS" />
</logger>

<logger name="backtype.storm.metric.LoggingMetricsConsumer" additivity="false" >
<level value="INFO"/>
<appender-ref ref="METRICS"/>
</logger>

</configuration>
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package backtype.storm.metric;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Map;

import backtype.storm.metric.api.IMetricsConsumer;
import backtype.storm.task.IErrorReporter;
import backtype.storm.task.TopologyContext;
import backtype.storm.utils.Utils;

/*
* Listens for all metrics, dumps them to log
*
* To use, add this to your topology's configuration:
* conf.registerMetricsConsumer(backtype.storm.metrics.LoggingMetricsConsumer.class, 1);
*
* Or edit the storm.yaml config file:
*
* topology.metrics.consumer.register:
* - class: "backtype.storm.metrics.LoggingMetricsConsumer"
* parallelism.hint: 1
*
*/
public class LoggingMetricsConsumer implements IMetricsConsumer {
public static final Logger LOG = LoggerFactory.getLogger(LoggingMetricsConsumer.class);

@Override
public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) { }

static private String padding = " ";

@Override
public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) {
StringBuilder sb = new StringBuilder();
String header = String.format("%d\t%15s:%-4d\t%3d:%-11s\t",
taskInfo.timestamp,
taskInfo.srcWorkerHost, taskInfo.srcWorkerPort,
taskInfo.srcTaskId,
taskInfo.srcComponentId);
sb.append(header);
for (DataPoint p : dataPoints) {
sb.delete(header.length(), sb.length());
sb.append(p.name)
.append(padding).delete(header.length()+23,sb.length()).append("\t")
.append(p.value);
LOG.info(sb.toString());
}
}

@Override
public void cleanup() { }
}

0 comments on commit ef94d01

Please sign in to comment.