Skip to content

Commit

Permalink
[FLINK-2553] Example Jars not build correctly
Browse files Browse the repository at this point in the history
  - reworked package structure of utils and wordcount.stormoperators package
  - reword class hierarchy of *FileSpout and *InMemorySpout
  - fixed pom.xml to assembly jars correctly
  - simplified example jar file names
  - replace maven-assembly-plugin with maven-shade-plugin (removed assembly.xml file)
  - extended README and documenation to building and using correct jars

Additional minor changes:
 - comment typo in FlinkSubmitter
 - removed version number in hardcoded jar file name

This closes apache#1037
  • Loading branch information
mjsax authored and StephanEwen committed Aug 27, 2015
1 parent 541a06c commit 824785e
Show file tree
Hide file tree
Showing 17 changed files with 156 additions and 172 deletions.
7 changes: 7 additions & 0 deletions docs/apis/storm_compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,11 @@ DataStream<String> rawInput = env.addSource(

You can find more examples in Maven module `flink-storm-compatibilty-examples`.
For the different versions of WordCount, see [README.md](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/README.md).
To run the examples, you need to assemble a correct jar file.
`flink-storm-compatibility-examples-0.10-SNAPSHOT.jar` is **no** valid jar file for job execution (it is only a standard maven artifact).

There are example jars for embedded Spout and Bolt, namely `WordCount-SpoutSource.jar` and `WordCount-BoltTokenizer.jar`, respectively.
Compare `pom.xml` to see how both jars are built.
Furthermore, there is one example for whole Storm topologies (`WordCount-StormTopology.jar`).

You can run each of those examples via `bin/flink run <jarname>.jar`. The correct entry point class is contained in each jar's manifest file.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.stormcompatibility.api;

import backtype.storm.Config;
Expand Down Expand Up @@ -60,7 +59,7 @@ public class FlinkSubmitter {
*/
public static void submitTopology(final String name, final Map<?, ?> stormConf, final FlinkTopology topology,
final SubmitOptions opts)
throws AlreadyAliveException, InvalidTopologyException {
throws AlreadyAliveException, InvalidTopologyException {
submitTopology(name, stormConf, topology);
}

Expand Down Expand Up @@ -109,7 +108,7 @@ public static void submitTopology(final String name, final Map stormConf, final
try {
for (final File file : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment())
.getJars()) {
// TODO verify that there is onnly one jar
// TODO verify that there is only one jar
localJar = file.getAbsolutePath();
}
} catch (final ClassCastException e) {
Expand Down Expand Up @@ -147,7 +146,7 @@ public static void submitTopology(final String name, final Map stormConf, final
*/
public static void submitTopologyWithProgressBar(final String name, final Map<?, ?> stormConf,
final FlinkTopology topology)
throws AlreadyAliveException, InvalidTopologyException {
throws AlreadyAliveException, InvalidTopologyException {
submitTopology(name, stormConf, topology);
}

Expand Down Expand Up @@ -180,7 +179,7 @@ public static String submitJar(final String localJar) {
if (localJar == null) {
throw new RuntimeException(
"Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar " +
"to upload");
"to upload");
}

return localJar;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void testDeclare() {
.intValue());

final String sid = "streamId";
numberOfAttributes = 0 + this.r.nextInt(26);
numberOfAttributes = this.r.nextInt(26);
declarer.declareStream(sid, createSchema(numberOfAttributes));
Assert.assertEquals(2, declarer.outputSchemas.size());
Assert.assertEquals(numberOfAttributes, declarer.outputSchemas.get(sid).intValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ This module contains multiple versions of a simple word-count-example to illustr
* `StormWordCountRemoteBySubmitter` submits the topology to a remote Flink cluster (simliar to the usage of `StormSubmitter` in Storm)

Additionally, this module package the three examples word-count programs as jar files to be submitted to a Flink cluster via `bin/flink run example.jar`.
(Valid jars are `WordCount-SpoutSource.jar`, `WordCount-BoltTokenizer.jar`, and `WordCount-StormTopology.jar`)

The package `org.apache.flink.stormcompatiblitly.stormoperators` contain original Storm spouts and bolts that can be used unmodified within Storm or Flink.
The package `org.apache.flink.stormcompatiblitly.wordcount.stormoperators` contain original Storm spouts and bolts that can be used unmodified within Storm or Flink.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ under the License.
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<!-- need to exclude to be able to run
* StormWordCountRemoteByClient and
* StormWordCountRemoteBySubmitter
within Eclipse -->
<excludes>defaults.yaml</excludes>
</artifactItem>
<artifactItem>
Expand All @@ -112,7 +116,7 @@ under the License.
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
</artifactItem>snakeyaml
</artifactItem>
<artifactItem>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
Expand Down Expand Up @@ -143,7 +147,8 @@ under the License.
<goal>jar</goal>
</goals>
<configuration>
<classifier>WordCountSpoutSource</classifier>
<finalName>WordCount</finalName>
<classifier>SpoutSource</classifier>

<archive>
<manifestEntries>
Expand All @@ -168,9 +173,11 @@ under the License.
<!-- Word Count -->
<include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount.class</include>
<include>org/apache/flink/stormcompatibility/wordcount/SpoutSourceWordCount$*.class</include>
<include>org/apache/flink/stormcompatibility/wordcount/stormoperators/AbstractStormSpout.class</include>
<include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormFileSpout.class</include>
<include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormInMemorySpout.class</include>
<include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountFileSpout.class</include>
<include>org/apache/flink/stormcompatibility/wordcount/stormoperators/StormWordCountInMemorySpout.class</include>
<include>org/apache/flink/stormcompatibility/util/AbstractStormSpout.class</include>
<include>org/apache/flink/stormcompatibility/util/StormFileSpout.class</include>
<include>org/apache/flink/stormcompatibility/util/StormInMemorySpout.class</include>
<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
</includes>
</configuration>
Expand All @@ -185,7 +192,8 @@ under the License.
<goal>jar</goal>
</goals>
<configuration>
<classifier>WordCountBoltTokenizer</classifier>
<finalName>WordCount</finalName>
<classifier>BoltTokenizer</classifier>

<archive>
<manifestEntries>
Expand Down Expand Up @@ -215,6 +223,25 @@ under the License.
</configuration>
</execution>

<!-- WordCount Storm topology-->
<!-- Example for whole topologies (ie, if FlinkTopologyBuilder is used) -->
<!-- We cannot use maven-jar-plugin because 'defaults.yaml' must be included in jar.
However, we excluded 'defaults.yaml' in dependency-plugin to get clean Eclipse environment.
Thus, 'defaults.yaml' is not available for maven-jar-plugin.
Nevertheless, we register an empty jar with corresponding name, such that the final jar can be installed to local maven repository.
We use maven-shade-plugin to build the actual jar (which will replace the empty jar). -->
<execution>
<id>WordCount-StormTopology</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<finalName>WordCount</finalName>
<classifier>StormTopology</classifier>
</configuration>
</execution>

<execution>
<goals>
<goal>test-jar</goal>
Expand All @@ -224,28 +251,77 @@ under the License.
</plugin>

<!-- WordCount Storm topology-->
<!-- example for whole topologies (ie, if FlinkTopologyBuilder is used) -->
<!-- Cannot use maven-jar-plugin because 'defaults.yaml' must be included in jar -->
<!-- Build StormTopolgy jar to overwrite empty jar created with maven-jar-plugin. -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/assembly/word-count-storm.xml</descriptor>
</descriptors>
<archive>
<manifestEntries>
<program-class>org.apache.flink.stormcompatibility.wordcount.StormWordCountRemoteBySubmitter</program-class>
</manifestEntries>
</archive>
</configuration>

<artifactId>maven-shade-plugin</artifactId>
<groupId>org.apache.maven.plugins</groupId>
<version>2.4.1</version>
<executions>
<execution>
<id>WordCountStorm</id>
<id>WordCount-StormTopology</id>
<phase>package</phase>
<goals>
<goal>single</goal>
<goal>shade</goal>
</goals>
<configuration>
<finalName>WordCount-StormTopology</finalName>

<artifactSet>
<includes>
<include>org.apache.storm:storm-core</include>
<!-- Storm's recursive dependencies -->
<include>org.yaml:snakeyaml</include>
<include>com.googlecode.json-simple:json-simple</include>
<include>org.apache.flink:flink-storm-compatibility-core</include>
<include>org.apache.flink:flink-storm-compatibility-examples</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>org.apache.storm:storm-core</artifact>
<includes>
<include>defaults.yaml</include>
<include>backtype/storm/*.class</include>
<include>backtype/storm/topology/*.class</include>
<include>backtype/storm/spout/*.class</include>
<include>backtype/storm/task/*.class</include>
<include>backtype/storm/tuple/*.class</include>
<include>backtype/storm/generated/*.class</include>
<include>backtype/storm/metric/**/*.class</include>
<include>backtype/storm/utils/*.class</include>
<include>backtype/storm/serialization/*.class</include>
<include>org/apache/storm/curator/**/*.class</include>
<include>org/apache/thrift7/**/*.class</include>
<!-- Storm's recursive dependencies -->
<include>org/json/simple/**/*.class</include>
<include>org/yaml/snakeyaml/**/*.class</include>
</includes>
</filter>
<filter>
<artifact>org.apache.flink:flink-storm-compatibility-examples</artifact>
<includes>
<include>org/apache/flink/stormcompatibility/wordcount/StormWordCountRemoteBySubmitter.class</include>
<include>org/apache/flink/stormcompatibility/wordcount/WordCountTopology.class</include>
<include>org/apache/flink/stormcompatibility/wordcount/stormoperators/*.class</include>
<include>org/apache/flink/stormcompatibility/util/*.class</include>
<include>org/apache/flink/examples/java/wordcount/util/WordCountData.class</include>
</includes>
</filter>
<filter>
<artifact>org.apache.flink:flink-storm-compatibility-core</artifact>
<includes>
<include>org/apache/flink/stormcompatibility/api/*.class</include>
<include>org/apache/flink/stormcompatibility/wrappers/*.class</include>
</includes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.flink.stormcompatibility.wordcount.StormWordCountRemoteBySubmitter</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.flink.stormcompatibility.util.AbstractStormSpout;

public class GenderSpout extends AbstractStormSpout {
private static final long serialVersionUID = -5079110197950743927L;

private int counter = 9;
private Fields outFields;

Expand Down
Loading

0 comments on commit 824785e

Please sign in to comment.