Skip to content

Commit

Permalink
SPARK 1084.1 (resubmitted)
Browse files Browse the repository at this point in the history
(Ported from https://github.com/apache/incubator-spark/pull/637 )

Author: Sean Owen <[email protected]>

Closes apache#31 from srowen/SPARK-1084.1 and squashes the following commits:

6c4a32c [Sean Owen] Suppress warnings about legitimate unchecked array creations, or change code to avoid it
f35b833 [Sean Owen] Fix two misc javadoc problems
254e8ef [Sean Owen] Fix one new style error introduced in scaladoc warning commit
5b2fce2 [Sean Owen] Fix scaladoc invocation warning, and enable javac warnings properly, with plugin config updates
007762b [Sean Owen] Remove dead scaladoc links
b8ff8cb [Sean Owen] Replace deprecated Ant <tasks> with <target>
  • Loading branch information
srowen authored and pwendell committed Feb 27, 2014
1 parent aace2c0 commit 12bbca2
Show file tree
Hide file tree
Showing 15 changed files with 154 additions and 88 deletions.
14 changes: 7 additions & 7 deletions bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object Bagel extends Logging {

/**
* Runs a Bagel program.
* @param sc [[org.apache.spark.SparkContext]] to use for the program.
* @param sc org.apache.spark.SparkContext to use for the program.
* @param vertices vertices of the graph represented as an RDD of (Key, Vertex) pairs. Often the
* Key will be the vertex id.
* @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often
Expand All @@ -38,10 +38,10 @@ object Bagel extends Logging {
* @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices
* after each superstep and provides the result to each vertex in the next
* superstep.
* @param partitioner [[org.apache.spark.Partitioner]] partitions values by key
* @param partitioner org.apache.spark.Partitioner partitions values by key
* @param numPartitions number of partitions across which to split the graph.
* Default is the default parallelism of the SparkContext
* @param storageLevel [[org.apache.spark.storage.StorageLevel]] to use for caching of
* @param storageLevel org.apache.spark.storage.StorageLevel to use for caching of
* intermediate RDDs in each superstep. Defaults to caching in memory.
* @param compute function that takes a Vertex, optional set of (possibly combined) messages to
* the Vertex, optional Aggregator and the current superstep,
Expand Down Expand Up @@ -131,7 +131,7 @@ object Bagel extends Logging {

/**
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]], default
* [[org.apache.spark.HashPartitioner]] and default storage level
* org.apache.spark.HashPartitioner and default storage level
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
Expand All @@ -146,7 +146,7 @@ object Bagel extends Logging {

/**
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]] and the
* default [[org.apache.spark.HashPartitioner]]
* default org.apache.spark.HashPartitioner
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest, C: Manifest](
sc: SparkContext,
Expand All @@ -166,7 +166,7 @@ object Bagel extends Logging {

/**
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
* default [[org.apache.spark.HashPartitioner]],
* default org.apache.spark.HashPartitioner,
* [[org.apache.spark.bagel.DefaultCombiner]] and the default storage level
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
Expand All @@ -180,7 +180,7 @@ object Bagel extends Logging {

/**
* Runs a Bagel program with no [[org.apache.spark.bagel.Aggregator]],
* the default [[org.apache.spark.HashPartitioner]]
* the default org.apache.spark.HashPartitioner
* and [[org.apache.spark.bagel.DefaultCombiner]]
*/
def run[K: Manifest, V <: Vertex : Manifest, M <: Message[K] : Manifest](
Expand Down
4 changes: 2 additions & 2 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@
</goals>
<configuration>
<exportAntProperties>true</exportAntProperties>
<tasks>
<target>
<property name="spark.classpath" refid="maven.test.classpath" />
<property environment="env" />
<fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
Expand All @@ -238,7 +238,7 @@
</not>
</condition>
</fail>
</tasks>
</target>
</configuration>
</execution>
</executions>
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ class SparkContext(
* using the older MapReduce API (`org.apache.hadoop.mapred`).
*
* @param conf JobConf for setting up the dataset
* @param inputFormatClass Class of the [[InputFormat]]
* @param inputFormatClass Class of the InputFormat
* @param keyClass Class of the keys
* @param valueClass Class of the values
* @param minSplits Minimum number of Hadoop Splits to generate.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class JobLogger(val user: String, val logDirName: String)
/**
* Create a log file for one job
* @param jobID ID of the job
* @exception FileNotFoundException Fail to create log file
* @throws FileNotFoundException Fail to create log file
*/
protected def createLogWriter(jobID: Int) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import scala.util.control.{ControlThrowable, NonFatal}
import com.typesafe.config.Config

/**
* An [[akka.actor.ActorSystem]] which refuses to shut down in the event of a fatal exception.
* An akka.actor.ActorSystem which refuses to shut down in the event of a fatal exception
* This is necessary as Spark Executors are allowed to recover from fatal exceptions
* (see [[org.apache.spark.executor.Executor]]).
* (see org.apache.spark.executor.Executor)
*/
object IndestructibleActorSystem {
def apply(name: String, config: Config): ActorSystem =
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/util/StatCounter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package org.apache.spark.util

/**
* A class for tracking the statistics of a set of numbers (count, mean and variance) in a
* numerically robust way. Includes support for merging two StatCounters. Based on
* [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
* Welford and Chan's algorithms for running variance]].
* numerically robust way. Includes support for merging two StatCounters. Based on Welford
* and Chan's [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance algorithms]]
* for running variance.
*
* @constructor Initialize the StatCounter with the given values.
*/
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/Vector.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ object Vector {

/**
* Creates this [[org.apache.spark.util.Vector]] of given length containing random numbers
* between 0.0 and 1.0. Optional [[scala.util.Random]] number generator can be provided.
* between 0.0 and 1.0. Optional scala.util.Random number generator can be provided.
*/
def random(length: Int, random: Random = new XORShiftRandom()) =
Vector(length, _ => random.nextDouble())
Expand Down
35 changes: 25 additions & 10 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ public int compare(Integer a, Integer b) {
else if (a < b) return 1;
else return 0;
}
};
}

@SuppressWarnings("unchecked")
@Test
public void sparkContextUnion() {
// Union of non-specialized JavaRDDs
Expand Down Expand Up @@ -148,6 +149,7 @@ public void call(String s) {
Assert.assertEquals(2, foreachCalls);
}

@SuppressWarnings("unchecked")
@Test
public void lookup() {
JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
Expand Down Expand Up @@ -179,6 +181,7 @@ public Boolean call(Integer x) {
Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
}

@SuppressWarnings("unchecked")
@Test
public void cogroup() {
JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
Expand All @@ -197,6 +200,7 @@ public void cogroup() {
cogrouped.collect();
}

@SuppressWarnings("unchecked")
@Test
public void leftOuterJoin() {
JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
Expand Down Expand Up @@ -243,6 +247,7 @@ public Integer call(Integer a, Integer b) {
Assert.assertEquals(33, sum);
}

@SuppressWarnings("unchecked")
@Test
public void foldByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
Expand All @@ -265,6 +270,7 @@ public Integer call(Integer a, Integer b) {
Assert.assertEquals(3, sums.lookup(3).get(0).intValue());
}

@SuppressWarnings("unchecked")
@Test
public void reduceByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
Expand Down Expand Up @@ -320,8 +326,8 @@ public void approximateResults() {
public void take() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
Assert.assertEquals(1, rdd.first().intValue());
List<Integer> firstTwo = rdd.take(2);
List<Integer> sample = rdd.takeSample(false, 2, 42);
rdd.take(2);
rdd.takeSample(false, 2, 42);
}

@Test
Expand Down Expand Up @@ -359,8 +365,8 @@ public Boolean call(Double x) {
Assert.assertEquals(2.49444, rdd.stdev(), 0.01);
Assert.assertEquals(2.73252, rdd.sampleStdev(), 0.01);

Double first = rdd.first();
List<Double> take = rdd.take(5);
rdd.first();
rdd.take(5);
}

@Test
Expand Down Expand Up @@ -438,11 +444,11 @@ public Iterable<Double> call(String s) {
return lengths;
}
});
Double x = doubles.first();
Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01);
Assert.assertEquals(5.0, doubles.first(), 0.01);
Assert.assertEquals(11, pairs.count());
}

@SuppressWarnings("unchecked")
@Test
public void mapsFromPairsToPairs() {
List<Tuple2<Integer, String>> pairs = Arrays.asList(
Expand Down Expand Up @@ -509,6 +515,7 @@ public void repartition() {
}
}

@SuppressWarnings("unchecked")
@Test
public void persist() {
JavaDoubleRDD doubleRDD = sc.parallelizeDoubles(Arrays.asList(1.0, 1.0, 2.0, 3.0, 5.0, 8.0));
Expand Down Expand Up @@ -573,6 +580,7 @@ public void textFilesCompressed() throws IOException {
Assert.assertEquals(expected, readRDD.collect());
}

@SuppressWarnings("unchecked")
@Test
public void sequenceFile() {
File tempDir = Files.createTempDir();
Expand Down Expand Up @@ -602,6 +610,7 @@ public Tuple2<Integer, String> call(Tuple2<IntWritable, Text> pair) {
Assert.assertEquals(pairs, readRDD.collect());
}

@SuppressWarnings("unchecked")
@Test
public void writeWithNewAPIHadoopFile() {
File tempDir = Files.createTempDir();
Expand Down Expand Up @@ -632,6 +641,7 @@ public String call(Tuple2<IntWritable, Text> x) {
}).collect().toString());
}

@SuppressWarnings("unchecked")
@Test
public void readWithNewAPIHadoopFile() throws IOException {
File tempDir = Files.createTempDir();
Expand Down Expand Up @@ -674,6 +684,7 @@ public void objectFilesOfInts() {
Assert.assertEquals(expected, readRDD.collect());
}

@SuppressWarnings("unchecked")
@Test
public void objectFilesOfComplexTypes() {
File tempDir = Files.createTempDir();
Expand All @@ -690,6 +701,7 @@ public void objectFilesOfComplexTypes() {
Assert.assertEquals(pairs, readRDD.collect());
}

@SuppressWarnings("unchecked")
@Test
public void hadoopFile() {
File tempDir = Files.createTempDir();
Expand Down Expand Up @@ -719,6 +731,7 @@ public String call(Tuple2<IntWritable, Text> x) {
}).collect().toString());
}

@SuppressWarnings("unchecked")
@Test
public void hadoopFileCompressed() {
File tempDir = Files.createTempDir();
Expand Down Expand Up @@ -824,7 +837,7 @@ public Float zero(Float initialValue) {
}
};

final Accumulator<Float> floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam);
final Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
rdd.foreach(new VoidFunction<Integer>() {
public void call(Integer x) {
floatAccum.add((float) x);
Expand Down Expand Up @@ -876,6 +889,7 @@ public void checkpointAndRestore() {
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect());
}

@SuppressWarnings("unchecked")
@Test
public void mapOnPairRDD() {
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
Expand All @@ -900,6 +914,7 @@ public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) throws Excepti

}

@SuppressWarnings("unchecked")
@Test
public void collectPartitions() {
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
Expand Down Expand Up @@ -968,14 +983,14 @@ public void countApproxDistinctByKey() {
@Test
public void collectAsMapWithIntArrayValues() {
// Regression test for SPARK-1040
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[] { 1 }));
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
JavaPairRDD<Integer, int[]> pairRDD = rdd.map(new PairFunction<Integer, Integer, int[]>() {
@Override
public Tuple2<Integer, int[]> call(Integer x) throws Exception {
return new Tuple2<Integer, int[]>(x, new int[] { x });
}
});
pairRDD.collect(); // Works fine
Map<Integer, int[]> map = pairRDD.collectAsMap(); // Used to crash with ClassCastException
pairRDD.collectAsMap(); // Used to crash with ClassCastException
}
}
5 changes: 3 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -592,12 +592,13 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.5.1</version>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>UTF-8</encoding>
<maxmem>1024m</maxmem>
<fork>true</fork>
</configuration>
</plugin>
<plugin>
Expand All @@ -612,7 +613,7 @@
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>1.0-M2</version>
<version>1.0-RC2</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
Expand Down
4 changes: 2 additions & 2 deletions repl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@
</goals>
<configuration>
<exportAntProperties>true</exportAntProperties>
<tasks>
<target>
<property name="spark.classpath" refid="maven.test.classpath" />
<property environment="env" />
<fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
Expand All @@ -111,7 +111,7 @@
</not>
</condition>
</fail>
</tasks>
</target>
</configuration>
</execution>
</executions>
Expand Down
Loading

0 comments on commit 12bbca2

Please sign in to comment.