Skip to content

Commit

Permalink
[FLINK-4298] [storm compatibility] Clean up unnecessary dependencies …
Browse files Browse the repository at this point in the history
…in 'flink-storm'
  • Loading branch information
StephanEwen committed Aug 1, 2016
1 parent 4456453 commit 0ea2596
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 76 deletions.
83 changes: 29 additions & 54 deletions flink-contrib/flink-storm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,40 @@ under the License.
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
<exclusion>
<artifactId>logback-classic</artifactId>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<groupId>ring</groupId>
<artifactId>ring-core</artifactId>
</exclusion>
<exclusion>
<groupId>ring</groupId>
<artifactId>ring-devel</artifactId>
</exclusion>
<exclusion>
<groupId>ring</groupId>
<artifactId>ring-servlet</artifactId>
</exclusion>
<exclusion>
<groupId>ring</groupId>
<artifactId>ring-jetty-adapter</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty</artifactId>
</exclusion>
<exclusion>
<groupId>org.mortbay.jetty</groupId>
<artifactId>jetty-util</artifactId>
</exclusion>
<exclusion>
<groupId>org.jgrapht</groupId>
<artifactId>jgrapht-core</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>

<!-- test dependencies -->

<dependency>
Expand All @@ -85,51 +107,4 @@ under the License.

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>

<pluginManagement>
<plugins>
<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<versionRange>[2.9,)</versionRange>
<goals>
<goal>unpack</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>

</build>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import akka.actor.ActorSystem;
import akka.pattern.Patterns;
import akka.util.Timeout;

import backtype.storm.Config;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
Expand All @@ -32,7 +33,6 @@
import backtype.storm.utils.Utils;

