Skip to content

Commit

Permalink
Merge pull request akka#21372 from drewhk/wip-21203-cached-actorref-l…
Browse files Browse the repository at this point in the history
…ookup-drewhk

akka#21203: Cache/memoize ActorRef resolution
  • Loading branch information
drewhk authored Sep 8, 2016
2 parents 9019390 + 11fceb4 commit 06681ae
Show file tree
Hide file tree
Showing 7 changed files with 575 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class CodecBenchmark {

val decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] =
Flow.fromGraph(new Decoder(inboundContext, system.asInstanceOf[ExtendedActorSystem],
resolveActorRefWithLocalAddress, NoInboundCompressions, envelopePool, inboundEnvelopePool))
uniqueLocalAddress, NoInboundCompressions, envelopePool, inboundEnvelopePool))

Source.fromGraph(new BenchTestSourceSameElement(N, "elem"))
.map { _ =>
Expand Down Expand Up @@ -208,7 +208,7 @@ class CodecBenchmark {

val decoder: Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] =
Flow.fromGraph(new Decoder(inboundContext, system.asInstanceOf[ExtendedActorSystem],
resolveActorRefWithLocalAddress, NoInboundCompressions, envelopePool, inboundEnvelopePool))
uniqueLocalAddress, NoInboundCompressions, envelopePool, inboundEnvelopePool))

Source.fromGraph(new BenchTestSourceSameElement(N, "elem"))
.map(msg outboundEnvelopePool.acquire().init(OptionVal.None, payload, OptionVal.Some(remoteRefB)))
Expand Down
78 changes: 78 additions & 0 deletions akka-bench-jmh/src/main/scala/akka/util/LruBoundedCacheBench.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.util

import java.util
import java.util.concurrent.TimeUnit

import akka.remote.artery.LruBoundedCache
import org.openjdk.jmh.annotations.{ Param, _ }

import scala.util.Random

