Skip to content

Commit

Permalink
[storm-compat] Storm compatibility code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
szape authored and mbalassi committed Jun 14, 2015
1 parent 9ff3cf0 commit e497a83
Show file tree
Hide file tree
Showing 51 changed files with 1,722 additions and 1,984 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.stormcompatibility.api;

import java.util.Map;

import org.apache.flink.streaming.util.ClusterUtil;
package org.apache.flink.stormcompatibility.api;

import backtype.storm.LocalCluster;
import backtype.storm.generated.ClusterSummary;
Expand All @@ -28,106 +25,96 @@
import backtype.storm.generated.StormTopology;
import backtype.storm.generated.SubmitOptions;
import backtype.storm.generated.TopologyInfo;
import org.apache.flink.streaming.util.ClusterUtil;




import java.util.Map;

/**
* {@link FlinkLocalCluster} mimics a Storm {@link LocalCluster}.
*/
public class FlinkLocalCluster {

public void submitTopology(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology)
throws Exception {
throws Exception {
this.submitTopologyWithOpts(topologyName, conf, topology, null);
}
public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology, final SubmitOptions submitOpts)
throws Exception {

public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology,
final SubmitOptions submitOpts) throws Exception {
ClusterUtil
.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), topology.getNumberOfTasks());
.startOnMiniCluster(topology.getStreamGraph().getJobGraph(topologyName), topology.getNumberOfTasks());
}

public void killTopology(final String topologyName) {
this.killTopologyWithOpts(topologyName, null);
}

public void killTopologyWithOpts(final String name, final KillOptions options) {
// TODO Auto-generated method stub
}

public void activate(final String topologyName) {
// TODO Auto-generated method stub
}

public void deactivate(final String topologyName) {
// TODO Auto-generated method stub
}

public void rebalance(final String name, final RebalanceOptions options) {
// TODO Auto-generated method stub
}

public void shutdown() {
ClusterUtil.stopOnMiniCluster();
}


@SuppressWarnings("unused")
public String getTopologyConf(final String id) {
// TODO Auto-generated method stub
return null;
}


@SuppressWarnings("unused")
public StormTopology getTopology(final String id) {
// TODO Auto-generated method stub
return null;
}


@SuppressWarnings("unused")
public ClusterSummary getClusterInfo() {
// TODO Auto-generated method stub
return null;
}


@SuppressWarnings("unused")
public TopologyInfo getTopologyInfo(final String id) {
// TODO Auto-generated method stub
return null;
}


@SuppressWarnings("unused")
public Map<?, ?> getState() {
// TODO Auto-generated method stub
return null;
}



// the following is used to set a different execution environment for ITCases
/**
* A different {@link FlinkLocalCluster} to be used for execution.
*/

// A different {@link FlinkLocalCluster} to be used for execution of ITCases
private static FlinkLocalCluster currentCluster = null;

