Skip to content

Commit

Permalink
use Tellable in bus.subscribeFun, update unicast streams
Browse files Browse the repository at this point in the history
  • Loading branch information
ornicar committed Dec 7, 2018
1 parent 5d70d7a commit aa3e018
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 46 deletions.
22 changes: 14 additions & 8 deletions modules/api/src/main/EventStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,27 @@ final class EventStream(
setOnline: User.ID => Unit
) {

import lila.common.HttpStream._
private case object SetOnline

def apply(me: User, gamesInProgress: List[Game], challenges: List[Challenge]): Enumerator[Option[JsObject]] = {

var stream: Option[ActorRef] = None

val classifiers = List(
Symbol(s"userStartGame:${me.id}"),
Symbol(s"rematchFor:${me.id}"),
'challenge
)

val enumerator = Concurrent.unicast[Option[JsObject]](
onStart = channel => {
val actor = system.actorOf(Props(new Actor {

override def postStop() = {
super.postStop()
system.lilaBus.unsubscribe(self, classifiers)
}

self ! SetOnline

def receive = {
Expand All @@ -50,15 +61,10 @@ final class EventStream(
}
}
}))
system.lilaBus.subscribe(
actor,
Symbol(s"userStartGame:${me.id}"),
Symbol(s"rematchFor:${me.id}"),
'challenge
)
system.lilaBus.subscribe(actor, classifiers: _*)
stream = actor.some
},
onComplete = onComplete(stream, system)
onComplete = stream foreach { _ ! PoisonPill }
)

lila.common.Iteratee.prepend(
Expand Down
6 changes: 3 additions & 3 deletions modules/bot/src/main/GameStateStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ final class GameStateStream(
roundSocketHub: ActorSelection
) {

import lila.common.HttpStream._
private case object SetOnline

def apply(me: User, init: Game.WithInitialFen, as: chess.Color): Enumerator[Option[JsObject]] = {

Expand Down Expand Up @@ -57,7 +57,7 @@ final class GameStateStream(

override def postStop(): Unit = {
super.postStop()
classifiers foreach { system.lilaBus.unsubscribe(self, _) }
system.lilaBus.unsubscribe(self, classifiers)
// hang around if game is over
// so the opponent has a chance to rematch
context.system.scheduler.scheduleOnce(if (gameOver) 10 second else 1 second) {
Expand Down Expand Up @@ -92,7 +92,7 @@ final class GameStateStream(
}))
stream = actor.some
},
onComplete = onComplete(stream, system)
onComplete = stream foreach { _ ! PoisonPill }
)
}
}
15 changes: 11 additions & 4 deletions modules/common/src/main/Bus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,22 @@ final class Bus private (system: ActorSystem) extends Extension with EventBus {
}
def subscribe(ref: ActorRef, to: Classifier*): Boolean = subscribe(Tellable(ref), to: _*)

def subscribeFun(to: Classifier*)(f: PartialFunction[Any, Unit]): ActorRef = {
val actor = system.actorOf(Props(new Actor { val receive = f }))
subscribe(Tellable(actor), to: _*)
actor
def subscribeFun(to: Classifier*)(f: PartialFunction[Any, Unit]): Tellable = {
val t = new lila.common.Tellable.HashCode {
def !(msg: Any) = f lift msg
}
subscribe(t, to: _*)
t
}

def unsubscribe(subscriber: Tellable, from: Classifier): Boolean = bus.unsubscribe(subscriber, from)
def unsubscribe(ref: ActorRef, from: Classifier): Boolean = unsubscribe(Tellable(ref), from)

def unsubscribe(subscriber: Tellable, from: Seq[Classifier]): Boolean =
from forall { unsubscribe(subscriber, _) }
def unsubscribe(ref: ActorRef, from: Seq[Classifier]): Boolean =
unsubscribe(Tellable(ref), from)

def unsubscribe(subscriber: Tellable): Unit = bus unsubscribe subscriber
def unsubscribe(ref: ActorRef): Unit = unsubscribe(Tellable(ref))

Expand Down
16 changes: 0 additions & 16 deletions modules/common/src/main/HttpStream.scala

This file was deleted.

4 changes: 4 additions & 0 deletions modules/common/src/main/Tellable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ trait Tellable extends Any {

object Tellable {

trait HashCode extends Tellable {
lazy val uniqueId = Integer.toHexString(hashCode)
}

case class Actor(ref: akka.actor.ActorRef) extends AnyVal with Tellable {
def !(msg: Any) = ref ! msg
def uniqueId = ref.path.name
Expand Down
11 changes: 6 additions & 5 deletions modules/game/src/main/GamesByUsersStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,23 @@ import lila.user.User
final class GamesByUsersStream(system: ActorSystem) {

import GamesByUsersStream._
import lila.common.HttpStream._

def apply(userIds: Set[User.ID]): Enumerator[JsObject] = {

def matches(game: Game) = game.userIds match {
case List(u1, u2) if u1 != u2 => userIds(u1) && userIds(u2)
case _ => false
}
var stream: Option[ActorRef] = None
var subscriber: Option[lila.common.Tellable] = None

val enumerator = Concurrent.unicast[Game](
onStart = channel => {
stream = system.lilaBus.subscribeFun('startGame, 'finishGame) {
subscriber = system.lilaBus.subscribeFun(classifiers: _*) {
case StartGame(game) if matches(game) => channel push game
case FinishGame(game, _, _) if matches(game) => channel push game
} some
},
onComplete = onComplete(stream, system)
onComplete = subscriber foreach { system.lilaBus.unsubscribe(_, classifiers) }
)

enumerator &> withInitialFen &> toJson
Expand All @@ -41,7 +40,9 @@ final class GamesByUsersStream(system: ActorSystem) {
Enumeratee.map[Game.WithInitialFen].apply[JsObject](gameWithInitialFenWriter.writes)
}

object GamesByUsersStream {
private object GamesByUsersStream {

private val classifiers = List('startGame, 'finishGame)

private implicit val fenWriter: Writes[FEN] = Writes[FEN] { f =>
JsString(f.value)
Expand Down
10 changes: 5 additions & 5 deletions modules/irwin/src/main/IrwinStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,24 @@ import lila.report.SuspectId

final class IrwinStream(system: ActorSystem) {

import lila.common.HttpStream._

private val stringify =
Enumeratee.map[JsValue].apply[String] { js =>
Json.stringify(js) + "\n"
}

private val classifier = 'irwin

def enumerator: Enumerator[String] = {
var stream: Option[ActorRef] = None
var subscriber: Option[lila.common.Tellable] = None
Concurrent.unicast[JsValue](
onStart = channel => {
stream = system.lilaBus.subscribeFun('irwin) {
subscriber = system.lilaBus.subscribeFun(classifier) {
case req: IrwinRequest =>
lila.mon.mod.irwin.streamEventType("request")()
channel.push(requestJson(req))
} some
},
onComplete = onComplete(stream, system)
onComplete = subscriber foreach { system.lilaBus.unsubscribe(_, classifier) }
) &> stringify
}

Expand Down
10 changes: 5 additions & 5 deletions modules/mod/src/main/ModStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ import lila.report.ModId

final class ModStream(system: ActorSystem) {

import lila.common.HttpStream._

private val stringify =
Enumeratee.map[JsValue].apply[String] { js =>
Json.stringify(js) + "\n"
}

private val classifier = 'userSignup

def enumerator: Enumerator[String] = {
var stream: Option[ActorRef] = None
var subscriber: Option[lila.common.Tellable] = None
Concurrent.unicast[JsValue](
onStart = channel => {
stream = system.lilaBus.subscribeFun('userSignup) {
subscriber = system.lilaBus.subscribeFun(classifier) {
case lila.security.Signup(user, email, req, fp) =>
channel push Json.obj(
"t" -> "signup",
Expand All @@ -32,7 +32,7 @@ final class ModStream(system: ActorSystem) {
)
} some
},
onComplete = onComplete(stream, system)
onComplete = subscriber foreach { system.lilaBus.unsubscribe(_, classifier) }
) &> stringify
}
}

0 comments on commit aa3e018

Please sign in to comment.