Skip to content

Commit

Permalink
[contrib] Added log properties files to contrib & minor clean ups
Browse files Browse the repository at this point in the history
  • Loading branch information
mbalassi committed Jun 15, 2015
1 parent 12b13f9 commit 3fe6d19
Show file tree
Hide file tree
Showing 30 changed files with 378 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@
*/
public class FlinkClient {

//The jobmanager's host name
/** The jobmanager's host name */
private final String jobManagerHost;
//The jobmanager's rpc port
/** The jobmanager's rpc port */
private final int jobManagerPort;
//The user specified timeout in milliseconds
/** The user specified timeout in milliseconds */
private final String timeout;

// The following methods are derived from "backtype.storm.utils.NimbusClient"
Expand All @@ -77,8 +77,6 @@ public class FlinkClient {
* Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for {@link
* Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
*
* @param conf
* A configuration.
* @param host
* The jobmanager's host name.
* @param port
Expand All @@ -92,8 +90,6 @@ public FlinkClient(final String host, final int port) {
* Instantiates a new {@link FlinkClient} for the given configuration, host name, and port. If values for {@link
* Config#NIMBUS_HOST} and {@link Config#NIMBUS_THRIFT_PORT} of the given configuration are ignored.
*
* @param conf
* A configuration.
* @param host
* The jobmanager's host name.
* @param port
Expand Down Expand Up @@ -139,8 +135,6 @@ public FlinkClient getClient() {

public void close() {/* nothing to do */}

// The following methods are derived from "backtype.storm.generated.Nimubs.Client"

/**
* Parameter {@code uploadedJarLocation} is actually used to point to the local jar, because Flink does not support
* uploading a jar file before hand. Jar files are always uploaded directly when a program is submitted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
*/
final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {

// the declared output schema
/** the declared output schema */
private Fields outputSchema;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,6 @@ public static void submitTopology(final String name, final Map<?, ?> stormConf,
* the topology-specific configuration. See {@link Config}.
* @param topology
* the processing to execute.
* @param opts
* to manipulate the starting of the topology
* @param progressListener
* to track the progress of the jar upload process
* @throws AlreadyAliveException
* if a topology with this name is already running
* @throws InvalidTopologyException
Expand Down Expand Up @@ -145,8 +141,6 @@ public static void submitTopology(final String name, final Map stormConf, final
* the topology-specific configuration. See {@link Config}.
* @param topology
* the processing to execute.
* @param opts
* to manipulate the starting of the topology
* @throws AlreadyAliveException
* if a topology with this name is already running
* @throws InvalidTopologyException
Expand Down Expand Up @@ -179,12 +173,8 @@ public static String submitJar(final Map conf, final String localJar) {
* returned value is parameter localJar, because this give the best integration of Storm behavior within a Flink
* environment.
*
* @param conf
* the topology-specific configuration. See {@link Config}.
* @param localJar
* file path of the jar file to submit
* @param listener
* progress listener to track the jar file upload
* @return the value of parameter localJar
*/
public static String submitJar(final String localJar) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
*/
class FlinkTopology extends StreamExecutionEnvironment {

// The corresponding {@link StormTopology} that is mimicked by this {@link FlinkTopology}
/** The corresponding {@link StormTopology} that is mimicked by this {@link FlinkTopology} */
private final StormTopology stormTopology;
// The number of declared tasks for the whole program (ie, sum over all dops)
/** The number of declared tasks for the whole program (ie, sum over all dops) */
private int numberOfTasks = 0;

public FlinkTopology(final StormTopology stormTopology) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@
*/
public class FlinkTopologyBuilder {

// A Storm {@link TopologyBuilder} to build a real Storm topology
/** A Storm {@link TopologyBuilder} to build a real Storm topology */
private final TopologyBuilder stormBuilder = new TopologyBuilder();
// All user spouts by their ID
/** All user spouts by their ID */
private final HashMap<String, IRichSpout> spouts = new HashMap<String, IRichSpout>();
// All user bolts by their ID
/** All user bolts by their ID */
private final HashMap<String, IRichBolt> bolts = new HashMap<String, IRichBolt>();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> {
private static final long serialVersionUID = 4993283609095408765L;

// Number of attributes of the bolt's output tuples.
/**
* Number of attributes of the bolt's output tuples.
*/
private final int numberOfAttributes;
/**
* The wrapped Storm {@link IRichSpout spout}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
*/
class StormBoltCollector<OUT> extends AbstractStormCollector<OUT> implements IOutputCollector {

// The Flink output object
/** The Flink output object */
private final Output<OUT> flinkOutput;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@
public class StormBoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = -4788589118464155835L;

// The wrapped Storm {@link IRichBolt bolt}
/** The wrapped Storm {@link IRichBolt bolt} */
private final IRichBolt bolt;
// Number of attributes of the bolt's output tuples
/** Number of attributes of the bolt's output tuples */
private final int numberOfAttributes;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
public class StormFiniteSpoutWrapper<OUT> extends AbstractStormSpoutWrapper<OUT> {
private static final long serialVersionUID = 3883246587044801286L;

// The number of {@link IRichSpout#nextTuple()} calls
/** The number of {@link IRichSpout#nextTuple()} calls */
private int numberOfInvocations;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
*/
class StormOutputFieldsDeclarer implements OutputFieldsDeclarer {

// The output schema declared by the wrapped bolt.
/** The output schema declared by the wrapped bolt. */
private Fields outputSchema = null;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ public StormSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException

/**
* Instantiates a new {@link StormSpoutWrapper} that wraps the given Storm {@link IRichSpout spout} such that it
* can
* be used within a Flink streaming program. The output type can be any type if parameter {@code rawOutput} is
* can be used within a Flink streaming program. The output type can be any type if parameter {@code rawOutput} is
* {@code true} and the spout's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the
* output type will be one of {@link Tuple1} to {@link Tuple25} depending on the spout's declared number of
* attributes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
*/
class StormTuple<IN> implements backtype.storm.tuple.Tuple {

// The storm representation of the original Flink tuple
/** The storm representation of the original Flink tuple */
private final Values stormTuple;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class StormWrapperSetupHelper {
* {@link StormBoltWrapper}. Returns zero for raw output type or a value within range [1;25] for
* output type {@link org.apache.flink.api.java.tuple.Tuple1 Tuple1} to
* {@link org.apache.flink.api.java.tuple.Tuple25 Tuple25} . In case of a data sink, {@code -1}
* is returned. .
* is returned.
*
* @param spoutOrBolt
* The Storm {@link IRichSpout spout} or {@link IRichBolt bolt} to be used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@

public class FlinkOutputFieldsDeclarerTest extends AbstractTest {



@Test
public void testDeclare() {
for (int i = 0; i < 4; ++i) {
Expand Down Expand Up @@ -58,7 +60,7 @@ public void testDeclareFullToManyAttributes() {
}

private void runDeclareTest(final int testCase, final int numberOfAttributes) {
final FlinkOutputFieldsDeclarer declarere = new FlinkOutputFieldsDeclarer();
final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();

final String[] attributes = new String[numberOfAttributes];
for (int i = 0; i < numberOfAttributes; ++i) {
Expand All @@ -67,19 +69,19 @@ private void runDeclareTest(final int testCase, final int numberOfAttributes) {

switch (testCase) {
case 0:
this.declareSimple(declarere, attributes);
this.declareSimple(declarer, attributes);
break;
case 1:
this.declareNonDirect(declarere, attributes);
this.declareNonDirect(declarer, attributes);
break;
case 2:
this.declareDefaultStream(declarere, attributes);
this.declareDefaultStream(declarer, attributes);
break;
default:
this.declareFull(declarere, attributes);
this.declareFull(declarer, attributes);
}

final TypeInformation<?> type = declarere.getOutputType();
final TypeInformation<?> type = declarer.getOutputType();

if (numberOfAttributes == 0) {
Assert.assertNull(type);
Expand All @@ -93,20 +95,20 @@ private void runDeclareTest(final int testCase, final int numberOfAttributes) {
}
}

private void declareSimple(final FlinkOutputFieldsDeclarer declarere, final String[] attributes) {
declarere.declare(new Fields(attributes));
private void declareSimple(final FlinkOutputFieldsDeclarer declarer, final String[] attributes) {
declarer.declare(new Fields(attributes));
}

private void declareNonDirect(final FlinkOutputFieldsDeclarer declarere, final String[] attributes) {
declarere.declare(false, new Fields(attributes));
private void declareNonDirect(final FlinkOutputFieldsDeclarer declarer, final String[] attributes) {
declarer.declare(false, new Fields(attributes));
}

private void declareDefaultStream(final FlinkOutputFieldsDeclarer declarere, final String[] attributes) {
declarere.declareStream(Utils.DEFAULT_STREAM_ID, new Fields(attributes));
private void declareDefaultStream(final FlinkOutputFieldsDeclarer declarer, final String[] attributes) {
declarer.declareStream(Utils.DEFAULT_STREAM_ID, new Fields(attributes));
}

private void declareFull(final FlinkOutputFieldsDeclarer declarere, final String[] attributes) {
declarere.declareStream(Utils.DEFAULT_STREAM_ID, false, new Fields(attributes));
private void declareFull(final FlinkOutputFieldsDeclarer declarer, final String[] attributes) {
declarer.declareStream(Utils.DEFAULT_STREAM_ID, false, new Fields(attributes));
}

@Test(expected = UnsupportedOperationException.class)
Expand Down Expand Up @@ -137,8 +139,8 @@ public void testGetGroupingFieldIndexes() {
attributes[i] = "a" + i;
}

final FlinkOutputFieldsDeclarer declarere = new FlinkOutputFieldsDeclarer();
declarere.declare(new Fields(attributes));
final FlinkOutputFieldsDeclarer declarer = new FlinkOutputFieldsDeclarer();
declarer.declare(new Fields(attributes));

final int numberOfKeys = 1 + this.r.nextInt(25);
final LinkedList<String> groupingFields = new LinkedList<String>();
Expand All @@ -161,7 +163,7 @@ public void testGetGroupingFieldIndexes() {
}
}

final int[] result = declarere.getGroupingFieldIndexes(groupingFields);
final int[] result = declarer.getGroupingFieldIndexes(groupingFields);

Assert.assertEquals(expectedResult.length, result.length);
for (int i = 0; i < expectedResult.length; ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void testReportError() {
@Test(expected = UnsupportedOperationException.class)
public void testEmitDirect() {
new StormBoltCollector<Object>(1, mock(Output.class)).emitDirect(0, null,
(Collection) null, null);
null, null);
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=OFF, A1

# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender

# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# This file ensures that tests executed from the IDE show log output

log4j.rootLogger=OFF, console

# Log all infos in the given file
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target = System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
</encoder>
</appender>

<root level="WARN">
<appender-ref ref="STDOUT"/>
</root>
<logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
public class ExclamationTopology {

public final static String spoutId = "source";
public final static String firstBoltId = "exlamation1";
public final static String firstBoltId = "exclamation1";
public final static String secondBoltId = "exclamation2";
public final static String sinkId = "sink";
private final static OutputFormatter formatter = new SimpleOutputFormatter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

/**
* Base class for Storm Spout that reads data line by line from an arbitrary source. The declared output schema has a
* single attribute calle {@code line} and should be of type {@link String}.
* single attribute called {@code line} and should be of type {@link String}.
*/
public abstract class AbstractStormSpout implements IRichSpout {
private static final long serialVersionUID = 8876828403487806771L;
Expand Down
Loading

0 comments on commit 3fe6d19

Please sign in to comment.