Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
ivantopo committed Jun 11, 2013
1 parent 1977465 commit 84c9ae3
Show file tree
Hide file tree
Showing 18 changed files with 246 additions and 42 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,9 @@
Kamon
=====




/metrics/actorsystem/{actorsystem-name}/dispatcher/{dispatcher-name}/
For each dispatcher, show:
-
2 changes: 1 addition & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ object Build extends Build {
.settings(newrelicSettings: _*)
.settings(
libraryDependencies ++=
compile(akkaActor, akkaAgent, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, metrics, newrelic, sprayJson) ++
compile(akkaActor, akkaAgent, sprayCan, sprayClient, sprayRouting, sprayServlet, aspectJ, metrics, sprayJson) ++
test(scalatest, sprayTestkit))


Expand Down
12 changes: 6 additions & 6 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ object Dependencies {
"spray nightlies repo" at "http://nightlies.spray.io"
)

val sprayCan = "io.spray" % "spray-can" % "1.1-20130509"
val sprayRouting = "io.spray" % "spray-routing" % "1.1-20130509"
val sprayTestkit = "io.spray" % "spray-testkit" % "1.1-20130509"
val sprayClient = "io.spray" % "spray-client" % "1.1-20130509"
val sprayServlet = "io.spray" % "spray-servlet" % "1.1-20130509"
val sprayCan = "io.spray" % "spray-can" % "1.1-M8"
val sprayRouting = "io.spray" % "spray-routing" % "1.1-M8"
val sprayTestkit = "io.spray" % "spray-testkit" % "1.1-M8"
val sprayClient = "io.spray" % "spray-client" % "1.1-M8"
val sprayServlet = "io.spray" % "spray-servlet" % "1.1-M8"
val sprayJson = "io.spray" %% "spray-json" % "1.2.3"
val scalaReflect = "org.scala-lang" % "scala-reflect" % "2.10.1"
val akkaActor = "com.typesafe.akka" %% "akka-actor" % "2.1.2"
Expand All @@ -21,7 +21,7 @@ object Dependencies {
val scalatest = "org.scalatest" % "scalatest_2.10" % "2.0.M5b"
val logback = "ch.qos.logback" % "logback-classic" % "1.0.10"
val aspectJ = "org.aspectj" % "aspectjrt" % "1.7.2"
val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.0-BETA2"
val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.0"
val newrelic = "com.newrelic.agent.java" % "newrelic-api" % "2.19.0"

def compile (deps: ModuleID*): Seq[ModuleID] = deps map (_ % "compile")
Expand Down
2 changes: 1 addition & 1 deletion project/NewRelic.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ object NewRelic {

lazy val newrelicSettings = SbtNewrelic.newrelicSettings ++ Seq(
javaOptions in run <++= jvmOptions in newrelic,
newrelicVersion in newrelic := "2.18.0"
newrelicVersion in newrelic := "2.19.0"
)
}
14 changes: 13 additions & 1 deletion src/main/resources/META-INF/aop.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,23 @@

<!--<aspect name="akka.ActorSystemAspect"/>
&lt;!&ndash;<aspect name="akka.MailboxAspect"/>&ndash;&gt;-->
<!--<aspect name="akka.PoolMonitorInstrumentation"/>-->
<!--<aspect name="akka.PoolMonitorAspect"/>-->
<aspect name="akka.ActorInstrumentation" />
<aspect name="akka.instrumentation.ActorRefTellInstrumentation"/>
<aspect name="akka.instrumentation.ActorCellInvokeInstrumentation"/>
<aspect name="kamon.instrumentation.RunnableInstrumentation" />
<!--<aspect name="kamon.instrumentation.DispatcherInstrumentation" />-->
<!--<aspect name ="akka.dispatch.FactoryInstrumentation" />-->


<!-- ExecutorService Instrumentation for Akka. -->
<aspect name="akka.dispatch.ExecutorServiceFactoryProviderInstrumentation"/>
<aspect name="akka.dispatch.NamedExecutorServiceFactoryDelegateInstrumentation"/>






<include within="*"/>
<exclude within="javax..*"/>
Expand Down
4 changes: 2 additions & 2 deletions src/main/resources/newrelic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ common: &default_settings
# This setting is dynamic, so changes do not require restarting your application.
# The levels in increasing order of verboseness are: off, severe, warning, info, fine, finer, finest
# Default is info.
log_level: info
log_level: finer
enable_custom_tracing: true

# Log all data to and from New Relic in plain text.
Expand All @@ -70,7 +70,7 @@ common: &default_settings

# The log file directory.
# Default is the logs directory in the newrelic.jar parent directory.
#log_file_path:
log_file_path: /home/ivantopo/Desktop/tmp

# The agent communicates with New Relic via https by
# default. If you want to communicate with newrelic via http,
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/akka/ActorInstrumentation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package akka
import actor.ActorCell
import org.aspectj.lang.annotation.{After, Around, Pointcut, Aspect}
import org.aspectj.lang.ProceedingJoinPoint
import kamon.metric.Metrics.{ metricsRegistry => meterRegistry }
import kamon.metric.Metrics.{ registry => meterRegistry }
import com.codahale.metrics.Meter
import kamon.metric.MetricsUtils._

Expand Down
18 changes: 16 additions & 2 deletions src/main/scala/akka/PoolMonitorInstrumentation.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,30 @@
package akka

import org.aspectj.lang.annotation._
import akka.dispatch.MonitorableThreadFactory
import kamon.metric.Metrics
import scala.concurrent.forkjoin.ForkJoinPool
import com.codahale.metrics.Gauge

@Aspect("perthis(poolMonitor(*))")
@Aspect("perthis(poolMonitor(scala.concurrent.forkjoin.ForkJoinPool))")
class PoolMonitorAspect {
println("Created PoolMonitorAspect")


@Pointcut("execution(scala.concurrent.forkjoin.ForkJoinPool.new(..)) && this(pool)")
protected def poolMonitor(pool:scala.concurrent.forkjoin.ForkJoinPool):Unit = {}
protected def poolMonitor(pool: scala.concurrent.forkjoin.ForkJoinPool):Unit = {}

@After("poolMonitor(pool)")
def beforePoolInstantiation(pool: scala.concurrent.forkjoin.ForkJoinPool):Unit = {
pool.getFactory match {
case m: MonitorableThreadFactory => registerForMonitoring(pool, m.name)
}
}

def registerForMonitoring(fjp: ForkJoinPool, name: String) {
Metrics.registry.register(s"/metrics/actorsystem/{actorsystem-name}/dispatcher/$name",
new Gauge[Long] {
def getValue: Long = fjp.getPoolSize
})
}
}
2 changes: 1 addition & 1 deletion src/main/scala/kamon/Kamon.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ object Kamon {
override def initialValue() = None
}

implicit lazy val actorSystem = ActorSystem("kamon")
implicit lazy val actorSystem = ActorSystem("kamon-test")


def context() = ctx.get()
Expand Down
6 changes: 4 additions & 2 deletions src/main/scala/kamon/TraceContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ object ThreadLocalTraceEntryStorage extends TraceEntryStorage {

private val storage = new ThreadLocal[List[TraceEntry]] {
override def initialValue(): List[TraceEntry] = Nil
def update(f: List[TraceEntry] => List[TraceEntry]) = set(f(get()))
}

def update(f: List[TraceEntry] => List[TraceEntry]) = storage set f(storage.get)

def store(entry: TraceEntry): Boolean = {
storage.update(entry :: _)
update(entry :: _)
true
}
}


32 changes: 13 additions & 19 deletions src/main/scala/kamon/executor/eventbus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import java.util.concurrent.TimeUnit

import kamon.{CodeBlockExecutionTime, Kamon, TraceContext}
import akka.util.Timeout
import scala.util.Success
import scala.util.Failure
import scala.util.{Random, Success, Failure}
import scala.concurrent.Future

trait Message
Expand All @@ -35,31 +34,24 @@ class AppActorEventBus extends ActorEventBus with LookupClassification{
case class Ping()
case class Pong()

class PingActor(val target: ActorRef) extends Actor with ActorLogging {
implicit def executionContext = context.dispatcher
implicit val timeout = Timeout(30, TimeUnit.SECONDS)
class PingActor extends Actor with ActorLogging {

val pong = context.actorOf(Props[PongActor])
val random = new Random()
def receive = {
case Pong() => {
log.info(s"pong with context ${Kamon.context}")
Thread.sleep(1000)
sender ! Ping()
Thread.sleep(random.nextInt(2000))
//log.info("Message from Ping")
pong ! Ping()
}
case a: Any => println(s"Got ${a} in PING"); Thread.sleep(1000)
}

def withAny(): Any = {1}
def withAnyRef(): AnyRef = {new Object}
}

class PongActor extends Actor with ActorLogging {
def receive = {
case Ping() => {
Thread.sleep(3000)
sender ! Pong()
log.info(s"ping with context ${Kamon.context}")
}
case a: Any => println(s"Got ${a} in PONG")
}
}

Expand All @@ -74,8 +66,10 @@ object TryAkka extends App{
}
}))



for(i <- 1 to 4) {
val ping = system.actorOf(Props[PingActor])
ping ! Pong()
}


def threadPrintln(body: String) = println(s"[${Thread.currentThread().getName}] - [${Kamon.context}] : $body")
Expand All @@ -100,8 +94,8 @@ object TryAkka extends App{
Kamon.stop


Thread.sleep(3000)
system.shutdown()
//Thread.sleep(3000)
//system.shutdown()

/* appActorEventBus.subscribe(subscriber, NEW_POST_CHANNEL)
appActorEventBus.publish(MessageEvent(NEW_POST_CHANNEL,PostMessage(text="hello world")))*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package akka.dispatch

import org.aspectj.lang.annotation._
import java.util.concurrent._
import scala.concurrent.forkjoin.ForkJoinPool
import org.aspectj.lang.ProceedingJoinPoint
import java.util
import akka.dispatch.NamedExecutorServiceFactoryDelegate
import kamon.metric.{MetricDirectory, ExecutorServiceMetricCollector}


case class NamedExecutorServiceFactoryDelegate(actorSystemName: String, dispatcherName: String, delegate: ExecutorServiceFactory) extends ExecutorServiceFactory {
def createExecutorService: ExecutorService = delegate.createExecutorService
}

@Aspect
class ExecutorServiceFactoryProviderInstrumentation {

@Pointcut("execution(* akka.dispatch.ExecutorServiceFactoryProvider+.createExecutorServiceFactory(..)) && args(id, threadFactory)")
def factoryMethodCall(id: String, threadFactory: ThreadFactory) = {}

@Around("factoryMethodCall(id, threadFactory)")
def enrichFactoryCreationWithNames(pjp: ProceedingJoinPoint, id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
val delegate = pjp.proceed(Array[AnyRef](id, threadFactory)).asInstanceOf[ExecutorServiceFactory] // Safe Cast

val actorSystemName = threadFactory match {
case m: MonitorableThreadFactory => m.name
case _ => "Unknown" // Find an alternative way to find the actor system name in case we start seeing "Unknown" as the AS name.
}

new NamedExecutorServiceFactoryDelegate(actorSystemName, id, delegate)
}

}


@Aspect
class NamedExecutorServiceFactoryDelegateInstrumentation {

@Pointcut("execution(* akka.dispatch.NamedExecutorServiceFactoryDelegate.createExecutorService()) && this(namedFactory)")
def factoryMethodCall(namedFactory: NamedExecutorServiceFactoryDelegate) = {}

@Around("factoryMethodCall(namedFactory)")
def enrichExecutorServiceWithMetricNameRoot(pjp: ProceedingJoinPoint, namedFactory: NamedExecutorServiceFactoryDelegate): ExecutorService = {
val delegate = pjp.proceed(Array[AnyRef](namedFactory)).asInstanceOf[ExecutorService]
val executorFullName = MetricDirectory.nameForDispatcher(namedFactory.actorSystemName, namedFactory.dispatcherName)

ExecutorServiceMetricCollector.register(executorFullName, delegate)

new NamedExecutorServiceDelegate(executorFullName, delegate)
}
}

case class NamedExecutorServiceDelegate(fullName: String, delegate: ExecutorService) extends ExecutorService {
def shutdown() = {
ExecutorServiceMetricCollector.deregister(fullName)
delegate.shutdown()
}
def shutdownNow(): util.List[Runnable] = delegate.shutdownNow()
def isShutdown: Boolean = delegate.isShutdown
def isTerminated: Boolean = delegate.isTerminated
def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = delegate.awaitTermination(timeout, unit)
def submit[T](task: Callable[T]): Future[T] = delegate.submit(task)
def submit[T](task: Runnable, result: T): Future[T] = delegate.submit(task, result)
def submit(task: Runnable): Future[_] = delegate.submit(task)
def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = delegate.invokeAll(tasks)
def invokeAll[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = delegate.invokeAll(tasks, timeout, unit)
def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = delegate.invokeAny(tasks)
def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = delegate.invokeAny(tasks, timeout, unit)
def execute(command: Runnable) = delegate.execute(command)
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ class RunnableInstrumentation {
*/
import kamon.TraceContextSwap.withContext

@Before("instrumentedRunnableCreation()")
def beforeCreation = {
//println((new Throwable).getStackTraceString)
}


@Around("runnableExecution()")
def around(pjp: ProceedingJoinPoint) = {
import pjp._
Expand Down
67 changes: 67 additions & 0 deletions src/main/scala/kamon/metric/ExecutorServiceMetricCollector.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package kamon.metric

import java.util.concurrent.{ThreadPoolExecutor, ExecutorService}
import scala.concurrent.forkjoin.ForkJoinPool
import com.codahale.metrics.{Metric, MetricFilter}

object ExecutorServiceMetricCollector extends ForkJoinPoolMetricCollector with ThreadPoolExecutorMetricCollector {

def register(fullName: String, executorService: ExecutorService) = executorService match {
case fjp: ForkJoinPool => registerForkJoinPool(fullName, fjp)
case tpe: ThreadPoolExecutor => registerThreadPoolExecutor(fullName, tpe)
case _ => // If it is a unknown Executor then just do nothing.
}

def deregister(fullName: String) = {
Metrics.registry.removeMatching(new MetricFilter {
def matches(name: String, metric: Metric): Boolean = name.startsWith(fullName)
})
}
}


trait ForkJoinPoolMetricCollector {
import GaugeGenerator._
import BasicExecutorMetricNames._


def registerForkJoinPool(fullName: String, fjp: ForkJoinPool) = {
val forkJoinPoolGauge = newNumericGaugeFor(fjp) _

val allMetrics = Map(
fullName + queueSize -> forkJoinPoolGauge(_.getQueuedTaskCount.toInt),
fullName + poolSize -> forkJoinPoolGauge(_.getPoolSize),
fullName + activeThreads -> forkJoinPoolGauge(_.getActiveThreadCount)
)

allMetrics.foreach(kv => Metrics.registry.register(kv._1, kv._2))
}
}

trait ThreadPoolExecutorMetricCollector {
import GaugeGenerator._
import BasicExecutorMetricNames._

def registerThreadPoolExecutor(fullName: String, tpe: ThreadPoolExecutor) = {
val tpeGauge = newNumericGaugeFor(tpe) _

val allMetrics = Map(
fullName + queueSize -> tpeGauge(_.getQueue.size()),
fullName + poolSize -> tpeGauge(_.getPoolSize),
fullName + activeThreads -> tpeGauge(_.getActiveCount)
)

allMetrics.foreach(kv => Metrics.registry.register(kv._1, kv._2))
}
}


object BasicExecutorMetricNames {
val queueSize = "queueSize"
val poolSize = "poolSize"
val activeThreads = "activeThreads"
}




Loading

0 comments on commit 84c9ae3

Please sign in to comment.