Skip to content

Commit

Permalink
Update a bunch of data loading examples and mention[ish] spark-submit
Browse files Browse the repository at this point in the history
  • Loading branch information
holdenk committed May 21, 2014
1 parent 4bb1413 commit c3e06ef
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 28 deletions.
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,13 @@ mvn exec:java -Dexec.mainClass="com.oreilly.learningsparkexamples.java.[EXAMPLE]
Python examples
===

From spark just run ./bin/pyspark ./src/python/[example]
From spark just run ./bin/pyspark ./src/python/[example]

Spark Submit
===

You can also create an assembly jar with all of the dependcies for running either the java or scala
versions of the code and run the job with the spark-submit script

./sbt/sbt assembly
cd $SPARK_HOME; ./bin/spark-submit --class com.oreilly.learningsparkexamples.[lang].[example] ../learning-spark-examples/target/scala-2.10/learning-spark-examples-assembly-0.0.1.jar
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;

public class BasicLoadJson {

public static class Person {
public static class Person implements java.io.Serializable {
public String name;
public Boolean lovesPandas;
}
Expand All @@ -37,16 +39,38 @@ public Iterable<Person> call(Iterator<String> lines) throws Exception {
}
}

public static class LikesPandas implements Function<Person, Boolean> {
public Boolean call(Person person) {
return person.lovesPandas;
}
}


public static class WriteJson implements FlatMapFunction<Iterator<Person>, String> {
public Iterable<String> call(Iterator<Person> people) throws Exception {
ArrayList<String> text = new ArrayList<String>();
ObjectMapper mapper = new ObjectMapper();
while (people.hasNext()) {
Person person = people.next();
text.add(mapper.writeValueAsString(person));
}
return text;
}
}

public static void main(String[] args) throws Exception {
if (args.length != 2) {
throw new Exception("Usage BasicLoadJson [sparkMaster] [jsoninput]");
if (args.length != 3) {
throw new Exception("Usage BasicLoadJson [sparkMaster] [jsoninput] [jsonoutput]");
}
String master = args[0];
String fileName = args[1];
String outfile = args[2];

JavaSparkContext sc = new JavaSparkContext(
master, "basicloadjson", System.getenv("SPARK_HOME"), System.getenv("JARS"));
JavaRDD<String> input = sc.textFile(fileName);
JavaRDD<Person> result = input.mapPartitions(new ParseJson());
List<Person> resultCollection = result.collect();
JavaRDD<Person> result = input.mapPartitions(new ParseJson()).filter(new LikesPandas());
JavaRDD<String> formatted = result.mapPartitions(new WriteJson());
formatted.saveAsTextFile(outfile);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,41 @@
package com.oreilly.learningsparkexamples.scala

import java.io.StringReader
import java.io.StringWriter

import org.apache.spark._
import play.api.libs.json._
import play.api.libs.functional.syntax._
import scala.util.parsing.json.JSON
import scala.collection.JavaConversions._

import au.com.bytecode.opencsv.CSVReader
import au.com.bytecode.opencsv.CSVWriter

object BasicParseCsv {
case class Person(name: String, favouriteAnimal: String)

def main(args: Array[String]) {
if (args.length < 2) {
println("Usage: [sparkmaster] [inputfile]")
if (args.length < 3) {
println("Usage: [sparkmaster] [inputfile] [outputfile]")
exit(1)
}
val master = args(0)
val inputFile = args(1)
val outputFile = args(2)
val sc = new SparkContext(master, "BasicParseCsv", System.getenv("SPARK_HOME"))
val input = sc.textFile(inputFile)
val result = input.map{ line =>
val reader = new CSVReader(new StringReader(line));
reader.readNext();
}
println(result.collect().mkString(","))
}
val people = result.map(x => Person(x(0), x(1)))
val pandaLovers = people.filter(person => person.favouriteAnimal == "panda")
pandaLovers.map(person => List(person.name, person.favouriteAnimal).toArray).mapPartitions{people =>
val stringWriter = new StringWriter();
val csvWriter = new CSVWriter(stringWriter);
csvWriter.writeAll(people.toList)
Iterator(stringWriter.toString)
}.saveAsTextFile(outputFile)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,23 @@ package com.oreilly.learningsparkexamples.scala
import org.apache.spark._
import play.api.libs.json._
import play.api.libs.functional.syntax._
import scala.util.parsing.json.JSON

object BasicParseJson {
case class Person(name: String, lovesPandas: Boolean)
implicit val personReads = Json.format[Person]

def main(args: Array[String]) {
if (args.length < 2) {
println("Usage: [sparkmaster] [inputfile]")
exit(1)
if (args.length < 3) {
println("Usage: [sparkmaster] [inputfile] [outputfile]")
exit(1)
}
val master = args(0)
val inputFile = args(1)
val sc = new SparkContext(master, "BasicParseJson", System.getenv("SPARK_HOME"))
val input = sc.textFile(inputFile)
val parsed = input.map(JSON.parseFull(_))
val result = parsed.map(record => personReads.reads(_))

println(result.collect().mkString(","))
val master = args(0)
val inputFile = args(1)
val outputFile = args(2)
val sc = new SparkContext(master, "BasicParseJson", System.getenv("SPARK_HOME"))
val input = sc.textFile(inputFile)
val parsed = input.map(Json.parse(_))
val result = parsed.flatMap(record => personReads.reads(record).asOpt)
result.filter(_.lovesPandas).map(Json.toJson(_)).saveAsTextFile(outputFile)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Illustrates a simple map partition to parse CSV data in Scala
*/
package com.oreilly.learningsparkexamples.scala

import java.io.StringReader

import org.apache.spark._
import play.api.libs.json._
import play.api.libs.functional.syntax._
import scala.util.parsing.json.JSON
import scala.collection.JavaConversions._
import au.com.bytecode.opencsv.CSVReader

object BasicParseWholeFileCsv {
def main(args: Array[String]) {
if (args.length < 2) {
println("Usage: [sparkmaster] [inputfile]")
exit(1)
}
val master = args(0)
val inputFile = args(1)
val sc = new SparkContext(master, "BasicParseWholeFileCsv", System.getenv("SPARK_HOME"))
val input = sc.wholeTextFiles(inputFile)
val result = input.flatMap{ case (_, txt) =>
val reader = new CSVReader(new StringReader(txt));
reader.readAll()
}
println(result.collect().map(_.toList).mkString(","))
}
}
44 changes: 44 additions & 0 deletions src/python/LoadCsv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from pyspark import SparkContext
import csv
import sys
import StringIO

def loadRecord(line):
"""Parse a CSV line"""
input = StringIO.StringIO(line)
reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"])
return reader.next()

def loadRecords(fileNameContents):
"""Load all the records in a given file"""
input = StringIO.StringIO(fileNameContents[1])
reader = csv.DictReader(input, fieldnames=["name", "favouriteAnimal"])
return reader

def writeRecords(records):
"""Write out CSV lines"""
output = StringIO.StringIO()
writer = csv.DictWriter(output, fieldnames=["name", "favouriteAnimal"])
for record in records:
writer.writerow(record)
return [output.getvalue()]

if __name__ == "__main__":
if len(sys.argv) != 4:
print "Error usage: LoadCsv [sparkmaster] [inputfile] [outputfile]"
sys.exit(-1)
master = sys.argv[1]
inputFile = sys.argv[2]
outputFile = sys.argv[3]
sc = SparkContext(master, "LoadCsv")
# Try the record-per-line-input
input = sc.textFile(inputFile)
data = input.map(loadRecord)
pandaLovers = data.filter(lambda x: x['favouriteAnimal'] == "panda")
pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)
# Try the more whole file input
fullFileData = sc.wholeTextFiles(inputFile).flatMap(loadRecords)
fullFilePandaLovers = fullFileData.filter(lambda x: x['favouriteAnimal'] == "panda")
fullFilePandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile+"fullfile")
sc.stop()
print "Done!"
16 changes: 10 additions & 6 deletions src/python/LoadJson.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
import sys

if __name__ == "__main__":
if len(sys.argv) != 3:
print "Error usage: LoadJson [sparkmaster] [file]"
if len(sys.argv) != 4:
print "Error usage: LoadJson [sparkmaster] [inputfile] [outputfile]"
sys.exit(-1)
sc = SparkContext(sys.argv[1], "LoadJson")
input = sc.textFile(sys.argv[2])
master = sys.argv[1]
inputFile = sys.argv[2]
outputFile = sys.argv[3]
sc = SparkContext(master, "LoadJson")
input = sc.textFile(inputFile)
data = input.map(lambda x: json.loads(x))
print data.collect()

data.filter(lambda x: x['lovesPandas']).map(lambda x: json.dumps(x)).saveAsTextFile(outputFile)
sc.stop()
print "Done!"

0 comments on commit c3e06ef

Please sign in to comment.