Skip to content

Commit

Permalink
add Class[_] to LogEvent
Browse files Browse the repository at this point in the history
- it is customary to use class name for categorizing logs, hence we
  should support it; class is taken from logSource.getClass
- update SLF4J module to use logClass as category and set logSource in
  MDC "akkaSource"
- add docs
  • Loading branch information
rkuhn committed Jan 12, 2012
1 parent 0470f5f commit d0498eb
Show file tree
Hide file tree
Showing 25 changed files with 295 additions and 137 deletions.
12 changes: 6 additions & 6 deletions akka-actor-tests/src/test/scala/akka/actor/FSMActorSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
system.eventStream.subscribe(testActor, classOf[Logging.Error])
fsm ! "go"
expectMsgPF(1 second, hint = "Next state 2 does not exist") {
case Logging.Error(_, `name`, "Next state 2 does not exist") true
case Logging.Error(_, `name`, _, "Next state 2 does not exist") true
}
system.eventStream.unsubscribe(testActor)
}
Expand Down Expand Up @@ -221,15 +221,15 @@ class FSMActorSpec extends AkkaSpec(Map("akka.actor.debug.fsm" -> true)) with Im
system.eventStream.subscribe(testActor, classOf[Logging.Debug])
fsm ! "go"
expectMsgPF(1 second, hint = "processing Event(go,null)") {
case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(go,null) from Actor[") true
case Logging.Debug(`name`, _, s: String) if s.startsWith("processing Event(go,null) from Actor[") true
}
expectMsg(1 second, Logging.Debug(name, "setting timer 't'/1500 milliseconds: Shutdown"))
expectMsg(1 second, Logging.Debug(name, "transition 1 -> 2"))
expectMsg(1 second, Logging.Debug(name, fsm.underlyingActor.getClass, "setting timer 't'/1500 milliseconds: Shutdown"))
expectMsg(1 second, Logging.Debug(name, fsm.underlyingActor.getClass, "transition 1 -> 2"))
fsm ! "stop"
expectMsgPF(1 second, hint = "processing Event(stop,null)") {
case Logging.Debug(`name`, s: String) if s.startsWith("processing Event(stop,null) from Actor[") true
case Logging.Debug(`name`, _, s: String) if s.startsWith("processing Event(stop,null) from Actor[") true
}
expectMsgAllOf(1 second, Logging.Debug(name, "canceling timer 't'"), Normal)
expectMsgAllOf(1 second, Logging.Debug(name, fsm.underlyingActor.getClass, "canceling timer 't'"), Normal)
expectNoMsg(1 second)
system.eventStream.unsubscribe(testActor)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ object ActorModelSpec {
await(deadline)(stops == dispatcher.stops.get)
} catch {
case e
system.eventStream.publish(Error(e, dispatcher.toString, "actual: stops=" + dispatcher.stops.get +
system.eventStream.publish(Error(e, dispatcher.toString, dispatcher.getClass, "actual: stops=" + dispatcher.stops.get +
" required: stops=" + stops))
throw e
}
Expand Down Expand Up @@ -208,9 +208,11 @@ object ActorModelSpec {
await(deadline)(stats.restarts.get() == restarts)
} catch {
case e
system.eventStream.publish(Error(e, Option(dispatcher).toString, "actual: " + stats + ", required: InterceptorStats(susp=" + suspensions +
",res=" + resumes + ",reg=" + registers + ",unreg=" + unregisters +
",recv=" + msgsReceived + ",proc=" + msgsProcessed + ",restart=" + restarts))
system.eventStream.publish(Error(e, Option(dispatcher).toString,
if (dispatcher ne null) dispatcher.getClass else this.getClass,
"actual: " + stats + ", required: InterceptorStats(susp=" + suspensions +
",res=" + resumes + ",reg=" + registers + ",unreg=" + unregisters +
",recv=" + msgsReceived + ",proc=" + msgsProcessed + ",restart=" + restarts))
throw e
}
}
Expand Down Expand Up @@ -311,7 +313,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
try {
f
} catch {
case e system.eventStream.publish(Error(e, "spawn", "error in spawned thread"))
case e system.eventStream.publish(Error(e, "spawn", this.getClass, "error in spawned thread"))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) {

private def verifyLevel(bus: LoggingBus, level: Logging.LogLevel) {
import Logging._
val allmsg = Seq(Debug("", "debug"), Info("", "info"), Warning("", "warning"), Error("", "error"))
val allmsg = Seq(Debug("", null, "debug"), Info("", null, "info"), Warning("", null, "warning"), Error("", null, "error"))
val msg = allmsg filter (_.level <= level)
allmsg foreach bus.publish
msg foreach (x expectMsg(x))
Expand Down
36 changes: 19 additions & 17 deletions akka-actor-tests/src/test/scala/akka/event/LoggingReceiveSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
}
val log = LoggingReceive("funky")(r)
log.isDefinedAt("hallo")
expectMsg(1 second, Logging.Debug("funky", "received unhandled message hallo"))
expectMsg(1 second, Logging.Debug("funky", classOf[String], "received unhandled message hallo"))
}
}

Expand All @@ -83,7 +83,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
val name = actor.path.toString
actor ! "buh"
within(1 second) {
expectMsg(Logging.Debug(name, "received handled message buh"))
expectMsg(Logging.Debug(name, actor.underlyingActor.getClass, "received handled message buh"))
expectMsg("x")
}

Expand All @@ -109,7 +109,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
})
actor ! "buh"
within(1 second) {
expectMsg(Logging.Debug(actor.path.toString, "received handled message buh"))
expectMsg(Logging.Debug(actor.path.toString, actor.underlyingActor.getClass, "received handled message buh"))
expectMsg("x")
}
}
Expand All @@ -130,7 +130,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
val name = actor.path.toString
actor ! PoisonPill
expectMsgPF() {
case Logging.Debug(`name`, msg: String) if msg startsWith "received AutoReceiveMessage Envelope(PoisonPill" true
case Logging.Debug(`name`, _, msg: String) if msg startsWith "received AutoReceiveMessage Envelope(PoisonPill" true
}
awaitCond(actor.isTerminated, 100 millis)
}
Expand All @@ -142,7 +142,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
val sys = impl.systemGuardian.path.toString
ignoreMute(this)
ignoreMsg {
case Logging.Debug(s, _) s.contains("MainBusReaper") || s == sys
case Logging.Debug(`sys`, _, _) true
}
system.eventStream.subscribe(testActor, classOf[Logging.Debug])
system.eventStream.subscribe(testActor, classOf[Logging.Error])
Expand All @@ -151,51 +151,53 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
val lname = lifecycleGuardian.path.toString
val supervisor = TestActorRef[TestLogActor](Props[TestLogActor].withFaultHandler(OneForOneStrategy(List(classOf[Throwable]), 5, 5000)))
val sname = supervisor.path.toString
val sclass = classOf[TestLogActor]

