Skip to content

Commit

Permalink
Retry projection fixes and exposed additional templates for sparql qu…
Browse files Browse the repository at this point in the history
…eries (BlueBrain#1160)

* Exposed {resource_project} and {resource_type} patterns to SPARQL queries

* Made the EventStream[F] lazy in computing the stream of events based on Ref
  • Loading branch information
bogdanromanx authored May 12, 2020
1 parent a6fc0d5 commit ff17ac3
Show file tree
Hide file tree
Showing 13 changed files with 137 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ object EventStreamClient {
private def decodeEvent(str: String): Either[ClientError, Event] =
decode[Event](str).leftMap(err => SerializationError(err.getMessage, "NexusAPIEvent", Some(str)))

private def buildStream(uri: Uri, lastEventIdCache: Ref[F, Option[Offset]]): F[EventStream[F]] =
lastEventIdCache.get
private def buildStream(uri: Uri, lastEventIdCache: Ref[F, Option[Offset]]): F[EventStream[F]] = {
val streamF = lastEventIdCache.get
.map { lastEventId =>
val lastEventIdH = lastEventId.map[Header](id => `Last-Event-Id`(EventId(id.asString)))
val req = Request[F](uri = uri, headers = Headers(lastEventIdH.toList ++ env.authorizationHeader.toList))
Expand All @@ -93,7 +93,8 @@ object EventStreamClient {
resultT.value
}
}
.map(stream => EventStream(stream, lastEventIdCache))
F.delay(EventStream(streamF, lastEventIdCache))
}

def apply(lastEventId: Option[Offset]): F[EventStream[F]] =
Ref.of(lastEventId).flatMap(ref => buildStream(env.eventsUri, ref))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import java.time.Instant
import java.util.concurrent.TimeUnit.SECONDS

import ch.epfl.bluebrain.nexus.cli.config.influx.TypeConfig
import ch.epfl.bluebrain.nexus.cli.sse.{OrgLabel, ProjectLabel}
import fs2.Chunk
import org.http4s.headers.`Content-Type`
import org.http4s.{EntityEncoder, MediaType}
Expand Down Expand Up @@ -41,24 +40,18 @@ object InfluxPoint {
/**
* Create a series of [[InfluxPoint]] from [[SparqlResults]].
*
* @param results SPARQL query results
* @param organization the organization used as a tag
* @param project the project used as a tag
* @param results SPARQL query results
*/
def fromSparqlResults(
results: SparqlResults,
organization: OrgLabel,
project: ProjectLabel,
tc: TypeConfig
): List[InfluxPoint] =
results.results.bindings.flatMap { bindings =>
val values = tc.values.flatMap(value => bindings.get(value).map(value -> _.value)).toMap
Option.when(values.nonEmpty) {
val tags = bindings.view
.filterKeys(key => !tc.values(key) && key != tc.timestamp)
.mapValues(_.value) ++ Seq(
"project" -> s"${organization.value}/${project.value}"
)
.mapValues(_.value)
val timestamp = bindings.get(tc.timestamp).flatMap(binding => Try(Instant.parse(binding.value)).toOption)
InfluxPoint(tc.measurement, tags.toMap, values, timestamp)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import ch.epfl.bluebrain.nexus.cli.ProjectionPipes._
import ch.epfl.bluebrain.nexus.cli.clients._
import ch.epfl.bluebrain.nexus.cli.config.AppConfig
import ch.epfl.bluebrain.nexus.cli.config.influx.{InfluxConfig, TypeConfig}
import ch.epfl.bluebrain.nexus.cli.sse.{EventStream, Offset, OrgLabel, ProjectLabel}
import ch.epfl.bluebrain.nexus.cli.{logRetryErrors, Console}
import ch.epfl.bluebrain.nexus.cli.sse.{EventStream, Offset}
import ch.epfl.bluebrain.nexus.cli.{logRetryErrors, ClientErrOr, Console}
import fs2.Stream
import retry.CatsEffect._
import retry.RetryPolicy
import retry.{RetryDetails, RetryPolicy}
import retry.syntax.all._

class InfluxProjection[F[_]: ContextShift](
Expand All @@ -39,50 +39,54 @@ class InfluxProjection[F[_]: ContextShift](
} yield ()

private def executeStream(eventStream: EventStream[F]): F[Unit] = {
implicit def logOnError[A] = logRetryErrors[F, A]("fetching SSE")
def successCondition[A] = cfg.env.httpClient.retry.condition.notRetryFromEither[A] _
val compiledStream = eventStream.value
.map {
case Right((ev, org, proj)) =>
Right(
ic.projects
.get((org, proj))
.flatMap(pc =>
pc.types.collectFirst {
case tc if ev.resourceTypes.exists(_.toString == tc.tpe) => (pc, tc, ev, org, proj)
}
)
)
case Left(err) => Left(err)
}
.through(printEventProgress(console))
.evalMap {
case (pc, tc, ev, org, proj) =>
val query = tc.query
.replaceAllLiterally("{resource_id}", ev.resourceId.renderString)
.replaceAllLiterally("{event_rev}", ev.rev.toString)
spc.query(org, proj, pc.sparqlView, query).flatMap(res => insert(tc, res, org, proj))
}
.through(printProjectionProgress(console))
.attempt
.map(_.leftMap(err => Unexpected(Option(err.getMessage).getOrElse("").take(30))).map(_ => ()))
.compile
.lastOrError
implicit def logOnError[A]: (ClientErrOr[A], RetryDetails) => F[Unit] =
logRetryErrors[F, A]("executing the projection")
def successCondition[A] = cfg.env.httpClient.retry.condition.notRetryFromEither[A] _

val compiledStream = eventStream.value.flatMap { stream =>
stream
.map {
case Right((ev, org, proj)) =>
Right(
ic.projects
.get((org, proj))
.flatMap(pc =>
pc.types.collectFirst {
case tc if ev.resourceTypes.exists(_.toString == tc.tpe) => (pc, tc, ev, org, proj)
}
)
)
case Left(err) => Left(err)
}
.through(printEventProgress(console))
.evalMap {
case (pc, tc, ev, org, proj) =>
val query = tc.query
.replaceAllLiterally("{resource_id}", ev.resourceId.renderString)
.replaceAllLiterally("{resource_type}", tc.tpe)
.replaceAllLiterally("{resource_project}", s"${org.show}/${proj.show}")
.replaceAllLiterally("{event_rev}", ev.rev.toString)
spc.query(org, proj, pc.sparqlView, query).flatMap(res => insert(tc, res))
}
.through(printProjectionProgress(console))
.attempt
.map(_.leftMap(err => Unexpected(Option(err.getMessage).getOrElse("").take(30))).map(_ => ()))
.compile
.lastOrError
}

compiledStream.retryingM(successCondition) >> F.unit
}

private def insert(
tc: TypeConfig,
res: Either[ClientError, SparqlResults],
org: OrgLabel,
proj: ProjectLabel
res: Either[ClientError, SparqlResults]
): F[Either[ClientError, Unit]] =
res match {
case Left(err) => F.pure(Left(err))
case Right(results) =>
InfluxPoint
.fromSparqlResults(results, org, proj, tc)
.fromSparqlResults(results, tc)
.traverse { point => inc.write(point) }
.map(_.foldM(())((_, r) => r))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Timer}
import cats.implicits._
import ch.epfl.bluebrain.nexus.cli.CliError.ClientError
import ch.epfl.bluebrain.nexus.cli.CliError.ClientError.Unexpected
import ch.epfl.bluebrain.nexus.cli.{logRetryErrors, Console}
import ch.epfl.bluebrain.nexus.cli.{logRetryErrors, ClientErrOr, Console}
import ch.epfl.bluebrain.nexus.cli.ProjectionPipes._
import ch.epfl.bluebrain.nexus.cli.clients.SparqlResults.{Binding, Literal}
import ch.epfl.bluebrain.nexus.cli.clients.{EventStreamClient, SparqlClient, SparqlResults}
Expand Down Expand Up @@ -49,10 +49,12 @@ class PostgresProjection[F[_]: ContextShift](
} yield ()

private def executeStream(eventStream: EventStream[F]): F[Unit] = {
implicit def logOnError[A] = logRetryErrors[F, A]("fetching SSE")
def successCondition[A] = cfg.env.httpClient.retry.condition.notRetryFromEither[A] _
val compiledStream =
eventStream.value
implicit def logOnError[A]: (ClientErrOr[A], RetryDetails) => F[Unit] =
logRetryErrors[F, A]("executing the projection")
def successCondition[A] = cfg.env.httpClient.retry.condition.notRetryFromEither[A] _

val compiledStream = eventStream.value.flatMap { stream =>
stream
.map {
case Right((ev, org, proj)) =>
Right(
Expand All @@ -73,6 +75,8 @@ class PostgresProjection[F[_]: ContextShift](
.map { qc =>
val query = qc.query
.replaceAllLiterally("{resource_id}", ev.resourceId.renderString)
.replaceAllLiterally("{resource_type}", tc.tpe)
.replaceAllLiterally("{resource_project}", s"${org.show}/${proj.show}")
.replaceAllLiterally("{event_rev}", ev.rev.toString)
spc.query(org, proj, pc.sparqlView, query).flatMap(res => insert(qc, res))
}
Expand All @@ -84,6 +88,7 @@ class PostgresProjection[F[_]: ContextShift](
.map(_.leftMap(err => Unexpected(Option(err.getMessage).getOrElse("").take(30))).map(_ => ()))
.compile
.lastOrError
}
compiledStream.retryingM(successCondition) >> F.unit
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ trait EventStream[F[_]] {
/**
* The Stream of events.
*/
def value: Stream[F, ClientErrOr[LabeledEvent]]
def value: F[Stream[F, ClientErrOr[LabeledEvent]]]

/**
* The eventId for the last consumed event.
Expand All @@ -22,10 +22,10 @@ trait EventStream[F[_]] {

object EventStream {
final def apply[F[_]](
stream: Stream[F, ClientErrOr[LabeledEvent]],
stream: F[Stream[F, ClientErrOr[LabeledEvent]]],
ref: Ref[F, Option[Offset]]
): EventStream[F] = new EventStream[F] {
override def value: Stream[F, ClientErrOr[LabeledEvent]] = stream
override def currentEventId(): F[Option[Offset]] = ref.get
override def value: F[Stream[F, ClientErrOr[LabeledEvent]]] = stream
override def currentEventId(): F[Option[Offset]] = ref.get
}
}
4 changes: 3 additions & 1 deletion cli/src/test/resources/influx.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ influx {
PREFIX schema:<http://schema.org/>
PREFIX nxv:<https://bluebrain.github.io/nexus/vocabulary/>

SELECT ?created ?updated ?deprecated ?bytes WHERE {
SELECT ?project ?typeUri ?created ?updated ?deprecated ?bytes WHERE {
<{resource_id}> nxv:createdAt ?created .
<{resource_id}> nxv:updatedAt ?updated .
<{resource_id}> nxv:deprecated ?deprecated .
Expand All @@ -18,6 +18,8 @@ influx {
?d schema:contentSize ?cs .
?cs schema:value ?bytes .
}
BIND("{resource_project}" AS ?project)
BIND("{resource_type}" AS ?typeUri)
}
"""

Expand Down
22 changes: 13 additions & 9 deletions cli/src/test/resources/postgres.conf
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,25 @@ postgres {
ddl =
"""
CREATE TABLE IF NOT EXISTS schemas (
id VARCHAR NOT NULL UNIQUE,
rev INT NOT NULL,
last_updated TIMESTAMP WITH TIME ZONE NOT NULL
id VARCHAR NOT NULL UNIQUE,
rev INT NOT NULL,
last_updated TIMESTAMP WITH TIME ZONE NOT NULL,
project VARCHAR NOT NULL,
typeUri VARCHAR NOT NULL
);
"""
query =
"""
PREFIX nxv:<https://bluebrain.github.io/nexus/vocabulary/>
PREFIX owl:<http://www.w3.org/2002/07/owl#>
SELECT ?id ?rev ?last_updated
SELECT ?id ?rev ?last_updated ?project ?typeUri
WHERE {
<{resource_id}> a nxv:Schema .
<{resource_id}> nxv:rev ?rev .
<{resource_id}> a nxv:Schema .
<{resource_id}> nxv:rev ?rev .
<{resource_id}> nxv:updatedAt ?last_updated .
BIND("{resource_id}" as ?id) .
BIND("{resource_project}" AS ?project) .
BIND("{resource_type}" AS ?typeUri) .
FILTER(?rev >= {event_rev})
}
LIMIT 1
Expand All @@ -59,9 +63,9 @@ postgres {
PREFIX owl:<http://www.w3.org/2002/07/owl#>
SELECT ?id ?import
WHERE {
<{resource_id}> a nxv:Schema .
<{resource_id}> owl:imports ?import .
<{resource_id}> nxv:rev ?rev .
<{resource_id}> a nxv:Schema .
<{resource_id}> owl:imports ?import .
<{resource_id}> nxv:rev ?rev .
BIND("{resource_id}" as ?id) .
FILTER(?rev >= {event_rev})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,33 @@
"bytes",
"created",
"deprecated",
"project"
"project",
"typeUri"
],
"values" : [
[
"2020-02-24T15:39:17.745809Z",
18111712,
"2020-02-24T15:36:41.484672Z",
"false",
"bbp/atlas"
"bbp/atlas",
"https://neuroshapes.org/BrainParcellationDataLayer"
],
[
"2020-02-24T15:43:49.216073Z",
1891712,
"2020-02-24T15:43:49.216073Z",
"false",
"bbp/atlas"
"bbp/atlas",
"https://neuroshapes.org/BrainParcellationDataLayer"
],
[
"2020-02-25T08:48:33.808426Z",
13007853,
"2020-02-25T08:48:33.808426Z",
"false",
"bbp/atlas"
"bbp/atlas",
"https://neuroshapes.org/BrainParcellationDataLayer"
]
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,33 @@
"bytes",
"created",
"deprecated",
"project"
"project",
"typeUri"
],
"values" : [
[
"2019-11-11T14:59:49.241511Z",
13848730,
"2019-11-11T14:59:49.241511Z",
"false",
"bbp/atlas"
"bbp/atlas",
"https://neuroshapes.org/CellRecordSeries"
],
[
"2019-12-03T12:53:56.682642Z",
2884,
"2019-12-03T12:53:56.682642Z",
"false",
"bbp/atlas"
"bbp/atlas",
"https://neuroshapes.org/CellRecordSeries"
],
[
"2019-12-03T12:55:27.949226Z",
4046,
"2019-12-03T12:55:27.949226Z",
"false",
"bbp/atlas"
"bbp/atlas",
"https://neuroshapes.org/CellRecordSeries"
]
]
}
Expand Down
5 changes: 5 additions & 0 deletions cli/src/test/resources/templates/sparql-results-influx.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
"datatype": "http://www.w3.org/2001/XMLSchema#dateTime",
"type": "literal",
"value": "{updated}"
},
"project": {
"datatype": "http://www.w3.org/2001/XMLSchema#string",
"type": "literal",
"value": "{project}"
}
}
]
Expand Down
Loading

0 comments on commit ff17ac3

Please sign in to comment.