Skip to content

Commit

Permalink
[LIVY-513][LIVY-517][TEST] Fix SessionHeartbeatSpec flakiness, occasi…
Browse files Browse the repository at this point in the history
…onal NPE

## What changes were proposed in this pull request?

This PR aims to fix the following issues in the test suite `SessionHeartbeatSpec`:

 * Since there is no implementation provided for `stop()` in `mock[TestSession]`, Mockito returns `null` by default and this causes `SessionManager#delete` to throw
   `NullPointerException` when executing `map()` on the `Future[Unit]` object it expects.
   This can be observed in the unit test output (`server/target/unit-tests.log`):

```
19/03/25 18:20:30.123 ScalaTest-main-running-SessionHeartbeatSpec INFO SessionHeartbeatSpec$$anonfun$2$TestWatchdog$1: Registering new session 0
19/03/25 18:20:30.123 ScalaTest-main-running-SessionHeartbeatSpec INFO SessionHeartbeatSpec$$anonfun$2$TestWatchdog$1: Registering new session 1
19/03/25 18:20:30.124 ScalaTest-main-running-SessionHeartbeatSpec INFO SessionHeartbeatSpec$$anonfun$2$TestWatchdog$1: Session 0 expired. Last heartbeat is at null.
19/03/25 18:20:30.124 ScalaTest-main-running-SessionHeartbeatSpec WARN SessionHeartbeatSpec$$anonfun$2$TestWatchdog$1: Exception was thrown when deleting expired session 0
java.lang.NullPointerException
        at org.apache.livy.sessions.SessionManager.delete(SessionManager.scala:123)
        at org.apache.livy.server.interactive.SessionHeartbeatWatchdog$$anonfun$deleteExpiredSessions$1.apply(SessionHeartbeat.scala:113)
        at org.apache.livy.server.interactive.SessionHeartbeatWatchdog$$anonfun$deleteExpiredSessions$1.apply(SessionHeartbeat.scala:110)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
        at org.apache.livy.server.interactive.SessionHeartbeatWatchdog$class.deleteExpiredSessions(SessionHeartbeat.scala:110)
        at org.apache.livy.server.interactive.SessionHeartbeatSpec$$anonfun$2$TestWatchdog$1.deleteExpiredSessions(SessionHeartbeatSpec.scala:60)
```

 * In some rare cases, the SessionManager GC thread may run `collectGarbage()` after the test sessions are registered but before the test is finished. The GC thread will also attempt to delete the `mock[TestSession]` objects because the mocked session's `lastActivity()` function returns 0 by default which means`currentTime - session.lastActivity > sessionTimeout` will always be true. This causes a race condition that can have the following outcomes:

     1. The `stop()` method may be called twice (both by `SessionHeartbeatWatchdog#deleteExpiredSessions` and `SessionManager#collectGarbage`), resulting in a `TooManyActualInvocations` exception (LIVY-513):

		```
		  org.mockito.exceptions.verification.TooManyActualInvocations: testSession$1.stop();
		Wanted 1 time:
		-> at org.apache.livy.server.interactive.SessionHeartbeatSpec$$anonfun$2$$anonfun$apply$mcV$sp$5.apply$mcV$sp(SessionHeartbeatSpec.scala:93)
		But was 2 times. Undesired invocation:
		-> at org.apache.livy.sessions.SessionManager.delete(SessionManager.scala:124)
		  at org.apache.livy.server.interactive.SessionHeartbeatSpec$$anonfun$2$$anonfun$apply$mcV$sp$5.apply$mcV$sp(SessionHeartbeatSpec.scala:93)
		  at org.apache.livy.server.interactive.SessionHeartbeatSpec$$anonfun$2$$anonfun$apply$mcV$sp$5.apply(SessionHeartbeatSpec.scala:70)
		  at org.apache.livy.server.interactive.SessionHeartbeatSpec$$anonfun$2$$anonfun$apply$mcV$sp$5.apply(SessionHeartbeatSpec.scala:70)
		  at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
		  at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
		  at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
		  at org.scalatest.Transformer.apply(Transformer.scala:22)
		  at org.scalatest.Transformer.apply(Transformer.scala:20)
		  at org.scalatest.FunSpecLike$$anon$1.apply(FunSpecLike.scala:422)
		  at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
		  ...
		```

	 2. The `stop()` method may be called by `collectGarbage()` after `deleteExpiredSessions()`, but before the test is finished, which causes the following error message to appear on the console (LIVY-517).

		```
		Deleting Mock for TestSession$1, hashCode: 1666638753 because it was inactive for more than 3600000.0 ms.
		Exception in thread "session gc thread" java.lang.NullPointerException
			at org.apache.livy.sessions.SessionManager.delete(SessionManager.scala:124)
			at org.apache.livy.sessions.SessionManager$$anonfun$collectGarbage$2.apply(SessionManager.scala:171)
			at org.apache.livy.sessions.SessionManager$$anonfun$collectGarbage$2.apply(SessionManager.scala:168)
			at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
			at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
			at scala.collection.immutable.List.foreach(List.scala:392)
			at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
			at scala.collection.immutable.List.map(List.scala:296)
			at org.apache.livy.sessions.SessionManager.collectGarbage(SessionManager.scala:168)
			at org.apache.livy.sessions.SessionManager$GarbageCollector.run(SessionManager.scala:201)
		```

These race conditions can be triggered easily by inserting `Thread.sleep(x)` (where x is between 10 to 50) before the loop in `org.apache.livy.sessions.SessionManager.GarbageCollector#run` and rerunning the test.

With this change applied, the SessionManager GC should not attempt to collect the `mock[TestSession]` objects created by the test, so the race condition should be eliminated and the flakiness of the test should be fixed.

## How was this patch tested?

By running the existing UTs.

Author: Gyorgy Gal <[email protected]>

Closes apache#163 from gyogal/LIVY-513.
  • Loading branch information
gyogal authored and Marcelo Vanzin committed Mar 27, 2019
1 parent 1c914a1 commit 8c7f03b
Showing 1 changed file with 5 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.livy.server.interactive

import scala.concurrent.duration._
import scala.concurrent.Future
import scala.language.postfixOps

import org.mockito.Mockito.{never, verify, when}
Expand Down Expand Up @@ -71,11 +72,15 @@ class SessionHeartbeatSpec extends FunSpec with Matchers {
when(expiredSession.id).thenReturn(0)
when(expiredSession.name).thenReturn(None)
when(expiredSession.heartbeatExpired).thenReturn(true)
when(expiredSession.stop()).thenReturn(Future.successful(()))
when(expiredSession.lastActivity).thenReturn(System.nanoTime())

val nonExpiredSession: TestSession = mock[TestSession]
when(nonExpiredSession.id).thenReturn(1)
when(nonExpiredSession.name).thenReturn(None)
when(nonExpiredSession.heartbeatExpired).thenReturn(false)
when(nonExpiredSession.stop()).thenReturn(Future.successful(()))
when(nonExpiredSession.lastActivity).thenReturn(System.nanoTime())

val n = new TestWatchdog(new LivyConf())

Expand Down

0 comments on commit 8c7f03b

Please sign in to comment.