val supervisorSet = receiveWhile(messages = 2) {
case Logging.Debug(`lname`, msg: String) if msg startsWith "now supervising" 1
case Logging.Debug(`sname`, msg: String) if msg startsWith "started" 2
case Logging.Debug(`lname`, _, msg: String) if msg startsWith "now supervising" 1
case Logging.Debug(`sname`, `sclass`, msg: String) if msg startsWith "started" 2
}.toSet
expectNoMsg(Duration.Zero)
assert(supervisorSet == Set(1, 2), supervisorSet + " was not Set(1, 2)")

val actor = TestActorRef[TestLogActor](Props[TestLogActor], supervisor, "none")
val aname = actor.path.toString
val aclass = classOf[TestLogActor]

val set = receiveWhile(messages = 2) {
case Logging.Debug(`sname`, msg: String) if msg startsWith "now supervising" 1
case Logging.Debug(`aname`, msg: String) if msg startsWith "started" 2
case Logging.Debug(`sname`, _, msg: String) if msg startsWith "now supervising" 1
case Logging.Debug(`aname`, `aclass`, msg: String) if msg startsWith "started" 2
}.toSet
expectNoMsg(Duration.Zero)
assert(set == Set(1, 2), set + " was not Set(1, 2)")

supervisor watch actor
expectMsgPF(hint = "now monitoring") {
case Logging.Debug(ref, msg: String)
case Logging.Debug(ref, `sclass`, msg: String)
ref == supervisor.underlyingActor && msg.startsWith("now monitoring")
}

