Skip to content

Commit

Permalink
CORDA-1124 Fix thread leak in generateAndSaveNodeInfo (corda#2659)
Browse files Browse the repository at this point in the history
  • Loading branch information
andr3ej authored Feb 27, 2018
1 parent 0ff37c0 commit 4d4253a
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 32 deletions.
46 changes: 21 additions & 25 deletions node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt
Original file line number Diff line number Diff line change
Expand Up @@ -175,18 +175,19 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
initCertificate()
val schemaService = NodeSchemaService(cordappLoader.cordappSchemas)
val (identity, identityKeyPair) = obtainIdentity(notaryConfig = null)
return initialiseDatabasePersistence(schemaService, makeIdentityService(identity.certificate)) { database ->
// TODO The fact that we need to specify an empty list of notaries just to generate our node info looks like
// a code smell.
val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList())
persistentNetworkMapCache.start()
val (keyPairs, nodeInfo) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair)
val signedNodeInfo = nodeInfo.sign { publicKey, serialised ->
val privateKey = keyPairs.single { it.public == publicKey }.private
privateKey.sign(serialised.bytes)
return initialiseDatabasePersistence(schemaService, makeIdentityService(identity.certificate)).use {
it.transaction {
// TODO The fact that we need to specify an empty list of notaries just to generate our node info looks like a code smell.
val persistentNetworkMapCache = PersistentNetworkMapCache(database, notaries = emptyList())
persistentNetworkMapCache.start()
val (keyPairs, nodeInfo) = initNodeInfo(persistentNetworkMapCache, identity, identityKeyPair)
val signedNodeInfo = nodeInfo.sign { publicKey, serialised ->
val privateKey = keyPairs.single { it.public == publicKey }.private
privateKey.sign(serialised.bytes)
}
NodeInfoWatcher.saveToFile(configuration.baseDirectory, signedNodeInfo)
nodeInfo
}
NodeInfoWatcher.saveToFile(configuration.baseDirectory, signedNodeInfo)
nodeInfo
}
}

