Skip to content

Commit

Permalink
ENT-2414 Named caches (corda#3848)
Browse files Browse the repository at this point in the history
* Add named caches and apply to NonInvalidingUnboundCache and all usages.

* Add named caches and apply to NonInvalidingCache and all usages.

* Add named caches and apply to NonInvalidingWeightBasedCache and all usages.

* Move NamedCache to core/internal

* Remove type `NamedCache` and `NamedLoadingCache`

* Suppressed 'name not used' warning, added comment, and fixed generic parameters on the buildNamed functions.

* Use `buildNamed` in all caffeine instances in production code. Not using it for caches that are created in test code.

* Add checks for the cache name

* Formatting

* Minor code review revisions
  • Loading branch information
blsemo authored Aug 24, 2018
1 parent 042b918 commit bc330bd
Show file tree
Hide file tree
Showing 29 changed files with 130 additions and 31 deletions.
4 changes: 3 additions & 1 deletion .idea/compiler.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import javafx.collections.FXCollections
import javafx.collections.ObservableList
import net.corda.client.jfx.utils.*
import net.corda.core.identity.AnonymousParty
import net.corda.core.internal.buildNamed
import net.corda.core.node.NodeInfo
import net.corda.core.node.services.NetworkMapCache.MapChange
import java.security.PublicKey
Expand All @@ -30,7 +31,7 @@ class NetworkIdentityModel {
private val rpcProxy by observableValue(NodeMonitorModel::proxyObservable)

private val identityCache = Caffeine.newBuilder()
.build<PublicKey, ObservableValue<NodeInfo?>>({ publicKey ->
.buildNamed<PublicKey, ObservableValue<NodeInfo?>>("NetworkIdentityModel_identity", { publicKey ->
publicKey.let { rpcProxy.map { it?.cordaRPCOps?.nodeInfoFromParty(AnonymousParty(publicKey)) } }
})
val notaries = ChosenList(rpcProxy.map { FXCollections.observableList(it?.cordaRPCOps?.notaryIdentities() ?: emptyList()) }, "notaries")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ import net.corda.client.rpc.internal.serialization.amqp.RpcClientObservableDeSer
import net.corda.core.context.Actor
import net.corda.core.context.Trace
import net.corda.core.context.Trace.InvocationId
import net.corda.core.internal.LazyStickyPool
import net.corda.core.internal.LifeCycle
import net.corda.core.internal.ThreadBox
import net.corda.core.internal.times
import net.corda.core.internal.*
import net.corda.core.messaging.RPCOps
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.serialize
Expand Down Expand Up @@ -162,9 +159,7 @@ class RPCClientProxyHandler(
observablesToReap.locked { observables.add(observableId) }
}
return Caffeine.newBuilder().
weakValues().
removalListener(onObservableRemove).executor(SameThreadExecutor.getExecutor()).
build()
weakValues().removalListener(onObservableRemove).executor(SameThreadExecutor.getExecutor()).buildNamed("RpcClientProxyHandler_rpcObservable")
}

private var sessionFactory: ClientSessionFactory? = null
Expand Down
3 changes: 3 additions & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ dependencies {
// Guava: Google utilities library.
testCompile "com.google.guava:guava:$guava_version"

// For caches rather than guava
compile "com.github.ben-manes.caffeine:caffeine:$caffeine_version"

// Smoke tests do NOT have any Node code on the classpath!
smokeTestCompile project(':smoke-test-utils')
smokeTestCompile "org.assertj:assertj-core:${assertj_version}"
Expand Down
41 changes: 41 additions & 0 deletions core/src/main/kotlin/net/corda/core/internal/NamedCache.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package net.corda.core.internal

import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import com.github.benmanes.caffeine.cache.LoadingCache

/**
* Restrict the allowed characters of a cache name - this ensures that each cache has a name, and that
* the name can be used to create a file name or a metric name.
*/
internal fun checkCacheName(name: String) {
require(!name.isBlank())
require(allowedChars.matches(name))
}

private val allowedChars = Regex("^[0-9A-Za-z_.]*\$")

/* buildNamed is the central helper method to build caffeine caches in Corda.
* This allows to easily add tweaks to all caches built in Corda, and also forces
* cache users to give their cache a (meaningful) name that can be used e.g. for
* capturing cache traces etc.
*
* Currently it is not used in this version of CORDA, but there are plans to do so.
*/

fun <K, V> Caffeine<in K, in V>.buildNamed(name: String): Cache<K, V> {
checkCacheName(name)
return this.build<K, V>()
}

fun <K, V> Caffeine<in K, in V>.buildNamed(name: String, loadFunc: (K) -> V): LoadingCache<K, V> {
checkCacheName(name)
return this.build<K, V>(loadFunc)
}


fun <K, V> Caffeine<in K, in V>.buildNamed(name: String, loader: CacheLoader<K, V>): LoadingCache<K, V> {
checkCacheName(name)
return this.build<K, V>(loader)
}
24 changes: 24 additions & 0 deletions core/src/test/kotlin/net/corda/core/internal/NamedCacheTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package net.corda.core.internal

import org.junit.Test
import kotlin.test.assertEquals

class NamedCacheTest {
fun checkNameHelper(name: String, throws: Boolean) {
var exceptionThrown = false
try {
checkCacheName(name)
} catch (e: Exception) {
exceptionThrown = true
}
assertEquals(throws, exceptionThrown)
}

@Test
fun TestCheckCacheName() {
checkNameHelper("abc_123.234", false)
checkNameHelper("", true)
checkNameHelper("abc 123", true)
checkNameHelper("abc/323", true)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@ package net.corda.nodeapi.internal

import com.github.benmanes.caffeine.cache.CacheLoader
import com.github.benmanes.caffeine.cache.Caffeine
import net.corda.core.internal.buildNamed
import java.time.Duration
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong

/**
* A class allowing the deduplication of a strictly incrementing sequence number.
*/
class DeduplicationChecker(cacheExpiry: Duration) {
class DeduplicationChecker(cacheExpiry: Duration, name: String = "DeduplicationChecker") {
// dedupe identity -> watermark cache
private val watermarkCache = Caffeine.newBuilder()
.expireAfterAccess(cacheExpiry.toNanos(), TimeUnit.NANOSECONDS)
.build(WatermarkCacheLoader)
.buildNamed("${name}_watermark", WatermarkCacheLoader)

private object WatermarkCacheLoader : CacheLoader<Any, AtomicLong> {
override fun load(key: Any) = AtomicLong(-1)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package net.corda.nodeapi.internal.persistence

import com.github.benmanes.caffeine.cache.Caffeine
import net.corda.core.internal.buildNamed
import net.corda.core.internal.castIfPossible
import net.corda.core.schemas.MappedSchema
import net.corda.core.utilities.contextLogger
Expand Down Expand Up @@ -57,7 +58,7 @@ class HibernateConfiguration(
}
}

private val sessionFactories = Caffeine.newBuilder().maximumSize(databaseConfig.mappedSchemaCacheSize).build<Set<MappedSchema>, SessionFactory>()
private val sessionFactories = Caffeine.newBuilder().maximumSize(databaseConfig.mappedSchemaCacheSize).buildNamed<Set<MappedSchema>, SessionFactory>("HibernateConfiguration_sessionFactories")

val sessionFactoryForRegisteredSchemas = schemas.let {
logger.info("Init HibernateConfiguration for schemas: $it")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.github.benmanes.caffeine.cache.Cache
import com.github.benmanes.caffeine.cache.Caffeine
import com.google.common.primitives.Ints
import net.corda.core.context.AuthServiceId
import net.corda.core.internal.buildNamed
import net.corda.core.internal.uncheckedCast
import net.corda.core.utilities.loggerFor
import net.corda.node.internal.DataSourceFactory
Expand Down Expand Up @@ -308,7 +309,7 @@ private class CaffeineCacheManager(val maxSize: Long,
return Caffeine.newBuilder()
.expireAfterWrite(timeToLiveSeconds, TimeUnit.SECONDS)
.maximumSize(maxSize)
.build<K, V>()
.buildNamed<K, V>("RPCSecurityManagerShiroCache_$name")
.toShiroCache()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class PersistentIdentityService : SingletonSerializeAsToken(), IdentityServiceIn

fun createPKMap(): AppendOnlyPersistentMap<SecureHash, PartyAndCertificate, PersistentIdentity, String> {
return AppendOnlyPersistentMap(
"PersistentIdentityService_partyByKey",
toPersistentEntityKey = { it.toString() },
fromPersistentEntity = {
Pair(
Expand All @@ -51,6 +52,7 @@ class PersistentIdentityService : SingletonSerializeAsToken(), IdentityServiceIn

fun createX500Map(): AppendOnlyPersistentMap<CordaX500Name, SecureHash, PersistentIdentityNames, String> {
return AppendOnlyPersistentMap(
"PersistentIdentityService_partyByName",
toPersistentEntityKey = { it.toString() },
fromPersistentEntity = { Pair(CordaX500Name.parse(it.name), SecureHash.parse(it.publicKeyHash)) },
toPersistentEntity = { key: CordaX500Name, value: SecureHash ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class PersistentKeyManagementService(val identityService: PersistentIdentityServ
private companion object {
fun createKeyMap(): AppendOnlyPersistentMap<PublicKey, PrivateKey, PersistentKey, String> {
return AppendOnlyPersistentMap(
"PersistentKeyManagementService_keys",
toPersistentEntityKey = { it.toStringShort() },
fromPersistentEntity = { Pair(Crypto.decodePublicKey(it.publicKey), Crypto.decodePrivateKey(
it.privateKey)) },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class P2PMessageDeduplicator(private val database: CordaPersistence) {

private fun createProcessedMessages(): AppendOnlyPersistentMap<DeduplicationId, MessageMeta, ProcessedMessage, String> {
return AppendOnlyPersistentMap(
"P2PMessageDeduplicator_processedMessages",
toPersistentEntityKey = { it.toString },
fromPersistentEntity = { Pair(DeduplicationId(it.id), MessageMeta(it.insertionTime, it.hash, it.seqNo)) },
toPersistentEntity = { key: DeduplicationId, value: MessageMeta ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import net.corda.core.context.Trace
import net.corda.core.context.Trace.InvocationId
import net.corda.core.identity.CordaX500Name
import net.corda.core.internal.LifeCycle
import net.corda.core.internal.buildNamed
import net.corda.core.messaging.RPCOps
import net.corda.core.serialization.SerializationContext
import net.corda.core.serialization.SerializationDefaults
Expand Down Expand Up @@ -153,7 +154,7 @@ class RPCServer(
log.debug { "Unsubscribing from Observable with id $key because of $cause" }
value!!.subscription.unsubscribe()
}
return Caffeine.newBuilder().removalListener(onObservableRemove).executor(SameThreadExecutor.getExecutor()).build()
return Caffeine.newBuilder().removalListener(onObservableRemove).executor(SameThreadExecutor.getExecutor()).buildNamed("RPCServer_observableSubscription")
}

fun start(activeMqServerControl: ActiveMQServerControl) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence,

override fun getNodesByLegalIdentityKey(identityKey: PublicKey): List<NodeInfo> = nodesByKeyCache[identityKey]!!

private val nodesByKeyCache = NonInvalidatingCache<PublicKey, List<NodeInfo>>(1024) { key ->
private val nodesByKeyCache = NonInvalidatingCache<PublicKey, List<NodeInfo>>(
"PersistentNetworkMap_nodesByKey",
1024) { key ->
database.transaction { queryByIdentityKey(session, key) }
}

Expand All @@ -140,7 +142,9 @@ open class PersistentNetworkMapCache(private val database: CordaPersistence,
return identityByLegalNameCache.get(name)!!.orElse(null)
}

private val identityByLegalNameCache = NonInvalidatingCache<CordaX500Name, Optional<PartyAndCertificate>>(1024) { name ->
private val identityByLegalNameCache = NonInvalidatingCache<CordaX500Name, Optional<PartyAndCertificate>>(
"PersistentNetworkMap_idByLegalName",
1024) { name ->
Optional.ofNullable(database.transaction { queryIdentityByLegalName(session, name) })
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class DBTransactionStorage(cacheSizeBytes: Long, private val database: CordaPers
fun createTransactionsMap(maxSizeInBytes: Long)
: AppendOnlyPersistentMapBase<SecureHash, TxCacheValue, DBTransaction, String> {
return WeightBasedAppendOnlyPersistentMap<SecureHash, TxCacheValue, DBTransaction, String>(
name = "DBTransactionStorage_transactions",
toPersistentEntityKey = { it.toString() },
fromPersistentEntity = {
Pair(SecureHash.parse(it.txId),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ class NodeAttachmentService(
// a problem somewhere else or this needs to be revisited.

private val attachmentContentCache = NonInvalidatingWeightBasedCache(
name = "NodeAttachmentService_attachmentContent",
maxWeight = attachmentContentCacheSize,
weigher = Weigher<SecureHash, Optional<Pair<Attachment, ByteArray>>> { key, value -> key.size + if (value.isPresent) value.get().second.size else 0 },
loadFunction = { Optional.ofNullable(loadAttachmentContent(it)) }
Expand All @@ -226,7 +227,9 @@ class NodeAttachmentService(
}
}

private val attachmentCache = NonInvalidatingCache<SecureHash, Optional<Attachment>>(attachmentCacheBound) { key ->
private val attachmentCache = NonInvalidatingCache<SecureHash, Optional<Attachment>>(
"NodeAttachmentService_attachemnt",
attachmentCacheBound) { key ->
Optional.ofNullable(createAttachment(key))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class FlowsDrainingModeOperationsImpl(readPhysicalNodeId: () -> String, private
}

internal val map = PersistentMap(
"FlowDrainingMode_nodeProperties",
{ key -> key },
{ entity -> entity.key to entity.value!! },
NodePropertiesPersistentStore::DBNodeProperty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class BFTNonValidatingNotaryService(

private fun createMap(): AppendOnlyPersistentMap<StateRef, SecureHash, CommittedState, PersistentStateRef> {
return AppendOnlyPersistentMap(
"BFTNonValidatingNotaryService_transactions",
toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) },
fromPersistentEntity = {
//TODO null check will become obsolete after making DB/JPA columns not nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class PersistentUniquenessProvider(val clock: Clock) : UniquenessProvider, Singl
private val log = contextLogger()
fun createMap(): AppendOnlyPersistentMap<StateRef, SecureHash, CommittedState, PersistentStateRef> =
AppendOnlyPersistentMap(
"PersistentUniquenessProvider_transactions",
toPersistentEntityKey = { PersistentStateRef(it.txhash.toString(), it.index) },
fromPersistentEntity = {
//TODO null check will become obsolete after making DB/JPA columns not nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class RaftUniquenessProvider(
private val log = contextLogger()
fun createMap(): AppendOnlyPersistentMap<StateRef, Pair<Long, SecureHash>, CommittedState, PersistentStateRef> =
AppendOnlyPersistentMap(
"RaftUniquenessProvider_transactions",
toPersistentEntityKey = { PersistentStateRef(it) },
fromPersistentEntity = {
val txId = it.id.txId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class ContractUpgradeServiceImpl : ContractUpgradeService, SingletonSerializeAsT
private companion object {
fun createContractUpgradesMap(): PersistentMap<String, String, DBContractUpgrade, String> {
return PersistentMap(
"ContractUpgradeService_upgrades",
toPersistentEntityKey = { it },
fromPersistentEntity = { Pair(it.stateRef, it.upgradedContractClassName ?: "") },
toPersistentEntity = { key: String, value: String ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ abstract class AppendOnlyPersistentMapBase<K, V, E, out EK>(

// Open for tests to override
open class AppendOnlyPersistentMap<K, V, E, out EK>(
name: String,
toPersistentEntityKey: (K) -> EK,
fromPersistentEntity: (E) -> Pair<K, V>,
toPersistentEntity: (key: K, value: V) -> E,
Expand All @@ -321,6 +322,7 @@ open class AppendOnlyPersistentMap<K, V, E, out EK>(
persistentEntityClass) {
//TODO determine cacheBound based on entity class later or with node config allowing tuning, or using some heuristic based on heap size
override val cache = NonInvalidatingCache(
name = name,
bound = cacheBound,
loadFunction = { key: K ->
// This gets called if a value is read and the cache has no Transactional for this key yet.
Expand Down Expand Up @@ -353,6 +355,7 @@ open class AppendOnlyPersistentMap<K, V, E, out EK>(

// Same as above, but with weighted values (e.g. memory footprint sensitive).
class WeightBasedAppendOnlyPersistentMap<K, V, E, out EK>(
name: String,
toPersistentEntityKey: (K) -> EK,
fromPersistentEntity: (E) -> Pair<K, V>,
toPersistentEntity: (key: K, value: V) -> E,
Expand All @@ -365,6 +368,7 @@ class WeightBasedAppendOnlyPersistentMap<K, V, E, out EK>(
toPersistentEntity,
persistentEntityClass) {
override val cache = NonInvalidatingWeightBasedCache(
name,
maxWeight = maxWeight,
weigher = Weigher { key, value -> weighingFunc(key, value) },
loadFunction = { key: K ->
Expand Down
Loading

0 comments on commit bc330bd

Please sign in to comment.