Skip to content

Commit

Permalink
Fix tests that check ports are bound/unbound (corda#756)
Browse files Browse the repository at this point in the history
* Specifically, DriverTests and WebserverDriverTests
* RPCDriver.startRpcBroker now waits for port to be unbound, as was probably intended
* Explicitly drop network map future while ensuring the error is logged
  • Loading branch information
andr3ej authored May 31, 2017
1 parent 39fdb35 commit 4bd38d3
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 12 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ crashlytics-build.properties
docs/virtualenv/

# bft-smart
config/currentView
**/config/currentView

# vim
*.swp
Expand Down
1 change: 1 addition & 0 deletions core/src/main/kotlin/net/corda/core/Utils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ fun <T> ListenableFuture<T>.failure(executor: Executor, body: (Throwable) -> Uni
infix fun <T> ListenableFuture<T>.then(body: () -> Unit): ListenableFuture<T> = apply { then(RunOnCallerThread, body) }
infix fun <T> ListenableFuture<T>.success(body: (T) -> Unit): ListenableFuture<T> = apply { success(RunOnCallerThread, body) }
infix fun <T> ListenableFuture<T>.failure(body: (Throwable) -> Unit): ListenableFuture<T> = apply { failure(RunOnCallerThread, body) }
fun <T> ListenableFuture<T>.andForget(log: Logger) = failure(RunOnCallerThread) { log.error("Background task failed:", it) }
@Suppress("UNCHECKED_CAST") // We need the awkward cast because otherwise F cannot be nullable, even though it's safe.
infix fun <F, T> ListenableFuture<F>.map(mapper: (F) -> T): ListenableFuture<T> = Futures.transform(this, { (mapper as (F?) -> T)(it) })
infix fun <F, T> ListenableFuture<F>.flatMap(mapper: (F) -> ListenableFuture<T>): ListenableFuture<T> = Futures.transformAsync(this) { mapper(it!!) }
Expand Down
23 changes: 22 additions & 1 deletion core/src/test/kotlin/net/corda/core/UtilsTest.kt
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
package net.corda.core

import com.google.common.util.concurrent.MoreExecutors
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.same
import com.nhaarman.mockito_kotlin.verify
import org.assertj.core.api.Assertions.*
import org.junit.Test
import org.mockito.ArgumentMatchers.anyString
import org.slf4j.Logger
import rx.subjects.PublishSubject
import java.util.*
import java.util.concurrent.CancellationException
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

class UtilsTest {
@Test
Expand Down Expand Up @@ -57,4 +65,17 @@ class UtilsTest {
future.get()
}
}
}

@Test
fun `andForget works`() {
val log = mock<Logger>()
val throwable = Exception("Boom")
val executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor())
executor.submit { throw throwable }.andForget(log)
executor.shutdown()
while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
// Do nothing.
}
verify(log).error(anyString(), same(throwable))
}
}
20 changes: 14 additions & 6 deletions node/src/main/kotlin/net/corda/node/driver/Driver.kt
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,11 @@ class ListenProcessDeathException(message: String) : Exception(message)
/**
* @throws ListenProcessDeathException if [listenProcess] dies before the check succeeds, i.e. the check can't succeed as intended.
*/
fun addressMustBeBound(executorService: ScheduledExecutorService, hostAndPort: HostAndPort, listenProcess: Process): ListenableFuture<Unit> {
fun addressMustBeBound(executorService: ScheduledExecutorService, hostAndPort: HostAndPort, listenProcess: Process) {
addressMustBeBoundFuture(executorService, hostAndPort, listenProcess).getOrThrow()
}

fun addressMustBeBoundFuture(executorService: ScheduledExecutorService, hostAndPort: HostAndPort, listenProcess: Process): ListenableFuture<Unit> {
return poll(executorService, "address $hostAndPort to bind") {
if (!listenProcess.isAlive) {
throw ListenProcessDeathException("The process that was expected to listen on $hostAndPort has died with status: ${listenProcess.exitValue()}")
Expand All @@ -275,7 +279,11 @@ fun addressMustBeBound(executorService: ScheduledExecutorService, hostAndPort: H
}
}

fun addressMustNotBeBound(executorService: ScheduledExecutorService, hostAndPort: HostAndPort): ListenableFuture<Unit> {
fun addressMustNotBeBound(executorService: ScheduledExecutorService, hostAndPort: HostAndPort) {
addressMustNotBeBoundFuture(executorService, hostAndPort).getOrThrow()
}

fun addressMustNotBeBoundFuture(executorService: ScheduledExecutorService, hostAndPort: HostAndPort): ListenableFuture<Unit> {
return poll(executorService, "address $hostAndPort to unbind") {
try {
Socket(hostAndPort.host, hostAndPort.port).close()
Expand Down Expand Up @@ -608,7 +616,7 @@ class DriverDSL(
)
_shutdownManager = ShutdownManager(executorService)
if (networkMapStartStrategy.startDedicated) {
startDedicatedNetworkMapService()
startDedicatedNetworkMapService().andForget(log) // Allow it to start concurrently with other nodes.
}
}

Expand All @@ -634,7 +642,7 @@ class DriverDSL(
log.info("Starting network-map-service")
val startNode = startNode(executorService, config.parseAs<FullNodeConfiguration>(), config, quasarJarPath, debugPort, systemProperties)
registerProcess(startNode)
return startNode.flatMap { addressMustBeBound(executorService, dedicatedNetworkMapAddress, it) }
return startNode.flatMap { addressMustBeBoundFuture(executorService, dedicatedNetworkMapAddress, it) }
}

override fun <A> pollUntilNonNull(pollName: String, pollInterval: Duration, warnCount: Int, check: () -> A?): ListenableFuture<A> {
Expand Down Expand Up @@ -693,7 +701,7 @@ class DriverDSL(
errorLogPath = nodeConf.baseDirectory / LOGS_DIRECTORY_NAME / "error.log",
workingDirectory = nodeConf.baseDirectory
)
}.flatMap { process -> addressMustBeBound(executorService, nodeConf.p2pAddress, process).map { process } }
}.flatMap { process -> addressMustBeBoundFuture(executorService, nodeConf.p2pAddress, process).map { process } }
}

private fun startWebserver(
Expand All @@ -713,7 +721,7 @@ class DriverDSL(
),
errorLogPath = Paths.get("error.$className.log")
)
}.flatMap { process -> addressMustBeBound(executorService, handle.webAddress, process).map { process } }
}.flatMap { process -> addressMustBeBoundFuture(executorService, handle.webAddress, process).map { process } }
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion test-utils/src/main/kotlin/net/corda/testing/RPCDriver.kt
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ data class RPCDriverDSL(
server.start()
driverDSL.shutdownManager.registerShutdown {
server.stop()
addressMustNotBeBound(driverDSL.executorService, hostAndPort).get()
addressMustNotBeBound(driverDSL.executorService, hostAndPort)
}
RpcBrokerHandle(
hostAndPort = hostAndPort,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import net.corda.core.node.services.ServiceType
import net.corda.core.utilities.DUMMY_CA
import net.corda.core.utilities.DUMMY_MAP
import net.corda.core.utilities.WHITESPACE
import net.corda.node.driver.addressMustNotBeBound
import net.corda.node.driver.addressMustNotBeBoundFuture
import net.corda.node.internal.Node
import net.corda.node.services.config.ConfigHelper
import net.corda.node.services.config.FullNodeConfiguration
Expand Down Expand Up @@ -62,8 +62,8 @@ abstract class NodeBasedTest {
// Wait until ports are released
val portNotBoundChecks = nodes.flatMap {
listOf(
it.configuration.p2pAddress.let { addressMustNotBeBound(shutdownExecutor, it) },
it.configuration.rpcAddress?.let { addressMustNotBeBound(shutdownExecutor, it) }
it.configuration.p2pAddress.let { addressMustNotBeBoundFuture(shutdownExecutor, it) },
it.configuration.rpcAddress?.let { addressMustNotBeBoundFuture(shutdownExecutor, it) }
)
}.filterNotNull()
nodes.clear()
Expand Down

0 comments on commit 4bd38d3

Please sign in to comment.