Skip to content

Commit

Permalink
Demonstrate new lambda-friendly transforms in MinimalWordCount
Browse files Browse the repository at this point in the history
Java 8 lambda enables some more concise expression of word count via
Filter, MapElements, and FlatMapElements transforms. This change
adds a Java 8 profile to our examples module and includes in it a
concise version of the MinimalWordCount example.

The Java 7 MinimalWordCount example is also updated to reflect the
use of these transforms without a lambda.

----Release Notes----

[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=103782162
  • Loading branch information
kennknowles authored and davorbonaci committed Oct 1, 2015
1 parent 3315791 commit 7d18d58
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 4 deletions.
123 changes: 123 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,129 @@
<testParallelValue>both</testParallelValue>
</properties>
</profile>

<profile>
<id>java8</id>
<activation>
<jdk>[1.8,)</jdk>
</activation>

<build>
<plugins>
<!-- Tells Maven about the Java 8 main and test source root. -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-java8-main-source</id>
<phase>initialize</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${project.basedir}/src/main/java8</source>
</sources>
</configuration>
</execution>

<execution>
<id>add-java8-test-source</id>
<phase>initialize</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>${project.basedir}/src/test/java8</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<executions>

<!-- Set `-source 1.7 -target 1.7` for Java 7 tests -->
<execution>
<id>default-testCompile</id>
<phase>test-compile</phase>
<goals>
<goal>testCompile</goal>
</goals>
<configuration>
<testSource>1.7</testSource>
<testTarget>1.7</testTarget>
<testExcludes>
<!-- This pattern is brittle; we would prefer to filter on the directory
but that seems to be unavailable to us. -->
<exclude>**/*Java8Test.java</exclude>
</testExcludes>
</configuration>
</execution>

<!-- Set `-source 1.8 -target 1.8` for Java 8 tests -->
<execution>
<id>java8-test-compile</id>
<phase>test-compile</phase>
<goals>
<goal>testCompile</goal>
</goals>
<configuration>
<testSource>1.8</testSource>
<testTarget>1.8</testTarget>
<includes>
<!-- This pattern is brittle; we would prefer to filter on the directory
but that seems to be unavailable to us. -->
<include>**/*Java8Test.java</include>
</includes>
</configuration>
</execution>

<!-- Set `-source 1.7 -target 1.7` for Java 7 examples -->
<execution>
<id>default-compile</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<source>1.7</source>
<target>1.7</target>
<excludes>
<!-- This pattern is brittle; we would prefer to filter on the directory
but that seems to be unavailable to us. -->
<exclude>**/*Java8*.java</exclude>
</excludes>
</configuration>
</execution>

<!-- Set `-source 1.8 -target 1.8` for Java 8 examples -->
<execution>
<id>java8-compile</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<source>1.8</source>
<target>1.8</target>
<includes>
<!-- This pattern is brittle; we would prefer to filter on the directory
but that seems to be unavailable to us. -->
<include>**/*Java8*.java</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.Count;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.MapElements;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.SimpleFunction;
import com.google.cloud.dataflow.sdk.values.KV;


Expand Down Expand Up @@ -95,12 +97,12 @@ public void processElement(ProcessContext c) {
// transform returns a new PCollection of key/value pairs, where each key represents a unique
// word in the text. The associated value is the occurrence count for that word.
.apply(Count.<String>perElement())
// Apply another ParDo transform that formats our PCollection of word counts into a printable
// Apply a MapElements transform that formats our PCollection of word counts into a printable
// string, suitable for writing to an output file.
.apply(ParDo.named("FormatResults").of(new DoFn<KV<String, Long>, String>() {
.apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.element().getKey() + ": " + c.element().getValue());
public String apply(KV<String, Long> input) {
return input.getKey() + ": " + input.getValue();
}
}))
// Concept #4: Apply a write transform, TextIO.Write, at the end of the pipeline.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.dataflow.examples;

import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.Count;
import com.google.cloud.dataflow.sdk.transforms.Filter;
import com.google.cloud.dataflow.sdk.transforms.FlatMapElements;
import com.google.cloud.dataflow.sdk.transforms.MapElements;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.TypeDescriptor;

import java.util.Arrays;

/**
* An example that counts words in Shakespeare, using Java 8 language features.
*
* <p>See {@link MinimalWordCount} for a comprehensive explanation.
*/
public class MinimalWordCountJava8 {

public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);

options.setRunner(BlockingDataflowPipelineRunner.class);

// CHANGE 1 of 3: Your project ID is required in order to run your pipeline on the Google Cloud.
options.setProject("SET_YOUR_PROJECT_ID_HERE");

// CHANGE 2 of 3: Your Google Cloud Storage path is required for staging local files.
options.setStagingLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_STAGING_DIRECTORY");

Pipeline p = Pipeline.create(options);

p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
.apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
.withOutputType(new TypeDescriptor<String>() {}))
.apply(Filter.byPredicate((String word) -> !word.isEmpty()))
.apply(Count.<String>perElement())
.apply(MapElements
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
.withOutputType(new TypeDescriptor<String>() {}))

// CHANGE 3 of 3: The Google Cloud Storage path is required for outputting the results to.
.apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));

p.run();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package com.google.cloud.dataflow.examples;

import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
import com.google.cloud.dataflow.sdk.transforms.Count;
import com.google.cloud.dataflow.sdk.transforms.Filter;
import com.google.cloud.dataflow.sdk.transforms.FlatMapElements;
import com.google.cloud.dataflow.sdk.transforms.MapElements;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.TypeDescriptor;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

import java.io.Serializable;
import java.util.Arrays;

/**
* To keep {@link MinimalWordCountJava8} simple, it is not factored or testable. This test
* file should be maintained with a copy of its code for a basic smoke test.
*/
@RunWith(JUnit4.class)
public class MinimalWordCountJava8Test implements Serializable {

/**
* A basic smoke test that ensures there is no crash at pipeline construction time.
*/
@Test
public void testMinimalWordCountJava8() {
Pipeline p = TestPipeline.create();

p.apply(TextIO.Read.from("gs://dataflow-samples/shakespeare/*"))
.apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))
.withOutputType(new TypeDescriptor<String>() {}))
.apply(Filter.byPredicate((String word) -> !word.isEmpty()))
.apply(Count.<String>perElement())
.apply(MapElements
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())
.withOutputType(new TypeDescriptor<String>() {}))
.apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
}
}

0 comments on commit 7d18d58

Please sign in to comment.