Skip to content

Commit

Permalink
Add json parsing example in java,python and csv example in scala
Browse files Browse the repository at this point in the history
  • Loading branch information
holdenk committed May 18, 2014
1 parent aea0e03 commit 4bb1413
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 4 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ libraryDependencies ++= Seq(
"org.apache.commons" % "commons-lang3" % "3.0",
"org.eclipse.jetty" % "jetty-client" % "8.1.14.v20131031",
"com.typesafe.play" % "play-json_2.10" % "2.2.1",
"com.eclipsesource.minimal-json" % "minimal-json" % "0.9.1",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.3.3",
"org.elasticsearch" % "elasticsearch-hadoop-mr" % "2.0.0.RC1",
"net.sf.opencsv" % "opencsv" % "2.0"
)
Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
<version>8.1.14.v20131031</version>
</dependency>
<dependency>
<groupId>com.eclipsesource.minimal-json</groupId>
<artifactId>minimal-json</artifactId>
<version>0.9.1</version>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.3.3</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Illustrates loading a json file and finding out if people like pandas
*/
package com.oreilly.learningsparkexamples.java;

import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Iterator;
import java.lang.Iterable;
import scala.Tuple2;

import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;

import com.fasterxml.jackson.databind.ObjectMapper;

public class BasicLoadJson {

public static class Person {
public String name;
public Boolean lovesPandas;
}

public static class ParseJson implements FlatMapFunction<Iterator<String>, Person> {
public Iterable<Person> call(Iterator<String> lines) throws Exception {
ArrayList<Person> people = new ArrayList<Person>();
ObjectMapper mapper = new ObjectMapper();
while (lines.hasNext()) {
String line = lines.next();
people.add(mapper.readValue(line, Person.class));
}
return people;
}
}

public static void main(String[] args) throws Exception {
if (args.length != 2) {
throw new Exception("Usage BasicLoadJson [sparkMaster] [jsoninput]");
}
String master = args[0];
String fileName = args[1];
JavaSparkContext sc = new JavaSparkContext(
master, "basicloadjson", System.getenv("SPARK_HOME"), System.getenv("JARS"));
JavaRDD<String> input = sc.textFile(fileName);
JavaRDD<Person> result = input.mapPartitions(new ParseJson());
List<Person> resultCollection = result.collect();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Illustrates a simple map partition to parse CSV data in Scala
*/
package com.oreilly.learningsparkexamples.scala

import java.io.StringReader

import org.apache.spark._
import play.api.libs.json._
import play.api.libs.functional.syntax._
import scala.util.parsing.json.JSON

import au.com.bytecode.opencsv.CSVReader

object BasicParseCsv {
def main(args: Array[String]) {
if (args.length < 2) {
println("Usage: [sparkmaster] [inputfile]")
exit(1)
}
val master = args(0)
val inputFile = args(1)
val sc = new SparkContext(master, "BasicParseCsv", System.getenv("SPARK_HOME"))
val input = sc.textFile(inputFile)
val result = input.map{ line =>
val reader = new CSVReader(new StringReader(line));
reader.readNext();
}
println(result.collect().mkString(","))
}
}
13 changes: 13 additions & 0 deletions src/python/LoadJson.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from pyspark import SparkContext
import json
import sys

if __name__ == "__main__":
if len(sys.argv) != 3:
print "Error usage: LoadJson [sparkmaster] [file]"
sys.exit(-1)
sc = SparkContext(sys.argv[1], "LoadJson")
input = sc.textFile(sys.argv[2])
data = input.map(lambda x: json.loads(x))
print data.collect()

0 comments on commit 4bb1413

Please sign in to comment.