@State(Scope.Benchmark)
@Measurement(timeUnit = TimeUnit.MICROSECONDS)
class LruBoundedCacheBench {

var javaHashMap: java.util.HashMap[String, String] = _

@Param(Array("1024", "8192"))
var count = 0

@Param(Array("128", "256"))
var stringSize = 0
var lruCache: LruBoundedCache[String, String] = _

@Param(Array("90", "99"))
var loadFactor: Int = _

var toAdd: String = _
var toRemove: String = _
var toGet: String = _

@Setup
def setup(): Unit = {
val loadF: Double = loadFactor / 100.0
val threshold = (loadF * count).toInt

val random = Random
javaHashMap = new util.HashMap[String, String](count)
lruCache = new LruBoundedCache[String, String](count, threshold) {
override protected def compute(k: String): String = k
override protected def hash(k: String): Int = k.hashCode
override protected def isCacheable(v: String): Boolean = true
}

// Loading
for (i <- 1 to threshold) {
val value = random.nextString(stringSize)
if (i == 1) toGet = value
toRemove = value
javaHashMap.put(value, value)
lruCache.get(value)
}

toAdd = random.nextString(stringSize)

}

@Benchmark
def addOne_lruCache(): String = {
lruCache.getOrCompute(toAdd)
}

@Benchmark
def addOne_hashMap(): String = {
javaHashMap.put(toAdd, toAdd)
javaHashMap.get(toAdd)
}

@Benchmark
def addOne_hashMap_remove_put_get(): String = {
javaHashMap.remove(toRemove)
javaHashMap.put(toAdd, toAdd)
javaHashMap.get(toAdd)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -914,10 +914,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
}

def createDecoder(compression: InboundCompressions, bufferPool: EnvelopeBufferPool): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = {
val resolveActorRefWithLocalAddress: String InternalActorRef = {
recipient provider.resolveActorRefWithLocalAddress(recipient, localAddress.address)
}
Flow.fromGraph(new Decoder(this, system, resolveActorRefWithLocalAddress, compression, bufferPool,
Flow.fromGraph(new Decoder(this, system, localAddress, compression, bufferPool,
inboundEnvelopePool))
}

Expand Down
22 changes: 20 additions & 2 deletions akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package akka.remote.artery
import java.nio.charset.Charset
import java.nio.{ ByteBuffer, ByteOrder }

import akka.actor.{ ActorRef, Address }
import akka.actor.{ ActorPath, ChildActorPath, ActorRef, Address }
import akka.remote.artery.compress.CompressionProtocol._
import akka.remote.artery.compress.{ CompressionTable, InboundCompressions }
import akka.serialization.Serialization
Expand Down Expand Up @@ -147,13 +147,31 @@ private[remote] sealed trait HeaderBuilder {
def manifest(originUid: Long): String
}

/**
* INTERNAL API
*/
private[remote] final class SerializationFormatCache
extends LruBoundedCache[ActorRef, String](capacity = 1024, evictAgeThreshold = 600) {

override protected def compute(ref: ActorRef): String = ref.path.toSerializationFormat

// Not calling ref.hashCode since it does a path.hashCode if ActorCell.undefinedUid is encountered.
// Refs with ActorCell.undefinedUid will now collide all the time, but this is not a usual scenario anyway.
override protected def hash(ref: ActorRef): Int = ref.path.uid

override protected def isCacheable(v: String): Boolean = true
}

/**
* INTERNAL API
*/
private[remote] final class HeaderBuilderImpl(
inboundCompression: InboundCompressions,
var _outboundActorRefCompression: CompressionTable[ActorRef],
var _outboundClassManifestCompression: CompressionTable[String]) extends HeaderBuilder {

private[this] val toSerializationFormat: SerializationFormatCache = new SerializationFormatCache

// Fields only available for EnvelopeBuffer
var _version: Int = _
var _uid: Long = _
Expand Down Expand Up @@ -215,7 +233,7 @@ private[remote] final class HeaderBuilderImpl(
def setRecipientActorRef(ref: ActorRef): Unit = {
_recipientActorRefIdx = outboundActorRefCompression.compress(ref)
if (_recipientActorRefIdx == -1) {
_recipientActorRef = ref.path.toSerializationFormat
_recipientActorRef = toSerializationFormat.getOrCompute(ref)
}
}
def recipientActorRef(originUid: Long): OptionVal[ActorRef] =
Expand Down
38 changes: 28 additions & 10 deletions akka-remote/src/main/scala/akka/remote/artery/Codecs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package akka.remote.artery
import scala.concurrent.duration._
import scala.util.control.NonFatal
import akka.actor._
import akka.remote.{ MessageSerializer, OversizedPayloadException, UniqueAddress }
import akka.remote.{ MessageSerializer, OversizedPayloadException, RemoteActorRefProvider, UniqueAddress }
import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope
import akka.serialization.{ Serialization, SerializationExtension }
import akka.stream._
Expand All @@ -16,10 +16,12 @@ import akka.actor.EmptyLocalActorRef
import akka.remote.artery.compress.InboundCompressions
import akka.stream.stage.TimerGraphStageLogic
import java.util.concurrent.TimeUnit

import scala.concurrent.Future
import akka.remote.artery.compress.CompressionTable
import akka.Done
import akka.stream.stage.GraphStageWithMaterializedValue

import scala.concurrent.Promise
import java.util.concurrent.atomic.AtomicInteger

Expand Down Expand Up @@ -194,16 +196,30 @@ private[remote] object Decoder {
private object Tick
}

/**
* INTERNAL API
*/
private[akka] final class ActorRefResolveCache(provider: RemoteActorRefProvider, localAddress: UniqueAddress)
extends LruBoundedCache[String, InternalActorRef](capacity = 1024, evictAgeThreshold = 600) {

override protected def compute(k: String): InternalActorRef =
provider.resolveActorRefWithLocalAddress(k, localAddress.address)

override protected def hash(k: String): Int = FastHash.ofString(k)

override protected def isCacheable(v: InternalActorRef): Boolean = !v.isInstanceOf[EmptyLocalActorRef]
}

/**
* INTERNAL API
*/
private[remote] class Decoder(
inboundContext: InboundContext,
system: ExtendedActorSystem,
resolveActorRefWithLocalAddress: String InternalActorRef,
compression: InboundCompressions,
bufferPool: EnvelopeBufferPool,
inEnvelopePool: ObjectPool[ReusableInboundEnvelope]) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] {
inboundContext: InboundContext,
system: ExtendedActorSystem,
uniqueLocalAddress: UniqueAddress,
compression: InboundCompressions,
bufferPool: EnvelopeBufferPool,
inEnvelopePool: ObjectPool[ReusableInboundEnvelope]) extends GraphStage[FlowShape[EnvelopeBuffer, InboundEnvelope]] {
import Decoder.Tick
val in: Inlet[EnvelopeBuffer] = Inlet("Artery.Decoder.in")
val out: Outlet[InboundEnvelope] = Outlet("Artery.Decoder.out")
Expand All @@ -214,6 +230,8 @@ private[remote] class Decoder(
import Decoder.RetryResolveRemoteDeployedRecipient
private val localAddress = inboundContext.localAddress.address
private val headerBuilder = HeaderBuilder.in(compression)
private val actorRefResolver: ActorRefResolveCache =
new ActorRefResolveCache(system.provider.asInstanceOf[RemoteActorRefProvider], uniqueLocalAddress)

private val retryResolveRemoteDeployedRecipientInterval = 50.millis
private val retryResolveRemoteDeployedRecipientAttempts = 20
Expand Down Expand Up @@ -260,7 +278,7 @@ private[remote] class Decoder(
case OptionVal.Some(ref)
OptionVal(ref.asInstanceOf[InternalActorRef])
case OptionVal.None if headerBuilder.senderActorRefPath.isDefined
OptionVal(resolveActorRefWithLocalAddress(headerBuilder.senderActorRefPath.get))
OptionVal(actorRefResolver.getOrCompute(headerBuilder.senderActorRefPath.get))
case _
OptionVal.None
}
Expand Down Expand Up @@ -315,7 +333,7 @@ private[remote] class Decoder(
}

private def resolveRecipient(path: String): OptionVal[InternalActorRef] = {
resolveActorRefWithLocalAddress(path) match {
actorRefResolver.getOrCompute(path) match {
case empty: EmptyLocalActorRef
val pathElements = empty.path.elements
// FIXME remote deployment corner case, please fix @patriknw (see also below, in onTimer)
Expand Down Expand Up @@ -354,7 +372,7 @@ private[remote] class Decoder(
attemptsLeft - 1,
recipientPath, inboundEnvelope), retryResolveRemoteDeployedRecipientInterval)
else {
val recipient = resolveActorRefWithLocalAddress(recipientPath)
val recipient = actorRefResolver.getOrCompute(recipientPath)
// FIXME only retry for the first message, need to keep them in a cache
push(out, inboundEnvelope.withRecipient(recipient))
}
Expand Down
Loading

0 comments on commit 06681ae

Please sign in to comment.