Skip to content

Commit

Permalink
Connected Components example Scala API rewrite
Browse files Browse the repository at this point in the history
  • Loading branch information
vasia authored and aljoscha committed Sep 22, 2014
1 parent b8131fa commit 84b04be
Show file tree
Hide file tree
Showing 2 changed files with 220 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@

package org.apache.flink.example.java.graph.util;

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;

import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;

Expand All @@ -33,34 +32,46 @@
*
*/
public class ConnectedComponentsData {

public static final Object[][] VERTICES = new Object[][] {
new Object[]{1L}, new Object[]{2L}, new Object[]{3L}, new Object[]{4L},
new Object[]{5L},new Object[]{6L}, new Object[]{7L}, new Object[]{8L},
new Object[]{9L}, new Object[]{10L}, new Object[]{11L}, new Object[]{12L},
new Object[]{13L}, new Object[]{14L}, new Object[]{15L}, new Object[]{16L}
};

public static DataSet<Long> getDefaultVertexDataSet(ExecutionEnvironment env) {

return env.fromElements(
1L, 2L, 3L, 4L, 5L,
6L, 7L, 8L, 9L, 10L,
11L, 12L, 13L, 14L, 15L, 16L);
List<Long> verticesList = new LinkedList<Long>();
for (Object[] vertex : VERTICES) {
verticesList.add((Long) vertex[0]);
}
return env.fromCollection(verticesList);
}

public static final Object[][] EDGES = new Object[][] {
new Object[]{1L, 2L},
new Object[]{2L, 3L},
new Object[]{2L, 4L},
new Object[]{3L, 5L},
new Object[]{6L, 7L},
new Object[]{8L, 9L},
new Object[]{8L, 10L},
new Object[]{5L, 11L},
new Object[]{11L, 12L},
new Object[]{10L, 13L},
new Object[]{9L, 14L},
new Object[]{13L, 14L},
new Object[]{1L, 15L},
new Object[]{16L, 1L}
};

public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env) {

List<Tuple2<Long, Long>> data = new ArrayList<Tuple2<Long, Long>>();
data.add(new Tuple2<Long, Long>(1L, 2L));
data.add(new Tuple2<Long, Long>(2L, 3L));
data.add(new Tuple2<Long, Long>(2L, 4L));
data.add(new Tuple2<Long, Long>(3L, 5L));
data.add(new Tuple2<Long, Long>(6L, 7L));
data.add(new Tuple2<Long, Long>(8L, 9L));
data.add(new Tuple2<Long, Long>(8L, 10L));
data.add(new Tuple2<Long, Long>(5L, 11L));
data.add(new Tuple2<Long, Long>(11L, 12L));
data.add(new Tuple2<Long, Long>(10L, 13L));
data.add(new Tuple2<Long, Long>(9L, 14L));
data.add(new Tuple2<Long, Long>(13L, 14L));
data.add(new Tuple2<Long, Long>(1L, 15L));
data.add(new Tuple2<Long, Long>(16L, 1L));

return env.fromCollection(data);
List<Tuple2<Long, Long>> edgeList = new LinkedList<Tuple2<Long, Long>>();
for (Object[] edge : EDGES) {
edgeList.add(new Tuple2<Long, Long>((Long) edge[0], (Long) edge[1]));
}
return env.fromCollection(edgeList);
}

}
Original file line number Diff line number Diff line change
@@ -1,90 +1,185 @@
///**
// * Licensed to the Apache Software Foundation (ASF) under one
// * or more contributor license agreements. See the NOTICE file
// * distributed with this work for additional information
// * regarding copyright ownership. The ASF licenses this file
// * to you under the Apache License, Version 2.0 (the
// * "License"); you may not use this file except in compliance
// * with the License. You may obtain a copy of the License at
// *
// * http://www.apache.org/licenses/LICENSE-2.0
// *
// * Unless required by applicable law or agreed to in writing, software
// * distributed under the License is distributed on an "AS IS" BASIS,
// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// * See the License for the specific language governing permissions and
// * limitations under the License.
// */
//
//
//package org.apache.flink.examples.scala.graph;
//
//import org.apache.flink.client.LocalExecutor
//import org.apache.flink.api.common.Program
//
//import org.apache.flink.api.scala._
//import org.apache.flink.api.scala.operators._
//
//object RunConnectedComponents {
// def main(pArgs: Array[String]) {
//
// if (pArgs.size < 5) {
// println("USAGE: <vertices input file> <edges input file> <output file> <max iterations> <degree of parallelism>")
// return
// }
// val plan = new ConnectedComponents().getPlan(pArgs(0), pArgs(1), pArgs(2), pArgs(3), pArgs(4))
// LocalExecutor.execute(plan)
// }
//}
//
//class ConnectedComponents extends Program with Serializable {
//
// override def getPlan(args: String*) = {
// val plan = getScalaPlan(args(0), args(1), args(2), args(3).toInt)
// plan.setDefaultParallelism(args(4).toInt)
// plan
// }
//
// def getScalaPlan(verticesInput: String, edgesInput: String, componentsOutput: String, maxIterations: Int) = {
//
// val vertices = DataSource(verticesInput, DelimitedInputFormat(parseVertex))
// val directedEdges = DataSource(edgesInput, DelimitedInputFormat(parseEdge))
//
// val undirectedEdges = directedEdges flatMap { case (from, to) => Seq(from -> to, to -> from) }
//
// def propagateComponent(s: DataSetOLD[(Int, Int)], ws: DataSetOLD[(Int, Int)]) = {
//
// val allNeighbors = ws join undirectedEdges where { case (v, _) => v } isEqualTo { case (from, _) => from } map { (w, e) => e._2 -> w._2 }
// val minNeighbors = allNeighbors groupBy { case (to, _) => to } reduceGroup { cs => cs minBy { _._2 } }
//
// // updated solution elements == new workset
// val s1 = s join minNeighbors where { _._1 } isEqualTo { _._1 } flatMap { (n, s) =>
// (n, s) match {
// case ((v, cOld), (_, cNew)) if cNew < cOld => Some((v, cNew))
// case _ => None
// }
// }
//// s1.left preserves({ case (v, _) => v }, { case (v, _) => v })
// s1.right preserves({ v=>v }, { v=>v })
//
// (s1, s1)
// }
//
// val components = vertices.iterateWithDelta(vertices, { _._1 }, propagateComponent, maxIterations)
// val output = components.write(componentsOutput, DelimitedOutputFormat(formatOutput.tupled))
//
// val plan = new ScalaPlan(Seq(output), "Connected Components")
// plan
// }
//
// def parseVertex = (line: String) => { val v = line.toInt; v -> v }
//
// val EdgeInputPattern = """(\d+) (\d+)""".r
//
// def parseEdge = (line: String) => line match {
// case EdgeInputPattern(from, to) => from.toInt -> to.toInt
// }
//
// def formatOutput = (vertex: Int, component: Int) => "%d %d".format(vertex, component)
//}
//
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.examples.scala.graph;