/**
* Returns a {@link FlinkLocalCluster} that should be used for execution. If no cluster was set by
* {@link #initialize(FlinkLocalCluster)} in advance, a new {@link FlinkLocalCluster} is returned.
*
* Returns a {@link FlinkLocalCluster} that should be used for execution. If no cluster was set by {@link
* #initialize(FlinkLocalCluster)} in advance, a new {@link FlinkLocalCluster} is returned.
*
* @return a {@link FlinkLocalCluster} to be used for execution
*/
public static FlinkLocalCluster getLocalCluster() {
if(currentCluster == null) {
if (currentCluster == null) {
currentCluster = new FlinkLocalCluster();
}

return currentCluster;
}

/**
* Sets a different {@link FlinkLocalCluster} to be used for execution.
*
*
* @param cluster
* the {@link FlinkLocalCluster} to be used for execution
* the {@link FlinkLocalCluster} to be used for execution
*/
public static void initialize(final FlinkLocalCluster cluster) {
currentCluster = cluster;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,158 +14,153 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.stormcompatibility.api;

import java.util.List;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.TypeExtractor;
package org.apache.flink.stormcompatibility.api;

import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.TypeExtractor;




import java.util.List;

/**
* {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema of a {@link IRichSpout spout} or
* {@link IRichBolt bolt}.<br />
* <br />
* <strong>CAUTION: Currently, Flink does only support the default output stream. Furthermore, direct emit is not
* supported.</strong>
* {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema of a {@link IRichSpout spout} or {@link
* IRichBolt bolt}.<br /> <br /> <strong>CAUTION: Currently, Flink does only support the default output stream.
* Furthermore, direct emit is not supported.</strong>
*/
final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {
private Fields outputSchema;

@Override
public void declare(final Fields fields) {
this.declareStream(Utils.DEFAULT_STREAM_ID, false, fields);
}

/**
* {@inheritDoc}
*
* <p/>
* Direct emit is no supported by Flink. Parameter {@code direct} must be {@code false}.
*
*
* @throws UnsupportedOperationException
* if {@code direct} is {@code true}
* if {@code direct} is {@code true}
*/
@Override
public void declare(final boolean direct, final Fields fields) {
this.declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
}

/**
* {@inheritDoc}
*
* <p/>
* Currently, Flink only supports the default output stream. Thus, pareamter {@code streamId} must be equals to
* {@link Utils#DEFAULT_STREAM_ID}.
*
*
* @throws UnsupportedOperationException
* if {@code streamId} is not equal to {@link Utils#DEFAULT_STREAM_ID}
* if {@code streamId} is not equal to {@link Utils#DEFAULT_STREAM_ID}
*/
@Override
public void declareStream(final String streamId, final Fields fields) {
this.declareStream(streamId, false, fields);
}

/**
* {@inheritDoc}
*
* <p/>
* Currently, Flink only supports the default output stream. Thus, pareamter {@code streamId} must be equals to
* {@link Utils#DEFAULT_STREAM_ID}. Furthermore, direct emit is no supported by Flink and parameter {@code direct}
* must be {@code false}.
*
*
* @throws UnsupportedOperationException
* if {@code streamId} is not equal to {@link Utils#DEFAULT_STREAM_ID} or {@code direct} is {@code true}
* if {@code streamId} is not equal to {@link Utils#DEFAULT_STREAM_ID} or {@code direct} is {@code true}
*/
@Override
public void declareStream(final String streamId, final boolean direct, final Fields fields) {
if(!Utils.DEFAULT_STREAM_ID.equals(streamId)) {
if (!Utils.DEFAULT_STREAM_ID.equals(streamId)) {
throw new UnsupportedOperationException("Currently, only the default output stream is supported by Flink");
}
if(direct) {
if (direct) {
throw new UnsupportedOperationException("Direct emit is not supported by Flink");
}

this.outputSchema = fields;
}

/**
* Returns {@link TypeInformation} for the declared output schema. If no or an empty output schema was declared,
* {@code null} is returned.
*
*
* @return output type information for the declared output schema; or {@code null} if no output schema was declared
*
* @throws IllegalArgumentException
* if more then 25 attributes are declared
* if more then 25 attributes are declared
*/
public TypeInformation<?> getOutputType() throws IllegalArgumentException {
if((this.outputSchema == null) || (this.outputSchema.size() == 0)) {
if ((this.outputSchema == null) || (this.outputSchema.size() == 0)) {
return null;
}

Tuple t;
final int numberOfAttributes = this.outputSchema.size();
if(numberOfAttributes == 1) {

if (numberOfAttributes == 1) {
return TypeExtractor.getForClass(Object.class);
} else if(numberOfAttributes <= 25) {
} else if (numberOfAttributes <= 25) {
try {
t = Tuple.getTupleClass(numberOfAttributes).newInstance();
} catch(final InstantiationException e) {
} catch (final InstantiationException e) {
throw new RuntimeException(e);
} catch(final IllegalAccessException e) {
} catch (final IllegalAccessException e) {
throw new RuntimeException(e);
}
} else {
throw new IllegalArgumentException("Flink supports only a maximum number of 25 attributes.");
throw new IllegalArgumentException("Flink supports only a maximum number of 25 attributes");
}

// TODO: declare only key fields as DefaultComparable
for(int i = 0; i < numberOfAttributes; ++i) {
for (int i = 0; i < numberOfAttributes; ++i) {
t.setField(new DefaultComparable(), i);
}

return TypeExtractor.getForObject(t);
}

/**
* {@link DefaultComparable} is a {@link Comparable} helper class that is used to get the correct
* {@link TypeInformation} from {@link TypeExtractor} within {@link #getOutputType()}. If key fields are not
* comparable, Flink cannot use them and will throw an exception.
*
* {@link DefaultComparable} is a {@link Comparable} helper class that is used to get the correct {@link
* TypeInformation} from {@link TypeExtractor} within {@link #getOutputType()}. If key fields are not comparable,
* Flink cannot use them and will throw an exception.
*
* @author mjsax
*/
private static class DefaultComparable implements Comparable<DefaultComparable> {

public DefaultComparable() {}


public DefaultComparable() {
}

@SuppressWarnings("NullableProblems")
@Override
public int compareTo(final DefaultComparable o) {
return 0;
}
}

/**
* Computes the indexes within the declared output schema, for a list of given field-grouping attributes.
*
*
* @return array of {@code int}s that contains the index without the output schema for each attribute in the given
* list
* list
*/
public int[] getGroupingFieldIndexes(final List<String> groupingFields) {
final int[] fieldIndexes = new int[groupingFields.size()];
for(int i = 0; i < fieldIndexes.length; ++i) {

for (int i = 0; i < fieldIndexes.length; ++i) {
fieldIndexes[i] = this.outputSchema.fieldIndex(groupingFields.get(i));
}

return fieldIndexes;
}

}
Loading

0 comments on commit e497a83

Please sign in to comment.