diff --git a/modules/api/src/main/EventStream.scala b/modules/api/src/main/EventStream.scala index 174977c87205d..33e2736fe99f9 100644 --- a/modules/api/src/main/EventStream.scala +++ b/modules/api/src/main/EventStream.scala @@ -16,16 +16,27 @@ final class EventStream( setOnline: User.ID => Unit ) { - import lila.common.HttpStream._ + private case object SetOnline def apply(me: User, gamesInProgress: List[Game], challenges: List[Challenge]): Enumerator[Option[JsObject]] = { var stream: Option[ActorRef] = None + val classifiers = List( + Symbol(s"userStartGame:${me.id}"), + Symbol(s"rematchFor:${me.id}"), + 'challenge + ) + val enumerator = Concurrent.unicast[Option[JsObject]]( onStart = channel => { val actor = system.actorOf(Props(new Actor { + override def postStop() = { + super.postStop() + system.lilaBus.unsubscribe(self, classifiers) + } + self ! SetOnline def receive = { @@ -50,15 +61,10 @@ final class EventStream( } } })) - system.lilaBus.subscribe( - actor, - Symbol(s"userStartGame:${me.id}"), - Symbol(s"rematchFor:${me.id}"), - 'challenge - ) + system.lilaBus.subscribe(actor, classifiers: _*) stream = actor.some }, - onComplete = onComplete(stream, system) + onComplete = stream foreach { _ ! PoisonPill } ) lila.common.Iteratee.prepend( diff --git a/modules/bot/src/main/GameStateStream.scala b/modules/bot/src/main/GameStateStream.scala index 7eabef9dc8a6a..d9df5ca313a69 100644 --- a/modules/bot/src/main/GameStateStream.scala +++ b/modules/bot/src/main/GameStateStream.scala @@ -22,7 +22,7 @@ final class GameStateStream( roundSocketHub: ActorSelection ) { - import lila.common.HttpStream._ + private case object SetOnline def apply(me: User, init: Game.WithInitialFen, as: chess.Color): Enumerator[Option[JsObject]] = { @@ -57,7 +57,7 @@ final class GameStateStream( override def postStop(): Unit = { super.postStop() - classifiers foreach { system.lilaBus.unsubscribe(self, _) } + system.lilaBus.unsubscribe(self, classifiers) // hang around if game is over // so the opponent has a chance to rematch context.system.scheduler.scheduleOnce(if (gameOver) 10 second else 1 second) { @@ -92,7 +92,7 @@ final class GameStateStream( })) stream = actor.some }, - onComplete = onComplete(stream, system) + onComplete = stream foreach { _ ! PoisonPill } ) } } diff --git a/modules/common/src/main/Bus.scala b/modules/common/src/main/Bus.scala index a4d22623fcd8f..506febf022c51 100644 --- a/modules/common/src/main/Bus.scala +++ b/modules/common/src/main/Bus.scala @@ -25,15 +25,22 @@ final class Bus private (system: ActorSystem) extends Extension with EventBus { } def subscribe(ref: ActorRef, to: Classifier*): Boolean = subscribe(Tellable(ref), to: _*) - def subscribeFun(to: Classifier*)(f: PartialFunction[Any, Unit]): ActorRef = { - val actor = system.actorOf(Props(new Actor { val receive = f })) - subscribe(Tellable(actor), to: _*) - actor + def subscribeFun(to: Classifier*)(f: PartialFunction[Any, Unit]): Tellable = { + val t = new lila.common.Tellable.HashCode { + def !(msg: Any) = f lift msg + } + subscribe(t, to: _*) + t } def unsubscribe(subscriber: Tellable, from: Classifier): Boolean = bus.unsubscribe(subscriber, from) def unsubscribe(ref: ActorRef, from: Classifier): Boolean = unsubscribe(Tellable(ref), from) + def unsubscribe(subscriber: Tellable, from: Seq[Classifier]): Boolean = + from forall { unsubscribe(subscriber, _) } + def unsubscribe(ref: ActorRef, from: Seq[Classifier]): Boolean = + unsubscribe(Tellable(ref), from) + def unsubscribe(subscriber: Tellable): Unit = bus unsubscribe subscriber def unsubscribe(ref: ActorRef): Unit = unsubscribe(Tellable(ref)) diff --git a/modules/common/src/main/HttpStream.scala b/modules/common/src/main/HttpStream.scala deleted file mode 100644 index a31bf52d533c3..0000000000000 --- a/modules/common/src/main/HttpStream.scala +++ /dev/null @@ -1,16 +0,0 @@ -package lila.common - -import akka.actor._ -import play.api.libs.iteratee._ -import play.api.libs.json._ - -object HttpStream { - - def onComplete(stream: Option[ActorRef], system: ActorSystem) = - stream foreach { actor => - system.lilaBus.unsubscribe(actor) - actor ! PoisonPill - } - - case object SetOnline -} diff --git a/modules/common/src/main/Tellable.scala b/modules/common/src/main/Tellable.scala index 43d065baaa99a..298dcd04ca171 100644 --- a/modules/common/src/main/Tellable.scala +++ b/modules/common/src/main/Tellable.scala @@ -7,6 +7,10 @@ trait Tellable extends Any { object Tellable { + trait HashCode extends Tellable { + lazy val uniqueId = Integer.toHexString(hashCode) + } + case class Actor(ref: akka.actor.ActorRef) extends AnyVal with Tellable { def !(msg: Any) = ref ! msg def uniqueId = ref.path.name diff --git a/modules/game/src/main/GamesByUsersStream.scala b/modules/game/src/main/GamesByUsersStream.scala index 75f444aeb3f09..ca445c1d86c63 100644 --- a/modules/game/src/main/GamesByUsersStream.scala +++ b/modules/game/src/main/GamesByUsersStream.scala @@ -11,7 +11,6 @@ import lila.user.User final class GamesByUsersStream(system: ActorSystem) { import GamesByUsersStream._ - import lila.common.HttpStream._ def apply(userIds: Set[User.ID]): Enumerator[JsObject] = { @@ -19,16 +18,16 @@ final class GamesByUsersStream(system: ActorSystem) { case List(u1, u2) if u1 != u2 => userIds(u1) && userIds(u2) case _ => false } - var stream: Option[ActorRef] = None + var subscriber: Option[lila.common.Tellable] = None val enumerator = Concurrent.unicast[Game]( onStart = channel => { - stream = system.lilaBus.subscribeFun('startGame, 'finishGame) { + subscriber = system.lilaBus.subscribeFun(classifiers: _*) { case StartGame(game) if matches(game) => channel push game case FinishGame(game, _, _) if matches(game) => channel push game } some }, - onComplete = onComplete(stream, system) + onComplete = subscriber foreach { system.lilaBus.unsubscribe(_, classifiers) } ) enumerator &> withInitialFen &> toJson @@ -41,7 +40,9 @@ final class GamesByUsersStream(system: ActorSystem) { Enumeratee.map[Game.WithInitialFen].apply[JsObject](gameWithInitialFenWriter.writes) } -object GamesByUsersStream { +private object GamesByUsersStream { + + private val classifiers = List('startGame, 'finishGame) private implicit val fenWriter: Writes[FEN] = Writes[FEN] { f => JsString(f.value) diff --git a/modules/irwin/src/main/IrwinStream.scala b/modules/irwin/src/main/IrwinStream.scala index 2633637170c0d..759ebbab2ff02 100644 --- a/modules/irwin/src/main/IrwinStream.scala +++ b/modules/irwin/src/main/IrwinStream.scala @@ -9,24 +9,24 @@ import lila.report.SuspectId final class IrwinStream(system: ActorSystem) { - import lila.common.HttpStream._ - private val stringify = Enumeratee.map[JsValue].apply[String] { js => Json.stringify(js) + "\n" } + private val classifier = 'irwin + def enumerator: Enumerator[String] = { - var stream: Option[ActorRef] = None + var subscriber: Option[lila.common.Tellable] = None Concurrent.unicast[JsValue]( onStart = channel => { - stream = system.lilaBus.subscribeFun('irwin) { + subscriber = system.lilaBus.subscribeFun(classifier) { case req: IrwinRequest => lila.mon.mod.irwin.streamEventType("request")() channel.push(requestJson(req)) } some }, - onComplete = onComplete(stream, system) + onComplete = subscriber foreach { system.lilaBus.unsubscribe(_, classifier) } ) &> stringify } diff --git a/modules/mod/src/main/ModStream.scala b/modules/mod/src/main/ModStream.scala index fd44f3e97f1e6..9a10456a6b64a 100644 --- a/modules/mod/src/main/ModStream.scala +++ b/modules/mod/src/main/ModStream.scala @@ -9,18 +9,18 @@ import lila.report.ModId final class ModStream(system: ActorSystem) { - import lila.common.HttpStream._ - private val stringify = Enumeratee.map[JsValue].apply[String] { js => Json.stringify(js) + "\n" } + private val classifier = 'userSignup + def enumerator: Enumerator[String] = { - var stream: Option[ActorRef] = None + var subscriber: Option[lila.common.Tellable] = None Concurrent.unicast[JsValue]( onStart = channel => { - stream = system.lilaBus.subscribeFun('userSignup) { + subscriber = system.lilaBus.subscribeFun(classifier) { case lila.security.Signup(user, email, req, fp) => channel push Json.obj( "t" -> "signup", @@ -32,7 +32,7 @@ final class ModStream(system: ActorSystem) { ) } some }, - onComplete = onComplete(stream, system) + onComplete = subscriber foreach { system.lilaBus.unsubscribe(_, classifier) } ) &> stringify } }