import org.apache.flink.api.scala._
import org.apache.flink.example.java.graph.util.ConnectedComponentsData
import org.apache.flink.api.java.aggregation.Aggregations
import org.apache.flink.util.Collector

/**
* An implementation of the connected components algorithm, using a delta iteration.
*
* Initially, the algorithm assigns each vertex an unique ID. In each step, a vertex picks the minimum of its own ID and its
* neighbors' IDs, as its new ID and tells its neighbors about its new ID. After the algorithm has completed, all vertices in the
* same component will have the same ID.
*
* A vertex whose component ID did not change needs not propagate its information in the next step. Because of that,
* the algorithm is easily expressible via a delta iteration. We here model the solution set as the vertices with
* their current component ids, and the workset as the changed vertices. Because we see all vertices initially as
* changed, the initial workset and the initial solution set are identical. Also, the delta to the solution set
* is consequently also the next workset.
*
* Input files are plain text files and must be formatted as follows:
*
* - Vertices represented as IDs and separated by new-line characters.
*
* For example
* {{{
* "1\n2\n12\n42\n63\n"
* }}}
* gives five vertices (1), (2), (12), (42), and (63).
*
*
* - Edges are represented as pairs for vertex IDs which are separated by space
* characters. Edges are separated by new-line characters.
*
* For example
* {{{
* "1 2\n2 12\n1 12\n42 63\n"
* }}}
* gives four (undirected) edges (1)-(2), (2)-(12), (1)-(12), and (42)-(63).
*
*
*
* Usage:
* {{{
* ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>
* }}}
*
* If no parameters are provided, the program is run with default data from
* [[org.apache.flink.example.java.graph.util.ConnectedComponentsData]]
* and 10 iterations.
*
*
* This example shows how to use:
*
* - Delta Iterations
* - Generic-typed Functions
*
*/
object ConnectedComponents {
def main(args: Array[String]) {
if (!parseParameters(args)) {
return
}

// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// read vertex and edge data
// assign the initial components (equal to the vertex id)
val vertices = getVerticesDataSet(env).map(id => (id, id))

// undirected edges by emitting for each input edge the input edges itself and an inverted version
val edges = getEdgesDataSet(env)
.flatMap { (edge, out: Collector[(Long, Long)]) =>
out.collect(edge)
out.collect((edge._2, edge._1))
}

// open a delta iteration
val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array(0)) {
(s, ws) => {

// apply the step logic: join with the edges
val allNeighbors = ws.join(edges)
.where(0).equalTo(0)
.map { in => (in._2._2, in._1._2) }

// select the minimum neighbor
val minNeighbors = allNeighbors.groupBy(0).aggregate(Aggregations.MIN, 1)

// update if the component of the candidate is smaller
val updatedComponents = minNeighbors.join(s).where(0).equalTo(0)
.flatMap { newAndOldComponent =>
newAndOldComponent match {
case ((vId, cNew), (_, cOld)) if cNew < cOld => Some((vId, cNew))
case _ => None
}
}
// delta and new workset are identical
(updatedComponents, updatedComponents)
}
}
if (fileOutput) {
verticesWithComponents.writeAsCsv(outputPath, "\n", " ")
} else {
verticesWithComponents.print()
}

env.execute("Scala Connected Components Example")
}

