Skip to content

Commit

Permalink
Improve error reporting in case of failure of flaky tests (corda#2828)
Browse files Browse the repository at this point in the history
* Improve error reporting in case of failure of flaky tests

Also eliminate warnings and other minor changes.

* Address code review comments raised by @exFalso

Also improve output for stack traces.
  • Loading branch information
vkolomeyko authored Mar 16, 2018
1 parent 9f80cfa commit a2d65f1
Showing 1 changed file with 36 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import net.corda.nodeapi.RPCApi
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.internal.testThreadFactory
import net.corda.testing.node.internal.*
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration
import org.apache.activemq.artemis.api.core.SimpleString
import org.junit.After
import org.junit.Assert.assertEquals
Expand Down Expand Up @@ -46,15 +47,15 @@ class RPCStabilityTests {
override val protocolVersion = 0
}

private fun waitUntilNumberOfThreadsStable(executorService: ScheduledExecutorService): Int {
val values = ConcurrentLinkedQueue<Int>()
private fun waitUntilNumberOfThreadsStable(executorService: ScheduledExecutorService): Map<Thread, List<StackTraceElement>> {
val values = ConcurrentLinkedQueue<Map<Thread, List<StackTraceElement>>>()
return poll(executorService, "number of threads to become stable", 250.millis) {
values.add(Thread.activeCount())
values.add(Thread.getAllStackTraces().mapValues { it.value.toList() })
if (values.size > 5) {
values.poll()
}
val first = values.peek()
if (values.size == 5 && values.all { it == first }) {
if (values.size == 5 && values.all { it.keys.size == first.keys.size }) {
first
} else {
null
Expand All @@ -64,30 +65,39 @@ class RPCStabilityTests {

@Test
fun `client and server dont leak threads`() {
val executor = Executors.newScheduledThreadPool(1)
fun startAndStop() {
rpcDriver {
val server = startRpcServer<RPCOps>(ops = DummyOps).get()
startRpcClient<RPCOps>(server.broker.hostAndPort!!).get()
}
}
repeat(5) {
startAndStop()
}
val numberOfThreadsBefore = waitUntilNumberOfThreadsStable(executor)
repeat(5) {
startAndStop()

runBlockAndCheckThreads(::startAndStop)
}

private fun runBlockAndCheckThreads(block: () -> Unit) {
val executor = Executors.newScheduledThreadPool(1)

try {
// Warm-up so that all the thread pools & co. created
block()

val threadsBefore = waitUntilNumberOfThreadsStable(executor)
repeat(5) {
block()
}
val threadsAfter = waitUntilNumberOfThreadsStable(executor)
// This is a less than check because threads from other tests may be shutting down while this test is running.
// This is therefore a "best effort" check. When this test is run on its own this should be a strict equality.
// In case of failure we output the threads along with their stacktraces to get an idea what was running at a time.
assert(threadsBefore.keys.size >= threadsAfter.keys.size, { "threadsBefore: $threadsBefore\nthreadsAfter: $threadsAfter" })
} finally {
executor.shutdownNow()
}
val numberOfThreadsAfter = waitUntilNumberOfThreadsStable(executor)
// This is a less than check because threads from other tests may be shutting down while this test is running.
// This is therefore a "best effort" check. When this test is run on its own this should be a strict equality.
assertTrue(numberOfThreadsBefore >= numberOfThreadsAfter)
executor.shutdownNow()
}

@Test
fun `client doesnt leak threads when it fails to start`() {
val executor = Executors.newScheduledThreadPool(1)
fun startAndStop() {
rpcDriver {
Try.on { startRpcClient<RPCOps>(NetworkHostAndPort("localhost", 9999)).get() }
Expand All @@ -100,20 +110,10 @@ class RPCStabilityTests {
}
}
}
repeat(5) {
startAndStop()
}
val numberOfThreadsBefore = waitUntilNumberOfThreadsStable(executor)
repeat(5) {
startAndStop()
}
val numberOfThreadsAfter = waitUntilNumberOfThreadsStable(executor)

assertTrue(numberOfThreadsBefore >= numberOfThreadsAfter)
executor.shutdownNow()
runBlockAndCheckThreads(::startAndStop)
}

fun RpcBrokerHandle.getStats(): Map<String, Any> {
private fun RpcBrokerHandle.getStats(): Map<String, Any> {
return serverControl.run {
mapOf(
"connections" to listConnectionIDs().toSet(),
Expand Down Expand Up @@ -211,14 +211,14 @@ class RPCStabilityTests {
val server = startRpcServer<LeakObservableOps>(ops = leakObservableOpsImpl)
val proxy = startRpcClient<LeakObservableOps>(server.get().broker.hostAndPort!!).get()
// Leak many observables
val N = 200
(1..N).map {
val count = 200
(1..count).map {
pool.fork { proxy.leakObservable(); Unit }
}.transpose().getOrThrow()
// In a loop force GC and check whether the server is notified
while (true) {
System.gc()
if (leakObservableOpsImpl.leakedUnsubscribedCount.get() == N) break
if (leakObservableOpsImpl.leakedUnsubscribedCount.get() == count) break
Thread.sleep(100)
}
}
Expand Down Expand Up @@ -361,11 +361,11 @@ class RPCStabilityTests {
clients[0].destroyForcibly()
pollUntilClientNumber(server, numberOfClients - 1)
// Kill the rest
(1..numberOfClients - 1).forEach {
(1 until numberOfClients).forEach {
clients[it].destroyForcibly()
}
pollUntilClientNumber(server, 0)
// Now poll until the server detects the disconnects and unsubscribes from all obserables.
// Now poll until the server detects the disconnects and un-subscribes from all observables.
pollUntilTrue("number of times subscribe() has been called") { trackSubscriberOpsImpl.subscriberCount.get() == 0 }.get()
}
}
Expand All @@ -391,7 +391,7 @@ class RPCStabilityTests {
// Construct an RPC session manually so that we can hang in the message handler
val myQueue = "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.test.${random63BitValue()}"
val session = startArtemisSession(server.broker.hostAndPort!!)
session.createTemporaryQueue(myQueue, myQueue)
session.createTemporaryQueue(myQueue, ActiveMQDefaultConfiguration.getDefaultRoutingType(), myQueue)
val consumer = session.createConsumer(myQueue, null, -1, -1, false)
consumer.setMessageHandler {
Thread.sleep(50) // 5x slower than the server producer
Expand Down Expand Up @@ -428,7 +428,7 @@ class RPCStabilityTests {
// Construct an RPC client session manually
val myQueue = "${RPCApi.RPC_CLIENT_QUEUE_NAME_PREFIX}.test.${random63BitValue()}"
val session = startArtemisSession(server.broker.hostAndPort!!)
session.createTemporaryQueue(myQueue, myQueue)
session.createTemporaryQueue(myQueue, ActiveMQDefaultConfiguration.getDefaultRoutingType(), myQueue)
val consumer = session.createConsumer(myQueue, null, -1, -1, false)
val replies = ArrayList<Any>()
consumer.setMessageHandler {
Expand Down

0 comments on commit a2d65f1

Please sign in to comment.