Skip to content

Commit

Permalink
[FLINK-3303] [core] Move all type utilities to flink-core
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Feb 2, 2016
1 parent 7081836 commit 21a7158
Show file tree
Hide file tree
Showing 168 changed files with 503 additions and 612 deletions.
47 changes: 41 additions & 6 deletions flink-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,6 @@ under the License.
<artifactId>flink-annotations</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>${shading-artifact.name}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>commons-collections</groupId>
Expand All @@ -56,20 +50,61 @@ under the License.
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
<!-- managed version -->
</dependency>

<!-- Avro is needed for the interoperability with Avro types for serialization -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<!-- managed version -->
<exclusions>
<exclusion>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- Hadoop is only needed here for serialization interoperability with the Writable type -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>${shading-artifact.name}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>

<!-- test depedencies -->

<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.5</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.joda</groupId>
<artifactId>joda-convert</artifactId>
<version>1.7</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,20 @@
* limitations under the License.
*/

package org.apache.flink.api.java.operators;
package org.apache.flink.api.common.operators;

import java.util.ArrayList;
import java.util.List;

import com.google.common.base.Joiner;

import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.CompositeType.FlatFieldDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
import org.apache.flink.api.java.operators.translation.TwoKeyExtractingMapper;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;

import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -177,87 +167,6 @@ public <E> void validateCustomPartitioner(Partitioner<E> partitioner, TypeInform
}
}

@SuppressWarnings("unchecked")
public static <T, K> Operator<Tuple2<K, T>> appendKeyExtractor(
Operator<T> input,
SelectorFunctionKeys<T, K> key)
{

TypeInformation<T> inputType = key.getInputType();
TypeInformation<Tuple2<K, T>> typeInfoWithKey = createTypeWithKey(key);
KeyExtractingMapper<T, K> extractor = new KeyExtractingMapper(key.getKeyExtractor());

MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>> mapper =
new MapOperatorBase<T, Tuple2<K, T>, MapFunction<T, Tuple2<K, T>>>(
extractor,
new UnaryOperatorInformation(inputType, typeInfoWithKey),
"Key Extractor"
);

mapper.setInput(input);
mapper.setParallelism(input.getParallelism());

return mapper;
}

@SuppressWarnings("unchecked")
public static <T, K1, K2> Operator<Tuple3<K1, K2, T>> appendKeyExtractor(
Operator<T> input,
SelectorFunctionKeys<T, K1> key1,
SelectorFunctionKeys<T, K2> key2)
{

TypeInformation<T> inputType = key1.getInputType();
TypeInformation<Tuple3<K1, K2, T>> typeInfoWithKey = createTypeWithKey(key1, key2);
TwoKeyExtractingMapper<T, K1, K2> extractor =
new TwoKeyExtractingMapper<>(key1.getKeyExtractor(), key2.getKeyExtractor());

MapOperatorBase<T, Tuple3<K1, K2, T>, MapFunction<T, Tuple3<K1, K2, T>>> mapper =
new MapOperatorBase<T, Tuple3<K1, K2, T>, MapFunction<T, Tuple3<K1, K2, T>>>(
extractor,
new UnaryOperatorInformation<>(inputType, typeInfoWithKey),
"Key Extractor"
);

mapper.setInput(input);
mapper.setParallelism(input.getParallelism());

return mapper;
}

public static <T, K> org.apache.flink.api.common.operators.SingleInputOperator<?, T, ?> appendKeyRemover(
Operator<Tuple2<K, T>> inputWithKey,
SelectorFunctionKeys<T, K> key)
{

TypeInformation<T> inputType = key.getInputType();
TypeInformation<Tuple2<K, T>> typeInfoWithKey = createTypeWithKey(key);

MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>> mapper =
new MapOperatorBase<Tuple2<K, T>, T, MapFunction<Tuple2<K, T>, T>>(
new KeyRemovingMapper<T, K>(),
new UnaryOperatorInformation<>(typeInfoWithKey, inputType),
"Key Remover"
);
mapper.setInput(inputWithKey);
mapper.setParallelism(inputWithKey.getParallelism());

return mapper;
}

public static <T, K> TypeInformation<Tuple2<K, T>> createTypeWithKey(
SelectorFunctionKeys<T, K> key)
{
return new TupleTypeInfo<>(key.getKeyType(), key.getInputType());
}

public static <T, K1, K2> TypeInformation<Tuple3<K1, K2, T>> createTypeWithKey(
SelectorFunctionKeys<T, K1> key1,
SelectorFunctionKeys<T, K2> key2)
{
return new TupleTypeInfo<>(key1.getKeyType(), key2.getKeyType(), key1.getInputType());
}

