Skip to content

Commit

Permalink
log slow blazegraph queries (BlueBrain#3823)
Browse files Browse the repository at this point in the history
* WIP

* add very basic store test

* add BlazegraphSlowQueryLoggerTest

* add config

* refactors

* scalafmt

* replace view id and project with viewref

* log subject as json, NOT NULL

* move schema to schema.ddl

* rename blazegraph_slow_queries to blazegraph_queries

* log failed queries also

* change config, add logging

* dont log the query

* rename logSlowQueries to apply

* read record back from store test

* dont use try.get, oops

* add drop table for blazegraph_queries

* make BlazegraphSlowQueryStore impl a private class

* remove copied class, oops

* split out slow query model class

* change distage construction

* move BlazegraphQueryContext

* move BlazegraphSlowQuery

* record whether outcome was successful

* fix logging statement

* add test for store failure

* scalafmt

* add deletion capability to store

* remove boilerplate from tests

* scalafmt

* don't use arraylist

* rename was_error to failed

* make SlowQueriesConfig

* make config match changes to scala config

* add docs, remove Impl class and just use private class

* scalafmt

* pr feedback

* some pr feedback

* assert timestamp is epoch

* more pr

* scalafmt
  • Loading branch information
shinyhappydan authored Apr 26, 2023
1 parent b7feed0 commit 898fa65
Show file tree
Hide file tree
Showing 18 changed files with 576 additions and 9 deletions.
8 changes: 8 additions & 0 deletions delta/plugins/blazegraph/src/main/resources/blazegraph.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,14 @@ plugins.blazegraph {

# Blazegraph query timeout
query-timeout = "1 minute"
slow-queries {
# How long a query takes before it is considered slow (and hence, logged)
slow-query-threshold = 30 seconds
# how long to keep logged slow queries
log-ttl = 30 days
# how often to delete expired logs
delete-expired-logs-every = 1 hour
}
# the blazegraph event log configuration
event-log = ${app.defaults.event-log}
# the elasticsearch pagination config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewReje
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewValue._
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.{contexts, schema => viewsSchemaId, BlazegraphView, BlazegraphViewCommand, BlazegraphViewEvent, BlazegraphViewRejection, BlazegraphViewState, BlazegraphViewValue}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.routes.BlazegraphViewsRoutes
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries.{BlazegraphSlowQueryDeleter, BlazegraphSlowQueryLogger, BlazegraphSlowQueryStore}
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.api.JsonLdApi
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
Expand Down Expand Up @@ -56,8 +57,32 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
HttpClient()(cfg.indexingClient, as.classicSystem, sc)
}

make[BlazegraphSlowQueryStore].from { (xas: Transactors) =>
BlazegraphSlowQueryStore(
xas
)
}

make[BlazegraphSlowQueryDeleter].fromEffect {
(supervisor: Supervisor, store: BlazegraphSlowQueryStore, cfg: BlazegraphViewsConfig) =>
BlazegraphSlowQueryDeleter.start(
supervisor,
store,
cfg.slowQueries.logTtl,
cfg.slowQueries.deleteExpiredLogsEvery
)
}

make[BlazegraphSlowQueryLogger].from { (cfg: BlazegraphViewsConfig, store: BlazegraphSlowQueryStore) =>
BlazegraphSlowQueryLogger(store, cfg.slowQueries.slowQueryThreshold)
}

make[BlazegraphClient].named("blazegraph-indexing-client").from {
(cfg: BlazegraphViewsConfig, client: HttpClient @Id("http-indexing-client"), as: ActorSystem[Nothing]) =>
(
cfg: BlazegraphViewsConfig,
client: HttpClient @Id("http-indexing-client"),
as: ActorSystem[Nothing]
) =>
BlazegraphClient(client, cfg.base, cfg.credentials, cfg.queryTimeout)(as.classicSystem)
}

Expand All @@ -67,7 +92,11 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
}

make[BlazegraphClient].named("blazegraph-query-client").from {
(cfg: BlazegraphViewsConfig, client: HttpClient @Id("http-query-client"), as: ActorSystem[Nothing]) =>
(
cfg: BlazegraphViewsConfig,
client: HttpClient @Id("http-query-client"),
as: ActorSystem[Nothing]
) =>
BlazegraphClient(client, cfg.base, cfg.credentials, cfg.queryTimeout)(as.classicSystem)
}

Expand Down Expand Up @@ -136,10 +165,19 @@ class BlazegraphPluginModule(priority: Int) extends ModuleDef {
fetchContext: FetchContext[ContextRejection],
views: BlazegraphViews,
client: BlazegraphClient @Id("blazegraph-query-client"),
slowQueryLogger: BlazegraphSlowQueryLogger,
cfg: BlazegraphViewsConfig,
xas: Transactors
) =>
BlazegraphViewsQuery(aclCheck, fetchContext.mapRejection(ProjectContextRejection), views, client, cfg.prefix, xas)
BlazegraphViewsQuery(
aclCheck,
fetchContext.mapRejection(ProjectContextRejection),
views,
client,
slowQueryLogger,
cfg.prefix,
xas
)
}

