Skip to content

Commit

Permalink
[FLINK-2432] Custom serializer support
Browse files Browse the repository at this point in the history
This closes apache#962
  • Loading branch information
zentol committed Nov 13, 2015
1 parent 946e8f6 commit 30647a2
Show file tree
Hide file tree
Showing 12 changed files with 215 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.apache.flink.python.api.streaming.Sender.TYPE_SHORT;
import static org.apache.flink.python.api.streaming.Sender.TYPE_STRING;
import static org.apache.flink.python.api.streaming.Sender.TYPE_TUPLE;
import org.apache.flink.python.api.types.CustomTypeWrapper;
import org.apache.flink.util.Collector;

/**
Expand Down Expand Up @@ -192,7 +193,7 @@ private Object receiveField(boolean normalized) throws IOException {
case TYPE_NULL:
return null;
default:
throw new IllegalArgumentException("Unknown TypeID encountered: " + type);
return new CustomTypeDeserializer(type).deserialize();
}
}

Expand Down Expand Up @@ -245,14 +246,29 @@ private Deserializer<?> getDeserializer(byte type) {
case TYPE_NULL:
return new NullDeserializer();
default:
throw new IllegalArgumentException("Unknown TypeID encountered: " + type);
return new CustomTypeDeserializer(type);

}
}

private interface Deserializer<T> {
public T deserialize();
}

private class CustomTypeDeserializer implements Deserializer<CustomTypeWrapper> {
private final byte type;

public CustomTypeDeserializer(byte type) {
this.type = type;
}

@Override
public CustomTypeWrapper deserialize() {
int size = fileBuffer.getInt();
byte[] data = new byte[size];
fileBuffer.get(data);
return new CustomTypeWrapper(type, data);
}
}

private class BooleanDeserializer implements Deserializer<Boolean> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.api.java.tuple.Tuple;
import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
import static org.apache.flink.python.api.PythonPlanBinder.MAPPED_FILE_SIZE;
import org.apache.flink.python.api.types.CustomTypeWrapper;

/**
* General-purpose class to write data to memory-mapped files.
Expand Down Expand Up @@ -180,7 +181,7 @@ public int sendBuffer(Iterator i, int group) throws IOException {
}

private enum SupportedTypes {
TUPLE, BOOLEAN, BYTE, BYTES, CHARACTER, SHORT, INTEGER, LONG, FLOAT, DOUBLE, STRING, OTHER, NULL
TUPLE, BOOLEAN, BYTE, BYTES, CHARACTER, SHORT, INTEGER, LONG, FLOAT, DOUBLE, STRING, OTHER, NULL, CUSTOMTYPEWRAPPER
}

//=====Serializer===================================================================================================
Expand Down Expand Up @@ -231,6 +232,9 @@ private Serializer getSerializer(Object value) throws IOException {
case NULL:
fileBuffer.put(TYPE_NULL);
return new NullSerializer();
case CUSTOMTYPEWRAPPER:
fileBuffer.put(((CustomTypeWrapper) value).getType());
return new CustomTypeSerializer();
default:
throw new IllegalArgumentException("Unknown Type encountered: " + type);
}
Expand All @@ -253,6 +257,18 @@ public ByteBuffer serialize(T value) {
public abstract void serializeInternal(T value);
}

private class CustomTypeSerializer extends Serializer<CustomTypeWrapper> {
public CustomTypeSerializer() {
super(0);
}
@Override
public void serializeInternal(CustomTypeWrapper value) {
byte[] bytes = value.getData();
buffer = ByteBuffer.wrap(bytes);
buffer.position(bytes.length);
}
}

private class ByteSerializer extends Serializer<Byte> {
public ByteSerializer() {
super(1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.flink.python.api.types;

/**
* Container for serialized python objects, generally assumed to be custom objects.
*/
public class CustomTypeWrapper {
private final byte typeID;
private final byte[] data;

public CustomTypeWrapper(byte typeID, byte[] data) {
this.typeID = typeID;
this.data = data;
}

public byte getType() {
return typeID;
}

public byte[] getData() {
return data;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import sys

from flink.connection.Constants import Types
from flink.plan.Constants import _Dummy

PY2 = sys.version_info[0] == 2
PY3 = sys.version_info[0] == 3
Expand All @@ -30,27 +31,28 @@


class Collector(object):
def __init__(self, con):
def __init__(self, con, env):
self._connection = con
self._serializer = None
self._env = env

def _close(self):
self._connection.send_end_signal()

def collect(self, value):
self._serializer = _get_serializer(self._connection.write, value)
self._serializer = _get_serializer(self._connection.write, value, self._env._types)
self.collect = self._collect
self.collect(value)

def _collect(self, value):
self._connection.write(self._serializer.serialize(value))


def _get_serializer(write, value):
def _get_serializer(write, value, custom_types):
if isinstance(value, (list, tuple)):
write(Types.TYPE_TUPLE)
write(pack(">I", len(value)))
return TupleSerializer(write, value)
return TupleSerializer(write, value, custom_types)
elif value is None:
write(Types.TYPE_NULL)
return NullSerializer()
Expand All @@ -70,12 +72,25 @@ def _get_serializer(write, value):
write(Types.TYPE_DOUBLE)
return FloatSerializer()
else:
for entry in custom_types:
if isinstance(value, entry[1]):
write(entry[0])
return CustomTypeSerializer(entry[2])
raise Exception("Unsupported Type encountered.")


class CustomTypeSerializer(object):
def __init__(self, serializer):
self._serializer = serializer

def serialize(self, value):
msg = self._serializer.serialize(value)
return b"".join([pack(">i",len(msg)), msg])


class TupleSerializer(object):
def __init__(self, write, value):
self.serializer = [_get_serializer(write, field) for field in value]
def __init__(self, write, value, custom_types):
self.serializer = [_get_serializer(write, field, custom_types) for field in value]

def serialize(self, value):
bits = []
Expand Down Expand Up @@ -117,8 +132,9 @@ def serialize(self, value):


class TypedCollector(object):
def __init__(self, con):
def __init__(self, con, env):
self._connection = con
self._env = env

def collect(self, value):
if not isinstance(value, (list, tuple)):
Expand Down Expand Up @@ -153,5 +169,13 @@ def _send_field(self, value):
value = bytes(value)
size = pack(">I", len(value))
self._connection.write(b"".join([Types.TYPE_BYTES, size, value]))
elif isinstance(value, _Dummy):
self._connection.write(pack(">i", 127)[3:])
self._connection.write(pack(">i", 0))
else:
for entry in self._env._types:
if isinstance(value, entry[1]):
self._connection.write(entry[0])
self._connection.write(CustomTypeSerializer(entry[2]).serialize(value))
return
raise Exception("Unsupported Type encountered.")
Loading

0 comments on commit 30647a2

Please sign in to comment.