supervisor unwatch actor
expectMsgPF(hint = "stopped monitoring") {
case Logging.Debug(ref, msg: String)
case Logging.Debug(ref, `sclass`, msg: String)
ref == supervisor.underlyingActor && msg.startsWith("stopped monitoring")
}

EventFilter[ActorKilledException](occurrences = 1) intercept {
actor ! Kill
val set = receiveWhile(messages = 3) {
case Logging.Error(_: ActorKilledException, `aname`, "Kill") 1
case Logging.Debug(`aname`, "restarting") 2
case Logging.Debug(`aname`, "restarted") 3
case Logging.Error(_: ActorKilledException, `aname`, `aclass`, "Kill") 1
case Logging.Debug(`aname`, `aclass`, "restarting") 2
case Logging.Debug(`aname`, `aclass`, "restarted") 3
}.toSet
expectNoMsg(Duration.Zero)
assert(set == Set(1, 2, 3), set + " was not Set(1, 2, 3)")
}

system.stop(supervisor)
expectMsg(Logging.Debug(sname, "stopping"))
expectMsg(Logging.Debug(aname, "stopped"))
expectMsg(Logging.Debug(sname, "stopped"))
expectMsg(Logging.Debug(sname, `sclass`, "stopping"))
expectMsg(Logging.Debug(aname, `aclass`, "stopped"))
expectMsg(Logging.Debug(sname, `sclass`, "stopped"))
}
}
}
Expand Down
43 changes: 25 additions & 18 deletions akka-actor/src/main/scala/akka/actor/ActorCell.scala
Original file line number Diff line number Diff line change
Expand Up @@ -358,12 +358,12 @@ private[akka] class ActorCell(
actor = created
created.preStart()
checkReceiveTimeout
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "started (" + actor + ")"))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(created), "started (" + created + ")"))
} catch {
// FIXME catching all and continue isn't good for OOME, ticket #1418
case e
try {
system.eventStream.publish(Error(e, self.path.toString, "error while creating actor"))
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while creating actor"))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
} finally {
Expand All @@ -373,7 +373,7 @@ private[akka] class ActorCell(

def recreate(cause: Throwable): Unit = try {
val failedActor = actor
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "restarting"))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(failedActor), "restarting"))
val freshActor = newActor()
if (failedActor ne null) {
val c = currentMessage //One read only plz
Expand All @@ -388,15 +388,15 @@ private[akka] class ActorCell(
actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call
hotswap = Props.noHotSwap // Reset the behavior
freshActor.postRestart(cause)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "restarted"))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(freshActor), "restarted"))

dispatcher.resume(this) //FIXME should this be moved down?

props.faultHandler.handleSupervisorRestarted(cause, self, children)
} catch {
// FIXME catching all and continue isn't good for OOME, ticket #1418
case e try {
system.eventStream.publish(Error(e, self.path.toString, "error while creating actor"))
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while creating actor"))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
} finally {
Expand All @@ -417,7 +417,7 @@ private[akka] class ActorCell(
else {
// do not process normal messages while waiting for all children to terminate
dispatcher suspend this
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopping"))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopping"))
// do not use stop(child) because that would dissociate the children from us, but we still want to wait for them
for (child c) child.asInstanceOf[InternalActorRef].stop()
stopping = true
Expand All @@ -428,12 +428,12 @@ private[akka] class ActorCell(
childrenRefs.get(child.path.name) match {
case None
childrenRefs = childrenRefs.updated(child.path.name, ChildRestartStats(child))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "now supervising " + child))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
case Some(ChildRestartStats(`child`, _, _))
// this is the nominal case where we created the child and entered it in actorCreated() above
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "now supervising " + child))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
case Some(ChildRestartStats(c, _, _))
system.eventStream.publish(Warning(self.path.toString, "Already supervising other child with same name '" + child.path.name + "', old: " + c + " new: " + child))
system.eventStream.publish(Warning(self.path.toString, clazz(actor), "Already supervising other child with same name '" + child.path.name + "', old: " + c + " new: " + child))
}
}