make[BlazegraphViewsRoutes].from {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewReje
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.BlazegraphViewValue.{AggregateBlazegraphViewValue, IndexingBlazegraphViewValue}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model.SparqlLink.{SparqlExternalLink, SparqlResourceLink}
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.model._
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries.BlazegraphSlowQueryLogger
import ch.epfl.bluebrain.nexus.delta.rdf.IriOrBNode.Iri
import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery
import ch.epfl.bluebrain.nexus.delta.sdk.acls.AclCheck
Expand All @@ -24,6 +25,7 @@ import ch.epfl.bluebrain.nexus.delta.sdk.projects.FetchContext
import ch.epfl.bluebrain.nexus.delta.sdk.projects.model.{ApiMappings, ProjectBase}
import ch.epfl.bluebrain.nexus.delta.sdk.views.View.{AggregateView, IndexingView}
import ch.epfl.bluebrain.nexus.delta.sdk.views.{ViewRef, ViewsStore}
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Subject
import ch.epfl.bluebrain.nexus.delta.sourcing.model.ProjectRef
import monix.bio.{IO, Task}

Expand Down Expand Up @@ -94,6 +96,7 @@ object BlazegraphViewsQuery {
fetchContext: FetchContext[BlazegraphViewRejection],
views: BlazegraphViews,
client: SparqlQueryClient,
logSlowQueries: BlazegraphSlowQueryLogger,
prefix: String,
xas: Transactors
): Task[BlazegraphViewsQuery] = {
Expand Down Expand Up @@ -169,6 +172,8 @@ object BlazegraphViewsQuery {
)(implicit caller: Caller): IO[BlazegraphViewRejection, R] =
for {
view <- viewsStore.fetch(id, project)
p <- fetchContext.onRead(project)
iri <- expandIri(id, p)
indices <- view match {
case i: IndexingView =>
aclCheck
Expand All @@ -181,7 +186,10 @@ object BlazegraphViewsQuery {
_.index
)
}
qr <- client.query(indices, query, responseType).mapError(WrappedBlazegraphClientError)
qr <- logSlowQueries(
BlazegraphQueryContext(ViewRef.apply(project, iri), query, caller.subject),
client.query(indices, query, responseType).mapError(WrappedBlazegraphClientError)
)
} yield qr

private def toSparqlLinks(sparqlResults: SparqlResults, mappings: ApiMappings, projectBase: ProjectBase)(implicit
Expand All @@ -200,4 +208,6 @@ object BlazegraphViewsQuery {
}
}
}

final case class BlazegraphQueryContext(view: ViewRef, query: SparqlQuery, subject: Subject)
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import scala.concurrent.duration._
* configuration of the query Blazegraph client
* @param queryTimeout
* the Blazegraph query timeout
* @param slowQueries
* configuration of slow queries
* @param eventLog
* configuration of the event log
* @param pagination
Expand All @@ -48,6 +50,7 @@ final case class BlazegraphViewsConfig(
indexingClient: HttpClientConfig,
queryClient: HttpClientConfig,
queryTimeout: Duration,
slowQueries: SlowQueriesConfig,
eventLog: EventLogConfig,
pagination: PaginationConfig,
batch: BatchConfig,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.config

import pureconfig.ConfigReader
import pureconfig.generic.semiauto.deriveReader

import scala.concurrent.duration._

/**
* Configuration for the Blazegraph slow queries logging.
*
* @param slowQueryThreshold
* how long a query takes before it is considered slow
* @param logTtl
* how long to keep logged slow queries
* @param deleteExpiredLogsEvery
* how often to delete expired logs
*/
final case class SlowQueriesConfig(
slowQueryThreshold: Duration,
logTtl: FiniteDuration,
deleteExpiredLogsEvery: FiniteDuration
)

object SlowQueriesConfig {
implicit final val eventLogConfig: ConfigReader[SlowQueriesConfig] =
deriveReader[SlowQueriesConfig]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries

import ch.epfl.bluebrain.nexus.delta.kernel.utils.IOUtils
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.{CompiledProjection, ExecutionStrategy, ProjectionMetadata, Supervisor}
import fs2.Stream
import monix.bio.Task

import scala.concurrent.duration.FiniteDuration

class BlazegraphSlowQueryDeleter(store: BlazegraphSlowQueryStore, deletionThreshold: FiniteDuration) {
def deleteOldQueries: Task[Unit] = {
IOUtils.instant.flatMap { now =>
store.removeQueriesOlderThan(now.minusMillis(deletionThreshold.toMillis))
}
}
}

object BlazegraphSlowQueryDeleter {
private val projectionMetadata: ProjectionMetadata =
ProjectionMetadata("system", "blazegraph-slow-query-log-deletion", None, None)
def start(
supervisor: Supervisor,
store: BlazegraphSlowQueryStore,
deletionThreshold: FiniteDuration,
deletionCheckInterval: FiniteDuration
): Task[BlazegraphSlowQueryDeleter] = {
val runner = new BlazegraphSlowQueryDeleter(store, deletionThreshold)

val continuousStream = Stream
.fixedRate[Task](deletionCheckInterval)
.evalMap(_ => runner.deleteOldQueries)
.drain

val compiledProjection =
CompiledProjection.fromStream(projectionMetadata, ExecutionStrategy.TransientSingleNode, _ => continuousStream)

supervisor
.run(compiledProjection)
.map(_ => runner)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries

import cats.effect.Clock
import ch.epfl.bluebrain.nexus.delta.kernel.utils.IOUtils
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.BlazegraphViewsQuery.BlazegraphQueryContext
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries.model.BlazegraphSlowQuery
import com.typesafe.scalalogging.Logger
import monix.bio.{IO, UIO}

import scala.concurrent.duration.{Duration, FiniteDuration}

/**
* Logs slow queries in order to help us determine problematic queries
*/
trait BlazegraphSlowQueryLogger {

/**
* When a query is slow, record this with context.
*
* @param context
* information about the query which can be used in logs
* @param query
* the query which should be timed, and logged if it is too slow
* @return
* the query
*/
def apply[E, A](context: BlazegraphQueryContext, query: IO[E, A]): IO[E, A]
}

object BlazegraphSlowQueryLogger {

private val logger = Logger[BlazegraphSlowQueryLogger]

def apply(sink: BlazegraphSlowQueryStore, longQueryThreshold: Duration)(implicit
clock: Clock[UIO]
): BlazegraphSlowQueryLogger = new BlazegraphSlowQueryLogger {
def apply[E, A](context: BlazegraphQueryContext, query: IO[E, A]): IO[E, A] = {
query.attempt.timed
.flatMap { case (duration, outcome) =>
UIO
.when(duration >= longQueryThreshold)(logSlowQuery(context, outcome.isLeft, duration))
.flatMap(_ => IO.fromEither(outcome))
}
}

private def logSlowQuery(
context: BlazegraphQueryContext,
isError: Boolean,
duration: FiniteDuration
): UIO[Unit] = {
IOUtils.instant
.tapEval(_ =>
UIO.delay(logger.warn(s"Slow blazegraph query recorded: duration '$duration', view '${context.view}'"))
)
.flatMap { now =>
sink
.save(BlazegraphSlowQuery(context.view, context.query, isError, duration, now, context.subject))
.onErrorHandleWith(e => UIO.delay(logger.error("error logging blazegraph slow query", e)))
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries

import ch.epfl.bluebrain.nexus.delta.kernel.database.Transactors
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.slowqueries.model.BlazegraphSlowQuery
import ch.epfl.bluebrain.nexus.delta.sdk.views.ViewRef
import doobie.implicits._
import doobie.postgres.implicits._
import ch.epfl.bluebrain.nexus.delta.sourcing.implicits._
import io.circe.syntax.EncoderOps
import monix.bio.Task
import ch.epfl.bluebrain.nexus.delta.sourcing.model.Identity.Database._

import java.time.Instant

/**
* Persistence operations for slow query logs
*/
trait BlazegraphSlowQueryStore {
def save(query: BlazegraphSlowQuery): Task[Unit]
def removeQueriesOlderThan(instant: Instant): Task[Unit]
def listForTestingOnly(view: ViewRef): Task[List[BlazegraphSlowQuery]]
}

object BlazegraphSlowQueryStore {
def apply(xas: Transactors): BlazegraphSlowQueryStore = {
new BlazegraphSlowQueryStore {
override def save(query: BlazegraphSlowQuery): Task[Unit] = {
sql""" INSERT INTO blazegraph_queries(project, view_id, instant, duration, subject, query, failed)
| VALUES(${query.view.project}, ${query.view.viewId}, ${query.instant}, ${query.duration}, ${query.subject.asJson}, ${query.query.value}, ${query.failed})
""".stripMargin.update.run
.transact(xas.write)
.void
}

override def listForTestingOnly(view: ViewRef): Task[List[BlazegraphSlowQuery]] = {
sql""" SELECT project, view_id, instant, duration, subject, query, failed FROM public.blazegraph_queries
|WHERE view_id = ${view.viewId} AND project = ${view.project}
""".stripMargin
.query[BlazegraphSlowQuery]
.stream
.transact(xas.read)
.compile
.toList
}

override def removeQueriesOlderThan(instant: Instant): Task[Unit] = {
sql""" DELETE FROM public.blazegraph_queries
|WHERE instant < $instant
""".stripMargin.update.run
.transact(xas.write)
.void
}
}
}
}
Loading

0 comments on commit 898fa65

Please sign in to comment.