Skip to content

Commit

Permalink
+ statsd: allow time and memory metrics be scaled before sending to s…
Browse files Browse the repository at this point in the history
…tatsd
  • Loading branch information
jozic committed Dec 1, 2015
1 parent da31689 commit 09f0932
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 20 deletions.
10 changes: 10 additions & 0 deletions kamon-statsd/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,16 @@ kamon {
# Max packet size for UDP metrics data sent to StatsD.
max-packet-size = 1024 bytes
}

# All time values are collected in nanoseconds,
# to scale before sending to StatsD set "time-units" to "s" or "ms" or "µs".
# Value "ns" is equivalent to omitting the setting
# time-units = "ns"

# All memory values are collected in bytes,
# to scale before sending to StatsD set "memory-units" to "gb" or "mb" or "kb".
# Value "b" is equivalent to omitting the setting
# memory-units = "b"
}

modules {
Expand Down
20 changes: 14 additions & 6 deletions kamon-statsd/src/main/scala/kamon/statsd/StatsD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@

package kamon.statsd

import scala.collection.JavaConverters._
import scala.concurrent.duration._

import akka.actor._
import akka.event.Logging
import com.typesafe.config.Config
import kamon.Kamon
import kamon.metric._
import kamon.util.ConfigTools.Syntax
import scala.concurrent.duration._
import com.typesafe.config.Config
import akka.event.Logging
import scala.collection.JavaConverters._
import kamon.util.NeedToScale

object StatsD extends ExtensionId[StatsDExtension] with ExtensionIdProvider {
override def lookup(): ExtensionId[_ <: Extension] = StatsD
Expand Down Expand Up @@ -62,11 +64,17 @@ class StatsDExtension(system: ExtendedActorSystem) extends Kamon.Extension {

val metricsSender = system.actorOf(senderFactory.props(statsDConfig, keyGenerator), "statsd-metrics-sender")

val decoratedSender = statsDConfig match {
case NeedToScale(scaleTimeTo, scaleMemoryTo) =>
system.actorOf(MetricScaleDecorator.props(scaleTimeTo, scaleMemoryTo, metricsSender), "statsd-metric-scale-decorator")
case _ => metricsSender
}

if (flushInterval == tickInterval) {
// No need to buffer the metrics, let's go straight to the metrics sender.
metricsSender
decoratedSender
} else {
system.actorOf(TickMetricSnapshotBuffer.props(flushInterval, metricsSender), "statsd-metrics-buffer")
system.actorOf(TickMetricSnapshotBuffer.props(flushInterval, decoratedSender), "statsd-metrics-buffer")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,43 @@ package kamon.statsd
import java.net.InetSocketAddress
import java.text.{ DecimalFormat, DecimalFormatSymbols }
import java.util.Locale

import akka.actor.{ Actor, ActorRef, ActorSystem }
import akka.io.{ IO, Udp }
import akka.util.ByteString
import com.typesafe.config.Config
import kamon.metric.SubscriptionsDispatcher.TickMetricSnapshot

trait StatsDValueFormatters {

val symbols = DecimalFormatSymbols.getInstance(Locale.US)
symbols.setDecimalSeparator('.')
// Just in case there is some weird locale config we are not aware of.

// Absurdly high number of decimal digits, let the other end lose precision if it needs to.
val samplingRateFormat = new DecimalFormat("#.################################################################", symbols)

def encodeStatsDTimer(level: Long, count: Long): String = {
val samplingRate: Double = 1D / count
level.toString + "|ms" + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "")
}

def encodeStatsDCounter(count: Long): String = count.toString + "|c"
}

/**
* Base class for different StatsD senders utilizing UDP protocol. It implies use of one statsd server.
* @param statsDConfig Config to read settings specific to this sender
* @param metricKeyGenerator Key generator for all metrics sent by this sender
*/
abstract class UDPBasedStatsDMetricsSender(statsDConfig: Config, metricKeyGenerator: MetricKeyGenerator)
extends Actor with UdpExtensionProvider {
extends Actor with UdpExtensionProvider with StatsDValueFormatters {

import context.system

val statsDHost = statsDConfig.getString("hostname")
val statsDPort = statsDConfig.getInt("port")

val symbols = DecimalFormatSymbols.getInstance(Locale.US)
symbols.setDecimalSeparator('.') // Just in case there is some weird locale config we are not aware of.

// Absurdly high number of decimal digits, let the other end lose precision if it needs to.
val samplingRateFormat = new DecimalFormat("#.################################################################", symbols)

udpExtension ! Udp.SimpleSender

lazy val socketAddress = new InetSocketAddress(statsDHost, statsDPort)
Expand All @@ -61,13 +73,6 @@ abstract class UDPBasedStatsDMetricsSender(statsDConfig: Config, metricKeyGenera

def writeMetricsToRemote(tick: TickMetricSnapshot, flushToUDP: String Unit): Unit

def encodeStatsDTimer(level: Long, count: Long): String = {
val samplingRate: Double = 1D / count
level.toString + "|ms" + (if (samplingRate != 1D) "|@" + samplingRateFormat.format(samplingRate) else "")
}

def encodeStatsDCounter(count: Long): String = count.toString + "|c"

}

trait UdpExtensionProvider {
Expand Down

0 comments on commit 09f0932

Please sign in to comment.