Skip to content

Commit b9c8353

Browse files
committed
[SPARK-12618][CORE][STREAMING][SQL] Clean up build warnings: 2.0.0 edition
Fix most build warnings: mostly deprecated API usages. I'll annotate some of the changes below. CC rxin who is leading the charge to remove the deprecated APIs. Author: Sean Owen <[email protected]> Closes apache#10570 from srowen/SPARK-12618.
1 parent 794ea55 commit b9c8353

File tree

24 files changed

+123
-137
lines changed

24 files changed

+123
-137
lines changed

core/src/test/scala/org/apache/spark/Smuggle.scala

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.util.UUID
2121
import java.util.concurrent.locks.ReentrantReadWriteLock
2222

2323
import scala.collection.mutable
24+
import scala.language.implicitConversions
2425

2526
/**
2627
* Utility wrapper to "smuggle" objects into tasks while bypassing serialization.

examples/src/main/java/org/apache/spark/examples/mllib/JavaBinaryClassificationMetricsExample.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public static void main(String[] args) {
5656
// Compute raw scores on the test set.
5757
JavaRDD<Tuple2<Object, Object>> predictionAndLabels = test.map(
5858
new Function<LabeledPoint, Tuple2<Object, Object>>() {
59+
@Override
5960
public Tuple2<Object, Object> call(LabeledPoint p) {
6061
Double prediction = model.predict(p.features());
6162
return new Tuple2<Object, Object>(prediction, p.label());
@@ -88,6 +89,7 @@ public Tuple2<Object, Object> call(LabeledPoint p) {
8889
// Thresholds
8990
JavaRDD<Double> thresholds = precision.map(
9091
new Function<Tuple2<Object, Object>, Double>() {
92+
@Override
9193
public Double call(Tuple2<Object, Object> t) {
9294
return new Double(t._1().toString());
9395
}
@@ -106,8 +108,7 @@ public Double call(Tuple2<Object, Object> t) {
106108

107109
// Save and load model
108110
model.save(sc, "target/tmp/LogisticRegressionModel");
109-
LogisticRegressionModel sameModel = LogisticRegressionModel.load(sc,
110-
"target/tmp/LogisticRegressionModel");
111+
LogisticRegressionModel.load(sc, "target/tmp/LogisticRegressionModel");
111112
// $example off$
112113
}
113114
}

examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java

+15-6
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ public static void main(String[] args) {
4141
JavaRDD<String> data = sc.textFile(path);
4242
JavaRDD<Rating> ratings = data.map(
4343
new Function<String, Rating>() {
44+
@Override
4445
public Rating call(String line) {
4546
String[] parts = line.split("::");
4647
return new Rating(Integer.parseInt(parts[0]), Integer.parseInt(parts[1]), Double
@@ -57,13 +58,14 @@ public Rating call(String line) {
5758
JavaRDD<Tuple2<Object, Rating[]>> userRecs = model.recommendProductsForUsers(10).toJavaRDD();
5859
JavaRDD<Tuple2<Object, Rating[]>> userRecsScaled = userRecs.map(
5960
new Function<Tuple2<Object, Rating[]>, Tuple2<Object, Rating[]>>() {
61+
@Override
6062
public Tuple2<Object, Rating[]> call(Tuple2<Object, Rating[]> t) {
6163
Rating[] scaledRatings = new Rating[t._2().length];
6264
for (int i = 0; i < scaledRatings.length; i++) {
6365
double newRating = Math.max(Math.min(t._2()[i].rating(), 1.0), 0.0);
6466
scaledRatings[i] = new Rating(t._2()[i].user(), t._2()[i].product(), newRating);
6567
}
66-
return new Tuple2<Object, Rating[]>(t._1(), scaledRatings);
68+
return new Tuple2<>(t._1(), scaledRatings);
6769
}
6870
}
6971
);
@@ -72,6 +74,7 @@ public Tuple2<Object, Rating[]> call(Tuple2<Object, Rating[]> t) {
7274
// Map ratings to 1 or 0, 1 indicating a movie that should be recommended
7375
JavaRDD<Rating> binarizedRatings = ratings.map(
7476
new Function<Rating, Rating>() {
77+
@Override
7578
public Rating call(Rating r) {
7679
double binaryRating;
7780
if (r.rating() > 0.0) {
@@ -87,6 +90,7 @@ public Rating call(Rating r) {
8790
// Group ratings by common user
8891
JavaPairRDD<Object, Iterable<Rating>> userMovies = binarizedRatings.groupBy(
8992
new Function<Rating, Object>() {
93+
@Override
9094
public Object call(Rating r) {
9195
return r.user();
9296
}
@@ -96,8 +100,9 @@ public Object call(Rating r) {
96100
// Get true relevant documents from all user ratings
97101
JavaPairRDD<Object, List<Integer>> userMoviesList = userMovies.mapValues(
98102
new Function<Iterable<Rating>, List<Integer>>() {
103+
@Override
99104
public List<Integer> call(Iterable<Rating> docs) {
100-
List<Integer> products = new ArrayList<Integer>();
105+
List<Integer> products = new ArrayList<>();
101106
for (Rating r : docs) {
102107
if (r.rating() > 0.0) {
103108
products.add(r.product());
@@ -111,8 +116,9 @@ public List<Integer> call(Iterable<Rating> docs) {
111116
// Extract the product id from each recommendation
112117
JavaPairRDD<Object, List<Integer>> userRecommendedList = userRecommended.mapValues(
113118
new Function<Rating[], List<Integer>>() {
119+
@Override
114120
public List<Integer> call(Rating[] docs) {
115-
List<Integer> products = new ArrayList<Integer>();
121+
List<Integer> products = new ArrayList<>();
116122
for (Rating r : docs) {
117123
products.add(r.product());
118124
}
@@ -124,7 +130,7 @@ public List<Integer> call(Rating[] docs) {
124130
userRecommendedList).values();
125131

126132
// Instantiate the metrics object
127-
RankingMetrics metrics = RankingMetrics.of(relevantDocs);
133+
RankingMetrics<Integer> metrics = RankingMetrics.of(relevantDocs);
128134

129135
// Precision and NDCG at k
130136
Integer[] kVector = {1, 3, 5};
@@ -139,6 +145,7 @@ public List<Integer> call(Rating[] docs) {
139145
// Evaluate the model using numerical ratings and regression metrics
140146
JavaRDD<Tuple2<Object, Object>> userProducts = ratings.map(
141147
new Function<Rating, Tuple2<Object, Object>>() {
148+
@Override
142149
public Tuple2<Object, Object> call(Rating r) {
143150
return new Tuple2<Object, Object>(r.user(), r.product());
144151
}
@@ -147,18 +154,20 @@ public Tuple2<Object, Object> call(Rating r) {
147154
JavaPairRDD<Tuple2<Integer, Integer>, Object> predictions = JavaPairRDD.fromJavaRDD(
148155
model.predict(JavaRDD.toRDD(userProducts)).toJavaRDD().map(
149156
new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Object>>() {
157+
@Override
150158
public Tuple2<Tuple2<Integer, Integer>, Object> call(Rating r) {
151159
return new Tuple2<Tuple2<Integer, Integer>, Object>(
152-
new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating());
160+
new Tuple2<>(r.user(), r.product()), r.rating());
153161
}
154162
}
155163
));
156164
JavaRDD<Tuple2<Object, Object>> ratesAndPreds =
157165
JavaPairRDD.fromJavaRDD(ratings.map(
158166
new Function<Rating, Tuple2<Tuple2<Integer, Integer>, Object>>() {
167+
@Override
159168
public Tuple2<Tuple2<Integer, Integer>, Object> call(Rating r) {
160169
return new Tuple2<Tuple2<Integer, Integer>, Object>(
161-
new Tuple2<Integer, Integer>(r.user(), r.product()), r.rating());
170+
new Tuple2<>(r.user(), r.product()), r.rating());
162171
}
163172
}
164173
)).join(predictions).values();

examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.spark.api.java.function.Function;
3737
import org.apache.spark.api.java.function.Function2;
3838
import org.apache.spark.api.java.function.PairFunction;
39+
import org.apache.spark.api.java.function.VoidFunction2;
3940
import org.apache.spark.broadcast.Broadcast;
4041
import org.apache.spark.streaming.Durations;
4142
import org.apache.spark.streaming.Time;
@@ -154,17 +155,17 @@ public Integer call(Integer i1, Integer i2) {
154155
}
155156
});
156157

157-
wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
158+
wordCounts.foreachRDD(new VoidFunction2<JavaPairRDD<String, Integer>, Time>() {
158159
@Override
159-
public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
160+
public void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
160161
// Get or register the blacklist Broadcast
161162
final Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
162163
// Get or register the droppedWordsCounter Accumulator
163164
final Accumulator<Integer> droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
164165
// Use blacklist to drop words and use droppedWordsCounter to count them
165166
String counts = rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
166167
@Override
167-
public Boolean call(Tuple2<String, Integer> wordCount) throws Exception {
168+
public Boolean call(Tuple2<String, Integer> wordCount) {
168169
if (blacklist.value().contains(wordCount._1())) {
169170
droppedWordsCounter.add(wordCount._2());
170171
return false;
@@ -178,7 +179,6 @@ public Boolean call(Tuple2<String, Integer> wordCount) throws Exception {
178179
System.out.println("Dropped " + droppedWordsCounter.value() + " word(s) totally");
179180
System.out.println("Appending to " + outputFile.getAbsolutePath());
180181
Files.append(output + "\n", outputFile, Charset.defaultCharset());
181-
return null;
182182
}
183183
});
184184

examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.spark.api.java.JavaRDD;
2727
import org.apache.spark.api.java.function.FlatMapFunction;
2828
import org.apache.spark.api.java.function.Function;
29-
import org.apache.spark.api.java.function.Function2;
29+
import org.apache.spark.api.java.function.VoidFunction2;
3030
import org.apache.spark.sql.SQLContext;
3131
import org.apache.spark.sql.DataFrame;
3232
import org.apache.spark.api.java.StorageLevels;
@@ -78,13 +78,14 @@ public Iterable<String> call(String x) {
7878
});
7979

8080
// Convert RDDs of the words DStream to DataFrame and run SQL query
81-
words.foreachRDD(new Function2<JavaRDD<String>, Time, Void>() {
81+
words.foreachRDD(new VoidFunction2<JavaRDD<String>, Time>() {
8282
@Override
83-
public Void call(JavaRDD<String> rdd, Time time) {
83+
public void call(JavaRDD<String> rdd, Time time) {
8484
SQLContext sqlContext = JavaSQLContextSingleton.getInstance(rdd.context());
8585

8686
// Convert JavaRDD[String] to JavaRDD[bean class] to DataFrame
8787
JavaRDD<JavaRecord> rowRDD = rdd.map(new Function<String, JavaRecord>() {
88+
@Override
8889
public JavaRecord call(String word) {
8990
JavaRecord record = new JavaRecord();
9091
record.setWord(word);
@@ -101,7 +102,6 @@ public JavaRecord call(String word) {
101102
sqlContext.sql("select word, count(*) as total from words group by word");
102103
System.out.println("========= " + time + "=========");
103104
wordCountsDataFrame.show();
104-
return null;
105105
}
106106
});
107107

examples/src/main/java/org/apache/spark/examples/streaming/JavaTwitterHashTagJoinSentiments.java

+15-21
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@
1717

1818
package org.apache.spark.examples.streaming;
1919

20-
import org.apache.commons.io.IOUtils;
2120
import org.apache.spark.SparkConf;
2221
import org.apache.spark.api.java.JavaPairRDD;
2322
import org.apache.spark.api.java.function.FlatMapFunction;
2423
import org.apache.spark.api.java.function.Function;
2524
import org.apache.spark.api.java.function.Function2;
2625
import org.apache.spark.api.java.function.PairFunction;
26+
import org.apache.spark.api.java.function.VoidFunction;
2727
import org.apache.spark.streaming.Duration;
2828
import org.apache.spark.streaming.api.java.JavaDStream;
2929
import org.apache.spark.streaming.api.java.JavaPairDStream;
@@ -33,8 +33,6 @@
3333
import scala.Tuple2;
3434
import twitter4j.Status;
3535

36-
import java.io.IOException;
37-
import java.net.URI;
3836
import java.util.Arrays;
3937
import java.util.List;
4038

@@ -44,7 +42,7 @@
4442
*/
4543
public class JavaTwitterHashTagJoinSentiments {
4644

47-
public static void main(String[] args) throws IOException {
45+
public static void main(String[] args) {
4846
if (args.length < 4) {
4947
System.err.println("Usage: JavaTwitterHashTagJoinSentiments <consumer key> <consumer secret>" +
5048
" <access token> <access token secret> [<filters>]");
@@ -79,7 +77,7 @@ public Iterable<String> call(Status s) {
7977

8078
JavaDStream<String> hashTags = words.filter(new Function<String, Boolean>() {
8179
@Override
82-
public Boolean call(String word) throws Exception {
80+
public Boolean call(String word) {
8381
return word.startsWith("#");
8482
}
8583
});
@@ -91,8 +89,7 @@ public Boolean call(String word) throws Exception {
9189
@Override
9290
public Tuple2<String, Double> call(String line) {
9391
String[] columns = line.split("\t");
94-
return new Tuple2<String, Double>(columns[0],
95-
Double.parseDouble(columns[1]));
92+
return new Tuple2<>(columns[0], Double.parseDouble(columns[1]));
9693
}
9794
});
9895

@@ -101,7 +98,7 @@ public Tuple2<String, Double> call(String line) {
10198
@Override
10299
public Tuple2<String, Integer> call(String s) {
103100
// leave out the # character
104-
return new Tuple2<String, Integer>(s.substring(1), 1);
101+
return new Tuple2<>(s.substring(1), 1);
105102
}
106103
});
107104

@@ -120,9 +117,8 @@ public Integer call(Integer a, Integer b) {
120117
hashTagTotals.transformToPair(new Function<JavaPairRDD<String, Integer>,
121118
JavaPairRDD<String, Tuple2<Double, Integer>>>() {
122119
@Override
123-
public JavaPairRDD<String, Tuple2<Double, Integer>> call(JavaPairRDD<String,
124-
Integer> topicCount)
125-
throws Exception {
120+
public JavaPairRDD<String, Tuple2<Double, Integer>> call(
121+
JavaPairRDD<String, Integer> topicCount) {
126122
return wordSentiments.join(topicCount);
127123
}
128124
});
@@ -131,37 +127,36 @@ public JavaPairRDD<String, Tuple2<Double, Integer>> call(JavaPairRDD<String,
131127
new PairFunction<Tuple2<String, Tuple2<Double, Integer>>, String, Double>() {
132128
@Override
133129
public Tuple2<String, Double> call(Tuple2<String,
134-
Tuple2<Double, Integer>> topicAndTuplePair) throws Exception {
130+
Tuple2<Double, Integer>> topicAndTuplePair) {
135131
Tuple2<Double, Integer> happinessAndCount = topicAndTuplePair._2();
136-
return new Tuple2<String, Double>(topicAndTuplePair._1(),
132+
return new Tuple2<>(topicAndTuplePair._1(),
137133
happinessAndCount._1() * happinessAndCount._2());
138134
}
139135
});
140136

141137
JavaPairDStream<Double, String> happinessTopicPairs = topicHappiness.mapToPair(
142138
new PairFunction<Tuple2<String, Double>, Double, String>() {
143139
@Override
144-
public Tuple2<Double, String> call(Tuple2<String, Double> topicHappiness)
145-
throws Exception {
146-
return new Tuple2<Double, String>(topicHappiness._2(),
140+
public Tuple2<Double, String> call(Tuple2<String, Double> topicHappiness) {
141+
return new Tuple2<>(topicHappiness._2(),
147142
topicHappiness._1());
148143
}
149144
});
150145

151146
JavaPairDStream<Double, String> happiest10 = happinessTopicPairs.transformToPair(
152147
new Function<JavaPairRDD<Double, String>, JavaPairRDD<Double, String>>() {
153148
@Override
154-
public JavaPairRDD<Double, String> call(JavaPairRDD<Double,
155-
String> happinessAndTopics) throws Exception {
149+
public JavaPairRDD<Double, String> call(
150+
JavaPairRDD<Double, String> happinessAndTopics) {
156151
return happinessAndTopics.sortByKey(false);
157152
}
158153
}
159154
);
160155

161156
// Print hash tags with the most positive sentiment values
162-
happiest10.foreachRDD(new Function<JavaPairRDD<Double, String>, Void>() {
157+
happiest10.foreachRDD(new VoidFunction<JavaPairRDD<Double, String>>() {
163158
@Override
164-
public Void call(JavaPairRDD<Double, String> happinessTopicPairs) throws Exception {
159+
public void call(JavaPairRDD<Double, String> happinessTopicPairs) {
165160
List<Tuple2<Double, String>> topList = happinessTopicPairs.take(10);
166161
System.out.println(
167162
String.format("\nHappiest topics in last 10 seconds (%s total):",
@@ -170,7 +165,6 @@ public Void call(JavaPairRDD<Double, String> happinessTopicPairs) throws Excepti
170165
System.out.println(
171166
String.format("%s (%s happiness)", pair._2(), pair._1()));
172167
}
173-
return null;
174168
}
175169
});
176170

examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ object SparkHdfsLR {
7474
val conf = new Configuration()
7575
val sc = new SparkContext(sparkConf)
7676
val lines = sc.textFile(inputPath)
77-
val points = lines.map(parsePoint _).cache()
77+
val points = lines.map(parsePoint).cache()
7878
val ITERATIONS = args(1).toInt
7979

8080
// Initialize w to a random value

examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ object SparkTachyonHdfsLR {
7171
val conf = new Configuration()
7272
val sc = new SparkContext(sparkConf)
7373
val lines = sc.textFile(inputPath)
74-
val points = lines.map(parsePoint _).persist(StorageLevel.OFF_HEAP)
74+
val points = lines.map(parsePoint).persist(StorageLevel.OFF_HEAP)
7575
val ITERATIONS = args(1).toInt
7676

7777
// Initialize w to a random value

external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java

+3-4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.spark.api.java.JavaRDD;
3636
import org.apache.spark.api.java.JavaPairRDD;
3737
import org.apache.spark.api.java.function.Function;
38+
import org.apache.spark.api.java.function.VoidFunction;
3839
import org.apache.spark.streaming.Durations;
3940
import org.apache.spark.streaming.api.java.JavaDStream;
4041
import org.apache.spark.streaming.api.java.JavaStreamingContext;
@@ -130,17 +131,15 @@ public String call(MessageAndMetadata<String, String> msgAndMd) {
130131
JavaDStream<String> unifiedStream = stream1.union(stream2);
131132

132133
final Set<String> result = Collections.synchronizedSet(new HashSet<String>());
133-
unifiedStream.foreachRDD(
134-
new Function<JavaRDD<String>, Void>() {
134+
unifiedStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
135135
@Override
136-
public Void call(JavaRDD<String> rdd) {
136+
public void call(JavaRDD<String> rdd) {
137137
result.addAll(rdd.collect());
138138
for (OffsetRange o : offsetRanges.get()) {
139139
System.out.println(
140140
o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset()
141141
);
142142
}
143-
return null;
144143
}
145144
}
146145
);

0 commit comments

Comments
 (0)