Expand All @@ -448,10 +448,10 @@ private[akka] class ActorCell(
case Recreate(cause) recreate(cause)
case Link(subject)
system.deathWatch.subscribe(self, subject)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "now monitoring " + subject))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now monitoring " + subject))
case Unlink(subject)
system.deathWatch.unsubscribe(self, subject)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopped monitoring " + subject))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + subject))
case Suspend() suspend()
case Resume() resume()
case Terminate() terminate()
Expand All @@ -460,7 +460,7 @@ private[akka] class ActorCell(
}
} catch {
case e //Should we really catch everything here?
system.eventStream.publish(Error(e, self.path.toString, "error while processing " + message))
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "error while processing " + message))
//TODO FIXME How should problems here be handled???
throw e
}
Expand All @@ -480,7 +480,7 @@ private[akka] class ActorCell(
currentMessage = null // reset current message after successful invocation
} catch {
case e
system.eventStream.publish(Error(e, self.path.toString, e.getMessage))
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage))

// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
Expand All @@ -500,7 +500,7 @@ private[akka] class ActorCell(
}
} catch {
case e
system.eventStream.publish(Error(e, self.path.toString, e.getMessage))
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), e.getMessage))
throw e
}
}
Expand Down Expand Up @@ -530,7 +530,8 @@ private[akka] class ActorCell(
}

def autoReceiveMessage(msg: Envelope) {
if (system.settings.DebugAutoReceive) system.eventStream.publish(Debug(self.path.toString, "received AutoReceiveMessage " + msg))
if (system.settings.DebugAutoReceive)
system.eventStream.publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))

msg.message match {
case Failed(cause) handleFailure(sender, cause)
Expand All @@ -554,7 +555,8 @@ private[akka] class ActorCell(
try {
parent.sendSystemMessage(ChildTerminated(self))
system.deathWatch.publish(Terminated(self))
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, "stopped"))
if (system.settings.DebugLifecycle)
system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped")) // FIXME: can actor be null?
} finally {
currentMessage = null
clearActorFields()
Expand All @@ -565,8 +567,8 @@ private[akka] class ActorCell(

final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.get(child.path.name) match {
case Some(stats) if stats.child == child if (!props.faultHandler.handleFailure(this, child, cause, stats, childrenRefs.values)) throw cause
case Some(stats) system.eventStream.publish(Warning(self.path.toString, "dropping Failed(" + cause + ") from unknown child " + child + " matching names but not the same, was: " + stats.child))
case None system.eventStream.publish(Warning(self.path.toString, "dropping Failed(" + cause + ") from unknown child " + child))
case Some(stats) system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child + " matching names but not the same, was: " + stats.child))
case None system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child))
}

final def handleChildTerminated(child: ActorRef): Unit = {
Expand Down Expand Up @@ -625,4 +627,9 @@ private[akka] class ActorCell(
lookupAndSetField(a.getClass, a, "self", self)
}
}

private def clazz(o: AnyRef): Class[_] = {
if (o eq null) this.getClass
else o.getClass
}
}
2 changes: 1 addition & 1 deletion akka-actor/src/main/scala/akka/actor/ActorSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
// this provides basic logging (to stdout) until .start() is called below
val eventStream = new EventStream(DebugEventStream)
eventStream.startStdoutLogger(settings)
val log = new BusLogging(eventStream, "ActorSystem") //this” used only for .getClass in tagging messages
val log = new BusLogging(eventStream, "ActorSystem", this.getClass)

val scheduler = createScheduler()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cl
runnable.run()
} catch {
// FIXME catching all and continue isn't good for OOME, ticket #1418
case e eventStream.publish(Error(e, "TaskInvocation", e.getMessage))
case e eventStream.publish(Error(e, "TaskInvocation", this.getClass, e.getMessage))
} finally {
cleanup()
}
Expand Down
Loading

0 comments on commit d0498eb

Please sign in to comment.