diff --git a/cli/src/main/scala/ch/epfl/bluebrain/nexus/cli/clients/EventStreamClient.scala b/cli/src/main/scala/ch/epfl/bluebrain/nexus/cli/clients/EventStreamClient.scala index 959baef060..2c6785e4fc 100644 --- a/cli/src/main/scala/ch/epfl/bluebrain/nexus/cli/clients/EventStreamClient.scala +++ b/cli/src/main/scala/ch/epfl/bluebrain/nexus/cli/clients/EventStreamClient.scala @@ -65,8 +65,8 @@ object EventStreamClient { private def decodeEvent(str: String): Either[ClientError, Event] = decode[Event](str).leftMap(err => SerializationError(err.getMessage, "NexusAPIEvent", Some(str))) - private def buildStream(uri: Uri, lastEventIdCache: Ref[F, Option[Offset]]): F[EventStream[F]] = - lastEventIdCache.get + private def buildStream(uri: Uri, lastEventIdCache: Ref[F, Option[Offset]]): F[EventStream[F]] = { + val streamF = lastEventIdCache.get .map { lastEventId => val lastEventIdH = lastEventId.map[Header](id => `Last-Event-Id`(EventId(id.asString))) val req = Request[F](uri = uri, headers = Headers(lastEventIdH.toList ++ env.authorizationHeader.toList)) @@ -93,7 +93,8 @@ object EventStreamClient { resultT.value } } - .map(stream => EventStream(stream, lastEventIdCache)) + F.delay(EventStream(streamF, lastEventIdCache)) + } def apply(lastEventId: Option[Offset]): F[EventStream[F]] = Ref.of(lastEventId).flatMap(ref => buildStream(env.eventsUri, ref)) diff --git a/cli/src/main/scala/ch/epfl/bluebrain/nexus/cli/clients/InfluxPoint.scala b/cli/src/main/scala/ch/epfl/bluebrain/nexus/cli/clients/InfluxPoint.scala index 438e2deb8a..e1754ba9f5 100644 --- a/cli/src/main/scala/ch/epfl/bluebrain/nexus/cli/clients/InfluxPoint.scala +++ b/cli/src/main/scala/ch/epfl/bluebrain/nexus/cli/clients/InfluxPoint.scala @@ -4,7 +4,6 @@ import java.time.Instant import java.util.concurrent.TimeUnit.SECONDS import ch.epfl.bluebrain.nexus.cli.config.influx.TypeConfig -import ch.epfl.bluebrain.nexus.cli.sse.{OrgLabel, ProjectLabel} import fs2.Chunk import org.http4s.headers.`Content-Type` import org.http4s.{EntityEncoder, MediaType} @@ -41,14 +40,10 @@ object InfluxPoint { /** * Create a series of [[InfluxPoint]] from [[SparqlResults]]. * - * @param results SPARQL query results - * @param organization the organization used as a tag - * @param project the project used as a tag + * @param results SPARQL query results */ def fromSparqlResults( results: SparqlResults, - organization: OrgLabel, - project: ProjectLabel, tc: TypeConfig ): List[InfluxPoint] = results.results.bindings.flatMap { bindings => @@ -56,9 +51,7 @@ object InfluxPoint { Option.when(values.nonEmpty) { val tags = bindings.view .filterKeys(key => !tc.values(key) && key != tc.timestamp) - .mapValues(_.value) ++ Seq( - "project" -> s"${organization.value}/${project.value}" - ) + .mapValues(_.value) val timestamp = bindings.get(tc.timestamp).flatMap(binding => Try(Instant.parse(binding.value)).toOption) InfluxPoint(tc.measurement, tags.toMap, values, timestamp) } diff --git a/cli/src/main/scala/ch/epfl/bluebrain/nexus/cli/modules/influx/InfluxProjection.scala b/cli/src/main/scala/ch/epfl/bluebrain/nexus/cli/modules/influx/InfluxProjection.scala index 4a568aaadb..9017db4c59 100644 --- a/cli/src/main/scala/ch/epfl/bluebrain/nexus/cli/modules/influx/InfluxProjection.scala +++ b/cli/src/main/scala/ch/epfl/bluebrain/nexus/cli/modules/influx/InfluxProjection.scala @@ -8,11 +8,11 @@ import ch.epfl.bluebrain.nexus.cli.ProjectionPipes._ import ch.epfl.bluebrain.nexus.cli.clients._ import ch.epfl.bluebrain.nexus.cli.config.AppConfig import ch.epfl.bluebrain.nexus.cli.config.influx.{InfluxConfig, TypeConfig} -import ch.epfl.bluebrain.nexus.cli.sse.{EventStream, Offset, OrgLabel, ProjectLabel} -import ch.epfl.bluebrain.nexus.cli.{logRetryErrors, Console} +import ch.epfl.bluebrain.nexus.cli.sse.{EventStream, Offset} +import ch.epfl.bluebrain.nexus.cli.{logRetryErrors, ClientErrOr, Console} import fs2.Stream import retry.CatsEffect._ -import retry.RetryPolicy +import retry.{RetryDetails, RetryPolicy} import retry.syntax.all._ class InfluxProjection[F[_]: ContextShift]( @@ -39,50 +39,54 @@ class InfluxProjection[F[_]: ContextShift]( } yield () private def executeStream(eventStream: EventStream[F]): F[Unit] = { - implicit def logOnError[A] = logRetryErrors[F, A]("fetching SSE") - def successCondition[A] = cfg.env.httpClient.retry.condition.notRetryFromEither[A] _ - val compiledStream = eventStream.value - .map { - case Right((ev, org, proj)) => - Right( - ic.projects - .get((org, proj)) - .flatMap(pc => - pc.types.collectFirst { - case tc if ev.resourceTypes.exists(_.toString == tc.tpe) => (pc, tc, ev, org, proj) - } - ) - ) - case Left(err) => Left(err) - } - .through(printEventProgress(console)) - .evalMap { - case (pc, tc, ev, org, proj) => - val query = tc.query - .replaceAllLiterally("{resource_id}", ev.resourceId.renderString) - .replaceAllLiterally("{event_rev}", ev.rev.toString) - spc.query(org, proj, pc.sparqlView, query).flatMap(res => insert(tc, res, org, proj)) - } - .through(printProjectionProgress(console)) - .attempt - .map(_.leftMap(err => Unexpected(Option(err.getMessage).getOrElse("").take(30))).map(_ => ())) - .compile - .lastOrError + implicit def logOnError[A]: (ClientErrOr[A], RetryDetails) => F[Unit] = + logRetryErrors[F, A]("executing the projection") + def successCondition[A] = cfg.env.httpClient.retry.condition.notRetryFromEither[A] _ + + val compiledStream = eventStream.value.flatMap { stream => + stream + .map { + case Right((ev, org, proj)) => + Right( + ic.projects + .get((org, proj)) + .flatMap(pc => + pc.types.collectFirst { + case tc if ev.resourceTypes.exists(_.toString == tc.tpe) => (pc, tc, ev, org, proj) + } + ) + ) + case Left(err) => Left(err) + } + .through(printEventProgress(console)) + .evalMap { + case (pc, tc, ev, org, proj) => + val query = tc.query + .replaceAllLiterally("{resource_id}", ev.resourceId.renderString) + .replaceAllLiterally("{resource_type}", tc.tpe) + .replaceAllLiterally("{resource_project}", s"${org.show}/${proj.show}") + .replaceAllLiterally("{event_rev}", ev.rev.toString) + spc.query(org, proj, pc.sparqlView, query).flatMap(res => insert(tc, res)) + } + .through(printProjectionProgress(console)) + .attempt + .map(_.leftMap(err => Unexpected(Option(err.getMessage).getOrElse("").take(30))).map(_ => ())) + .compile + .lastOrError + } compiledStream.retryingM(successCondition) >> F.unit } private def insert( tc: TypeConfig, - res: Either[ClientError, SparqlResults], - org: OrgLabel, - proj: ProjectLabel + res: Either[ClientError, SparqlResults] ): F[Either[ClientError, Unit]] = res match { case Left(err) => F.pure(Left(err)) case Right(results) => InfluxPoint - .fromSparqlResults(results, org, proj, tc) + .fromSparqlResults(results, tc) .traverse { point => inc.write(point) } .map(_.foldM(())((_, r) => r)) } diff --git a/cli/src/main/scala/ch/epfl/bluebrain/nexus/cli/modules/postgres/PostgresProjection.scala b/cli/src/main/scala/ch/epfl/bluebrain/nexus/cli/modules/postgres/PostgresProjection.scala index 8b0df74885..70ed3ccc28 100644 --- a/cli/src/main/scala/ch/epfl/bluebrain/nexus/cli/modules/postgres/PostgresProjection.scala +++ b/cli/src/main/scala/ch/epfl/bluebrain/nexus/cli/modules/postgres/PostgresProjection.scala @@ -6,7 +6,7 @@ import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Timer} import cats.implicits._ import ch.epfl.bluebrain.nexus.cli.CliError.ClientError import ch.epfl.bluebrain.nexus.cli.CliError.ClientError.Unexpected -import ch.epfl.bluebrain.nexus.cli.{logRetryErrors, Console} +import ch.epfl.bluebrain.nexus.cli.{logRetryErrors, ClientErrOr, Console} import ch.epfl.bluebrain.nexus.cli.ProjectionPipes._ import ch.epfl.bluebrain.nexus.cli.clients.SparqlResults.{Binding, Literal} import ch.epfl.bluebrain.nexus.cli.clients.{EventStreamClient, SparqlClient, SparqlResults} @@ -49,10 +49,12 @@ class PostgresProjection[F[_]: ContextShift]( } yield () private def executeStream(eventStream: EventStream[F]): F[Unit] = { - implicit def logOnError[A] = logRetryErrors[F, A]("fetching SSE") - def successCondition[A] = cfg.env.httpClient.retry.condition.notRetryFromEither[A] _ - val compiledStream = - eventStream.value + implicit def logOnError[A]: (ClientErrOr[A], RetryDetails) => F[Unit] = + logRetryErrors[F, A]("executing the projection") + def successCondition[A] = cfg.env.httpClient.retry.condition.notRetryFromEither[A] _ + + val compiledStream = eventStream.value.flatMap { stream => + stream .map { case Right((ev, org, proj)) => Right( @@ -73,6 +75,8 @@ class PostgresProjection[F[_]: ContextShift]( .map { qc => val query = qc.query .replaceAllLiterally("{resource_id}", ev.resourceId.renderString) + .replaceAllLiterally("{resource_type}", tc.tpe) + .replaceAllLiterally("{resource_project}", s"${org.show}/${proj.show}") .replaceAllLiterally("{event_rev}", ev.rev.toString) spc.query(org, proj, pc.sparqlView, query).flatMap(res => insert(qc, res)) } @@ -84,6 +88,7 @@ class PostgresProjection[F[_]: ContextShift]( .map(_.leftMap(err => Unexpected(Option(err.getMessage).getOrElse("").take(30))).map(_ => ())) .compile .lastOrError + } compiledStream.retryingM(successCondition) >> F.unit } diff --git a/cli/src/main/scala/ch/epfl/bluebrain/nexus/cli/sse/EventStream.scala b/cli/src/main/scala/ch/epfl/bluebrain/nexus/cli/sse/EventStream.scala index 814646e373..32181535e0 100644 --- a/cli/src/main/scala/ch/epfl/bluebrain/nexus/cli/sse/EventStream.scala +++ b/cli/src/main/scala/ch/epfl/bluebrain/nexus/cli/sse/EventStream.scala @@ -12,7 +12,7 @@ trait EventStream[F[_]] { /** * The Stream of events. */ - def value: Stream[F, ClientErrOr[LabeledEvent]] + def value: F[Stream[F, ClientErrOr[LabeledEvent]]] /** * The eventId for the last consumed event. @@ -22,10 +22,10 @@ trait EventStream[F[_]] { object EventStream { final def apply[F[_]]( - stream: Stream[F, ClientErrOr[LabeledEvent]], + stream: F[Stream[F, ClientErrOr[LabeledEvent]]], ref: Ref[F, Option[Offset]] ): EventStream[F] = new EventStream[F] { - override def value: Stream[F, ClientErrOr[LabeledEvent]] = stream - override def currentEventId(): F[Option[Offset]] = ref.get + override def value: F[Stream[F, ClientErrOr[LabeledEvent]]] = stream + override def currentEventId(): F[Option[Offset]] = ref.get } } diff --git a/cli/src/test/resources/influx.conf b/cli/src/test/resources/influx.conf index f2594dfe53..7929933f56 100644 --- a/cli/src/test/resources/influx.conf +++ b/cli/src/test/resources/influx.conf @@ -8,7 +8,7 @@ influx { PREFIX schema: PREFIX nxv: - SELECT ?created ?updated ?deprecated ?bytes WHERE { + SELECT ?project ?typeUri ?created ?updated ?deprecated ?bytes WHERE { <{resource_id}> nxv:createdAt ?created . <{resource_id}> nxv:updatedAt ?updated . <{resource_id}> nxv:deprecated ?deprecated . @@ -18,6 +18,8 @@ influx { ?d schema:contentSize ?cs . ?cs schema:value ?bytes . } + BIND("{resource_project}" AS ?project) + BIND("{resource_type}" AS ?typeUri) } """ diff --git a/cli/src/test/resources/postgres.conf b/cli/src/test/resources/postgres.conf index df144884f7..e3405bbbb3 100644 --- a/cli/src/test/resources/postgres.conf +++ b/cli/src/test/resources/postgres.conf @@ -24,21 +24,25 @@ postgres { ddl = """ CREATE TABLE IF NOT EXISTS schemas ( - id VARCHAR NOT NULL UNIQUE, - rev INT NOT NULL, - last_updated TIMESTAMP WITH TIME ZONE NOT NULL + id VARCHAR NOT NULL UNIQUE, + rev INT NOT NULL, + last_updated TIMESTAMP WITH TIME ZONE NOT NULL, + project VARCHAR NOT NULL, + typeUri VARCHAR NOT NULL ); """ query = """ PREFIX nxv: PREFIX owl: - SELECT ?id ?rev ?last_updated + SELECT ?id ?rev ?last_updated ?project ?typeUri WHERE { - <{resource_id}> a nxv:Schema . - <{resource_id}> nxv:rev ?rev . + <{resource_id}> a nxv:Schema . + <{resource_id}> nxv:rev ?rev . <{resource_id}> nxv:updatedAt ?last_updated . BIND("{resource_id}" as ?id) . + BIND("{resource_project}" AS ?project) . + BIND("{resource_type}" AS ?typeUri) . FILTER(?rev >= {event_rev}) } LIMIT 1 @@ -59,9 +63,9 @@ postgres { PREFIX owl: SELECT ?id ?import WHERE { - <{resource_id}> a nxv:Schema . - <{resource_id}> owl:imports ?import . - <{resource_id}> nxv:rev ?rev . + <{resource_id}> a nxv:Schema . + <{resource_id}> owl:imports ?import . + <{resource_id}> nxv:rev ?rev . BIND("{resource_id}" as ?id) . FILTER(?rev >= {event_rev}) } diff --git a/cli/src/test/resources/templates/influxdb-results-brain-parcelation.json b/cli/src/test/resources/templates/influxdb-results-brain-parcelation.json index e891abba37..3b4072efba 100644 --- a/cli/src/test/resources/templates/influxdb-results-brain-parcelation.json +++ b/cli/src/test/resources/templates/influxdb-results-brain-parcelation.json @@ -10,7 +10,8 @@ "bytes", "created", "deprecated", - "project" + "project", + "typeUri" ], "values" : [ [ @@ -18,21 +19,24 @@ 18111712, "2020-02-24T15:36:41.484672Z", "false", - "bbp/atlas" + "bbp/atlas", + "https://neuroshapes.org/BrainParcellationDataLayer" ], [ "2020-02-24T15:43:49.216073Z", 1891712, "2020-02-24T15:43:49.216073Z", "false", - "bbp/atlas" + "bbp/atlas", + "https://neuroshapes.org/BrainParcellationDataLayer" ], [ "2020-02-25T08:48:33.808426Z", 13007853, "2020-02-25T08:48:33.808426Z", "false", - "bbp/atlas" + "bbp/atlas", + "https://neuroshapes.org/BrainParcellationDataLayer" ] ] } diff --git a/cli/src/test/resources/templates/influxdb-results-cell-record.json b/cli/src/test/resources/templates/influxdb-results-cell-record.json index 10f579ec47..1e69ac59b4 100644 --- a/cli/src/test/resources/templates/influxdb-results-cell-record.json +++ b/cli/src/test/resources/templates/influxdb-results-cell-record.json @@ -10,7 +10,8 @@ "bytes", "created", "deprecated", - "project" + "project", + "typeUri" ], "values" : [ [ @@ -18,21 +19,24 @@ 13848730, "2019-11-11T14:59:49.241511Z", "false", - "bbp/atlas" + "bbp/atlas", + "https://neuroshapes.org/CellRecordSeries" ], [ "2019-12-03T12:53:56.682642Z", 2884, "2019-12-03T12:53:56.682642Z", "false", - "bbp/atlas" + "bbp/atlas", + "https://neuroshapes.org/CellRecordSeries" ], [ "2019-12-03T12:55:27.949226Z", 4046, "2019-12-03T12:55:27.949226Z", "false", - "bbp/atlas" + "bbp/atlas", + "https://neuroshapes.org/CellRecordSeries" ] ] } diff --git a/cli/src/test/resources/templates/sparql-results-influx.json b/cli/src/test/resources/templates/sparql-results-influx.json index eada503110..d69a46fc60 100644 --- a/cli/src/test/resources/templates/sparql-results-influx.json +++ b/cli/src/test/resources/templates/sparql-results-influx.json @@ -29,6 +29,11 @@ "datatype": "http://www.w3.org/2001/XMLSchema#dateTime", "type": "literal", "value": "{updated}" + }, + "project": { + "datatype": "http://www.w3.org/2001/XMLSchema#string", + "type": "literal", + "value": "{project}" } } ] diff --git a/cli/src/test/scala/ch/epfl/bluebrain/nexus/cli/clients/EventStreamClientSpec.scala b/cli/src/test/scala/ch/epfl/bluebrain/nexus/cli/clients/EventStreamClientSpec.scala index 231a1c3288..d9f7b32fbb 100644 --- a/cli/src/test/scala/ch/epfl/bluebrain/nexus/cli/clients/EventStreamClientSpec.scala +++ b/cli/src/test/scala/ch/epfl/bluebrain/nexus/cli/clients/EventStreamClientSpec.scala @@ -78,7 +78,8 @@ class EventStreamClientSpec extends AbstractCliSpec with Http4sExtras with Optio val ec = EventStreamClient[IO](client, pc, env) for { eventStream <- ec.apply(None) - eventList <- eventStream.value.collect { case Right((event, _, _)) => event }.compile.toList + stream <- eventStream.value + eventList <- stream.collect { case Right((event, _, _)) => event }.compile.toList _ = eventList shouldEqual events } yield () } @@ -86,7 +87,8 @@ class EventStreamClientSpec extends AbstractCliSpec with Http4sExtras with Optio val ec = EventStreamClient[IO](client, pc, env) for { eventStream <- ec.apply(offset) - eventList <- eventStream.value.collect { case Right((event, _, _)) => event }.compile.toList + stream <- eventStream.value + eventList <- stream.collect { case Right((event, _, _)) => event }.compile.toList _ = eventList shouldEqual events.drop(3) } yield () } @@ -94,7 +96,8 @@ class EventStreamClientSpec extends AbstractCliSpec with Http4sExtras with Optio val ec = EventStreamClient[IO](client, pc, env) for { eventStream <- ec.apply(orgLabel, None) - eventList <- eventStream.value.collect { case Right((event, _, _)) => event }.compile.toList + stream <- eventStream.value + eventList <- stream.collect { case Right((event, _, _)) => event }.compile.toList _ = eventList shouldEqual events } yield () } @@ -102,7 +105,8 @@ class EventStreamClientSpec extends AbstractCliSpec with Http4sExtras with Optio val ec = EventStreamClient[IO](client, pc, env) for { eventStream <- ec.apply(orgLabel, offset) - eventList <- eventStream.value.collect { case Right((event, _, _)) => event }.compile.toList + stream <- eventStream.value + eventList <- stream.collect { case Right((event, _, _)) => event }.compile.toList _ = eventList shouldEqual events.drop(3) } yield () } @@ -110,7 +114,8 @@ class EventStreamClientSpec extends AbstractCliSpec with Http4sExtras with Optio val ec = EventStreamClient[IO](client, pc, env) for { eventStream <- ec.apply(orgLabel, projectLabel, None) - eventList <- eventStream.value.collect { case Right((event, _, _)) => event }.compile.toList + stream <- eventStream.value + eventList <- stream.collect { case Right((event, _, _)) => event }.compile.toList _ = eventList shouldEqual events } yield () } @@ -118,7 +123,8 @@ class EventStreamClientSpec extends AbstractCliSpec with Http4sExtras with Optio val ec = EventStreamClient[IO](client, pc, env) for { eventStream <- ec.apply(orgLabel, projectLabel, offset) - eventList <- eventStream.value.collect { case Right((event, _, _)) => event }.compile.toList + stream <- eventStream.value + eventList <- stream.collect { case Right((event, _, _)) => event }.compile.toList _ = eventList shouldEqual events.drop(3) } yield () } diff --git a/cli/src/test/scala/ch/epfl/bluebrain/nexus/cli/clients/InfluxPointSpec.scala b/cli/src/test/scala/ch/epfl/bluebrain/nexus/cli/clients/InfluxPointSpec.scala index 1c5daefe12..bdaa959d9e 100644 --- a/cli/src/test/scala/ch/epfl/bluebrain/nexus/cli/clients/InfluxPointSpec.scala +++ b/cli/src/test/scala/ch/epfl/bluebrain/nexus/cli/clients/InfluxPointSpec.scala @@ -6,7 +6,6 @@ import java.util.regex.Pattern.quote import cats.effect.IO import cats.implicits._ import ch.epfl.bluebrain.nexus.cli.config.influx.TypeConfig -import ch.epfl.bluebrain.nexus.cli.sse.{OrgLabel, ProjectLabel} import ch.epfl.bluebrain.nexus.cli.utils.{Resources, TimeTransformation} import fs2._ import fs2.text._ @@ -41,7 +40,8 @@ class InfluxPointSpec extends AnyWordSpecLike with Matchers with Resources with Map( quote("{created}") -> created.toString, quote("{updated}") -> updated.toString, - quote("{bytes}") -> 1234.toString + quote("{bytes}") -> 1234.toString, + quote("{project}") -> "myorg/myproject" ) ).as[SparqlResults].getOrElse(throw new IllegalArgumentException) @@ -54,7 +54,7 @@ class InfluxPointSpec extends AnyWordSpecLike with Matchers with Resources with Some(updated) ) - InfluxPoint.fromSparqlResults(sparqlResults, OrgLabel("myorg"), ProjectLabel("myproject"), typeConfig) shouldEqual + InfluxPoint.fromSparqlResults(sparqlResults, typeConfig) shouldEqual List(expected) } diff --git a/cli/src/test/scala/ch/epfl/bluebrain/nexus/cli/dummies/TestEventStreamClient.scala b/cli/src/test/scala/ch/epfl/bluebrain/nexus/cli/dummies/TestEventStreamClient.scala index 0c4a02f729..22d8626754 100644 --- a/cli/src/test/scala/ch/epfl/bluebrain/nexus/cli/dummies/TestEventStreamClient.scala +++ b/cli/src/test/scala/ch/epfl/bluebrain/nexus/cli/dummies/TestEventStreamClient.scala @@ -5,9 +5,9 @@ import java.util.UUID import cats.effect.Sync import cats.effect.concurrent.Ref import cats.implicits._ -import ch.epfl.bluebrain.nexus.cli.{ClientErrOr, LabeledEvent} import ch.epfl.bluebrain.nexus.cli.clients.{EventStreamClient, ProjectClient} import ch.epfl.bluebrain.nexus.cli.sse._ +import ch.epfl.bluebrain.nexus.cli.{ClientErrOr, LabeledEvent} import fs2.{Pipe, Stream} class TestEventStreamClient[F[_]](events: List[Event], projectClient: ProjectClient[F])(implicit F: Sync[F]) @@ -33,32 +33,32 @@ class TestEventStreamClient[F[_]](events: List[Event], projectClient: ProjectCli projectClient.labels(event.organization, event.project).map(_.map { case (org, proj) => (event, org, proj) }) override def apply(lastEventId: Option[Offset]): F[EventStream[F]] = - for { - ref <- Ref.of(lastEventId) - events <- eventsFrom(ref) - } yield EventStream(Stream.fromIterator[F](events.iterator).through(saveOffset(ref)).evalMap(eventAndLabels), ref) + Ref.of(lastEventId).flatMap { ref => + val stream = eventsFrom(ref).map { events => + Stream.fromIterator[F](events.iterator).through(saveOffset(ref)).evalMap(eventAndLabels) + } + F.delay(EventStream(stream, ref)) + } override def apply(organization: OrgLabel, lastEventId: Option[Offset]): F[EventStream[F]] = - for { - ref <- Ref.of(lastEventId) - events <- eventsFrom(ref) - } yield EventStream( - Stream.fromIterator[F](events.iterator).through(saveOffset(ref)).evalMap(eventAndLabels).filter { - case Right((_, org, _)) => org == organization - case Left(_) => true - }, - ref - ) + Ref.of(lastEventId).flatMap { ref => + val stream = eventsFrom(ref).map { events => + Stream.fromIterator[F](events.iterator).through(saveOffset(ref)).evalMap(eventAndLabels).filter { + case Right((_, org, _)) => org == organization + case Left(_) => true + } + } + F.delay(EventStream(stream, ref)) + } override def apply(organization: OrgLabel, project: ProjectLabel, lastEventId: Option[Offset]): F[EventStream[F]] = - for { - ref <- Ref.of(lastEventId) - events <- eventsFrom(ref) - } yield EventStream( - Stream.fromIterator[F](events.iterator).through(saveOffset(ref)).evalMap(eventAndLabels).filter { - case Right((_, org, proj)) => org == organization && proj == project - case Left(_) => true - }, - ref - ) + Ref.of(lastEventId).flatMap { ref => + val stream = eventsFrom(ref).map { events => + Stream.fromIterator[F](events.iterator).through(saveOffset(ref)).evalMap(eventAndLabels).filter { + case Right((_, org, proj)) => org == organization && proj == project + case Left(_) => true + } + } + F.delay(EventStream(stream, ref)) + } }