Skip to content

Commit

Permalink
[FLINK-11328] [cep] Snapshots of NFA-related serializers should be a …
Browse files Browse the repository at this point in the history
…CompositeTypeSerializerSnapshot
  • Loading branch information
tzulitai committed Jan 23, 2019
1 parent edf6d59 commit ade21d1
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
Expand Down Expand Up @@ -196,8 +195,6 @@ public static class DeweyNumberSerializer extends TypeSerializerSingleton<DeweyN

private static final long serialVersionUID = -5086792497034943656L;

private final IntSerializer elemSerializer = IntSerializer.INSTANCE;

public static final DeweyNumberSerializer INSTANCE = new DeweyNumberSerializer();

private DeweyNumberSerializer() {}
Expand Down Expand Up @@ -232,7 +229,7 @@ public void serialize(DeweyNumber record, DataOutputView target) throws IOExcept
final int size = record.length();
target.writeInt(size);
for (int i = 0; i < size; i++) {
elemSerializer.serialize(record.deweyNumber[i], target);
target.writeInt(record.deweyNumber[i]);
}
}

Expand All @@ -241,7 +238,7 @@ public DeweyNumber deserialize(DataInputView source) throws IOException {
final int size = source.readInt();
int[] number = new int[size];
for (int i = 0; i < size; i++) {
number[i] = elemSerializer.deserialize(source);
number[i] = source.readInt();
}
return new DeweyNumber(number);
}
Expand All @@ -256,7 +253,7 @@ public void copy(DataInputView source, DataOutputView target) throws IOException
final int size = source.readInt();
target.writeInt(size);
for (int i = 0; i < size; i++) {
elemSerializer.copy(source, target);
target.writeInt(source.readInt());
}
}

Expand All @@ -270,11 +267,6 @@ public boolean canEqual(Object obj) {
return true;
}

@Override
public int hashCode() {
return elemSerializer.hashCode();
}

