Skip to content

Commit

Permalink
[PIO-90] Improve /batch/events.json endpoint performance
Browse files Browse the repository at this point in the history
Closes apache#386
  • Loading branch information
takezoe authored and marevol committed Jun 12, 2017
1 parent a36fbac commit 4b172f5
Show file tree
Hide file tree
Showing 5 changed files with 215 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -384,50 +384,70 @@ class EventServiceActor(
val appId = authData.appId
val channelId = authData.channelId
val allowedEvents = authData.events
val handleEvent: PartialFunction[Try[Event], Future[Map[String, Any]]] = {
case Success(event) => {
if (allowedEvents.isEmpty || allowedEvents.contains(event.event)) {
pluginContext.inputBlockers.values.foreach(
_.process(EventInfo(
appId = appId,
channelId = channelId,
event = event), pluginContext))
val data = eventClient.futureInsert(event, appId, channelId).map { id =>
pluginsActorRef ! EventInfo(
appId = appId,
channelId = channelId,
event = event)
val status = StatusCodes.Created
val result = Map(
"status" -> status.intValue,
"eventId" -> s"${id}")
if (config.stats) {
statsActorRef ! Bookkeeping(appId, status, event)

entity(as[Seq[Try[Event]]]) { events =>
complete {
if (events.length <= MaxNumberOfEventsPerBatchRequest) {
val eventWithIndex = events.zipWithIndex

val taggedEvents = eventWithIndex.collect { case (Success(event), i) =>
if(allowedEvents.isEmpty || allowedEvents.contains(event.event)){
(Right(event), i)
} else {
(Left(event), i)
}
result
}.recover { case exception =>
}

val insertEvents = taggedEvents.collect { case (Right(event), i) =>
(event, i)
}

insertEvents.foreach { case (event, i) =>
pluginContext.inputBlockers.values.foreach(
_.process(EventInfo(
appId = appId,
channelId = channelId,
event = event), pluginContext))
}

val f: Future[Seq[Map[String, Any]]] = eventClient.futureInsertBatch(
insertEvents.map(_._1), appId, channelId).map { insertResults =>
val results = insertResults.zip(insertEvents).map { case (id, (event, i)) =>
pluginsActorRef ! EventInfo(
appId = appId,
channelId = channelId,
event = event)
val status = StatusCodes.Created
if (config.stats) {
statsActorRef ! Bookkeeping(appId, status, event)
}
(Map(
"status" -> status.intValue,
"eventId" -> s"${id}"), i)
} ++
// Results of denied events
taggedEvents.collect { case (Left(event), i) =>
(Map(
"status" -> StatusCodes.Forbidden.intValue,
"message" -> s"${event.event} events are not allowed"), i)
} ++
// Results of failed to deserialze events
eventWithIndex.collect { case (Failure(exception), i) =>
(Map(
"status" -> StatusCodes.BadRequest.intValue,
"message" -> s"${exception.getMessage()}"), i)
}

// Restore original order
results.sortBy { case (_, i) => i }.map { case (data, _) => data }
}

f.recover { case exception =>
Map(
"status" -> StatusCodes.InternalServerError.intValue,
"message" -> s"${exception.getMessage()}")
}
data
} else {
Future.successful(Map(
"status" -> StatusCodes.Forbidden.intValue,
"message" -> s"${event.event} events are not allowed"))
}
}
case Failure(exception) => {
Future.successful(Map(
"status" -> StatusCodes.BadRequest.intValue,
"message" -> s"${exception.getMessage()}"))
}
}

entity(as[Seq[Try[Event]]]) { events =>
complete {
if (events.length <= MaxNumberOfEventsPerBatchRequest) {
Future.traverse(events)(handleEvent)
} else {
(StatusCodes.BadRequest,
Map("message" -> (s"Batch request must have less than or equal to " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,27 @@ trait LEvents {
def futureInsert(
event: Event, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext): Future[String]

/** :: DeveloperApi ::
* Insert [[Event]]s in a non-blocking fashion.
*
* Default implementation of this method is calling
* [[LEvents.futureInsert(Event, Int, Option[Int])]] per event.
* Override in the storage implementation if the storage has
* a better way to insert multiple data at once.
*
* @param events [[Event]]s to be inserted
* @param appId App ID for the [[Event]]s to be inserted to
* @param channelId Optional channel ID for the [[Event]]s to be inserted to
*/
@DeveloperApi
def futureInsertBatch(events: Seq[Event], appId: Int, channelId: Option[Int])
(implicit ec: ExecutionContext): Future[Seq[String]] = {
val seq = events.map { event =>
futureInsert(event, appId, channelId)
}
Future.sequence(seq)
}

/** :: DeveloperApi ::
* Get an [[Event]] in a non-blocking fashion.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ import java.io.IOException
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

import org.apache.http.entity.ContentType
import org.apache.http.entity.{ContentType, StringEntity}
import org.apache.http.nio.entity.NStringEntity
import org.apache.http.util.EntityUtils
import org.apache.predictionio.data.storage.Event
Expand All @@ -34,13 +33,11 @@ import org.joda.time.DateTime
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization.read
import org.json4s.native.Serialization.write
import org.json4s.ext.JodaTimeSerializers

import grizzled.slf4j.Logging
import org.elasticsearch.client.ResponseException
import org.apache.http.entity.StringEntity
import org.apache.http.message.BasicHeader

class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: String)
extends LEvents with Logging {
Expand Down Expand Up @@ -130,7 +127,7 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
("prId" -> event.prId) ~
("creationTime" -> ESUtils.formatUTCDateTime(event.creationTime)) ~
("properties" -> write(event.properties.toJObject))
val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON);
val entity = new NStringEntity(compact(render(json)), ContentType.APPLICATION_JSON)
val response = restClient.performRequest(
"POST",
s"/$index/$estype/$id",
Expand All @@ -153,6 +150,73 @@ class ESLEvents(val client: ESClient, config: StorageClientConfig, val index: St
}
}

override def futureInsertBatch(
events: Seq[Event],
appId: Int,
channelId: Option[Int])(implicit ec: ExecutionContext): Future[Seq[String]] = {
Future {
val estype = getEsType(appId, channelId)
try {
val ids = events.map { event =>
event.eventId.getOrElse(ESEventsUtil.getBase64UUID)
}

val json = events.zip(ids).map { case (event, id) =>
val commandJson =
("index" -> (
("_index" -> index) ~
("_type" -> estype) ~
("_id" -> id)
))

val documentJson =
("eventId" -> id) ~
("event" -> event.event) ~
("entityType" -> event.entityType) ~
("entityId" -> event.entityId) ~
("targetEntityType" -> event.targetEntityType) ~
("targetEntityId" -> event.targetEntityId) ~
("eventTime" -> ESUtils.formatUTCDateTime(event.eventTime)) ~
("tags" -> event.tags) ~
("prId" -> event.prId) ~
("creationTime" -> ESUtils.formatUTCDateTime(event.creationTime)) ~
("properties" -> write(event.properties.toJObject))

compact(render(commandJson)) + "\n" + compact(render(documentJson))

}.mkString("", "\n", "\n")

val entity = new StringEntity(json)
val response = restClient.performRequest(
"POST",
"/_bulk",
Map("refresh" -> ESUtils.getEventDataRefresh(config)).asJava,
entity,
new BasicHeader("Content-Type", "application/x-ndjson"))

val responseJson = parse(EntityUtils.toString(response.getEntity))
val items = (responseJson \ "items").asInstanceOf[JArray]

items.arr.map { case value: JObject =>
val result = (value \ "index" \ "result").extract[String]
val id = (value \ "index" \ "_id").extract[String]

result match {
case "created" => id
case "updated" => id
case _ =>
error(s"[$result] Failed to update $index/$estype/$id")
""
}
}
} catch {
case e: IOException =>
error(s"Failed to update $index/$estype/<id>", e)
Nil
}
}
}

private def exists(restClient: RestClient, estype: String, id: Int): Boolean = {
try {
restClient.performRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,20 @@ class HBLEvents(val client: HBClient, config: StorageClientConfig, val namespace
}
}

override
def futureInsertBatch(
events: Seq[Event], appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext):
Future[Seq[String]] = {
Future {
val table = getTable(appId, channelId)
val (puts, rowKeys) = events.map { event => HBEventsUtil.eventToPut(event, appId) }.unzip
table.put(puts)
table.flushCommits()
table.close()
rowKeys.map(_.toString)
}
}

override
def futureGet(
eventId: String, appId: Int, channelId: Option[Int])(implicit ec: ExecutionContext):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class JDBCLEvents(
namespace: String) extends LEvents with Logging {
implicit private val formats = org.json4s.DefaultFormats

def init(appId: Int, channelId: Option[Int] = None): Boolean = {
override def init(appId: Int, channelId: Option[Int] = None): Boolean = {

// To use index, it must be varchar less than 255 characters on a VARCHAR column
val useIndex = config.properties.contains("INDEX") &&
Expand Down Expand Up @@ -91,17 +91,17 @@ class JDBCLEvents(
}
}

def remove(appId: Int, channelId: Option[Int] = None): Boolean =
override def remove(appId: Int, channelId: Option[Int] = None): Boolean =
DB autoCommit { implicit session =>
SQL(s"""
drop table ${JDBCUtils.eventTableName(namespace, appId, channelId)}
""").execute().apply()
true
}

def close(): Unit = ConnectionPool.closeAll()
override def close(): Unit = ConnectionPool.closeAll()

def futureInsert(event: Event, appId: Int, channelId: Option[Int])(
override def futureInsert(event: Event, appId: Int, channelId: Option[Int])(
implicit ec: ExecutionContext): Future[String] = Future {
DB localTx { implicit session =>
val id = event.eventId.getOrElse(JDBCUtils.generateId)
Expand All @@ -127,7 +127,52 @@ class JDBCLEvents(
}
}

def futureGet(eventId: String, appId: Int, channelId: Option[Int])(
override def futureInsertBatch(events: Seq[Event], appId: Int, channelId: Option[Int])(
implicit ec: ExecutionContext): Future[Seq[String]] = Future {
DB localTx { implicit session =>
val ids = events.map(_.eventId.getOrElse(JDBCUtils.generateId))
val params = events.zip(ids).map { case (event, id) =>
Seq(
'id -> id,
'event -> event.event,
'entityType -> event.entityType,
'entityId -> event.entityId,
'targetEntityType -> event.targetEntityType,
'targetEntityId -> event.targetEntityId,
'properties -> write(event.properties.toJObject),
'eventTime -> event.eventTime,
'eventTimeZone -> event.eventTime.getZone.getID,
'tags -> (if(event.tags.nonEmpty) Some(event.tags.mkString(",")) else None),
'prId -> event.prId,
'creationTime -> event.creationTime,
'creationTimeZone -> event.creationTime.getZone.getID
)
}

val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
sql"""
insert into $tableName values(
{id},
{event},
{entityType},
{entityId},
{targetEntityType},
{targetEntityId},
{properties},
{eventTime},
{eventTimeZone},
{tags},
{prId},
{creationTime},
{creationTimeZone}
)
""".batchByName(params: _*).apply()

ids
}
}

override def futureGet(eventId: String, appId: Int, channelId: Option[Int])(
implicit ec: ExecutionContext): Future[Option[Event]] = Future {
DB readOnly { implicit session =>
val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
Expand All @@ -152,7 +197,7 @@ class JDBCLEvents(
}
}

def futureDelete(eventId: String, appId: Int, channelId: Option[Int])(
override def futureDelete(eventId: String, appId: Int, channelId: Option[Int])(
implicit ec: ExecutionContext): Future[Boolean] = Future {
DB localTx { implicit session =>
val tableName = sqls.createUnsafely(JDBCUtils.eventTableName(namespace, appId, channelId))
Expand All @@ -163,7 +208,7 @@ class JDBCLEvents(
}
}

def futureFind(
override def futureFind(
appId: Int,
channelId: Option[Int] = None,
startTime: Option[DateTime] = None,
Expand Down

0 comments on commit 4b172f5

Please sign in to comment.