Skip to content

Commit

Permalink
[FLINK-1040] Updated JavaDocs and removed deprecated types() calls fr…
Browse files Browse the repository at this point in the history
…om examples and tests
  • Loading branch information
fhueske committed Dec 10, 2014
1 parent a9ecaba commit 799ff8a
Show file tree
Hide file tree
Showing 24 changed files with 293 additions and 185 deletions.
2 changes: 1 addition & 1 deletion docs/dataset_transformations.md
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ DataSet<Tuple3<Integer, Integer, String>>
// hint that the second DataSet is very large
input1.crossWithHuge(input2)
// apply a projection (or any Cross function)
.projectFirst(0,1).projectSecond(1).types(Integer.class, String.class, String.class)
.projectFirst(0,1).projectSecond(1);
~~~
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void testSolutionSetDeltaDependsOnBroadcastVariable() {
DataSet<Tuple2<Long, Long>> result =
invariantInput
.map(new IdentityMapper<Tuple2<Long, Long>>()).withBroadcastSet(iter.getWorkset(), "bc data")
.join(iter.getSolutionSet()).where(0).equalTo(1).projectFirst(1).projectSecond(1).types(Long.class, Long.class);
.join(iter.getSolutionSet()).where(0).equalTo(1).projectFirst(1).projectSecond(1);

iter.closeWith(result.map(new IdentityMapper<Tuple2<Long,Long>>()), result).print();

Expand Down Expand Up @@ -330,12 +330,12 @@ public static DataSet<Tuple2<Long, Long>> doDeltaIteration(DataSet<Tuple2<Long,
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> depIteration = vertices.iterateDelta(vertices, 100, 0);

DataSet<Tuple1<Long>> candidates = depIteration.getWorkset().join(edges).where(0).equalTo(0)
.projectSecond(1).types(Long.class);
.projectSecond(1);

DataSet<Tuple1<Long>> grouped = candidates.groupBy(0).reduceGroup(new Reduce101());

DataSet<Tuple2<Long, Long>> candidatesDependencies =
grouped.join(edges).where(0).equalTo(1).projectSecond(0, 1).types(Long.class, Long.class);
grouped.join(edges).where(0).equalTo(1).projectSecond(0, 1);

DataSet<Tuple2<Long, Long>> verticesWithNewComponents =
candidatesDependencies.join(depIteration.getSolutionSet()).where(0).equalTo(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void testDeltaIterationInClosure() {
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> mainIteration = data2.iterateDelta(data2, 100, 0);

DataSet<Tuple2<Long, Long>> joined = mainIteration.getWorkset().join(firstResult).where(0).equalTo(0)
.projectFirst(0).projectSecond(0).types(Long.class, Long.class);
.projectFirst(0).projectSecond(0);

DataSet<Tuple2<Long, Long>> mainResult = mainIteration.closeWith(joined, joined);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.junit.Test;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.Partitioner;
Expand All @@ -40,8 +41,6 @@
import org.apache.flink.compiler.plan.SinkPlanNode;
import org.apache.flink.compiler.testfunctions.IdentityGroupReducer;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.junit.Test;


@SuppressWarnings({"serial", "unchecked"})
public class CustomPartitioningGlobalOptimizationTest extends CompilerTestBase {
Expand All @@ -65,7 +64,7 @@ public void testJoinReduceCombination() {
joined.groupBy(1).withPartitioner(partitioner)
.reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>())
.print();

Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ public void testExceptionWhenNewWorksetNotDependentOnWorkset() {

DataSet<Tuple2<Long, Long>> delta = deltaIteration.getSolutionSet().join(deltaIteration.getWorkset())
.where(0).equalTo(0)
.projectFirst(1).projectSecond(1).types(Long.class, Long.class);
.projectFirst(1).projectSecond(1);

DataSet<Tuple2<Long, Long>> nextWorkset = deltaIteration.getSolutionSet().join(input)
.where(0).equalTo(0)
.projectFirst(1).projectSecond(1).types(Long.class, Long.class);
.projectFirst(1).projectSecond(1);


DataSet<Tuple2<Long, Long>> result = deltaIteration.closeWith(delta, nextWorkset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void testOperationOnSolutionSet() {
DataSet<Tuple2<Long, Long>> mapped = iteration.getSolutionSet().map(new IdentityMapper<Tuple2<Long, Long>>());

DataSet<Tuple2<Long, Long>> joined = iteration.getWorkset().join(mapped)
.where(0).equalTo(0).projectFirst(1).projectSecond(0).types(Long.class, Long.class);
.where(0).equalTo(0).projectFirst(1).projectSecond(0);

iteration.closeWith(joined, joined)
.print();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public boolean filter(Tuple3<Integer, Integer, String> o) {
}
})
// project fields out that are no longer required
.project(0,1).types(Integer.class, Integer.class);
.project(0,1);

// lineitems filtered by flag: (orderkey, revenue)
DataSet<Tuple2<Integer, Double>> lineitemsFilteredByFlag =
Expand Down Expand Up @@ -154,15 +154,13 @@ public Tuple2<Integer, Double> map(Tuple4<Integer, Double, Double, String> l) {
DataSet<Tuple5<Integer, String, String, String, Double>> customerWithNation = customers
.joinWithTiny(nations)
.where(3).equalTo(0)
.projectFirst(0,1,2).projectSecond(1).projectFirst(4)
.types(Integer.class, String.class, String.class, String.class, Double.class);
.projectFirst(0,1,2).projectSecond(1).projectFirst(4);

// join customer (with nation) with revenue (custkey, name, address, nationname, acctbal, revenue)
DataSet<Tuple6<Integer, String, String, String, Double, Double>> result =
customerWithNation.join(revenueByCustomer)
.where(0).equalTo(0)
.projectFirst(0,1,2,3,4).projectSecond(1)
.types(Integer.class, String.class, String.class, String.class, Double.class, Double.class);
.projectFirst(0,1,2,3,4).projectSecond(1);

// emit result
result.writeAsCsv(outputPath, "\n", "|");
Expand Down
18 changes: 10 additions & 8 deletions flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -230,18 +230,20 @@ public FilterOperator<T> filter(FilterFunction<T> filter) {
// --------------------------------------------------------------------------------------------

/**
* Initiates a Project transformation on a {@link Tuple} {@link DataSet}.<br/>
* <b>Note: Only Tuple DataSets can be projected.</b></br>
* Applies a Project transformation on a {@link Tuple} {@link DataSet}.<br/>
* <b>Note: Only Tuple DataSets can be projected using field indexes.</b></br>
* The transformation projects each Tuple of the DataSet onto a (sub)set of fields.</br>
* This method returns a {@link ProjectOperator} to complete the transformation.
*
* @param fieldIndexes The field indexes of the input tuples that are retained.
* Additional fields can be added to the projection by calling {@link ProjectOperator#project(int[])}.
*
* <b>Note: With the current implementation, the Project transformation looses type information.</b>
*
* @param fieldIndexes The field indexes of the input tuple that are retained.
* The order of fields in the output tuple corresponds to the order of field indexes.
* @return A ProjectOperator to complete the Project transformation.
*
* @return A ProjectOperator that represents the projected DataSet.
*
* @see Tuple
* @see DataSet
* @see org.apache.flink.api.java.operators.ProjectOperator
* @see ProjectOperator
*/
public <OUT extends Tuple> ProjectOperator<?, OUT> project(int... fieldIndexes) {
return new Projection<T>(this, fieldIndexes).projectTupleX();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,14 +150,15 @@ public <R> CrossOperator<I1, I2, R> with(CrossFunction<I1, I2, R> function) {
* {@link org.apache.flink.api.java.operators.CrossOperator.ProjectCross#projectFirst(int...)} and
* {@link org.apache.flink.api.java.operators.CrossOperator.ProjectCross#projectSecond(int...)}.
*
* <b>Note: With the current implementation, the Project transformation looses type information.</b>
*
* @param firstFieldIndexes If the first input is a Tuple DataSet, the indexes of the selected fields.
* For a non-Tuple DataSet, do not provide parameters.
* The order of fields in the output tuple is defined by to the order of field indexes.
* @return A CrossProjection to complete the Cross transformation.
* @return A ProjectCross which represents the projected cross result.
*
* @see Tuple
* @see DataSet
* @see org.apache.flink.api.java.operators.CrossOperator.CrossProjection
* @see org.apache.flink.api.java.operators.CrossOperator.ProjectCross
*/
public <OUT extends Tuple> ProjectCross<I1, I2, OUT> projectFirst(int... firstFieldIndexes) {
Expand All @@ -173,14 +174,15 @@ public <OUT extends Tuple> ProjectCross<I1, I2, OUT> projectFirst(int... firstFi
* {@link org.apache.flink.api.java.operators.CrossOperator.ProjectCross#projectFirst(int...)} and
* {@link org.apache.flink.api.java.operators.CrossOperator.ProjectCross#projectSecond(int...)}.
*
* <b>Note: With the current implementation, the Project transformation looses type information.</b>
*
* @param secondFieldIndexes If the second input is a Tuple DataSet, the indexes of the selected fields.
* For a non-Tuple DataSet, do not provide parameters.
* The order of fields in the output tuple is defined by to the order of field indexes.
* @return A CrossProjection complete the Cross transformation by calling.
* @return A ProjectCross which represents the projected cross result.
*
* @see Tuple
* @see DataSet
* @see org.apache.flink.api.java.operators.CrossOperator.CrossProjection
* @see org.apache.flink.api.java.operators.CrossOperator.ProjectCross
*/
public <OUT extends Tuple> ProjectCross<I1, I2, OUT> projectSecond(int... secondFieldIndexes) {
Expand Down Expand Up @@ -218,25 +220,83 @@ protected ProjectCross(DataSet<I1> input1, DataSet<I2> input2, int[] fields, boo

this.crossProjection = crossProjection;
}


/**
* Continues a ProjectCross transformation and adds fields of the first cross input to the projection.<br/>
* If the first cross input is a {@link Tuple} {@link DataSet}, fields can be selected by their index.
* If the first cross input is not a Tuple DataSet, no parameters should be passed.<br/>
*
* Additional fields of the first and second input can be added by chaining the method calls of
* {@link org.apache.flink.api.java.operators.CrossOperator.ProjectCross#projectFirst(int...)} and
* {@link org.apache.flink.api.java.operators.CrossOperator.ProjectCross#projectSecond(int...)}.
*
* <b>Note: With the current implementation, the Project transformation looses type information.</b>
*
* @param firstFieldIndexes If the first input is a Tuple DataSet, the indexes of the selected fields.
* For a non-Tuple DataSet, do not provide parameters.
* The order of fields in the output tuple is defined by to the order of field indexes.
* @return A ProjectCross which represents the projected cross result.
*
* @see Tuple
* @see DataSet
* @see org.apache.flink.api.java.operators.CrossOperator.ProjectCross
*/
@SuppressWarnings("hiding")
public <OUT extends Tuple> ProjectCross<I1, I2, OUT> projectFirst(int... firstFieldIndexes) {
crossProjection = crossProjection.projectFirst(firstFieldIndexes);

return crossProjection.projectTupleX();
}

/**
* Continues a ProjectCross transformation and adds fields of the second cross input to the projection.<br/>
* If the second cross input is a {@link Tuple} {@link DataSet}, fields can be selected by their index.
* If the second cross input is not a Tuple DataSet, no parameters should be passed.<br/>
*
* Additional fields of the first and second input can be added by chaining the method calls of
* {@link org.apache.flink.api.java.operators.CrossOperator.ProjectCross#projectFirst(int...)} and
* {@link org.apache.flink.api.java.operators.CrossOperator.ProjectCross#projectSecond(int...)}.
*
* <b>Note: With the current implementation, the Project transformation looses type information.</b>
*
* @param secondFieldIndexes If the second input is a Tuple DataSet, the indexes of the selected fields.
* For a non-Tuple DataSet, do not provide parameters.
* The order of fields in the output tuple is defined by to the order of field indexes.
* @return A ProjectCross which represents the projected cross result.
*
* @see Tuple
* @see DataSet
* @see org.apache.flink.api.java.operators.CrossOperator.ProjectCross
*/
@SuppressWarnings("hiding")
public <OUT extends Tuple> ProjectCross<I1, I2, OUT> projectSecond(int... secondFieldIndexes) {
crossProjection = crossProjection.projectSecond(secondFieldIndexes);

return crossProjection.projectTupleX();
}


/**
* Deprecated method only kept for compatibility.
*
* @param types
*
* @return
*/
@SuppressWarnings({ "hiding", "unchecked" })
@Deprecated
public <OUT extends Tuple> ProjectCross<I1, I2, OUT> types(Class<?>... types) {
return (ProjectCross<I1, I2, OUT>) this;
public <OUT extends Tuple> CrossOperator<I1, I2, OUT> types(Class<?>... types) {
TupleTypeInfo<OUT> typeInfo = (TupleTypeInfo<OUT>)this.getResultType();

if(types.length != typeInfo.getArity()) {
throw new InvalidProgramException("Provided types do not match projection.");
}
for (int i=0; i<types.length; i++) {
Class<?> typeClass = types[i];
if (!typeClass.equals(typeInfo.getTypeAt(i).getTypeClass())) {
throw new InvalidProgramException("Provided type "+typeClass.getSimpleName()+" at position "+i+" does not match projection");
}
}
return (CrossOperator<I1, I2, OUT>) this;
}

@Override
Expand Down Expand Up @@ -410,16 +470,14 @@ public CrossProjection(DataSet<I1> ds1, DataSet<I2> ds2, int[] firstFieldIndexes
* @param firstFieldIndexes If the first input is a Tuple DataSet, the indexes of the selected fields.
* For a non-Tuple DataSet, do not provide parameters.
* The order of fields in the output tuple is defined by to the order of field indexes.
* @return A CrossProjection that needs to be converted into a {@link ProjectOperator} to complete the
* Cross transformation by calling
* {@link org.apache.flink.api.java.operators.CrossOperator.CrossProjection#types(Class)}.
* @return An extended CrossProjection.
*
* @see Tuple
* @see DataSet
* @see org.apache.flink.api.java.operators.CrossOperator.CrossProjection
* @see org.apache.flink.api.java.operators.CrossOperator.ProjectCross
*/
public CrossProjection<I1, I2> projectFirst(int... firstFieldIndexes) {
protected CrossProjection<I1, I2> projectFirst(int... firstFieldIndexes) {

boolean isFirstTuple;

Expand Down Expand Up @@ -478,16 +536,14 @@ public CrossProjection<I1, I2> projectFirst(int... firstFieldIndexes) {
* @param secondFieldIndexes If the second input is a Tuple DataSet, the indexes of the selected fields.
* For a non-Tuple DataSet, do not provide parameters.
* The order of fields in the output tuple is defined by to the order of field indexes.
* @return A CrossProjection that needs to be converted into a {@link ProjectOperator} to complete the
* Cross transformation by calling
* {@link org.apache.flink.api.java.operators.CrossOperator.CrossProjection#types(Class)}.
* @return An extended CrossProjection.
*
* @see Tuple
* @see DataSet
* @see org.apache.flink.api.java.operators.CrossOperator.CrossProjection
* @see org.apache.flink.api.java.operators.CrossOperator.ProjectCross
*/
public CrossProjection<I1, I2> projectSecond(int... secondFieldIndexes) {
protected CrossProjection<I1, I2> projectSecond(int... secondFieldIndexes) {

boolean isSecondTuple;

Expand Down Expand Up @@ -545,7 +601,7 @@ public CrossProjection<I1, I2> projectSecond(int... secondFieldIndexes) {
*
* @return The projected DataSet.
*
* @see Projection
* @see ProjectCross
*/
@SuppressWarnings("unchecked")
public <OUT extends Tuple> ProjectCross<I1, I2, OUT> projectTupleX() {
Expand Down
Loading

0 comments on commit 799ff8a

Please sign in to comment.