private def parseParameters(args: Array[String]): Boolean = {
if (args.length > 0) {
fileOutput = true
if (args.length == 4) {
verticesPath = args(0)
edgesPath = args(1)
outputPath = args(2)
maxIterations = args(3).toInt
} else {
System.err.println("Usage: ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>")
false
}
} else {
System.out.println("Executing Connected Components example with built-in default data.")
System.out.println(" Provide parameters to read input data from a file.")
System.out.println(" Usage: ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>")
}
true
}

private def getVerticesDataSet(env: ExecutionEnvironment): DataSet[Long] = {
if (fileOutput) {
env.readCsvFile[Tuple1[Long]](
verticesPath,
includedFields = Array(0))
.map { x => x._1 }
}
else {
val vertexData = ConnectedComponentsData.VERTICES map {
case Array(x) => x.asInstanceOf[Long]
}
env.fromCollection(vertexData);
}
}

private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[(Long, Long)] = {
if (fileOutput) {
env.readCsvFile[(Long, Long)](
edgesPath,
fieldDelimiter = ' ',
includedFields = Array(0, 1))
.map { x => (x._1, x._2)}
}
else {
val edgeData = ConnectedComponentsData.EDGES map {
case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long])
}
env.fromCollection(edgeData);
}
}

private var fileOutput: Boolean = false
private var verticesPath: String = null
private var edgesPath: String = null
private var maxIterations: Int = 10
private var outputPath: String = null
}

0 comments on commit 84b04be

Please sign in to comment.