Skip to content

Commit

Permalink
Merge pull request GoogleCloudPlatform#438 from tgroh/backport_960
Browse files Browse the repository at this point in the history
Add Flatten with Duplicate Inputs Test
  • Loading branch information
lukecwik authored Sep 21, 2016
2 parents 2710c19 + 8b87f47 commit 3297496
Showing 1 changed file with 21 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import com.google.cloud.dataflow.sdk.values.PCollectionList;
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.common.collect.ImmutableSet;

import com.google.common.collect.Iterables;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
Expand All @@ -49,7 +49,6 @@
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -115,6 +114,26 @@ public void testFlattenPCollectionListEmpty() {
p.run();
}

@Test
@Category(RunnableOnService.class)
public void testFlattenInputMultipleCopies() {
Pipeline p = TestPipeline.create();

PCollection<String> lines = p.apply("mkLines", Create.of(LINES));
PCollection<String> lines2 = p.apply("mkOtherLines", Create.of(LINES2));

PCollection<String> flattened = PCollectionList.of(lines)
.and(lines2)
.and(lines)
.and(lines)
.apply(Flatten.<String>pCollections());

DataflowAssert.that(flattened)
.containsInAnyOrder(Iterables.concat(LINES, LINES, LINES, LINES2));

p.run();
}

@Test
@Category(RunnableOnService.class)
public void testEmptyFlattenAsSideInput() {
Expand Down

0 comments on commit 3297496

Please sign in to comment.