Skip to content

Commit

Permalink
[FLINK-3290] [py] Generalize OperationInfo transfer
Browse files Browse the repository at this point in the history
-identifier saved in java OpInfo
-changed default values to prevent null exceptions
-all operations use the same routine to transfer parameters
-PyPlRcv can handle Tuple0
-labeled py OpInfo fields as transferred/internal
-fixed broadcast OpInfo not having correct identifier
-removed unused projection code
  • Loading branch information
supermegaciaccount committed Jan 27, 2016
1 parent f681d9b commit 499b60f
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 406 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import org.apache.flink.api.java.tuple.Tuple;
import static org.apache.flink.api.java.typeutils.TypeExtractor.getForObject;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.python.api.PythonPlanBinder.Operation;
import org.apache.flink.python.api.streaming.plan.PythonPlanStreamer;

public class PythonOperationInfo {
public String identifier;
public int parentID; //DataSet that an operation is applied on
public int otherID; //secondary DataSet
public int setID; //ID for new DataSet
Expand All @@ -35,7 +35,6 @@ public class PythonOperationInfo {
public Object[] values;
public int count;
public String field;
public int[] fields;
public Order order;
public String path;
public String fieldDelimiter;
Expand All @@ -47,154 +46,59 @@ public class PythonOperationInfo {
public String name;
public boolean usesUDF;

public PythonOperationInfo(PythonPlanStreamer streamer, Operation identifier) throws IOException {
Object tmpType;
switch (identifier) {
case SOURCE_CSV:
setID = (Integer) streamer.getRecord(true);
path = (String) streamer.getRecord();
fieldDelimiter = (String) streamer.getRecord();
lineDelimiter = (String) streamer.getRecord();
tmpType = (Tuple) streamer.getRecord();
types = tmpType == null ? null : getForObject(tmpType);
return;
case SOURCE_TEXT:
setID = (Integer) streamer.getRecord(true);
path = (String) streamer.getRecord();
return;
case SOURCE_VALUE:
setID = (Integer) streamer.getRecord(true);
int valueCount = (Integer) streamer.getRecord(true);
values = new Object[valueCount];
for (int x = 0; x < valueCount; x++) {
values[x] = streamer.getRecord();
}
return;
case SOURCE_SEQ:
setID = (Integer) streamer.getRecord(true);
from = (Long) streamer.getRecord();
to = (Long) streamer.getRecord();
return;
case SINK_CSV:
parentID = (Integer) streamer.getRecord(true);
path = (String) streamer.getRecord();
fieldDelimiter = (String) streamer.getRecord();
lineDelimiter = (String) streamer.getRecord();
writeMode = ((Integer) streamer.getRecord(true)) == 1
? WriteMode.OVERWRITE
: WriteMode.NO_OVERWRITE;
return;
case SINK_TEXT:
parentID = (Integer) streamer.getRecord(true);
path = (String) streamer.getRecord();
writeMode = ((Integer) streamer.getRecord(true)) == 1
? WriteMode.OVERWRITE
: WriteMode.NO_OVERWRITE;
return;
case SINK_PRINT:
parentID = (Integer) streamer.getRecord(true);
toError = (Boolean) streamer.getRecord();
return;
case BROADCAST:
parentID = (Integer) streamer.getRecord(true);
otherID = (Integer) streamer.getRecord(true);
name = (String) streamer.getRecord();
return;
}
setID = (Integer) streamer.getRecord(true);
public PythonOperationInfo(PythonPlanStreamer streamer) throws IOException {
identifier = (String) streamer.getRecord();
parentID = (Integer) streamer.getRecord(true);
switch (identifier) {
case AGGREGATE:
count = (Integer) streamer.getRecord(true);
aggregates = new AggregationEntry[count];
for (int x = 0; x < count; x++) {
int encodedAgg = (Integer) streamer.getRecord(true);
int field = (Integer) streamer.getRecord(true);
aggregates[x] = new AggregationEntry(encodedAgg, field);
}
return;
case FIRST:
count = (Integer) streamer.getRecord(true);
return;
case DISTINCT:
case GROUPBY:
case PARTITION_HASH:
keys = normalizeKeys(streamer.getRecord(true));
return;
case PROJECTION:
fields = toIntArray(streamer.getRecord(true));
return;
case REBALANCE:
return;
case SORT:
field = "f0.f" + (Integer) streamer.getRecord(true);
int encodedOrder = (Integer) streamer.getRecord(true);
switch (encodedOrder) {
case 0:
order = Order.NONE;
break;
case 1:
order = Order.ASCENDING;
break;
case 2:
order = Order.DESCENDING;
break;
case 3:
order = Order.ANY;
break;
default:
order = Order.NONE;
break;
}
return;
case UNION:
otherID = (Integer) streamer.getRecord(true);
return;
case COGROUP:
otherID = (Integer) streamer.getRecord(true);
keys1 = normalizeKeys(streamer.getRecord(true));
keys2 = normalizeKeys(streamer.getRecord(true));
tmpType = streamer.getRecord();
types = tmpType == null ? null : getForObject(tmpType);
name = (String) streamer.getRecord();
return;
case CROSS:
case CROSS_H:
case CROSS_T:
otherID = (Integer) streamer.getRecord(true);
usesUDF = (Boolean) streamer.getRecord();
tmpType = streamer.getRecord();
types = tmpType == null ? null : getForObject(tmpType);
name = (String) streamer.getRecord();
return;
case REDUCE:
case GROUPREDUCE:
tmpType = streamer.getRecord();
types = tmpType == null ? null : getForObject(tmpType);
name = (String) streamer.getRecord();
return;
case JOIN:
case JOIN_H:
case JOIN_T:
keys1 = normalizeKeys(streamer.getRecord(true));
keys2 = normalizeKeys(streamer.getRecord(true));
otherID = (Integer) streamer.getRecord(true);
usesUDF = (Boolean) streamer.getRecord();
tmpType = streamer.getRecord();
types = tmpType == null ? null : getForObject(tmpType);
name = (String) streamer.getRecord();
return;
case MAPPARTITION:
case FLATMAP:
case MAP:
case FILTER:
tmpType = streamer.getRecord();
types = tmpType == null ? null : getForObject(tmpType);
name = (String) streamer.getRecord();
return;
otherID = (Integer) streamer.getRecord(true);
field = "f0.f" + (Integer) streamer.getRecord(true);
int encodedOrder = (Integer) streamer.getRecord(true);
switch (encodedOrder) {
case 0:
order = Order.NONE;
break;
case 1:
order = Order.ASCENDING;
break;
case 2:
order = Order.DESCENDING;
break;
case 3:
order = Order.ANY;
break;
default:
throw new UnsupportedOperationException("This operation is not implemented in the Python API: " + identifier);
order = Order.NONE;
break;
}
keys = normalizeKeys(streamer.getRecord(true));
keys1 = normalizeKeys(streamer.getRecord(true));
keys2 = normalizeKeys(streamer.getRecord(true));
Object tmpType = streamer.getRecord();
types = tmpType == null ? null : getForObject(tmpType);
usesUDF = (Boolean) streamer.getRecord();
name = (String) streamer.getRecord();
lineDelimiter = (String) streamer.getRecord();
fieldDelimiter = (String) streamer.getRecord();
writeMode = ((Integer) streamer.getRecord(true)) == 1
? WriteMode.OVERWRITE
: WriteMode.NO_OVERWRITE;
path = (String) streamer.getRecord();
setID = (Integer) streamer.getRecord(true);
toError = (Boolean) streamer.getRecord();
count = (Integer) streamer.getRecord(true);
int valueCount = (Integer) streamer.getRecord(true);
values = new Object[valueCount];
for (int x = 0; x < valueCount; x++) {
values[x] = streamer.getRecord();
}

/*
aggregates = new AggregationEntry[count];
for (int x = 0; x < count; x++) {
int encodedAgg = (Integer) streamer.getRecord(true);
int field = (Integer) streamer.getRecord(true);
aggregates[x] = new AggregationEntry(encodedAgg, field);
}
*/
}

@Override
Expand Down Expand Up @@ -283,21 +187,6 @@ private static String[] normalizeKeys(Object keys) {
throw new RuntimeException("Key argument is neither an int[] nor a Tuple: " + keys.toString());
}

private static int[] toIntArray(Object key) {
if (key instanceof Tuple) {
Tuple tuple = (Tuple) key;
int[] keys = new int[tuple.getArity()];
for (int y = 0; y < tuple.getArity(); y++) {
keys[y] = (Integer) tuple.getField(y);
}
return keys;
}
if (key instanceof int[]) {
return (int[]) key;
}
throw new RuntimeException("Key argument is neither an int[] nor a Tuple.");
}

private static String[] tupleToStringArray(Tuple tuple) {
String[] keys = new String[tuple.getArity()];
for (int y = 0; y < tuple.getArity(); y++) {
Expand Down
Loading

0 comments on commit 499b60f

Please sign in to comment.