Skip to content

Commit

Permalink
ENT-2431 Tidy up buildNamed and CacheFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
rick-r3 authored Oct 18, 2018
1 parent cc75a65 commit 55731ef
Show file tree
Hide file tree
Showing 59 changed files with 268 additions and 196 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import javafx.collections.FXCollections
import javafx.collections.ObservableList
import net.corda.client.jfx.utils.*
import net.corda.core.identity.AnonymousParty
import net.corda.core.internal.buildNamed
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache.MapChange
import java.security.PublicKey
Expand All @@ -32,7 +31,7 @@ class NetworkIdentityModel {
private val rpcProxy by observableValue(NodeMonitorModel::proxyObservable)

private val identityCache = Caffeine.newBuilder()
.buildNamed<PublicKey, ObservableValue<NodeInfo?>>("NetworkIdentityModel_identity", CacheLoader { publicKey: PublicKey ->
.build<PublicKey, ObservableValue<NodeInfo?>>(CacheLoader { publicKey: PublicKey ->
publicKey.let { rpcProxy.map { it?.cordaRPCOps?.nodeInfoFromParty(AnonymousParty(publicKey)) } }
})
val notaries = ChosenList(rpcProxy.map { FXCollections.observableList(it?.cordaRPCOps?.notaryIdentities() ?: emptyList()) }, "notaries")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package net.corda.client.rpc.internal

import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.LoadingCache
import net.corda.core.internal.NamedCacheFactory

class ClientCacheFactory : NamedCacheFactory {
override fun <K, V> buildNamed(caffeine: Caffeine<in K, in V>, name: String): Cache<K, V> {
checkCacheName(name)
return caffeine.build<K, V>()
}

override fun <K, V> buildNamed(caffeine: Caffeine<in K, in V>, name: String, loader: CacheLoader<K, V>): LoadingCache<K, V> {
checkCacheName(name)
return caffeine.build<K, V>(loader)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ class RPCClientProxyHandler(
private val sessionId: Trace.SessionId,
private val externalTrace: Trace?,
private val impersonatedActor: Actor?,
private val targetLegalIdentity: CordaX500Name?
private val targetLegalIdentity: CordaX500Name?,
private val cacheFactory: NamedCacheFactory = ClientCacheFactory()
) : InvocationHandler {

private enum class State {
Expand Down Expand Up @@ -169,8 +170,7 @@ class RPCClientProxyHandler(
}
observablesToReap.locked { observables.add(observableId) }
}
return Caffeine.newBuilder().
weakValues().removalListener(onObservableRemove).executor(SameThreadExecutor.getExecutor()).buildNamed("RpcClientProxyHandler_rpcObservable")
return cacheFactory.buildNamed(Caffeine.newBuilder().weakValues().removalListener(onObservableRemove).executor(SameThreadExecutor.getExecutor()), "RpcClientProxyHandler_rpcObservable")
}

private var sessionFactory: ClientSessionFactory? = null
Expand All @@ -179,7 +179,7 @@ class RPCClientProxyHandler(
private var rpcProducer: ClientProducer? = null
private var rpcConsumer: ClientConsumer? = null

private val deduplicationChecker = DeduplicationChecker(rpcConfiguration.deduplicationCacheExpiry)
private val deduplicationChecker = DeduplicationChecker(rpcConfiguration.deduplicationCacheExpiry, cacheFactory = cacheFactory)
private val deduplicationSequenceNumber = AtomicLong(0)

private val sendingEnabled = AtomicBoolean(true)
Expand Down
35 changes: 13 additions & 22 deletions core/src/main/kotlin/net/corda/core/internal/NamedCache.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,21 @@ import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.LoadingCache

/**
* Restrict the allowed characters of a cache name - this ensures that each cache has a name, and that
* the name can be used to create a file name or a metric name.
* Allow extra functionality to be injected to our caches.
*/
internal fun checkCacheName(name: String) {
require(!name.isBlank())
require(allowedChars.matches(name))
}

private val allowedChars = Regex("^[0-9A-Za-z_.]*\$")
interface NamedCacheFactory {
/**
* Restrict the allowed characters of a cache name - this ensures that each cache has a name, and that
* the name can be used to create a file name or a metric name.
*/
fun checkCacheName(name: String) {
require(!name.isBlank())
require(allowedChars.matches(name))
}

/* buildNamed is the central helper method to build caffeine caches in Corda.
* This allows to easily add tweaks to all caches built in Corda, and also forces
* cache users to give their cache a (meaningful) name that can be used e.g. for
* capturing cache traces etc.
*
* Currently it is not used in this version of CORDA, but there are plans to do so.
*/
fun <K, V> buildNamed(caffeine: Caffeine<in K, in V>, name: String): Cache<K, V>

fun <K, V> Caffeine<in K, in V>.buildNamed(name: String): Cache<K, V> {
checkCacheName(name)
return this.build<K, V>()
fun <K, V> buildNamed(caffeine: Caffeine<in K, in V>, name: String, loader: CacheLoader<K, V>): LoadingCache<K, V>
}

fun <K, V> Caffeine<in K, in V>.buildNamed(name: String, loader: CacheLoader<K, V>): LoadingCache<K, V> {
checkCacheName(name)
return this.build<K, V>(loader)
}
private val allowedChars = Regex("^[0-9A-Za-z_.]*\$")
14 changes: 13 additions & 1 deletion core/src/test/kotlin/net/corda/core/internal/NamedCacheTest.kt
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
package net.corda.core.internal

import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.LoadingCache
import org.junit.Test
import kotlin.test.assertEquals

class NamedCacheTest {
class NamedCacheTest : NamedCacheFactory {
override fun <K, V> buildNamed(caffeine: Caffeine<in K, in V>, name: String): Cache<K, V> {
throw IllegalStateException("Should not be called")
}

override fun <K, V> buildNamed(caffeine: Caffeine<in K, in V>, name: String, loader: CacheLoader<K, V>): LoadingCache<K, V> {
throw IllegalStateException("Should not be called")
}

fun checkNameHelper(name: String, throws: Boolean) {
var exceptionThrown = false
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import net.corda.core.contracts.TimeWindow
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.NotarisationRequestSignature
import net.corda.core.identity.Party
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.notary.NotaryInternalException
import net.corda.core.internal.notary.UniquenessProvider
import net.corda.core.schemas.PersistentStateRef
Expand All @@ -27,7 +28,6 @@ import net.corda.core.utilities.contextLogger
import net.corda.core.utilities.debug
import net.corda.node.services.config.RaftConfig
import net.corda.node.utilities.AppendOnlyPersistentMap
import net.corda.node.utilities.NamedCacheFactory
import net.corda.nodeapi.internal.config.MutualSslConfiguration
import net.corda.nodeapi.internal.persistence.CordaPersistence
import net.corda.nodeapi.internal.persistence.NODE_DATABASE_PREFIX
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import net.corda.core.internal.concurrent.asCordaFuture
import net.corda.core.internal.concurrent.transpose
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.getOrThrow
import net.corda.node.internal.configureDatabase
import net.corda.node.services.schema.NodeSchemaService
import net.corda.testing.internal.TestingNamedCacheFactory
import net.corda.nodeapi.internal.persistence.CordaPersistence
Expand All @@ -23,6 +22,7 @@ import net.corda.testing.core.ALICE_NAME
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.driver.PortAllocation
import net.corda.testing.internal.LogHelper
import net.corda.testing.internal.configureDatabase
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.hamcrest.Matchers.instanceOf
import org.junit.After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@ package net.corda.nodeapi.internal

import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import net.corda.core.internal.buildNamed
import net.corda.core.internal.NamedCacheFactory
import java.time.Duration
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong

/**
* A class allowing the deduplication of a strictly incrementing sequence number.
*/
class DeduplicationChecker(cacheExpiry: Duration, name: String = "DeduplicationChecker") {
class DeduplicationChecker(cacheExpiry: Duration, name: String = "DeduplicationChecker", cacheFactory: NamedCacheFactory) {
// dedupe identity -> watermark cache
private val watermarkCache = Caffeine.newBuilder()
.expireAfterAccess(cacheExpiry.toNanos(), TimeUnit.NANOSECONDS)
.buildNamed("${name}_watermark", WatermarkCacheLoader)
private val watermarkCache = cacheFactory.buildNamed(Caffeine.newBuilder()
.expireAfterAccess(cacheExpiry.toNanos(), TimeUnit.NANOSECONDS), "${name}_watermark", WatermarkCacheLoader)

private object WatermarkCacheLoader : CacheLoader<Any, AtomicLong> {
override fun load(key: Any) = AtomicLong(-1)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package net.corda.nodeapi.internal.persistence

import co.paralleluniverse.strands.Strand
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.schemas.MappedSchema
import net.corda.core.utilities.contextLogger
import rx.Observable
Expand Down Expand Up @@ -52,6 +53,7 @@ class CordaPersistence(
databaseConfig: DatabaseConfig,
schemas: Set<MappedSchema>,
val jdbcUrl: String,
cacheFactory: NamedCacheFactory,
attributeConverters: Collection<AttributeConverter<*, *>> = emptySet()
) : Closeable {
companion object {
Expand All @@ -61,7 +63,7 @@ class CordaPersistence(
private val defaultIsolationLevel = databaseConfig.transactionIsolationLevel
val hibernateConfig: HibernateConfiguration by lazy {
transaction {
HibernateConfiguration(schemas, databaseConfig, attributeConverters, jdbcUrl)
HibernateConfiguration(schemas, databaseConfig, attributeConverters, jdbcUrl, cacheFactory)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package net.corda.nodeapi.internal.persistence

import com.github.benmanes.caffeine.cache.Caffeine
import net.corda.core.internal.buildNamed
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.castIfPossible
import net.corda.core.schemas.MappedSchema
import net.corda.core.utilities.contextLogger
Expand Down Expand Up @@ -31,6 +31,7 @@ class HibernateConfiguration(
private val databaseConfig: DatabaseConfig,
private val attributeConverters: Collection<AttributeConverter<*, *>>,
private val jdbcUrl: String,
cacheFactory: NamedCacheFactory,
val cordappClassLoader: ClassLoader? = null
) {
companion object {
Expand Down Expand Up @@ -58,7 +59,7 @@ class HibernateConfiguration(
}
}

private val sessionFactories = Caffeine.newBuilder().maximumSize(databaseConfig.mappedSchemaCacheSize).buildNamed<Set<MappedSchema>, SessionFactory>("HibernateConfiguration_sessionFactories")
private val sessionFactories = cacheFactory.buildNamed<Set<MappedSchema>, SessionFactory>(Caffeine.newBuilder(), "HibernateConfiguration_sessionFactories")

val sessionFactoryForRegisteredSchemas = schemas.let {
logger.info("Init HibernateConfiguration for schemas: $it")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import net.corda.core.crypto.generateKeyPair
import net.corda.core.internal.div
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.core.utilities.seconds
import net.corda.node.internal.configureDatabase
import net.corda.node.services.config.FlowTimeoutConfiguration
import net.corda.node.services.config.NodeConfiguration
import net.corda.node.services.config.configureWithDevSSLCertificate
Expand All @@ -21,11 +20,12 @@ import net.corda.testing.core.MAX_MESSAGE_SIZE
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.driver.PortAllocation
import net.corda.testing.internal.LogHelper
import net.corda.testing.internal.TestingNamedCacheFactory
import net.corda.testing.internal.configureDatabase
import net.corda.testing.internal.rigorousMock
import net.corda.testing.internal.stubs.CertificateStoreStubs
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.testing.node.internal.MOCK_VERSION_INFO
import net.corda.testing.internal.TestingNamedCacheFactory
import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatThrownBy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package net.corda.node.services.network

import net.corda.core.node.NodeInfo
import net.corda.core.utilities.NetworkHostAndPort
import net.corda.node.internal.configureDatabase
import net.corda.node.internal.schemas.NodeInfoSchemaV1
import net.corda.node.services.identity.InMemoryIdentityService
import net.corda.nodeapi.internal.DEV_ROOT_CA
import net.corda.nodeapi.internal.persistence.DatabaseConfig
import net.corda.testing.core.*
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import net.corda.testing.internal.TestingNamedCacheFactory
import net.corda.testing.internal.configureDatabase
import net.corda.testing.node.MockServices.Companion.makeTestDataSourceProperties
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatIllegalArgumentException
import org.junit.After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import net.corda.nodeapi.internal.config.MutualSslConfiguration
import net.corda.nodeapi.internal.config.User
import net.corda.testing.core.SerializationEnvironmentRule
import net.corda.testing.driver.PortAllocation
import net.corda.testing.internal.TestingNamedCacheFactory
import net.corda.testing.internal.fromUserList
import net.corda.testing.internal.p2pSslOptions
import org.apache.activemq.artemis.api.core.ActiveMQConnectionTimedOutException
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl
Expand Down Expand Up @@ -128,7 +130,7 @@ class ArtemisRpcTests {

private fun <OPS : RPCOps> InternalRPCMessagingClient.start(ops: OPS, securityManager: RPCSecurityManager, brokerControl: ActiveMQServerControl) {
apply {
init(ops, securityManager)
init(ops, securityManager, TestingNamedCacheFactory())
start(brokerControl)
}
}
Expand Down
28 changes: 9 additions & 19 deletions node/src/main/kotlin/net/corda/node/internal/AbstractNode.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import net.corda.core.identity.CordaX500Name
import net.corda.core.identity.Party
import net.corda.core.identity.PartyAndCertificate
import net.corda.core.internal.FlowStateMachine
import net.corda.core.internal.NamedCacheFactory
import net.corda.core.internal.VisibleForTesting
import net.corda.core.internal.concurrent.map
import net.corda.core.internal.concurrent.openFuture
Expand Down Expand Up @@ -115,7 +116,7 @@ import net.corda.core.crypto.generateKeyPair as cryptoGenerateKeyPair
// TODO Log warning if this node is a notary but not one of the ones specified in the network parameters, both for core and custom
abstract class AbstractNode<S>(val configuration: NodeConfiguration,
val platformClock: CordaClock,
cacheFactoryPrototype: NamedCacheFactory,
cacheFactoryPrototype: BindableNamedCacheFactory,
protected val versionInfo: VersionInfo,
protected val serverThread: AffinityExecutor.ServiceAffinityExecutor,
private val busyNodeLatch: ReusableLatch = ReusableLatch()) : SingletonSerializeAsToken() {
Expand Down Expand Up @@ -150,8 +151,8 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
identityService::wellKnownPartyFromX500Name,
identityService::wellKnownPartyFromAnonymous,
schemaService,
configuration.dataSourceProperties
)
configuration.dataSourceProperties,
cacheFactory)
init {
// TODO Break cyclic dependency
identityService.database = database
Expand All @@ -169,7 +170,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
val servicesForResolution = ServicesForResolutionImpl(identityService, attachments, cordappProvider, transactionStorage)
@Suppress("LeakingThis")
val vaultService = makeVaultService(keyManagementService, servicesForResolution, database).tokenize()
val nodeProperties = NodePropertiesPersistentStore(StubbedNodeUniqueIdProvider::value, database)
val nodeProperties = NodePropertiesPersistentStore(StubbedNodeUniqueIdProvider::value, database, cacheFactory)
val flowLogicRefFactory = FlowLogicRefFactoryImpl(cordappLoader.appClassLoader)
val networkMapUpdater = NetworkMapUpdater(
networkMapCache,
Expand All @@ -185,7 +186,7 @@ abstract class AbstractNode<S>(val configuration: NodeConfiguration,
).closeOnStop()
@Suppress("LeakingThis")
val transactionVerifierService = InMemoryTransactionVerifierService(transactionVerifierWorkerCount).tokenize()
val contractUpgradeService = ContractUpgradeServiceImpl().tokenize()
val contractUpgradeService = ContractUpgradeServiceImpl(cacheFactory).tokenize()
val auditService = DummyAuditService().tokenize()
@Suppress("LeakingThis")
protected val network: MessagingService = makeMessagingService().tokenize()
Expand Down Expand Up @@ -999,31 +1000,20 @@ class FlowStarterImpl(private val smm: StateMachineManager, private val flowLogi

class ConfigurationException(message: String) : CordaException(message)

// TODO This is no longer used by AbstractNode and can be moved elsewhere
fun configureDatabase(hikariProperties: Properties,
databaseConfig: DatabaseConfig,
wellKnownPartyFromX500Name: (CordaX500Name) -> Party?,
wellKnownPartyFromAnonymous: (AbstractParty) -> Party?,
schemaService: SchemaService = NodeSchemaService(),
internalSchemas: Set<MappedSchema> = NodeSchemaService().internalSchemas()): CordaPersistence {
val persistence = createCordaPersistence(databaseConfig, wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous, schemaService, hikariProperties)
persistence.startHikariPool(hikariProperties, databaseConfig, internalSchemas)
return persistence
}

fun createCordaPersistence(databaseConfig: DatabaseConfig,
wellKnownPartyFromX500Name: (CordaX500Name) -> Party?,
wellKnownPartyFromAnonymous: (AbstractParty) -> Party?,
schemaService: SchemaService,
hikariProperties: Properties): CordaPersistence {
hikariProperties: Properties,
cacheFactory: NamedCacheFactory): CordaPersistence {
// Register the AbstractPartyDescriptor so Hibernate doesn't warn when encountering AbstractParty. Unfortunately
// Hibernate warns about not being able to find a descriptor if we don't provide one, but won't use it by default
// so we end up providing both descriptor and converter. We should re-examine this in later versions to see if
// either Hibernate can be convinced to stop warning, use the descriptor by default, or something else.
JavaTypeDescriptorRegistry.INSTANCE.addDescriptor(AbstractPartyDescriptor(wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous))
val attributeConverters = listOf(AbstractPartyToX500NameAsStringConverter(wellKnownPartyFromX500Name, wellKnownPartyFromAnonymous))
val jdbcUrl = hikariProperties.getProperty("dataSource.url", "")
return CordaPersistence(databaseConfig, schemaService.schemaOptions.keys, jdbcUrl, attributeConverters)
return CordaPersistence(databaseConfig, schemaService.schemaOptions.keys, jdbcUrl, cacheFactory, attributeConverters)
}

fun CordaPersistence.startHikariPool(hikariProperties: Properties, databaseConfig: DatabaseConfig, schemas: Set<MappedSchema>, metricRegistry: MetricRegistry? = null) {
Expand Down
Loading

0 comments on commit 55731ef

Please sign in to comment.