Skip to content

Commit

Permalink
[FLINK-2901] Remove Record API dependencies from flink-tests apache#1
Browse files Browse the repository at this point in the history
  • Loading branch information
supermegaciaccount authored and fhueske committed Nov 24, 2015
1 parent 348d501 commit 8abae2c
Show file tree
Hide file tree
Showing 35 changed files with 1,477 additions and 2,861 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ else if (result instanceof CancellationFailure) {
}
}

private void awaitRunning(ActorGateway jobManager, JobID jobId, FiniteDuration timeout)
public static void awaitRunning(ActorGateway jobManager, JobID jobId, FiniteDuration timeout)
throws Exception {

checkNotNull(jobManager);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,24 @@
package org.apache.flink.test.broadcastvars;

import java.util.Collection;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.record.functions.JoinFunction;
import org.apache.flink.api.java.record.functions.MapFunction;
import org.apache.flink.api.java.record.io.CsvInputFormat;
import org.apache.flink.api.java.record.operators.FileDataSink;
import org.apache.flink.api.java.record.operators.FileDataSource;
import org.apache.flink.api.java.record.operators.JoinOperator;
import org.apache.flink.api.java.record.operators.MapOperator;
import java.util.List;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.util.ContractITCaseIOFormats.ContractITCaseOutputFormat;
import org.apache.flink.test.util.RecordAPITestBase;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.StringValue;
import org.apache.flink.test.util.JavaProgramTestBase;
import org.apache.flink.util.Collector;

@SuppressWarnings("deprecation")
public class BroadcastBranchingITCase extends RecordAPITestBase {

private static final String SC1_ID_ABC = "1 61 6 29\n2 7 13 10\n3 8 13 27\n";

private static final String SC2_ID_X = "1 5\n2 3\n3 6";

private static final String SC3_ID_Y = "1 2\n2 3\n3 7";

private static final String RESULT = "2 112\n";

private String sc1Path;
private String sc2Path;
private String sc3Path;
private String resultPath;

@Override
protected void preSubmit() throws Exception {
sc1Path = createTempFile("broadcastBranchingInput/map_id_abc.txt", SC1_ID_ABC);
sc2Path = createTempFile("broadcastBranchingInput/map_id_x.txt", SC2_ID_X);
sc3Path = createTempFile("broadcastBranchingInput/map_id_y.txt", SC3_ID_Y);
resultPath = getTempDirPath("result");
}
public class BroadcastBranchingITCase extends JavaProgramTestBase {
private static final String RESULT = "(2,112)\n";

// Sc1(id,a,b,c) --
// \
Expand All @@ -68,108 +46,100 @@ protected void preSubmit() throws Exception {
// /
// Sc3(id,y) --------
@Override
protected Plan getTestJob() {
protected void testProgram() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

// Sc1 generates M parameters a,b,c for second degree polynomials P(x) = ax^2 + bx + c identified by id
FileDataSource sc1 = new FileDataSource(new CsvInputFormat(), sc1Path);
CsvInputFormat.configureRecordFormat(sc1).fieldDelimiter(' ').field(StringValue.class, 0).field(IntValue.class, 1)
.field(IntValue.class, 2).field(IntValue.class, 3);
DataSet<Tuple4<String, Integer, Integer, Integer>> sc1 = env
.fromElements(new Tuple4<>("1", 61, 6, 29), new Tuple4<>("2", 7, 13, 10), new Tuple4<>("3", 8, 13, 27));

// Sc2 generates N x values to be evaluated with the polynomial identified by id
FileDataSource sc2 = new FileDataSource(new CsvInputFormat(), sc2Path);
CsvInputFormat.configureRecordFormat(sc2).fieldDelimiter(' ').field(StringValue.class, 0).field(IntValue.class, 1);
DataSet<Tuple2<String, Integer>> sc2 = env
.fromElements(new Tuple2<>("1", 5), new Tuple2<>("2", 3), new Tuple2<>("3", 6));

// Sc3 generates N y values to be evaluated with the polynomial identified by id
FileDataSource sc3 = new FileDataSource(new CsvInputFormat(), sc3Path);
CsvInputFormat.configureRecordFormat(sc3).fieldDelimiter(' ').field(StringValue.class, 0).field(IntValue.class, 1);
DataSet<Tuple2<String, Integer>> sc3 = env
.fromElements(new Tuple2<>("1", 2), new Tuple2<>("2", 3), new Tuple2<>("3", 7));

// Jn1 matches x and y values on id and emits (id, x, y) triples
JoinOperator jn1 = JoinOperator.builder(Jn1.class, StringValue.class, 0, 0).input1(sc2).input2(sc3).build();
JoinOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> jn1
= sc2.join(sc3).where(0).equalTo(0).with(new Jn1());

// Jn2 matches polynomial and arguments by id, computes p = min(P(x),P(y)) and emits (id, p) tuples
JoinOperator jn2 = JoinOperator.builder(Jn2.class, StringValue.class, 0, 0).input1(jn1).input2(sc1).build();
JoinOperator<Tuple3<String, Integer, Integer>, Tuple4<String, Integer, Integer, Integer>, Tuple2<String, Integer>> jn2
= jn1.join(sc1).where(0).equalTo(0).with(new Jn2());

// Mp1 selects (id, x, y) triples where x = y and broadcasts z (=x=y) to Mp2
MapOperator mp1 = MapOperator.builder(Mp1.class).input(jn1).build();
FlatMapOperator<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>> mp1
= jn1.flatMap(new Mp1());

// Mp2 filters out all p values which can be divided by z
MapOperator mp2 = MapOperator.builder(Mp2.class).setBroadcastVariable("z", mp1).input(jn2).build();

FileDataSink output = new FileDataSink(new ContractITCaseOutputFormat(), resultPath);
output.setParallelism(1);
output.setInput(mp2);
List<Tuple2<String, Integer>> result = jn2.flatMap(new Mp2()).withBroadcastSet(mp1, "z").collect();

return new Plan(output);
}

@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory(RESULT, resultPath);
JavaProgramTestBase.compareResultAsText(result, RESULT);
}

public static class Jn1 extends JoinFunction {
public static class Jn1 implements JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>> {
private static final long serialVersionUID = 1L;

@Override
public void join(Record sc2, Record sc3, Collector<Record> out) throws Exception {
Record r = new Record(3);
r.setField(0, sc2.getField(0, StringValue.class));
r.setField(1, sc2.getField(1, IntValue.class));
r.setField(2, sc3.getField(1, IntValue.class));
out.collect(r);
public Tuple3<String, Integer, Integer> join(Tuple2<String, Integer> first, Tuple2<String, Integer> second) throws Exception {
return new Tuple3<>(first.f0, first.f1, second.f1);
}
}

public static class Jn2 extends JoinFunction {
public static class Jn2 implements JoinFunction<Tuple3<String, Integer, Integer>, Tuple4<String, Integer, Integer, Integer>, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;

private static int p(int x, int a, int b, int c) {
return a * x * x + b * x + c;
}

@Override
public void join(Record jn1, Record sc1, Collector<Record> out) throws Exception {
int x = jn1.getField(1, IntValue.class).getValue();
int y = jn1.getField(2, IntValue.class).getValue();
int a = sc1.getField(1, IntValue.class).getValue();
int b = sc1.getField(2, IntValue.class).getValue();
int c = sc1.getField(3, IntValue.class).getValue();
public Tuple2<String, Integer> join(Tuple3<String, Integer, Integer> first, Tuple4<String, Integer, Integer, Integer> second) throws Exception {
int x = first.f1;
int y = first.f2;
int a = second.f1;
int b = second.f2;
int c = second.f3;

int p_x = p(x, a, b, c);
int p_y = p(y, a, b, c);
int min = Math.min(p_x, p_y);
out.collect(new Record(jn1.getField(0, StringValue.class), new IntValue(min)));
return new Tuple2<>(first.f0, min);
}
}

public static class Mp1 extends MapFunction {
public static class Mp1 implements FlatMapFunction<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;

@Override
public void map(Record jn1, Collector<Record> out) throws Exception {
if (jn1.getField(1, IntValue.class).getValue() == jn1.getField(2, IntValue.class).getValue()) {
out.collect(new Record(jn1.getField(0, StringValue.class), jn1.getField(1, IntValue.class)));
public void flatMap(Tuple3<String, Integer, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
if (value.f1.compareTo(value.f2) == 0) {
out.collect(new Tuple2<>(value.f0, value.f1));
}
}
}

public static class Mp2 extends MapFunction {
public static class Mp2 extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;

private Collection<Record> zs;
private Collection<Tuple2<String, Integer>> zs;

@Override
public void open(Configuration parameters) throws Exception {
this.zs = getRuntimeContext().getBroadcastVariable("z");
}

@Override
public void map(Record jn2, Collector<Record> out) throws Exception {
int p = jn2.getField(1, IntValue.class).getValue();
public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
int p = value.f1;

for (Record z : zs) {
if (z.getField(0, StringValue.class).getValue().equals(jn2.getField(0, StringValue.class).getValue())) {
if (p % z.getField(1, IntValue.class).getValue() != 0) {
out.collect(jn2);
for (Tuple2<String, Integer> z : zs) {
if (z.f0.equals(value.f0)) {
if (p % z.f1 != 0) {
out.collect(value);
}
}
}
Expand Down
Loading

0 comments on commit 8abae2c

Please sign in to comment.