Skip to content

Commit

Permalink
Merge pull request #164 from ibm-et/FixStatusMessageOrder
Browse files Browse the repository at this point in the history
Send the idle status message after sending the execute reply and result.
  • Loading branch information
chipsenkbeil committed Oct 16, 2015
2 parents 8817f8c + c48974e commit 95dd59e
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class ExecuteRequestHandler(
(executeRequest, km, outputStream)
).mapTo[(ExecuteReply, ExecuteResult)]

executeFuture.onComplete {
executeFuture andThen {
case Success(tuple) =>
val (executeReply, executeResult) = updateCount(tuple, executionCount)

Expand Down Expand Up @@ -102,7 +102,6 @@ class ExecuteRequestHandler(
Option(error.getStackTrace.map(_.toString).toList))
relayErrorMessages(relayActor, replyError, skeletonBuilder)
}
executeFuture
}

def parseErrorHandler(invalid: Seq[(JsPath, Seq[ValidationError])]) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.ibm.spark.kernel.protocol.v5.handler

import java.io.OutputStream
import java.util.concurrent.atomic.AtomicInteger

import akka.actor._
import akka.testkit.{ImplicitSender, TestKit, TestProbe}
Expand All @@ -32,6 +33,8 @@ import play.api.libs.json.Json
import org.mockito.Mockito._
import org.mockito.Matchers._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent._

class ExecuteRequestHandlerSpec extends TestKit(
ActorSystem("ExecuteRequestHandlerSpec")
Expand Down Expand Up @@ -142,6 +145,42 @@ class ExecuteRequestHandlerSpec extends TestKit(
}
}

it("should send a status idle message after the reply and result") {
handlerActor ! MockExecuteRequestKernelMessage
replyToHandlerWithOkAndResult()

val msgCount = new AtomicInteger(0)
var statusMsgNum = -1
var statusReceived = false

val f1 = future {
kernelMessageRelayProbe.fishForMessage(4.seconds) {
case KernelMessage(_, _, header, _, _, _) =>
if (header.msg_type == ExecuteResult.toTypeString &&
!statusReceived)
msgCount.incrementAndGet()
else if (header.msg_type == ExecuteReply.toTypeString &&
!statusReceived)
msgCount.incrementAndGet()
statusReceived || (msgCount.get() >= 2)
}
}

val f2 = future {
statusDispatchProbe.fishForMessage(4.seconds) {
case (status, header) =>
if (status == KernelStatusIdle.toString)
statusReceived = true
statusMsgNum = msgCount.get()
statusReceived || (msgCount.get() >= 2)
}
}
val fs = (f1 zip f2)
Await.ready(fs, 5.seconds)

statusMsgNum should equal(2)
}

it("should send an execute input message") {
handlerActor ! MockExecuteRequestKernelMessage
kernelMessageRelayProbe.fishForMessage(200.milliseconds) {
Expand Down

0 comments on commit 95dd59e

Please sign in to comment.