Skip to content

Commit

Permalink
[FLINK-6009] [java api] Deprecate DataSetUtils#checksumHashCode
Browse files Browse the repository at this point in the history
This is likely only used by Gelly and we have a more featureful
implementation allowing for multiple outputs and setting the job name.
Deprecation will allow this to be removed in Flink 2.0.

This closes apache#3516
  • Loading branch information
greghogan authored and StephanEwen committed Mar 16, 2017
1 parent 980d072 commit 40a156e
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,9 @@ public TupleSummaryAggregator<R> reduce(TupleSummaryAggregator<R> agg1, TupleSum
* as well as the checksum (sum over element hashes).
*
* @return A ChecksumHashCode that represents the count and checksum of elements in the data set.
* @deprecated replaced with {@code org.apache.flink.graph.asm.dataset.ChecksumHashCode} in Gelly
*/
@Deprecated
public static <T> Utils.ChecksumHashCode checksumHashCode(DataSet<T> input) throws Exception {
final String id = new AbstractID().toString();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,22 @@
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.Utils.ChecksumHashCode;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.types.IntValue;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Random;

import static org.hamcrest.Matchers.is;

/*
* These programs demonstrate the effects of user defined functions which modify input objects or return locally created
* objects that are retained and reused on future calls. The programs do not retain and later modify input objects.
Expand All @@ -47,15 +50,17 @@ public class OverwriteObjects {
public final static Logger LOG = LoggerFactory.getLogger(OverwriteObjects.class);

// DataSets are created with this number of elements
private static final int NUMBER_OF_ELEMENTS = 3 * 1000 * 1000;
private static final int NUMBER_OF_ELEMENTS = 3_000_000;

// DataSet values are randomly generated over this range
private static final int KEY_RANGE = 1 * 1000 * 1000;
private static final int KEY_RANGE = 1_000_000;

private static final int MAX_PARALLELISM = 4;

private static final long RANDOM_SEED = new Random().nextLong();

private static final Tuple2Comparator<IntValue, IntValue> comparator = new Tuple2Comparator<>();

public static void main(String[] args) throws Exception {
new OverwriteObjects().run();
}
Expand Down Expand Up @@ -116,17 +121,23 @@ public void testGroupedReduce(ExecutionEnvironment env) throws Exception {

env.getConfig().enableObjectReuse();

ChecksumHashCode enabledChecksum = DataSetUtils.checksumHashCode(getDataSet(env)
List<Tuple2<IntValue, IntValue>> enabledResult = getDataSet(env)
.groupBy(0)
.reduce(new OverwriteObjectsReduce(true)));
.reduce(new OverwriteObjectsReduce(true))
.collect();

Collections.sort(enabledResult, comparator);

env.getConfig().disableObjectReuse();

ChecksumHashCode disabledChecksum = DataSetUtils.checksumHashCode(getDataSet(env)
List<Tuple2<IntValue, IntValue>> disabledResult = getDataSet(env)
.groupBy(0)
.reduce(new OverwriteObjectsReduce(true)));
.reduce(new OverwriteObjectsReduce(true))
.collect();

Assert.assertEquals(disabledChecksum, enabledChecksum);
Collections.sort(disabledResult, comparator);

Assert.assertThat(disabledResult, is(enabledResult));
}

private class OverwriteObjectsReduce implements ReduceFunction<Tuple2<IntValue, IntValue>> {
Expand Down Expand Up @@ -154,31 +165,37 @@ public void testJoin(ExecutionEnvironment env) throws Exception {
continue;
}

ChecksumHashCode enabledChecksum;
List<Tuple2<IntValue, IntValue>> enabledResult;

ChecksumHashCode disabledChecksum;
List<Tuple2<IntValue, IntValue>> disabledResult;

// Inner join

LOG.info("Testing inner join with JoinHint = {}", joinHint);

env.getConfig().enableObjectReuse();

enabledChecksum = DataSetUtils.checksumHashCode(getDataSet(env)
enabledResult = getDataSet(env)
.join(getDataSet(env), joinHint)
.where(0)
.equalTo(0)
.with(new OverwriteObjectsJoin()));
.with(new OverwriteObjectsJoin())
.collect();

Collections.sort(enabledResult, comparator);

env.getConfig().disableObjectReuse();

disabledChecksum = DataSetUtils.checksumHashCode(getDataSet(env)
disabledResult = getDataSet(env)
.join(getDataSet(env), joinHint)
.where(0)
.equalTo(0)
.with(new OverwriteObjectsJoin()));
.with(new OverwriteObjectsJoin())
.collect();

Assert.assertEquals("JoinHint=" + joinHint, disabledChecksum, enabledChecksum);
Collections.sort(disabledResult, comparator);

Assert.assertEquals("JoinHint=" + joinHint, disabledResult, enabledResult);

// Left outer join

Expand All @@ -187,21 +204,27 @@ public void testJoin(ExecutionEnvironment env) throws Exception {

env.getConfig().enableObjectReuse();

enabledChecksum = DataSetUtils.checksumHashCode(getDataSet(env)
enabledResult = getDataSet(env)
.leftOuterJoin(getFilteredDataSet(env), joinHint)
.where(0)
.equalTo(0)
.with(new OverwriteObjectsJoin()));
.with(new OverwriteObjectsJoin())
.collect();

Collections.sort(enabledResult, comparator);

env.getConfig().disableObjectReuse();

disabledChecksum = DataSetUtils.checksumHashCode(getDataSet(env)
disabledResult = getDataSet(env)
.leftOuterJoin(getFilteredDataSet(env), joinHint)
.where(0)
.equalTo(0)
.with(new OverwriteObjectsJoin()));
.with(new OverwriteObjectsJoin())
.collect();

Collections.sort(disabledResult, comparator);

Assert.assertEquals("JoinHint=" + joinHint, disabledChecksum, enabledChecksum);
Assert.assertThat("JoinHint=" + joinHint, disabledResult, is(enabledResult));
}

// Right outer join
Expand All @@ -211,21 +234,27 @@ public void testJoin(ExecutionEnvironment env) throws Exception {

env.getConfig().enableObjectReuse();

enabledChecksum = DataSetUtils.checksumHashCode(getDataSet(env)
enabledResult = getDataSet(env)
.rightOuterJoin(getFilteredDataSet(env), joinHint)
.where(0)
.equalTo(0)
.with(new OverwriteObjectsJoin()));
.with(new OverwriteObjectsJoin())
.collect();

Collections.sort(enabledResult, comparator);

env.getConfig().disableObjectReuse();

disabledChecksum = DataSetUtils.checksumHashCode(getDataSet(env)
disabledResult = getDataSet(env)
.rightOuterJoin(getFilteredDataSet(env), joinHint)
.where(0)
.equalTo(0)
.with(new OverwriteObjectsJoin()));
.with(new OverwriteObjectsJoin())
.collect();

Collections.sort(disabledResult, comparator);

Assert.assertEquals("JoinHint=" + joinHint, disabledChecksum, enabledChecksum);
Assert.assertThat("JoinHint=" + joinHint, disabledResult, is(enabledResult));
}

// Full outer join
Expand All @@ -235,21 +264,27 @@ public void testJoin(ExecutionEnvironment env) throws Exception {

env.getConfig().enableObjectReuse();

enabledChecksum = DataSetUtils.checksumHashCode(getDataSet(env)
enabledResult = getDataSet(env)
.fullOuterJoin(getFilteredDataSet(env), joinHint)
.where(0)
.equalTo(0)
.with(new OverwriteObjectsJoin()));
.with(new OverwriteObjectsJoin())
.collect();

Collections.sort(enabledResult, comparator);

env.getConfig().disableObjectReuse();

disabledChecksum = DataSetUtils.checksumHashCode(getDataSet(env)
disabledResult = getDataSet(env)
.fullOuterJoin(getFilteredDataSet(env), joinHint)
.where(0)
.equalTo(0)
.with(new OverwriteObjectsJoin()));
.with(new OverwriteObjectsJoin())
.collect();

Collections.sort(disabledResult, comparator);

Assert.assertEquals("JoinHint=" + joinHint, disabledChecksum, enabledChecksum);
Assert.assertThat("JoinHint=" + joinHint, disabledResult, is(enabledResult));
}
}
}
Expand Down Expand Up @@ -279,32 +314,37 @@ public void testCross(ExecutionEnvironment env) throws Exception {

env.getConfig().enableObjectReuse();

ChecksumHashCode enabledChecksumWithHuge = DataSetUtils.checksumHashCode(small
List<Tuple2<IntValue, IntValue>> enabledResultWithHuge = small
.crossWithHuge(large)
.with(new OverwriteObjectsCross()));
.with(new OverwriteObjectsCross())
.collect();

ChecksumHashCode enabledChecksumWithTiny = DataSetUtils.checksumHashCode(small
List<Tuple2<IntValue, IntValue>> enabledResultWithTiny = small
.crossWithTiny(large)
.with(new OverwriteObjectsCross()));
.with(new OverwriteObjectsCross())
.collect();

Assert.assertEquals(enabledChecksumWithHuge, enabledChecksumWithTiny);
Assert.assertThat(enabledResultWithHuge, is(enabledResultWithTiny));

// test NESTEDLOOP_BLOCKED_OUTER_FIRST and NESTEDLOOP_BLOCKED_OUTER_SECOND with object reuse disabled

env.getConfig().disableObjectReuse();

ChecksumHashCode disabledChecksumWithHuge = DataSetUtils.checksumHashCode(small
List<Tuple2<IntValue, IntValue>> disabledResultWithHuge = small
.crossWithHuge(large)
.with(new OverwriteObjectsCross()));
.with(new OverwriteObjectsCross())
.collect();

ChecksumHashCode disabledChecksumWithTiny = DataSetUtils.checksumHashCode(small
List<Tuple2<IntValue, IntValue>> disabledResultWithTiny = small
.crossWithTiny(large)
.with(new OverwriteObjectsCross()));
.with(new OverwriteObjectsCross())
.collect();

Assert.assertEquals(disabledChecksumWithHuge, disabledChecksumWithTiny);
Assert.assertThat(disabledResultWithHuge, is(disabledResultWithTiny));

// verify that checksums match between object reuse enabled and disabled
Assert.assertEquals(enabledChecksumWithHuge, disabledChecksumWithHuge);
// verify match between object reuse enabled and disabled
Assert.assertThat(disabledResultWithHuge, is(enabledResultWithHuge));
Assert.assertThat(disabledResultWithTiny, is(enabledResultWithTiny));
}

private class OverwriteObjectsCross implements CrossFunction<Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>> {
Expand Down Expand Up @@ -338,8 +378,7 @@ public boolean filter(Tuple2<IntValue, IntValue> value) throws Exception {
});
}

private static final class TupleIntValueIntValueIterator implements Iterator<Tuple2<IntValue, IntValue>>, Serializable {

private static class TupleIntValueIntValueIterator implements Iterator<Tuple2<IntValue, IntValue>>, Serializable {
private int numElements;
private final int keyRange;
private Tuple2<IntValue, IntValue> ret = new Tuple2<>(new IntValue(), new IntValue());
Expand Down Expand Up @@ -370,9 +409,23 @@ public void remove() {
}
}

private static class Tuple2Comparator<T0 extends Comparable<T0>, T1 extends Comparable<T1>>
implements Comparator<Tuple2<T0, T1>> {
@Override
public int compare(Tuple2<T0, T1> o1, Tuple2<T0, T1> o2) {
int cmp = o1.f0.compareTo(o2.f0);

if (cmp != 0) {
return cmp;
}

return o1.f1.compareTo(o2.f1);
}
}

// --------------------------------------------------------------------------------------------

private static final class Scrambler implements Serializable {
private static class Scrambler implements Serializable {
private Tuple2<IntValue, IntValue> d = new Tuple2<>(new IntValue(), new IntValue());

private final boolean keyed;
Expand Down

0 comments on commit 40a156e

Please sign in to comment.