Skip to content

Commit

Permalink
automatically detect relay format - WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ornicar committed Sep 1, 2018
1 parent 6090882 commit 4fc7adc
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 75 deletions.
3 changes: 0 additions & 3 deletions app/views/relay/inForm.scala.html
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
@base.form.group(form("syncUrl"), Html("Source URL")) { field =>
@base.form.input(field, typ = "url", required = true)
}
@base.form.group(form("syncType"), Html("Source format")) { field =>
@base.form.select(field, lila.relay.RelayForm.syncTypes)
}
<div>
@base.form.group(form("startsAt"), Html("Start date <strong>UTC</strong>"), help = Html("Optional, if you know when the event starts").some, half = true) { field =>
@base.form.flatpickr(field)
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ lazy val study = module("study", Seq(
)

lazy val relay = module("relay", Seq(common, study)).settings(
libraryDependencies ++= provided(play.api, reactivemongo.driver)
libraryDependencies ++= Seq(scalaUri) ++ provided(play.api, reactivemongo.driver)
)

lazy val studySearch = module("studySearch", Seq(common, hub, study, search)).settings(
Expand Down
9 changes: 1 addition & 8 deletions modules/relay/src/main/BSONHandlers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,7 @@ object BSONHandlers {

import Relay.Sync
import Sync.Upstream
private implicit val SourceUpstreamHandler = new lila.db.BSON[Upstream] {
def reads(r: lila.db.BSON.Reader) = r.str("k") match {
case "dgt-one" => Upstream.DgtOneFile(r str "url")
case "dgt-many" => Upstream.DgtManyFiles(r str "url")
case k => sys error s"Invalid relay source upstream $k"
}
def writes(w: lila.db.BSON.Writer, u: Upstream) = $doc("k" -> u.key, "url" -> u.url)
}
implicit val upstreamHandler = Macros.handler[Upstream]

import SyncLog.Event
implicit val syncLogEventHandler = Macros.handler[Event]
Expand Down
3 changes: 3 additions & 0 deletions modules/relay/src/main/Env.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ final class Env(
config: Config,
db: lila.db.Env,
studyEnv: lila.study.Env,
asyncCache: lila.memo.AsyncCache.Builder,
system: ActorSystem
) {

Expand Down Expand Up @@ -47,6 +48,7 @@ final class Env(
private val fetch = system.actorOf(Props(new RelayFetch(
sync = sync,
api = api,
formatApi = new RelayFormatApi(asyncCache),
chapterRepo = studyEnv.chapterRepo
)))

Expand All @@ -66,6 +68,7 @@ object Env {
db = lila.db.Env.current,
config = lila.common.PlayApp loadConfig "relay",
studyEnv = lila.study.Env.current,
asyncCache = lila.memo.Env.current.asyncCache,
system = lila.common.PlayApp.system
)
}
8 changes: 1 addition & 7 deletions modules/relay/src/main/Relay.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,7 @@ object Relay {
}

object Sync {
sealed abstract class Upstream(val key: String, val url: String, val heavy: Boolean) {
override def toString = s"$key $url"
}
object Upstream {
case class DgtOneFile(fileUrl: String) extends Upstream("dgt-one", fileUrl, false)
case class DgtManyFiles(dirUrl: String) extends Upstream("dgt-many", dirUrl, true)
}
case class Upstream(url: String) extends AnyVal
}

case class WithStudy(relay: Relay, study: Study)
Expand Down
109 changes: 65 additions & 44 deletions modules/relay/src/main/RelayFetch.scala
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
package lila.relay

import akka.actor._
import com.github.blemale.scaffeine.{ Cache, Scaffeine }
import io.lemonlabs.uri.Url
import org.joda.time.DateTime
import play.api.libs.json._
import play.api.libs.ws.{ WS, WSResponse }
import play.api.Play.current
import scala.concurrent.duration._

import chess.format.pgn.Tags
import lila.base.LilaException
import lila.study.MultiPgn
import lila.tree.Node.Comments
import Relay.Sync.Upstream

private final class RelayFetch(
sync: RelaySync,
api: RelayApi,
formatApi: RelayFormatApi,
chapterRepo: lila.study.ChapterRepo
) extends Actor {

Expand Down Expand Up @@ -57,7 +62,7 @@ private final class RelayFetch(
// no writing the relay; only reading!
def processRelay(relay: Relay): Fu[Relay] =
if (!relay.sync.playing) fuccess(relay.withSync(_.play))
else RelayFetch(relay) flatMap { games =>
else doProcess(relay) flatMap { games =>
sync(relay, games)
.chronometer.mon(_.relay.sync.duration.each).result
.withTimeout(1 second, SyncResult.Timeout)(context.system) map { res =>
Expand Down Expand Up @@ -95,53 +100,80 @@ private final class RelayFetch(
(if (r.sync.log.alwaysFails) fuccess(60) else (r.sync.delay match {
case Some(delay) => fuccess(delay)
case None => api.getNbViewers(r) map { nb =>
if (r.sync.upstream.heavy) (18 - nb) atLeast 8
else (13 - nb) atLeast 5
(18 - nb) atLeast 8
}
})) map { seconds =>
r.withSync(_.copy(nextAt = DateTime.now plusSeconds {
seconds atLeast { if (r.sync.log.isOk) 5 else 15 }
} some))
}
}

private object RelayFetch {
import RelayFetch.GamesSeenBy

import Relay.Sync.Upstream
case class GamesSeenBy(games: Fu[RelayGames], seenBy: Set[Relay.Id])

def apply(relay: Relay): Fu[RelayGames] =
private def doProcess(relay: Relay): Fu[RelayGames] =
cache getIfPresent relay.sync.upstream match {
case Some(GamesSeenBy(games, seenBy)) if !seenBy(relay.id) =>
cache.put(relay.sync.upstream, GamesSeenBy(games, seenBy + relay.id))
games
case x =>
val games = doFetch(relay.sync.upstream, maxChapters(relay))
val games = doFetch(relay.sync.upstream, RelayFetch.maxChapters(relay))
cache.put(relay.sync.upstream, GamesSeenBy(games, Set(relay.id)))
games
}

def maxChapters(relay: Relay) =
lila.study.Study.maxChapters * (if (relay.official) 2 else 1)
// private def dgtManyFiles(dir: String, max: Int, format: DgtMany): Fu[MultiPgn] = {
// val indexFile = s"$dir/${format.indexFile}"
// httpGet(indexFile) flatMap {
// case res if res.status == 200 => roundReads reads res.json match {
// case JsError(err) => fufail(err.toString)
// case JsSuccess(round, _) => round.pairings.zipWithIndex.map {
// case (pairing, i) =>
// val number = i + 1
// val gameUrl = s"$dir/${format.gameFile(number)}"
// httpGet(gameUrl).flatMap {
// case res if res.status == 200 => fuccess(number -> format.toPgn(res, pairing))
// case res => fufail(s"[${res.status}] $gameUrl")
// }
// }.sequenceFu map { results =>
// MultiPgn(results.sortBy(_._1).map(_._2).toList)
// }
// }
// case res => fufail(s"[${res.status}] $indexFile")
// }
// }

import com.github.blemale.scaffeine.{ Cache, Scaffeine }
private val cache: Cache[Upstream, GamesSeenBy] = Scaffeine()
.expireAfterWrite(30.seconds)
.build[Upstream, GamesSeenBy]

private def doFetch(upstream: Upstream, max: Int): Fu[RelayGames] = (upstream match {
case Upstream.DgtOneFile(file) => dgtOneFile(file, max)
case Upstream.DgtManyFiles(dir) =>
dgtManyFiles(dir, max, DgtMany.RoundPgn) recoverWith {
case _: lila.base.LilaException => dgtManyFiles(dir, max, DgtMany.Indexjson)
private def doFetch(upstream: Upstream, max: Int): Fu[RelayGames] =
formatApi.get(upstream.url) flatMap {
_.fold[Fu[MultiPgn]](fufail("Cannot find any DGT compatible files")) {
case RelayFormat.SingleFile(doc) => httpGet(doc.url) map { body =>
doc.format match {
// all games in a single PGN file
case RelayFormat.DocFormat.Pgn => MultiPgn.split(body, max)
// maybe a single JSON game? Why not
case RelayFormat.DocFormat.Json => MultiPgn(List(RelayFetch.jsonToPgn(body)))
}
}
case f: RelayFormat.ManyFiles => ???
}
}) flatMap multiPgnToGames.apply
} flatMap RelayFetch.multiPgnToGames.apply

private def dgtOneFile(file: String, max: Int): Fu[MultiPgn] =
httpGet(file).flatMap {
case res if res.status == 200 => fuccess(MultiPgn.split(res.body, max))
private def httpGet(url: Url): Fu[String] =
WS.url(url.toString).withRequestTimeout(4.seconds.toMillis).get().flatMap {
case res if res.status == 200 => fuccess(res.body)
case res => fufail(s"[${res.status}]")
}
}

private object RelayFetch {

case class GamesSeenBy(games: Fu[RelayGames], seenBy: Set[Relay.Id])

def maxChapters(relay: Relay) =
lila.study.Study.maxChapters * (if (relay.official) 2 else 1)

private object DgtJson {
case class PairingPlayer(fname: Option[String], mname: Option[String], lname: Option[String], title: Option[String]) {
Expand Down Expand Up @@ -172,7 +204,7 @@ private object RelayFetch {
private sealed abstract class DgtMany(val indexFile: String, val gameFile: Int => String, val toPgn: (WSResponse, RoundJsonPairing) => String)
private object DgtMany {
case object RoundPgn extends DgtMany("round.json", n => s"game-$n.pgn", (r, _) => r.body)
case object Indexjson extends DgtMany("index.json", n => s"game-$n.json", {
case object Indexjson extends DgtMany("index.json", n => s"game-$n.pgn", {
case (res, pairing) => res.json.validate[GameJson] match {
case JsSuccess(game, _) =>
val moves = game.moves.map(_ split ' ') map { move =>
Expand All @@ -187,29 +219,18 @@ private object RelayFetch {
})
}

private def dgtManyFiles(dir: String, max: Int, format: DgtMany): Fu[MultiPgn] = {
val indexFile = s"$dir/${format.indexFile}"
httpGet(indexFile) flatMap {
case res if res.status == 200 => roundReads reads res.json match {
case JsError(err) => fufail(err.toString)
case JsSuccess(round, _) => round.pairings.zipWithIndex.map {
case (pairing, i) =>
val number = i + 1
val gameUrl = s"$dir/${format.gameFile(number)}"
httpGet(gameUrl).flatMap {
case res if res.status == 200 => fuccess(number -> format.toPgn(res, pairing))
case res => fufail(s"[${res.status}] $gameUrl")
}
}.sequenceFu map { results =>
MultiPgn(results.sortBy(_._1).map(_._2).toList)
}
}
case res => fufail(s"[${res.status}] $indexFile")
}
private def jsonToPgn(str: String, extraTags: Tags = Tags.empty) = Json.parse(str).validate[GameJson] match {
case JsSuccess(game, _) =>
val moves = game.moves.map(_ split ' ') map { move =>
chess.format.pgn.Move(
san = ~move.headOption,
secondsLeft = move.lift(1).map(_.takeWhile(_.isDigit)) flatMap parseIntOption
)
} mkString " "
s"${extraTags}\n\n$moves"
case JsError(err) => ""
}

private def httpGet(url: String) = WS.url(url).withRequestTimeout(4.seconds.toMillis).get()

private object multiPgnToGames {

import scala.util.{ Try, Success, Failure }
Expand Down
13 changes: 1 addition & 12 deletions modules/relay/src/main/RelayForm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,10 @@ object RelayForm {

import lila.common.Form.UTCDate._

val syncTypes = List(
"dgt-one" -> "DGT (traditional): all games in a single file",
"dgt-many" -> "DGT (new): one file per game"
)

val form = Form(mapping(
"name" -> nonEmptyText(minLength = 3, maxLength = 80),
"description" -> nonEmptyText(minLength = 3, maxLength = 4000),
"official" -> boolean,
"syncType" -> text.verifying(syncTypes.map(_._1).contains _),
"syncUrl" -> nonEmptyText,
"startsAt" -> optional(utcDate),
"throttle" -> optional(number(min = 0, max = 60))
Expand All @@ -35,7 +29,6 @@ object RelayForm {
name: String,
description: String,
official: Boolean,
syncType: String,
syncUrl: String,
startsAt: Option[DateTime],
throttle: Option[Int]
Expand All @@ -56,10 +49,7 @@ object RelayForm {
)

def makeSync = Relay.Sync(
upstream = syncType match {
case "dgt-one" => Relay.Sync.Upstream.DgtOneFile(cleanUrl)
case _ => Relay.Sync.Upstream.DgtManyFiles(cleanUrl)
},
upstream = Relay.Sync.Upstream(cleanUrl),
until = none,
nextAt = none,
delay = throttle,
Expand Down Expand Up @@ -87,7 +77,6 @@ object RelayForm {
name = relay.name,
description = relay.description,
official = relay.official,
syncType = relay.sync.upstream.key,
syncUrl = relay.sync.upstream.url,
startsAt = relay.startsAt,
throttle = relay.sync.delay
Expand Down
91 changes: 91 additions & 0 deletions modules/relay/src/main/RelayFormat.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package lila.relay

import io.lemonlabs.uri._
import play.api.libs.json._
import play.api.libs.ws.{ WS, WSResponse }
import play.api.Play.current
import scala.concurrent.duration._

import lila.memo.AsyncCache
import lila.study.MultiPgn

private final class RelayFormatApi(
asyncCache: AsyncCache.Builder
) {

import RelayFormat._

def get(url: String): Fu[Option[RelayFormat]] = cache get Url.parse(url.pp).pp

private def guessFormat(url: Url): Fu[Option[RelayFormat]] = {

def guessSingleFile: Fu[Option[RelayFormat]] =
lila.common.Future.find(List(
url.some,
!url.path.parts.contains(mostCommonSingleFileName) option addPart(url, mostCommonSingleFileName)
).flatten.distinct)(looksLikePgn) map2 { (u: Url) =>
SingleFile(pgnDoc(u))
}

def guessManyFiles: Fu[Option[RelayFormat]] =
lila.common.Future.find(
List(url) ::: mostCommonIndexNames.filterNot(url.path.parts.contains).map(addPart(url, _))
)(looksLikeJson) map2 { (index: Url) =>
ManyFiles(index, i => pgnDoc(replaceLastPart(index, s"game-$i.pgn")))
}

guessSingleFile orElse guessManyFiles
} thenPp

private val cache = asyncCache.multi[Url, Option[RelayFormat]](
name = "relayFormat",
f = guessFormat,
expireAfter = _.ExpireAfterAccess(1 hour)
)
}

private sealed trait RelayFormat

private object RelayFormat {

sealed trait DocFormat
object DocFormat {
case object Json extends DocFormat
case object Pgn extends DocFormat
}

case class Doc(url: Url, format: DocFormat)

def jsonDoc(url: Url) = Doc(url, DocFormat.Json)
def pgnDoc(url: Url) = Doc(url, DocFormat.Pgn)

case class SingleFile(doc: Doc) extends RelayFormat

case class ManyFiles(jsonIndex: Url, game: Int => Doc) extends RelayFormat

def httpGet(url: Url): Fu[Option[String]] =
WS.url(url.toString.pp("httpGet")).withRequestTimeout(4.seconds.toMillis).get().map {
case res if res.status == 200 => res.body.some
case _ => none
}

def looksLikePgn(body: String): Boolean = MultiPgn.split(body, 1).value.headOption ?? { pgn =>
lila.study.PgnImport(pgn, Nil).isSuccess
}
def looksLikePgn(url: Url): Fu[Boolean] = httpGet(url).map { _ exists looksLikePgn }

def looksLikeJson(body: String): Boolean = Json.parse(body) != JsNull
def looksLikeJson(url: Url): Fu[Boolean] = httpGet(url).map { _ exists looksLikeJson }

def addPart(url: Url, part: String) = url.withPath(url.path addPart part)
def replaceLastPart(url: Url, withPart: String) =
if (url.path.isEmpty) addPart(url, withPart)
else url.withPath {
url.path.withParts {
url.path.parts.init :+ withPart
}
}

val mostCommonSingleFileName = "games.pgn"
val mostCommonIndexNames = List("round.json", "index.json")
}
Loading

0 comments on commit 4fc7adc

Please sign in to comment.