Skip to content

Commit

Permalink
! statsd: allow custom statsd senders + add simple statsd sender whic…
Browse files Browse the repository at this point in the history
…h doesn't batch stats
  • Loading branch information
jozic committed Oct 28, 2015
1 parent 546f460 commit 4bcddee
Show file tree
Hide file tree
Showing 12 changed files with 519 additions and 232 deletions.
17 changes: 14 additions & 3 deletions kamon-statsd/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ kamon {
# kamon.metric.tick-interval setting.
flush-interval = 10 seconds

# Max packet size for UDP metrics data sent to StatsD.
max-packet-size = 1024 bytes

# Subscription patterns used to select which metrics will be pushed to StatsD. Note that first, metrics
# collection for your desired entities must be activated under the kamon.metrics.filters settings.
subscriptions {
Expand Down Expand Up @@ -63,6 +60,20 @@ kamon {
# version of StatsD or if you are running your own, customized version of StatsD that supports this.
metric-name-normalization-strategy = normalize
}

# FQCN of the implementation of `kamon.statsd.StatsDMetricsSenderFactory` to be instantiated and use for
# creating StatsD sender. Provided implementations are:
# - `kamon.statsd.BatchStatsDMetricsSender`. Sends a UDP packet every "kamon.statsd.flush-interval" or
# as long as "kamon.statsd.batch-metric-sender.max-packet-size" is reached. Default one.
# - `kamon.statsd.SimpleStatsDMetricsSender`. Sends a UDP packet for each piece of data it receives.
metric-sender-factory = kamon.statsd.BatchStatsDMetricsSender

# Settings for `kamon.statsd.BatchStatsDMetricsSender`.
# Used only if kamon.statsd.metric-sender-factory is set to `kamon.statsd.BatchStatsDMetricsSender`
batch-metric-sender {
# Max packet size for UDP metrics data sent to StatsD.
max-packet-size = 1024 bytes
}
}

modules {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* =========================================================================================
* Copyright © 2013-2014 the kamon project <http://kamon.io/>
* Copyright © 2013-2015 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
Expand All @@ -16,41 +16,34 @@

package kamon.statsd

import akka.actor.{ ActorSystem, Props, ActorRef, Actor }
import akka.io.{ Udp, IO }
import java.net.InetSocketAddress
import akka.util.ByteString
import akka.actor.Props
import com.typesafe.config.Config
import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
import java.text.{ DecimalFormatSymbols, DecimalFormat }
import java.util.Locale

import kamon.metric.instrument.{ Counter, Histogram }

class StatsDMetricsSender(statsDHost: String, statsDPort: Int, maxPacketSizeInBytes: Long, metricKeyGenerator: MetricKeyGenerator)
extends Actor with UdpExtensionProvider {
import context.system

val symbols = DecimalFormatSymbols.getInstance(Locale.US)
symbols.setDecimalSeparator('.') // Just in case there is some weird locale config we are not aware of.

// Absurdly high number of decimal digits, let the other end lose precision if it needs to.
val samplingRateFormat = new DecimalFormat("#.################################################################", symbols)

udpExtension ! Udp.SimpleSender

def newSocketAddress = new InetSocketAddress(statsDHost, statsDPort)
/**
* Factory for [[BatchStatsDMetricsSender]].
* Use FQCN of the object in "kamon.statsd.statsd-metrics-sender"
* to select [[BatchStatsDMetricsSender]] as your sender
*/
object BatchStatsDMetricsSender extends StatsDMetricsSenderFactory {
override def props(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator): Props =
Props(new BatchStatsDMetricsSender(statsDConfig, metricKeyGenerator))
}

def receive = {
case Udp.SimpleSenderReady
context.become(ready(sender))
}
/**
* StatsD sender which sends a UDP packet every "kamon.statsd.flush-interval" or
* as long as "kamon.statsd.batch-metric-sender.max-packet-size" is reached.
* @param statsDConfig Config to read settings specific to this sender
* @param metricKeyGenerator Key generator for all metrics sent by this sender
*/
class BatchStatsDMetricsSender(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator)
extends UDPBasedStatsDMetricsSender(statsDConfig, metricKeyGenerator) {

def ready(udpSender: ActorRef): Receive = {
case tick: TickMetricSnapshot writeMetricsToRemote(tick, udpSender)
}
val maxPacketSizeInBytes = statsDConfig.getBytes("batch-metric-sender.max-packet-size")

def writeMetricsToRemote(tick: TickMetricSnapshot, udpSender: ActorRef): Unit = {
val packetBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, udpSender, newSocketAddress)
def writeMetricsToRemote(tick: TickMetricSnapshot, flushToUDP: String Unit): Unit = {
val packetBuilder = new MetricDataPacketBuilder(maxPacketSizeInBytes, flushToUDP)

for (
(entity, snapshot) tick.metrics;
Expand All @@ -72,25 +65,9 @@ class StatsDMetricsSender(statsDHost: String, statsDPort: Int, maxPacketSizeInBy

packetBuilder.flush()
}

def encodeStatsDTimer(level: Long, count: Long): String = {
val samplingRate: Double = 1D / count
level.toString + "|ms" + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "")
}

def encodeStatsDCounter(count: Long): String = count.toString + "|c"
}

object StatsDMetricsSender {
def props(statsDHost: String, statsDPort: Int, maxPacketSize: Long, metricKeyGenerator: MetricKeyGenerator): Props =
Props(new StatsDMetricsSender(statsDHost, statsDPort, maxPacketSize, metricKeyGenerator))
}

trait UdpExtensionProvider {
def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp)
}

class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, remote: InetSocketAddress) {
class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, flushToUDP: String Unit) {
val metricSeparator = "\n"
val measurementSeparator = ":"

Expand All @@ -103,8 +80,7 @@ class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, r
if (fitsOnBuffer(dataWithoutKey))
buffer.append(dataWithoutKey)
else {
flushToUDP(buffer.toString())
buffer.clear()
flush()
buffer.append(key).append(dataWithoutKey)
}
} else {
Expand All @@ -114,17 +90,14 @@ class MetricDataPacketBuilder(maxPacketSizeInBytes: Long, udpSender: ActorRef, r
val mSeparator = if (buffer.length > 0) metricSeparator else ""
buffer.append(mSeparator).append(dataWithoutSeparator)
} else {
flushToUDP(buffer.toString())
buffer.clear()
flush()
buffer.append(dataWithoutSeparator)
}
}
}

def fitsOnBuffer(data: String): Boolean = (buffer.length + data.length) <= maxPacketSizeInBytes

private def flushToUDP(data: String): Unit = udpSender ! Udp.Send(ByteString(data), remote)

def flush(): Unit = {
flushToUDP(buffer.toString)
buffer.clear()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* =========================================================================================
* Copyright © 2013-2015 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* =========================================================================================
*/

package kamon.statsd

import java.lang.management.ManagementFactory
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* =========================================================================================
* Copyright © 2013-2015 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* =========================================================================================
*/

package kamon.statsd

import akka.actor.Props
import com.typesafe.config.Config
import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot
import kamon.metric.instrument.{ Counter, Histogram }

/**
* Factory for [[SimpleStatsDMetricsSender]].
* Use FQCN of the object in "kamon.statsd.statsd-metrics-sender"
* to select [[SimpleStatsDMetricsSender]] as your sender
*/
object SimpleStatsDMetricsSender extends StatsDMetricsSenderFactory {
override def props(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator): Props =
Props(new SimpleStatsDMetricsSender(statsDConfig, metricKeyGenerator))
}

/**
* "Traditional" StatsD sender which sends a UDP packet for each piece of data it receives.
* @param statsDConfig Config to read settings specific to this sender
* @param metricKeyGenerator Key generator for all metrics sent by this sender
*/
class SimpleStatsDMetricsSender(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator)
extends UDPBasedStatsDMetricsSender(statsDConfig, metricKeyGenerator) {

def writeMetricsToRemote(tick: TickMetricSnapshot, flushToUDP: String Unit): Unit = {

for (
(entity, snapshot) tick.metrics;
(metricKey, metricSnapshot) snapshot.metrics
) {

val keyPrefix = metricKeyGenerator.generateKey(entity, metricKey) + ":"

metricSnapshot match {
case hs: Histogram.Snapshot
hs.recordsIterator.foreach { record
flushToUDP(keyPrefix + encodeStatsDTimer(record.level, record.count))
}

case cs: Counter.Snapshot
flushToUDP(keyPrefix + encodeStatsDCounter(cs.count))
}
}
}
}
18 changes: 7 additions & 11 deletions kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* =========================================================================================
* Copyright © 2013-2014 the kamon project <http://kamon.io/>
* Copyright © 2013-2015 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
Expand All @@ -23,8 +23,6 @@ import kamon.util.ConfigTools.Syntax
import scala.concurrent.duration._
import com.typesafe.config.Config
import akka.event.Logging
import java.net.InetSocketAddress
import java.util.concurrent.TimeUnit.MILLISECONDS
import scala.collection.JavaConverters._

object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider {
Expand All @@ -44,10 +42,10 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension {

val tickInterval = metricsExtension.settings.tickInterval
val flushInterval = statsDConfig.getFiniteDuration("flush-interval")
val maxPacketSizeInBytes = statsDConfig.getBytes("max-packet-size")
val keyGeneratorFQCN = statsDConfig.getString("metric-key-generator")
val senderFactoryFQCN = statsDConfig.getString("metric-sender-factory")

val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval, keyGeneratorFQCN, config)
val statsDMetricsListener = buildMetricsListener(tickInterval, flushInterval, keyGeneratorFQCN, senderFactoryFQCN, config)

val subscriptions = statsDConfig.getConfig("subscriptions")
subscriptions.firstLevelKeys.map { subscriptionCategory
Expand All @@ -56,15 +54,13 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension {
}
}

def buildMetricsListener(tickInterval: FiniteDuration, flushInterval: FiniteDuration, keyGeneratorFQCN: String, config: Config): ActorRef = {
def buildMetricsListener(tickInterval: FiniteDuration, flushInterval: FiniteDuration,
keyGeneratorFQCN: String, senderFactoryFQCN: String, config: Config): ActorRef = {
assert(flushInterval >= tickInterval, "StatsD flush-interval needs to be equal or greater to the tick-interval")
val keyGenerator = system.dynamicAccess.createInstanceFor[MetricKeyGenerator](keyGeneratorFQCN, (classOf[Config], config) :: Nil).get
val senderFactory = system.dynamicAccess.getObjectFor[StatsDMetricsSenderFactory](senderFactoryFQCN).get

val metricsSender = system.actorOf(StatsDMetricsSender.props(
statsDConfig.getString("hostname"),
statsDConfig.getInt("port"),
maxPacketSizeInBytes,
keyGenerator), "statsd-metrics-sender")
val metricsSender = system.actorOf(senderFactory.props(statsDConfig, keyGenerator), "statsd-metrics-sender")

if (flushInterval == tickInterval) {
// No need to buffer the metrics, let's go straight to the metrics sender.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* =========================================================================================
* Copyright © 2013-2015 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* =========================================================================================
*/

package kamon.statsd

import akka.actor.Props
import com.typesafe.config.Config

trait StatsDMetricsSenderFactory {
def props(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator): Props
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* =========================================================================================
* Copyright © 2013-2015 the kamon project <http://kamon.io/>
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
* =========================================================================================
*/

package kamon.statsd

import java.net.InetSocketAddress
import java.text.{ DecimalFormat, DecimalFormatSymbols }
import java.util.Locale
import akka.actor.{ Actor, ActorRef, ActorSystem }
import akka.io.{ IO, Udp }
import akka.util.ByteString
import com.typesafe.config.Config
import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot

/**
* Base class for different StatsD senders utilizing UDP protocol. It implies use of one statsd server.
* @param statsDConfig Config to read settings specific to this sender
* @param metricKeyGenerator Key generator for all metrics sent by this sender
*/
abstract class UDPBasedStatsDMetricsSender(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator)
extends Actor with UdpExtensionProvider {

import context.system

val statsDHost = statsDConfig.getString("hostname")
val statsDPort = statsDConfig.getInt("port")

val symbols = DecimalFormatSymbols.getInstance(Locale.US)
symbols.setDecimalSeparator('.') // Just in case there is some weird locale config we are not aware of.

// Absurdly high number of decimal digits, let the other end lose precision if it needs to.
val samplingRateFormat = new DecimalFormat("#.################################################################", symbols)

udpExtension ! Udp.SimpleSender

lazy val socketAddress = new InetSocketAddress(statsDHost, statsDPort)

def receive = {
case Udp.SimpleSenderReady
context.become(ready(sender))
}

def ready(udpSender: ActorRef): Receive = {
case tick: TickMetricSnapshot
writeMetricsToRemote(tick,
(data: String) udpSender ! Udp.Send(ByteString(data), socketAddress))
}

def writeMetricsToRemote(tick: TickMetricSnapshot, flushToUDP: String Unit): Unit

def encodeStatsDTimer(level: Long, count: Long): String = {
val samplingRate: Double = 1D / count
level.toString + "|ms" + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "")
}

def encodeStatsDCounter(count: Long): String = count.toString + "|c"

}

trait UdpExtensionProvider {
def udpExtension(implicit system: ActorSystem): ActorRef = IO(Udp)
}

Loading

0 comments on commit 4bcddee

Please sign in to comment.