@Override
public String toString() {
return "Key function (Type: " + keyType + ")";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
import org.apache.flink.types.Either;

/**
* A {@link TypeInformation} for the {@link Either} type of the Java API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@
import java.util.regex.Pattern;

import com.google.common.base.Preconditions;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.AvroSerializer;
import org.apache.flink.api.java.typeutils.runtime.PojoComparator;
import org.apache.flink.api.java.typeutils.runtime.PojoSerializer;
import org.apache.flink.api.java.operators.Keys.ExpressionKeys;

import com.google.common.base.Joiner;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* This interface can be implemented by functions and input formats to tell the framework
* about their produced data type. This method acts as an alternative to the reflection analysis
* that is otherwise performed and is useful in situations where the produced data type may vary
* depending on parameterization.
* depending on parametrization.
*/
public interface ResultTypeQueryable<T> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
import java.util.regex.Pattern;

import com.google.common.base.Preconditions;

import org.apache.flink.api.common.operators.Keys.ExpressionKeys;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.operators.Keys.ExpressionKeys;

public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {

Expand All @@ -35,7 +36,7 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
private final static String REGEX_FIELD = "(f?)([0-9]+)";
private final static String REGEX_NESTED_FIELDS = "("+REGEX_FIELD+")(\\.(.+))?";
private final static String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS
+"|\\"+ExpressionKeys.SELECT_ALL_CHAR
+"|\\"+ ExpressionKeys.SELECT_ALL_CHAR
+"|\\"+ExpressionKeys.SELECT_ALL_CHAR_SCALA;

private static final Pattern PATTERN_FIELD = Pattern.compile(REGEX_FIELD);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import java.util.List;

import org.apache.avro.specific.SpecificRecordBase;

import org.apache.commons.lang3.ClassUtils;

import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
Expand All @@ -54,9 +56,12 @@
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.types.Either;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;

import org.apache.hadoop.io.Writable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

package org.apache.flink.api.java.typeutils.runtime;

import static org.apache.flink.api.java.typeutils.Either.Left;
import static org.apache.flink.api.java.typeutils.Either.Right;
import static org.apache.flink.types.Either.Left;
import static org.apache.flink.types.Either.Right;

import java.io.IOException;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.Either;
import org.apache.flink.types.Either;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.CollectionSerializer;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.specific.SpecificRecordBase;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
Expand Down Expand Up @@ -64,7 +65,7 @@ public static void recursivelyRegisterType(TypeInformation<?> typeInfo, Executio
}
else if (typeInfo instanceof CompositeType) {
List<GenericTypeInfo<?>> genericTypesInComposite = new ArrayList<>();
Utils.getContainedGenericTypes((CompositeType<?>)typeInfo, genericTypesInComposite);
getContainedGenericTypes((CompositeType<?>)typeInfo, genericTypesInComposite);
for (GenericTypeInfo<?> gt : genericTypesInComposite) {
Serializers.recursivelyRegisterType(gt.getTypeClass(), config, alreadySeen);
}
Expand Down Expand Up @@ -127,41 +128,38 @@ else if (fieldType instanceof Class) {
}
}

private static void checkAndAddSerializerForTypeAvro(ExecutionConfig reg, Class<?> type) {
if (GenericData.Record.class.isAssignableFrom(type)) {
registerGenericAvro(reg);
}
if (SpecificRecordBase.class.isAssignableFrom(type)) {
@SuppressWarnings("unchecked")
Class<? extends SpecificRecordBase> specRecordClass = (Class<? extends SpecificRecordBase>) type;
registerSpecificAvro(reg, specRecordClass);
}
}

/**
* Register these serializers for using Avro's {@link GenericData.Record} and classes
* implementing {@link org.apache.avro.specific.SpecificRecordBase}
* Returns all GenericTypeInfos contained in a composite type.
*
* @param typeInfo {@link CompositeType}
*/
private static void registerGenericAvro(ExecutionConfig reg) {
// Avro POJOs contain java.util.List which have GenericData.Array as their runtime type
// because Kryo is not able to serialize them properly, we use this serializer for them
reg.registerTypeWithKryoSerializer(GenericData.Array.class, SpecificInstanceCollectionSerializerForArrayList.class);

// We register this serializer for users who want to use untyped Avro records (GenericData.Record).
// Kryo is able to serialize everything in there, except for the Schema.
// This serializer is very slow, but using the GenericData.Records of Kryo is in general a bad idea.
// we add the serializer as a default serializer because Avro is using a private sub-type at runtime.
reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class);
private static void getContainedGenericTypes(CompositeType<?> typeInfo, List<GenericTypeInfo<?>> target) {
for (int i = 0; i < typeInfo.getArity(); i++) {
TypeInformation<?> type = typeInfo.getTypeAt(i);
if (type instanceof CompositeType) {
getContainedGenericTypes((CompositeType<?>) type, target);
} else if (type instanceof GenericTypeInfo) {
if (!target.contains(type)) {
target.add((GenericTypeInfo<?>) type);
}
}
}
}

// ------------------------------------------------------------------------

private static void checkAndAddSerializerForTypeAvro(ExecutionConfig reg, Class<?> type) {
if (GenericData.Record.class.isAssignableFrom(type) || SpecificRecordBase.class.isAssignableFrom(type)) {
// Avro POJOs contain java.util.List which have GenericData.Array as their runtime type
// because Kryo is not able to serialize them properly, we use this serializer for them
reg.registerTypeWithKryoSerializer(GenericData.Array.class, SpecificInstanceCollectionSerializerForArrayList.class);

private static void registerSpecificAvro(ExecutionConfig reg, Class<? extends SpecificRecordBase> avroType) {
registerGenericAvro(reg);
// This rule only applies if users explicitly use the GenericTypeInformation for the avro types
// usually, we are able to handle Avro POJOs with the POJO serializer.
// (However only if the GenericData.Array type is registered!)

// ClassTag<SpecificRecordBase> tag = scala.reflect.ClassTag$.MODULE$.apply(avroType);
// reg.registerTypeWithKryoSerializer(avroType, com.twitter.chill.avro.AvroSerializer.SpecificRecordSerializer(tag));
// We register this serializer for users who want to use untyped Avro records (GenericData.Record).
// Kryo is able to serialize everything in there, except for the Schema.
// This serializer is very slow, but using the GenericData.Records of Kryo is in general a bad idea.
// we add the serializer as a default serializer because Avro is using a private sub-type at runtime.
reg.addDefaultKryoSerializer(Schema.class, AvroSchemaSerializer.class);
}
}

// --------------------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 21a7158

Please sign in to comment.