import com.esotericsoftware.kryo.Serializer;
import com.google.common.collect.Lists;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
Expand All @@ -52,6 +52,7 @@
import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
import org.apache.flink.storm.util.StormConfig;
import org.apache.flink.streaming.api.graph.StreamGraph;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -215,7 +216,7 @@ public void submitTopologyWithOpts(final String name, final String uploadedJarLo

try {
ClassLoader classLoader = JobWithJars.buildUserCodeClassLoader(
Lists.newArrayList(uploadedJarUrl),
Collections.<URL>singletonList(uploadedJarUrl),
Collections.<URL>emptyList(),
this.getClass().getClassLoader());
client.runDetached(jobGraph, classLoader);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.storm.wrappers;

import backtype.storm.generated.GlobalStreamId;
Expand All @@ -26,8 +27,6 @@
import backtype.storm.tuple.MessageId;
import backtype.storm.utils.Utils;

import com.google.common.collect.Sets;

import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple0;
Expand All @@ -44,6 +43,8 @@
import java.util.HashMap;
import java.util.Map;

import static java.util.Arrays.asList;

/**
* A {@link BoltWrapper} wraps an {@link IRichBolt} in order to execute the Storm bolt within a Flink Streaming program.
* It takes the Flink input tuples of type {@code IN} and transforms them into {@link StormTuple}s that the bolt can
Expand Down Expand Up @@ -135,9 +136,9 @@ public BoltWrapper(final IRichBolt bolt, final Fields inputSchema)
* {@code rawOuput} is {@code false} and the number of declared output attributes is not within range
* [1;25].
*/
public BoltWrapper(final IRichBolt bolt, final String[] rawOutputs)
public BoltWrapper(final IRichBolt bolt, final String[] rawOutputs)
throws IllegalArgumentException {
this(bolt, null, Sets.newHashSet(rawOutputs));
this(bolt, null, asList(rawOutputs));
}

/**
Expand All @@ -157,8 +158,7 @@ public BoltWrapper(final IRichBolt bolt, final String[] rawOutputs)
* {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
* [1;25].
*/
public BoltWrapper(final IRichBolt bolt, final Collection<String> rawOutputs)
throws IllegalArgumentException {
public BoltWrapper(final IRichBolt bolt, final Collection<String> rawOutputs) throws IllegalArgumentException {
this(bolt, null, rawOutputs);
}

Expand All @@ -181,9 +181,12 @@ public BoltWrapper(final IRichBolt bolt, final Collection<String> rawOutputs)
* {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
* [0;25].
*/
public BoltWrapper(final IRichBolt bolt, final Fields inputSchema, final String[] rawOutputs)
public BoltWrapper(
final IRichBolt bolt,
final Fields inputSchema,
final String[] rawOutputs)
throws IllegalArgumentException {
this(bolt, inputSchema, Sets.newHashSet(rawOutputs));
this(bolt, inputSchema, asList(rawOutputs));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,8 @@ public ReducedMetric registerMetric(final String name, final IReducer combiner,
* @throws UnsupportedOperationException
* at every invocation
*/
@SuppressWarnings("unchecked")
@Override
public IMetric registerMetric(final String name, final IMetric metric, final int timeBucketSizeInSecs) {
public <T extends IMetric> T registerMetric(final String name, final T metric, final int timeBucketSizeInSecs) {
throw new UnsupportedOperationException("Metrics are not supported by Flink");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.storm.wrappers;

import backtype.storm.topology.IRichBolt;
import com.google.common.collect.Sets;

import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple1;
Expand All @@ -26,6 +26,8 @@

import java.util.Collection;

import static java.util.Arrays.asList;

/**
* A {@link MergedInputsBoltWrapper} is a {@link BoltWrapper} that expects input tuples of type {@link StormTuple}. It
* can be used to wrap a multi-input bolt and assumes that all input stream got merged into a {@link StormTuple} stream
Expand Down Expand Up @@ -67,7 +69,7 @@ public MergedInputsBoltWrapper(final IRichBolt bolt) throws IllegalArgumentExcep
*/
public MergedInputsBoltWrapper(final IRichBolt bolt, final String[] rawOutputs)
throws IllegalArgumentException {
super(bolt, Sets.newHashSet(rawOutputs));
super(bolt, asList(rawOutputs));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;

import com.google.common.collect.Sets;

import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
import org.apache.flink.api.common.functions.StoppableFunction;
import org.apache.flink.api.java.tuple.Tuple0;
Expand All @@ -37,6 +35,8 @@
import java.util.Collection;
import java.util.HashMap;

import static java.util.Arrays.asList;

/**
* A {@link SpoutWrapper} wraps an {@link IRichSpout} in order to execute it within a Flink Streaming program. It
* takes the spout's output tuples and transforms them into Flink tuples of type {@code OUT} (see
Expand Down Expand Up @@ -121,7 +121,7 @@ public SpoutWrapper(final IRichSpout spout, final Integer numberOfInvocations)
*/
public SpoutWrapper(final IRichSpout spout, final String[] rawOutputs)
throws IllegalArgumentException {
this(spout, Sets.newHashSet(rawOutputs), null);
this(spout, asList(rawOutputs), null);
}

/**
Expand All @@ -147,7 +147,7 @@ public SpoutWrapper(final IRichSpout spout, final String[] rawOutputs)
*/
public SpoutWrapper(final IRichSpout spout, final String[] rawOutputs,
final Integer numberOfInvocations) throws IllegalArgumentException {
this(spout, Sets.newHashSet(rawOutputs), numberOfInvocations);
this(spout, asList(rawOutputs), numberOfInvocations);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.storm.wrappers;

import backtype.storm.Config;
Expand All @@ -27,16 +28,18 @@
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import com.google.common.collect.Sets;

import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.storm.util.AbstractTest;
import org.apache.flink.storm.util.TestDummyBolt;
import org.apache.flink.storm.util.TestDummySpout;
import org.apache.flink.storm.util.TestSink;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;

import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
Expand All @@ -48,6 +51,8 @@
import java.util.List;
import java.util.Set;

import static java.util.Collections.singleton;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -85,7 +90,7 @@ public void testRawType() throws Exception {
PowerMockito.whenNew(SetupOutputFieldsDeclarer.class).withNoArguments().thenReturn(declarer);

WrapperSetupHelper.getNumberOfAttributes(boltOrSpout,
Sets.newHashSet(new String[] { Utils.DEFAULT_STREAM_ID }));
new HashSet<String>(singleton(Utils.DEFAULT_STREAM_ID)));
}

@Test(expected = IllegalArgumentException.class)
Expand Down Expand Up @@ -143,8 +148,7 @@ private void testTupleTypes(final int numberOfAttributes) throws Exception {

Assert.assertEquals(attributes, WrapperSetupHelper.getNumberOfAttributes(
boltOrSpout,
numberOfAttributes == -1 ? Sets
.newHashSet(new String[] { Utils.DEFAULT_STREAM_ID }) : null));
numberOfAttributes == -1 ? new HashSet<String>(singleton(Utils.DEFAULT_STREAM_ID)) : null));
}

@Test
Expand Down

0 comments on commit 0ea2596

Please sign in to comment.