Skip to content

Commit

Permalink
Minor cleanup of CoGbkResultCoder.equals()
Browse files Browse the repository at this point in the history
----Release Notes----

[]
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=102723893
  • Loading branch information
kennknowles authored and davorbonaci committed Sep 10, 2015
1 parent b72b7b2 commit 046af6b
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.CoderException;
import com.google.cloud.dataflow.sdk.coders.IterableCoder;
import com.google.cloud.dataflow.sdk.coders.MapCoder;
import com.google.cloud.dataflow.sdk.coders.StandardCoder;
import com.google.cloud.dataflow.sdk.util.CloudObject;
import com.google.cloud.dataflow.sdk.util.PropertyNames;
Expand Down Expand Up @@ -217,7 +216,6 @@ public static class CoGbkResultCoder extends StandardCoder<CoGbkResult> {

private final CoGbkResultSchema schema;
private final UnionCoder unionCoder;
private MapCoder<Integer, List<RawUnionValue>> mapCoder;

/**
* Returns a CoGbkResultCoder for the given schema and unionCoder.
Expand Down Expand Up @@ -296,11 +294,15 @@ private IterableCoder tagListCoder(int unionTag) {
}

@Override
public boolean equals(Object other) {
if (!super.equals(other)) {
public boolean equals(Object object) {
if (this == object) {
return true;
}
if (!(object instanceof CoGbkResultCoder)) {
return false;
}
return schema.equals(((CoGbkResultCoder) other).schema);
CoGbkResultCoder other = (CoGbkResultCoder) object;
return schema.equals(other.schema) && unionCoder.equals(other.unionCoder);
}

@Override
Expand All @@ -311,7 +313,7 @@ public int hashCode() {
@Override
public void verifyDeterministic() throws NonDeterministicException {
verifyDeterministic(
"CoGbkResult requires the mapCoder to be deterministic", mapCoder);
"CoGbkResult requires the union coder to be deterministic", unionCoder);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,70 @@

package com.google.cloud.dataflow.sdk.transforms.join;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;

import com.google.cloud.dataflow.sdk.coders.BigEndianIntegerCoder;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.DoubleCoder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.coders.VarIntCoder;
import com.google.cloud.dataflow.sdk.testing.CoderProperties;
import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResult.CoGbkResultCoder;
import com.google.cloud.dataflow.sdk.util.CloudObject;
import com.google.cloud.dataflow.sdk.util.Serializer;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.cloud.dataflow.sdk.values.TupleTagList;
import com.google.common.collect.ImmutableList;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.util.Arrays;

/**
* Tests the CoGbkResult.CoGbkResultCoder.
*/
@RunWith(JUnit4.class)
public class CoGbkResultCoderTest {

@Test
public void testSerializationDeserialization() {
CoGbkResultSchema schema =
private static final CoGbkResultSchema TEST_SCHEMA =
new CoGbkResultSchema(TupleTagList.of(new TupleTag<String>()).and(
new TupleTag<Integer>()));

private static final UnionCoder TEST_UNION_CODER =
UnionCoder.of(ImmutableList.<Coder<?>>of(
StringUtf8Coder.of(),
VarIntCoder.of()));

private static final UnionCoder COMPATIBLE_UNION_CODER =
UnionCoder.of(ImmutableList.<Coder<?>>of(
StringUtf8Coder.of(),
BigEndianIntegerCoder.of()));

private static final CoGbkResultSchema INCOMPATIBLE_SCHEMA =
new CoGbkResultSchema(TupleTagList.of(new TupleTag<String>()).and(
new TupleTag<Double>()));
UnionCoder unionCoder =
UnionCoder.of(Arrays.<Coder<?>>asList(StringUtf8Coder.of(),
DoubleCoder.of()));
CoGbkResultCoder newCoder = CoGbkResultCoder.of(schema, unionCoder);
CloudObject encoding = newCoder.asCloudObject();
Coder<?> decodedCoder = Serializer.deserialize(encoding, Coder.class);
assertEquals(newCoder, decodedCoder);

private static final UnionCoder INCOMPATIBLE_UNION_CODER =
UnionCoder.of(ImmutableList.<Coder<?>>of(
StringUtf8Coder.of(),
DoubleCoder.of()));

private static final CoGbkResultCoder TEST_CODER =
CoGbkResultCoder.of(TEST_SCHEMA, TEST_UNION_CODER);

private static final CoGbkResultCoder COMPATIBLE_TEST_CODER =
CoGbkResultCoder.of(TEST_SCHEMA, COMPATIBLE_UNION_CODER);

private static final CoGbkResultCoder INCOMPATIBLE_TEST_CODER =
CoGbkResultCoder.of(INCOMPATIBLE_SCHEMA, INCOMPATIBLE_UNION_CODER);

@Test
public void testEquals() {
assertFalse(TEST_CODER.equals(new Object()));
assertFalse(TEST_CODER.equals(COMPATIBLE_TEST_CODER));
assertFalse(TEST_CODER.equals(INCOMPATIBLE_TEST_CODER));
}

@Test
public void testSerializationDeserialization() {
CoderProperties.coderSerializable(TEST_CODER);
}
}

0 comments on commit 046af6b

Please sign in to comment.