Skip to content

Commit

Permalink
Serialization tokenization support for large objects (e.g. entries in…
Browse files Browse the repository at this point in the history
… ServiceHub)
  • Loading branch information
rick-r3 committed May 18, 2016
1 parent e8ae3be commit 041aab5
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 1 deletion.
3 changes: 3 additions & 0 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ dependencies {
// Thread safety annotations
compile "com.google.code.findbugs:jsr305:3.0.1"

// AssertJ: for fluent assertions for testing
testCompile "org.assertj:assertj-core:3.4.1"

// SLF4J: Logging framework.
compile "org.slf4j:slf4j-jdk14:1.7.13"

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/kotlin/core/node/ServiceHub.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package core.node

import core.*
import core.contracts.*
import core.crypto.SecureHash
import core.messaging.MessagingService
Expand All @@ -14,6 +13,8 @@ import java.time.Clock
* mocked out. This class is useful to pass to chunks of pluggable code that might have need of many different kinds of
* functionality and you don't want to hard-code which types in the interface.
*
* All services exposed to protocols (public view) need to implement [SerializeAsToken] or similar to avoid being serialized in checkpoints.
*
* TODO: Split into a public (to contracts etc) and private (to node) view
*/
interface ServiceHub {
Expand Down
98 changes: 98 additions & 0 deletions core/src/main/kotlin/core/serialization/SerializationToken.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package core.serialization

import com.esotericsoftware.kryo.DefaultSerializer
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.KryoException
import com.esotericsoftware.kryo.Serializer
import com.esotericsoftware.kryo.io.Input
import com.esotericsoftware.kryo.io.Output
import java.lang.ref.WeakReference
import java.util.*

/**
* The interfaces and classes in this file allow large, singleton style classes to
* mark themselves as needing converting to some form of token representation in the serialised form
* and converting back again when deserialising.
*
* Typically these classes would be used for node services and subsystems that might become reachable from
* Fibers and thus sucked into serialization when they are checkpointed.
*/

/**
* This interface should be implemented by classes that want to substitute a token representation of themselves if
* they are serialized because they have a lot of internal state that does not serialize (well).
*
* This models a similar pattern to the readReplace/writeReplace methods in Java serialization.
*
* With Kryo serialisation, these classes should also annotate themselves with <code>@DefaultSerializer</code>. See below.
*
*/
interface SerializeAsToken {
val token: SerializationToken
}

/**
* This represents a token in the serialized stream for an instance of a type that implements [SerializeAsToken]
*/
interface SerializationToken {
fun fromToken(): Any
}

/**
* A Kryo serializer for [SerializeAsToken] implementations.
*
* Annotate the [SerializeAsToken] with <code>@DefaultSerializer(SerializeAsTokenSerializer::class)</code>
*/
class SerializeAsTokenSerializer<T : SerializeAsToken> : Serializer<T>() {
override fun write(kryo: Kryo, output: Output, obj: T) {
kryo.writeClassAndObject(output, obj.token)
}

override fun read(kryo: Kryo, input: Input, type: Class<T>): T {
val token = (kryo.readClassAndObject(input) as? SerializationToken) ?: throw KryoException("Non-token read for tokenized type: ${type.name}")
val fromToken = token.fromToken()
if (type.isAssignableFrom(fromToken.javaClass)) {
return type.cast(fromToken)
} else {
throw KryoException("Token read did not return tokenized type: ${type.name}")
}
}
}

/**
* A class representing a [SerializationToken] for some object that is not serializable but can be re-created or looked up
* (when deserialized) via a [String] key.
*/
private data class SerializationStringToken(private val key: String, private val className: String) : SerializationToken {

constructor(key: String, toBeProxied: SerializeAsStringToken) : this(key, toBeProxied.javaClass.name) {
tokenized.put(this, WeakReference(toBeProxied))
}

companion object {
val tokenized = Collections.synchronizedMap(WeakHashMap<SerializationStringToken, WeakReference<SerializeAsStringToken>>())
}

override fun fromToken(): Any = tokenized.get(this)?.get() ?:
throw IllegalStateException("Unable to find tokenized instance of ${className} for key $key")
}

/**
* A base class for implementing large objects / components / services that need to serialize themselves to a string token
* to indicate which instance the token is a serialized form of.
*
* This class will also double check that the class is annotated for Kryo serialization. Note it does this on every
* instance constructed but given this is designed to represent heavyweight services or components, this should not be significant.
*/
abstract class SerializeAsStringToken(val key: String) : SerializeAsToken {

init {
// Verify we have the annotation
val annotation = javaClass.getAnnotation(DefaultSerializer::class.java)
if (annotation == null || annotation.value.java.name != SerializeAsTokenSerializer::class.java.name) {
throw IllegalStateException("${this.javaClass.name} is not annotated with @${DefaultSerializer::class.java.simpleName} set to ${SerializeAsTokenSerializer::class.java.simpleName}")
}
}

override val token: SerializationToken = SerializationStringToken(key, this)
}
79 changes: 79 additions & 0 deletions core/src/test/kotlin/core/serialization/SerializationTokenTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package core.serialization

import com.esotericsoftware.kryo.DefaultSerializer
import org.assertj.core.api.Assertions.assertThat
import org.junit.Test
import kotlin.test.assertEquals
import kotlin.test.assertNotEquals

class SerializationTokenTest {

// Large tokenizable object so we can tell from the smaller number of serialized bytes it was actually tokenized
@DefaultSerializer(SerializeAsTokenSerializer::class)
private class LargeTokenizable(size: Int) : SerializeAsStringToken(size.toString()) {
val bytes = OpaqueBytes(ByteArray(size))

override fun hashCode() = bytes.bits.size

override fun equals(other: Any?) = other is LargeTokenizable && other.bytes.bits.size == this.bytes.bits.size
}

@Test
fun `write token and read tokenizable`() {
val numBytes = 1024
val tokenizableBefore = LargeTokenizable(numBytes)
val serializedBytes = tokenizableBefore.serialize()
assertThat(serializedBytes.size).isLessThan(numBytes)
val tokenizableAfter = serializedBytes.deserialize()
assertEquals(tokenizableBefore, tokenizableAfter)
}

@Test
fun `check same sized tokenizable equal`() {
val tokenizableBefore = LargeTokenizable(1024)
val tokenizableAfter = LargeTokenizable(1024)
assertEquals(tokenizableBefore, tokenizableAfter)
}

@Test
fun `check different sized tokenizable not equal`() {
val tokenizableBefore = LargeTokenizable(1024)
val tokenizableAfter = LargeTokenizable(1025)
assertNotEquals(tokenizableBefore, tokenizableAfter)
}

@DefaultSerializer(SerializeAsTokenSerializer::class)
private class IntegerSerializeAsKeyedToken(val value: Int) : SerializeAsStringToken(value.toString())

@Test
fun `write and read keyed`() {
val tokenizableBefore1 = IntegerSerializeAsKeyedToken(123)
val tokenizableBefore2 = IntegerSerializeAsKeyedToken(456)

val serializedBytes1 = tokenizableBefore1.serialize()
val tokenizableAfter1 = serializedBytes1.deserialize()
val serializedBytes2 = tokenizableBefore2.serialize()
val tokenizableAfter2 = serializedBytes2.deserialize()

assertThat(tokenizableAfter1).isSameAs(tokenizableBefore1)
assertThat(tokenizableAfter2).isSameAs(tokenizableBefore2)
}

@DefaultSerializer(SerializeAsTokenSerializer::class)
private class UnitSerializeAsSingletonToken : SerializeAsStringToken("Unit0")

@Test
fun `write and read singleton`() {
val tokenizableBefore = UnitSerializeAsSingletonToken()
val serializedBytes = tokenizableBefore.serialize()
val tokenizableAfter = serializedBytes.deserialize()
assertThat(tokenizableAfter).isSameAs(tokenizableBefore)
}

private class UnannotatedSerializeAsSingletonToken : SerializeAsStringToken("Unannotated0")

@Test(expected = IllegalStateException::class)
fun `unannotated throws`() {
val tokenizableBefore = UnannotatedSerializeAsSingletonToken()
}
}

0 comments on commit 041aab5

Please sign in to comment.