// -----------------------------------------------------------------------------------

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public int getLength() {
private static final StringSerializer STATE_NAME_SERIALIZER = StringSerializer.INSTANCE;
private static final LongSerializer TIMESTAMP_SERIALIZER = LongSerializer.INSTANCE;
private static final DeweyNumber.DeweyNumberSerializer VERSION_SERIALIZER = DeweyNumber.DeweyNumberSerializer.INSTANCE;
private static final NodeId.NodeIdSerializer NODE_ID_SERIALIZER = NodeId.NodeIdSerializer.INSTANCE;
private static final NodeId.NodeIdSerializer NODE_ID_SERIALIZER = new NodeId.NodeIdSerializer();
private static final EventId.EventIdSerializer EVENT_ID_SERIALIZER = EventId.EventIdSerializer.INSTANCE;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
Expand Down Expand Up @@ -117,19 +115,19 @@ public EventId copy(EventId from, EventId reuse) {

@Override
public int getLength() {
return 2 * LongSerializer.INSTANCE.getLength();
return Integer.BYTES + Long.BYTES;
}

@Override
public void serialize(EventId record, DataOutputView target) throws IOException {
IntSerializer.INSTANCE.serialize(record.id, target);
LongSerializer.INSTANCE.serialize(record.timestamp, target);
target.writeInt(record.id);
target.writeLong(record.timestamp);
}

@Override
public EventId deserialize(DataInputView source) throws IOException {
int id = IntSerializer.INSTANCE.deserialize(source);
long timestamp = LongSerializer.INSTANCE.deserialize(source);
int id = source.readInt();
long timestamp = source.readLong();

return new EventId(id, timestamp);
}
Expand All @@ -141,8 +139,8 @@ public EventId deserialize(EventId reuse, DataInputView source) throws IOExcepti

@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
IntSerializer.INSTANCE.copy(source, target);
LongSerializer.INSTANCE.copy(source, target);
target.writeInt(source.readInt());
target.writeLong(source.readLong());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@

package org.apache.flink.cep.nfa.sharedbuffer;

import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.types.StringValue;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Unique identifier for {@link SharedBufferNode}.
*/
Expand Down Expand Up @@ -81,9 +84,19 @@ public static class NodeIdSerializer extends TypeSerializerSingleton<NodeId> {

private static final long serialVersionUID = 9209498028181378582L;

public static final NodeIdSerializer INSTANCE = new NodeIdSerializer();
/**
* NOTE: this field should actually be final.
* The reason that it isn't final is due to backward compatible deserialization
* paths. See {@link #readObject(ObjectInputStream)}.
*/
private TypeSerializer<EventId> eventIdSerializer;

public NodeIdSerializer() {
this(EventId.EventIdSerializer.INSTANCE);
}

private NodeIdSerializer() {
private NodeIdSerializer(TypeSerializer<EventId> eventIdSerializer) {
this.eventIdSerializer = checkNotNull(eventIdSerializer);
}

@Override
Expand Down Expand Up @@ -115,8 +128,8 @@ public int getLength() {
public void serialize(NodeId record, DataOutputView target) throws IOException {
if (record != null) {
target.writeByte(1);
EventId.EventIdSerializer.INSTANCE.serialize(record.eventId, target);
StringSerializer.INSTANCE.serialize(record.pageName, target);
eventIdSerializer.serialize(record.eventId, target);
StringValue.writeString(record.pageName, target);
} else {
target.writeByte(0);
}
Expand All @@ -129,8 +142,8 @@ public NodeId deserialize(DataInputView source) throws IOException {
return null;
}

EventId eventId = EventId.EventIdSerializer.INSTANCE.deserialize(source);
String pageName = StringSerializer.INSTANCE.deserialize(source);
EventId eventId = eventIdSerializer.deserialize(source);
String pageName = StringValue.readString(source);
return new NodeId(eventId, pageName);
}

Expand All @@ -143,9 +156,8 @@ public NodeId deserialize(NodeId reuse, DataInputView source) throws IOException
public void copy(DataInputView source, DataOutputView target) throws IOException {
target.writeByte(source.readByte());

LongSerializer.INSTANCE.copy(source, target); // eventId
LongSerializer.INSTANCE.copy(source, target); // timestamp
StringSerializer.INSTANCE.copy(source, target); // pageName
eventIdSerializer.copy(source, target);
StringValue.copyString(source, target);
}

@Override
Expand All @@ -157,17 +169,50 @@ public boolean canEqual(Object obj) {

@Override
public TypeSerializerSnapshot<NodeId> snapshotConfiguration() {
return new NodeIdSerializerSnapshot();
return new NodeIdSerializerSnapshot(this);
}

/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
@SuppressWarnings("WeakerAccess")
public static final class NodeIdSerializerSnapshot extends SimpleTypeSerializerSnapshot<NodeId> {
public static final class NodeIdSerializerSnapshot extends CompositeTypeSerializerSnapshot<NodeId, NodeIdSerializer> {

private static final int VERSION = 1;

public NodeIdSerializerSnapshot() {
super(() -> INSTANCE);
super(NodeIdSerializer.class);
}

public NodeIdSerializerSnapshot(NodeIdSerializer nodeIdSerializer) {
super(nodeIdSerializer);
}

@Override
protected int getCurrentOuterSnapshotVersion() {
return VERSION;
}

@Override
protected NodeIdSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
return new NodeIdSerializer((EventId.EventIdSerializer) nestedSerializers[0]);
}

@Override
protected TypeSerializer<?>[] getNestedSerializers(NodeIdSerializer outerSerializer) {
return new TypeSerializer<?>[]{ outerSerializer.eventIdSerializer };
}
}

// ------------------------------------------------------------------------

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();

if (eventIdSerializer == null) {
// the nested serializer will be null if this was read from a savepoint taken with versions
// lower than Flink 1.7; in this case, we explicitly create instance for the nested serializer.
this.eventIdSerializer = EventId.EventIdSerializer.INSTANCE;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public SharedBuffer(KeyedStateStore stateStore, TypeSerializer<V> valueSerialize
this.entries = stateStore.getMapState(
new MapStateDescriptor<>(
entriesStateName,
NodeId.NodeIdSerializer.INSTANCE,
new NodeId.NodeIdSerializer(),
new Lockable.LockableTypeSerializer<>(new SharedBufferNode.SharedBufferNodeSerializer())));

this.eventsCount = stateStore.getMapState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,18 @@

package org.apache.flink.cep.nfa.sharedbuffer;

import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.cep.nfa.DeweyNumber;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;

import java.io.IOException;
import java.io.ObjectInputStream;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Versioned edge in {@link SharedBuffer} that allows retrieving predecessors.
Expand Down Expand Up @@ -67,9 +71,24 @@ public static class SharedBufferEdgeSerializer extends TypeSerializerSingleton<S

private static final long serialVersionUID = -5122474955050663979L;

static final SharedBufferEdgeSerializer INSTANCE = new SharedBufferEdgeSerializer();
/**
* NOTE: these serializer fields should actually be final.
* The reason that it isn't final is due to backward compatible deserialization
* paths. See {@link #readObject(ObjectInputStream)}.
*/
private TypeSerializer<NodeId> nodeIdSerializer;
private TypeSerializer<DeweyNumber> deweyNumberSerializer;

public SharedBufferEdgeSerializer() {
this(new NodeId.NodeIdSerializer(), DeweyNumber.DeweyNumberSerializer.INSTANCE);
}

private SharedBufferEdgeSerializer() {}
private SharedBufferEdgeSerializer(
TypeSerializer<NodeId> nodeIdSerializer,
TypeSerializer<DeweyNumber> deweyNumberSerializer) {
this.nodeIdSerializer = checkNotNull(nodeIdSerializer);
this.deweyNumberSerializer = checkNotNull(deweyNumberSerializer);
}

@Override
public boolean isImmutableType() {
Expand Down Expand Up @@ -98,14 +117,14 @@ public int getLength() {

@Override
public void serialize(SharedBufferEdge record, DataOutputView target) throws IOException {
NodeId.NodeIdSerializer.INSTANCE.serialize(record.target, target);
DeweyNumber.DeweyNumberSerializer.INSTANCE.serialize(record.deweyNumber, target);
nodeIdSerializer.serialize(record.target, target);
deweyNumberSerializer.serialize(record.deweyNumber, target);
}

@Override
public SharedBufferEdge deserialize(DataInputView source) throws IOException {
NodeId target = NodeId.NodeIdSerializer.INSTANCE.deserialize(source);
DeweyNumber deweyNumber = DeweyNumber.DeweyNumberSerializer.INSTANCE.deserialize(source);
NodeId target = nodeIdSerializer.deserialize(source);
DeweyNumber deweyNumber = deweyNumberSerializer.deserialize(source);
return new SharedBufferEdge(target, deweyNumber);
}

Expand All @@ -116,8 +135,8 @@ public SharedBufferEdge deserialize(SharedBufferEdge reuse, DataInputView source

@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
NodeId.NodeIdSerializer.INSTANCE.copy(source, target);
DeweyNumber.DeweyNumberSerializer.INSTANCE.copy(source, target);
nodeIdSerializer.copy(source, target);
deweyNumberSerializer.copy(source, target);
}

@Override
Expand All @@ -129,17 +148,54 @@ public boolean canEqual(Object obj) {

@Override
public TypeSerializerSnapshot<SharedBufferEdge> snapshotConfiguration() {
return new SharedBufferEdgeSerializerSnapshot();
return new SharedBufferEdgeSerializerSnapshot(this);
}

/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
@SuppressWarnings("WeakerAccess")
public static final class SharedBufferEdgeSerializerSnapshot extends SimpleTypeSerializerSnapshot<SharedBufferEdge> {
public static final class SharedBufferEdgeSerializerSnapshot
extends CompositeTypeSerializerSnapshot<SharedBufferEdge, SharedBufferEdgeSerializer> {

private static final int VERSION = 1;

public SharedBufferEdgeSerializerSnapshot() {
super(() -> INSTANCE);
super(SharedBufferEdgeSerializer.class);
}

public SharedBufferEdgeSerializerSnapshot(SharedBufferEdgeSerializer sharedBufferEdgeSerializer) {
super(sharedBufferEdgeSerializer);
}

@Override
protected int getCurrentOuterSnapshotVersion() {
return VERSION;
}

@Override
protected SharedBufferEdgeSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
return new SharedBufferEdgeSerializer(
(NodeId.NodeIdSerializer) nestedSerializers[0],
(DeweyNumber.DeweyNumberSerializer) nestedSerializers[1]);
}

@Override
protected TypeSerializer<?>[] getNestedSerializers(SharedBufferEdgeSerializer outerSerializer) {
return new TypeSerializer<?>[] { outerSerializer.nodeIdSerializer, outerSerializer.deweyNumberSerializer };
}
}

// ------------------------------------------------------------------------

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();

if (nodeIdSerializer == null) {
// the nested serializers will be null if this was read from a savepoint taken with versions
// lower than Flink 1.7; in this case, we explicitly create instances for the nested serializers
this.nodeIdSerializer = new NodeId.NodeIdSerializer();
this.deweyNumberSerializer = DeweyNumber.DeweyNumberSerializer.INSTANCE;
}
}
}
Expand Down
Loading

0 comments on commit ade21d1

Please sign in to comment.