From 6290f4ff407cc64508736d76bc0b4dd33a958082 Mon Sep 17 00:00:00 2001 From: wellecks Date: Fri, 16 Oct 2015 13:49:32 -0500 Subject: [PATCH 1/2] Adjust execute request handler futures such that the status message is sent after processing. --- .../v5/handler/ExecuteRequestHandler.scala | 3 +- .../handler/ExecuteRequestHandlerSpec.scala | 39 +++++++++++++++++++ 2 files changed, 40 insertions(+), 2 deletions(-) diff --git a/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/handler/ExecuteRequestHandler.scala b/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/handler/ExecuteRequestHandler.scala index 0f4d135c..c3fbcdb7 100644 --- a/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/handler/ExecuteRequestHandler.scala +++ b/kernel/src/main/scala/com/ibm/spark/kernel/protocol/v5/handler/ExecuteRequestHandler.scala @@ -74,7 +74,7 @@ class ExecuteRequestHandler( (executeRequest, km, outputStream) ).mapTo[(ExecuteReply, ExecuteResult)] - executeFuture.onComplete { + executeFuture andThen { case Success(tuple) => val (executeReply, executeResult) = updateCount(tuple, executionCount) @@ -102,7 +102,6 @@ class ExecuteRequestHandler( Option(error.getStackTrace.map(_.toString).toList)) relayErrorMessages(relayActor, replyError, skeletonBuilder) } - executeFuture } def parseErrorHandler(invalid: Seq[(JsPath, Seq[ValidationError])]) = { diff --git a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/ExecuteRequestHandlerSpec.scala b/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/ExecuteRequestHandlerSpec.scala index 409bf214..6cf02ee4 100644 --- a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/ExecuteRequestHandlerSpec.scala +++ b/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/ExecuteRequestHandlerSpec.scala @@ -17,6 +17,7 @@ package com.ibm.spark.kernel.protocol.v5.handler import java.io.OutputStream +import java.util.concurrent.atomic.AtomicInteger import akka.actor._ import akka.testkit.{ImplicitSender, TestKit, TestProbe} @@ -32,6 +33,8 @@ import play.api.libs.json.Json import org.mockito.Mockito._ import org.mockito.Matchers._ import scala.concurrent.duration._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent._ class ExecuteRequestHandlerSpec extends TestKit( ActorSystem("ExecuteRequestHandlerSpec") @@ -142,6 +145,42 @@ class ExecuteRequestHandlerSpec extends TestKit( } } + it("should send a status idle message after the reply and result") { + handlerActor ! MockExecuteRequestKernelMessage + replyToHandlerWithOkAndResult() + + val msgCount = new AtomicInteger(0) + var statusMsgNum = -1 + var statusReceived = false + + val f1 = future { + kernelMessageRelayProbe.fishForMessage(200.milliseconds) { + case KernelMessage(_, _, header, _, _, _) => + if (header.msg_type == ExecuteResult.toTypeString && + !statusReceived) + msgCount.incrementAndGet() + else if (header.msg_type == ExecuteReply.toTypeString && + !statusReceived) + msgCount.incrementAndGet() + statusReceived + } + } + + val f2 = future { + statusDispatchProbe.fishForMessage(200.milliseconds) { + case (status, header) => + if (status == KernelStatusIdle.toString) + statusReceived = true + statusMsgNum = msgCount.get() + statusReceived + } + } + val fs = (f1 zip f2) + Await.ready(fs, 400.milliseconds) + + statusMsgNum should equal(2) + } + it("should send an execute input message") { handlerActor ! MockExecuteRequestKernelMessage kernelMessageRelayProbe.fishForMessage(200.milliseconds) { From c48974ee1abe561d0bd75aebfca33ad730f0d876 Mon Sep 17 00:00:00 2001 From: wellecks Date: Fri, 16 Oct 2015 14:17:36 -0500 Subject: [PATCH 2/2] Adjust test timeout and stopping condition. --- .../v5/handler/ExecuteRequestHandlerSpec.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/ExecuteRequestHandlerSpec.scala b/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/ExecuteRequestHandlerSpec.scala index 6cf02ee4..d48926df 100644 --- a/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/ExecuteRequestHandlerSpec.scala +++ b/kernel/src/test/scala/com/ibm/spark/kernel/protocol/v5/handler/ExecuteRequestHandlerSpec.scala @@ -154,7 +154,7 @@ class ExecuteRequestHandlerSpec extends TestKit( var statusReceived = false val f1 = future { - kernelMessageRelayProbe.fishForMessage(200.milliseconds) { + kernelMessageRelayProbe.fishForMessage(4.seconds) { case KernelMessage(_, _, header, _, _, _) => if (header.msg_type == ExecuteResult.toTypeString && !statusReceived) @@ -162,21 +162,21 @@ class ExecuteRequestHandlerSpec extends TestKit( else if (header.msg_type == ExecuteReply.toTypeString && !statusReceived) msgCount.incrementAndGet() - statusReceived + statusReceived || (msgCount.get() >= 2) } } val f2 = future { - statusDispatchProbe.fishForMessage(200.milliseconds) { + statusDispatchProbe.fishForMessage(4.seconds) { case (status, header) => if (status == KernelStatusIdle.toString) statusReceived = true statusMsgNum = msgCount.get() - statusReceived + statusReceived || (msgCount.get() >= 2) } } val fs = (f1 zip f2) - Await.ready(fs, 400.milliseconds) + Await.ready(fs, 5.seconds) statusMsgNum should equal(2) }