Skip to content

Commit

Permalink
[hotfix] Minor cleanup of warnings, comments, and code style in the J…
Browse files Browse the repository at this point in the history
…ava API Utils
  • Loading branch information
StephanEwen committed Jan 15, 2016
1 parent e9a5358 commit 9365441
Showing 1 changed file with 41 additions and 27 deletions.
68 changes: 41 additions & 27 deletions flink-java/src/main/java/org/apache/flink/api/java/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,21 @@
package org.apache.flink.api.java;

import org.apache.commons.lang3.StringUtils;

import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.configuration.Configuration;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.List;
import java.util.Random;

import org.apache.flink.configuration.Configuration;
import static org.apache.flink.api.java.functions.FunctionAnnotation.SkipCodeAnalysis;

/**
Expand Down Expand Up @@ -63,19 +64,28 @@ public static String getCallLocationName(int depth) {
*
* @param typeInfo {@link CompositeType}
*/
public static void getContainedGenericTypes(CompositeType typeInfo, List<GenericTypeInfo<?>> target) {
for(int i = 0; i < typeInfo.getArity(); i++) {
public static void getContainedGenericTypes(CompositeType<?> typeInfo, List<GenericTypeInfo<?>> target) {
for (int i = 0; i < typeInfo.getArity(); i++) {
TypeInformation<?> type = typeInfo.getTypeAt(i);
if(type instanceof CompositeType) {
getContainedGenericTypes((CompositeType) type, target);
} else if(type instanceof GenericTypeInfo) {
if(!target.contains(type)) {
if (type instanceof CompositeType) {
getContainedGenericTypes((CompositeType<?>) type, target);
} else if (type instanceof GenericTypeInfo) {
if (!target.contains(type)) {
target.add((GenericTypeInfo<?>) type);
}
}
}
}

// --------------------------------------------------------------------------------------------

/**
* Utility sink function that counts elements and writes the count into an accumulator,
* from which it can be retrieved by the client. This sink is used by the
* {@link DataSet#count()} function.
*
* @param <T> Type of elements to count.
*/
@SkipCodeAnalysis
public static class CountHelper<T> extends RichOutputFormat<T> {

Expand All @@ -90,24 +100,29 @@ public CountHelper(String id) {
}

@Override
public void configure(Configuration parameters) {
}
public void configure(Configuration parameters) {}

@Override
public void open(int taskNumber, int numTasks) throws IOException {
}
public void open(int taskNumber, int numTasks) {}

@Override
public void writeRecord(T record) throws IOException {
public void writeRecord(T record) {
counter++;
}

@Override
public void close() throws IOException {
public void close() {
getRuntimeContext().getLongCounter(id).add(counter);
}
}

/**
* Utility sink function that collects elements into an accumulator,
* from which it they can be retrieved by the client. This sink is used by the
* {@link DataSet#collect()} function.
*
* @param <T> Type of elements to count.
*/
@SkipCodeAnalysis
public static class CollectHelper<T> extends RichOutputFormat<T> {

Expand All @@ -124,11 +139,10 @@ public CollectHelper(String id, TypeSerializer<T> serializer) {
}

@Override
public void configure(Configuration parameters) {
}
public void configure(Configuration parameters) {}

@Override
public void open(int taskNumber, int numTasks) throws IOException {
public void open(int taskNumber, int numTasks) {
this.accumulator = new SerializedListAccumulator<>();
}

Expand All @@ -138,13 +152,12 @@ public void writeRecord(T record) throws IOException {
}

@Override
public void close() throws IOException {
public void close() {
// Important: should only be added in close method to minimize traffic of accumulators
getRuntimeContext().addAccumulator(id, accumulator);
}
}


// --------------------------------------------------------------------------------------------

/**
Expand All @@ -157,16 +170,16 @@ public static <T> String getSerializerTree(TypeInformation<T> ti) {

private static <T> String getSerializerTree(TypeInformation<T> ti, int indent) {
String ret = "";
if(ti instanceof CompositeType) {
if (ti instanceof CompositeType) {
ret += StringUtils.repeat(' ', indent) + ti.getClass().getSimpleName()+"\n";
CompositeType<T> cti = (CompositeType<T>) ti;
String[] fieldNames = cti.getFieldNames();
for(int i = 0; i < cti.getArity(); i++) {
TypeInformation fieldType = cti.getTypeAt(i);
for (int i = 0; i < cti.getArity(); i++) {
TypeInformation<?> fieldType = cti.getTypeAt(i);
ret += StringUtils.repeat(' ', indent + 2) + fieldNames[i]+":"+getSerializerTree(fieldType, indent);
}
} else {
if(ti instanceof GenericTypeInfo) {
if (ti instanceof GenericTypeInfo) {
ret += StringUtils.repeat(' ', indent) + "GenericTypeInfo ("+ti.getTypeClass().getSimpleName()+")\n";
ret += getGenericTypeTree(ti.getTypeClass(), indent + 4);
} else {
Expand All @@ -176,14 +189,15 @@ private static <T> String getSerializerTree(TypeInformation<T> ti, int indent) {
return ret;
}

private static String getGenericTypeTree(Class type, int indent) {
private static String getGenericTypeTree(Class<?> type, int indent) {
String ret = "";
for(Field field : type.getDeclaredFields()) {
if(Modifier.isStatic(field.getModifiers()) || Modifier.isTransient(field.getModifiers())) {
for (Field field : type.getDeclaredFields()) {
if (Modifier.isStatic(field.getModifiers()) || Modifier.isTransient(field.getModifiers())) {
continue;
}
ret += StringUtils.repeat(' ', indent) + field.getName() + ":" + field.getType().getName() + (field.getType().isEnum() ? " (is enum)" : "") + "\n";
if(!field.getType().isPrimitive()) {
ret += StringUtils.repeat(' ', indent) + field.getName() + ":" + field.getType().getName() +
(field.getType().isEnum() ? " (is enum)" : "") + "\n";
if (!field.getType().isPrimitive()) {
ret += getGenericTypeTree(field.getType(), indent + 4);
}
}
Expand Down

0 comments on commit 9365441

Please sign in to comment.