Expand All @@ -204,7 +205,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
"Node's platform version is lower than network's required minimumPlatformVersion"
}
// Do all of this in a database transaction so anything that might need a connection has one.
val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService) { database ->
val (startedImpl, schedulerService) = initialiseDatabasePersistence(schemaService, identityService).transaction {
val networkMapCache = NetworkMapCacheImpl(PersistentNetworkMapCache(database, networkParameters.notaries).start(), identityService)
val (keyPairs, nodeInfo) = initNodeInfo(networkMapCache, identity, identityKeyPair)
identityService.loadIdentities(nodeInfo.legalIdentitiesAndCerts)
Expand Down Expand Up @@ -258,7 +259,7 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
registerCordappFlows(smm)
_services.rpcFlows += cordappLoader.cordapps.flatMap { it.rpcFlows }
startShell(rpcOps)
Pair(StartedNodeImpl(this, _services, nodeInfo, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
Pair(StartedNodeImpl(this@AbstractNode, _services, nodeInfo, checkpointStorage, smm, attachments, network, database, rpcOps, flowStarter, notaryService), schedulerService)
}
networkMapUpdater = NetworkMapUpdater(services.networkMapCache,
NodeInfoWatcher(configuration.baseDirectory, getRxIoScheduler(), Duration.ofMillis(configuration.additionalNodeInfoPollingFrequencyMsec)),
Expand Down Expand Up @@ -630,19 +631,14 @@ abstract class AbstractNode(val configuration: NodeConfiguration,
// Specific class so that MockNode can catch it.
class DatabaseConfigurationException(msg: String) : CordaException(msg)

protected open fun <T> initialiseDatabasePersistence(schemaService: SchemaService, identityService: IdentityService, insideTransaction: (CordaPersistence) -> T): T {
protected open fun initialiseDatabasePersistence(schemaService: SchemaService, identityService: IdentityService): CordaPersistence {
val props = configuration.dataSourceProperties
if (props.isNotEmpty()) {
val database = configureDatabase(props, configuration.database, identityService, schemaService)
// Now log the vendor string as this will also cause a connection to be tested eagerly.
logVendorString(database, log)
runOnStop += database::close
return database.transaction {
insideTransaction(database)
}
} else {
throw DatabaseConfigurationException("There must be a database configured.")
}
if (props.isEmpty()) throw DatabaseConfigurationException("There must be a database configured.")
val database = configureDatabase(props, configuration.database, identityService, schemaService)
// Now log the vendor string as this will also cause a connection to be tested eagerly.
logVendorString(database, log)
runOnStop += database::close
return database
}

private fun makeNotaryService(tokenizableServices: MutableList<Any>, database: CordaPersistence): NotaryService? {
Expand Down
4 changes: 2 additions & 2 deletions node/src/main/kotlin/net/corda/node/internal/Node.kt
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ open class Node(configuration: NodeConfiguration,
* This is not using the H2 "automatic mixed mode" directly but leans on many of the underpinnings. For more details
* on H2 URLs and configuration see: http://www.h2database.com/html/features.html#database_url
*/
override fun <T> initialiseDatabasePersistence(schemaService: SchemaService, identityService: IdentityService, insideTransaction: (CordaPersistence) -> T): T {
override fun initialiseDatabasePersistence(schemaService: SchemaService, identityService: IdentityService): CordaPersistence {
val databaseUrl = configuration.dataSourceProperties.getProperty("dataSource.url")
val h2Prefix = "jdbc:h2:file:"
if (databaseUrl != null && databaseUrl.startsWith(h2Prefix)) {
Expand All @@ -330,7 +330,7 @@ open class Node(configuration: NodeConfiguration,
printBasicNodeInfo("Database connection url is", "jdbc:h2:$url/node")
}
}
return super.initialiseDatabasePersistence(schemaService, identityService, insideTransaction)
return super.initialiseDatabasePersistence(schemaService, identityService)
}

private val _startupComplete = openFuture<Unit>()
Expand Down
81 changes: 81 additions & 0 deletions node/src/test/kotlin/net/corda/node/internal/NodeTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package net.corda.node.internal

import com.nhaarman.mockito_kotlin.doReturn
import com.nhaarman.mockito_kotlin.whenever
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.readObject
import net.corda.core.node.NodeInfo
import net.corda.core.serialization.deserialize
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.VersionInfo
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.network.PersistentNetworkMapCache
import net.corda.nodeapi.internal.SignedNodeInfo
import net.corda.nodeapi.internal.network.NodeInfoFilesCopier.Companion.NODE_INFO_FILE_NAME_PREFIX
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.internal.rigorousMock
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.junit.Rule
import org.junit.Test
import org.junit.rules.TemporaryFolder
import java.nio.file.Files
import kotlin.test.assertEquals
import kotlin.test.assertNotEquals
import kotlin.test.assertNull

class NodeTest {
private abstract class AbstractNodeConfiguration : NodeConfiguration

@Rule
@JvmField
val temporaryFolder = TemporaryFolder()
@Rule
@JvmField
val testSerialization = SerializationEnvironmentRule()

private fun nodeInfoFile() = temporaryFolder.root.listFiles().singleOrNull { it.name.startsWith(NODE_INFO_FILE_NAME_PREFIX) }
private fun AbstractNode.generateNodeInfo(): NodeInfo {
assertNull(nodeInfoFile())
generateAndSaveNodeInfo()
val path = nodeInfoFile()!!.toPath()
val nodeInfo = path.readObject<SignedNodeInfo>().raw.deserialize()
Files.delete(path)
return nodeInfo
}

@Test
fun `generateAndSaveNodeInfo works`() {
val nodeAddress = NetworkHostAndPort("0.1.2.3", 456)
val nodeName = CordaX500Name("Manx Blockchain Corp", "Douglas", "IM")
val platformVersion = 789
val dataSourceProperties = makeTestDataSourceProperties()
val databaseConfig = DatabaseConfig()
val configuration = rigorousMock<AbstractNodeConfiguration>().also {
doReturn(nodeAddress).whenever(it).p2pAddress
doReturn(nodeName).whenever(it).myLegalName
doReturn(null).whenever(it).notary // Don't add notary identity.
doReturn(dataSourceProperties).whenever(it).dataSourceProperties
doReturn(databaseConfig).whenever(it).database
doReturn(temporaryFolder.root.toPath()).whenever(it).baseDirectory
doReturn(true).whenever(it).devMode // Needed for identity cert.
doReturn("tsp").whenever(it).trustStorePassword
doReturn("ksp").whenever(it).keyStorePassword
}
configureDatabase(dataSourceProperties, databaseConfig, rigorousMock()).use { database ->
val node = Node(configuration, rigorousMock<VersionInfo>().also {
doReturn(platformVersion).whenever(it).platformVersion
}, initialiseSerialization = false)
val nodeInfo = node.generateNodeInfo()
assertEquals(listOf(nodeAddress), nodeInfo.addresses)
assertEquals(listOf(nodeName), nodeInfo.legalIdentitiesAndCerts.map { it.name })
assertEquals(platformVersion, nodeInfo.platformVersion)
node.generateNodeInfo().let {
assertNotEquals(nodeInfo, it) // Different serial.
assertEquals(nodeInfo, it.copy(serial = nodeInfo.serial))
}
PersistentNetworkMapCache(database, emptyList()).addNode(nodeInfo)
assertEquals(nodeInfo, node.generateNodeInfo())
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,11 +297,8 @@ open class InternalMockNetwork(private val cordappPackages: List<String>,
override val serializationWhitelists: List<SerializationWhitelist>
get() = _serializationWhitelists
private var dbCloser: (() -> Any?)? = null
override fun <T> initialiseDatabasePersistence(schemaService: SchemaService, identityService: IdentityService, insideTransaction: (CordaPersistence) -> T): T {
return super.initialiseDatabasePersistence(schemaService, identityService) { database ->
dbCloser = database::close
insideTransaction(database)
}
override fun initialiseDatabasePersistence(schemaService: SchemaService, identityService: IdentityService): CordaPersistence {
return super.initialiseDatabasePersistence(schemaService, identityService).also { dbCloser = it::close }
}

fun disableDBCloseOnStop() {
Expand Down

0 comments on commit 4d4253a

Please sign in to comment.