Skip to content

Commit

Permalink
[SPARK-13423][WIP][CORE][SQL][STREAMING] Static analysis fixes for 2.x
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Make some cross-cutting code improvements according to static analysis. These are individually up for discussion since they exist in separate commits that can be reverted. The changes are broadly:

- Inner class should be static
- Mismatched hashCode/equals
- Overflow in compareTo
- Unchecked warnings
- Misuse of assert, vs junit.assert
- get(a) + getOrElse(b) -> getOrElse(a,b)
- Array/String .size -> .length (occasionally, -> .isEmpty / .nonEmpty) to avoid implicit conversions
- Dead code
- tailrec
- exists(_ == ) -> contains find + nonEmpty -> exists filter + size -> count
- reduce(_+_) -> sum map + flatten -> map

The most controversial may be .size -> .length simply because of its size. It is intended to avoid implicits that might be expensive in some places.

## How was the this patch tested?

Existing Jenkins unit tests.

Author: Sean Owen <[email protected]>

Closes apache#11292 from srowen/SPARK-13423.
  • Loading branch information
srowen committed Mar 3, 2016
1 parent 02b7677 commit e97fc7f
Show file tree
Hide file tree
Showing 147 changed files with 345 additions and 293 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public static void tearDown() {
testFile.delete();
}

class FetchResult {
static class FetchResult {
public Set<Integer> successChunks;
public Set<Integer> failedChunks;
public List<ManagedBuffer> buffers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ public StreamManager getStreamManager() {
synchronized (callback1) {
client.sendRpc(ByteBuffer.allocate(0), callback1);
callback1.wait(4 * 1000);
assert (callback1.failure != null);
assert (callback1.failure instanceof IOException);
assertNotNull(callback1.failure);
assertTrue(callback1.failure instanceof IOException);
}
semaphore.release();
}
Expand Down Expand Up @@ -167,8 +167,8 @@ public StreamManager getStreamManager() {
synchronized (callback0) {
client0.sendRpc(ByteBuffer.allocate(0), callback0);
callback0.wait(FOREVER);
assert (callback0.failure instanceof IOException);
assert (!client0.isActive());
assertTrue(callback0.failure instanceof IOException);
assertFalse(client0.isActive());
}

// Increment the semaphore and the second request should succeed quickly.
Expand Down Expand Up @@ -236,15 +236,15 @@ public StreamManager getStreamManager() {

synchronized (callback1) {
// failed at same time as previous
assert (callback0.failure instanceof IOException);
assertTrue(callback0.failure instanceof IOException);
}
}

/**
* Callback which sets 'success' or 'failure' on completion.
* Additionally notifies all waiters on this callback when invoked.
*/
class TestCallback implements RpcResponseCallback, ChunkReceivedCallback {
static class TestCallback implements RpcResponseCallback, ChunkReceivedCallback {

int successLength = -1;
Throwable failure;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public static void tearDown() {
clientFactory.close();
}

class RpcResult {
static class RpcResult {
public Set<String> successMessages;
public Set<String> errorMessages;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import com.google.common.collect.Maps;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

Expand Down Expand Up @@ -95,7 +96,7 @@ public void run() {
try {
TransportClient client =
factory.createClient(TestUtils.getLocalHost(), server1.getPort());
assert (client.isActive());
assertTrue(client.isActive());
clients.add(client);
} catch (IOException e) {
failed.incrementAndGet();
Expand All @@ -115,8 +116,8 @@ public void run() {
attempts[i].join();
}

assert(failed.get() == 0);
assert(clients.size() == maxConnections);
Assert.assertEquals(0, failed.get());
Assert.assertEquals(clients.size(), maxConnections);

for (TransportClient client : clients) {
client.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void testByteBufBody() throws Exception {
assertEquals(42, result.readLong());
assertEquals(84, result.readLong());

assert(msg.release());
assertTrue(msg.release());
assertEquals(0, bodyPassedToNettyManagedBuffer.refCnt());
assertEquals(0, header.refCnt());
}
Expand All @@ -77,7 +77,7 @@ public void testDeallocateReleasesManagedBuffer() throws Exception {
ByteBuf body = (ByteBuf) managedBuf.convertToNetty();
assertEquals(2, body.refCnt());
MessageWithHeader msg = new MessageWithHeader(managedBuf, header, body, body.readableBytes());
assert(msg.release());
assertTrue(msg.release());
Mockito.verify(managedBuf, Mockito.times(1)).release();
assertEquals(0, body.refCnt());
}
Expand All @@ -94,7 +94,7 @@ private void testFileRegionBody(int totalWrites, int writesPerCall) throws Excep
for (long i = 0; i < 8; i++) {
assertEquals(i, result.readLong());
}
assert(msg.release());
assertTrue(msg.release());
}

private ByteBuf doWrite(MessageWithHeader msg, int minExpectedWrites) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ public int hashCode() {
return Objects.hashCode(appId);
}

@Override
public boolean equals(Object o) {
if (!(o instanceof RegisterDriver)) {
return false;
}
return Objects.equal(appId, ((RegisterDriver) o).appId);
}

public static RegisterDriver decode(ByteBuf buf) {
String appId = Encoders.Strings.decode(buf);
return new RegisterDriver(appId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void afterEach() {
handler.applicationRemoved(APP_ID, false /* cleanupLocalDirs */);
}

class FetchResult {
static class FetchResult {
public Set<String> successBlocks;
public Set<String> failedBlocks;
public List<ManagedBuffer> buffers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
}
}

assert stub != null;
assertNotNull(stub);
stub.when(fetchStarter).createAndStart((String[]) any(), (BlockFetchingListener) anyObject());
String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]);
new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, listener).start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class UTF8StringPropertyCheckSuite extends FunSuite with GeneratorDrivenProperty

test("concat") {
def concat(orgin: Seq[String]): String =
if (orgin.exists(_ == null)) null else orgin.mkString
if (orgin.contains(null)) null else orgin.mkString

forAll { (inputs: Seq[String]) =>
assert(UTF8String.concat(inputs.map(toUTF8): _*) === toUTF8(inputs.mkString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ final class ShuffleInMemorySorter {
private static final class SortComparator implements Comparator<PackedRecordPointer> {
@Override
public int compare(PackedRecordPointer left, PackedRecordPointer right) {
return left.getPartitionId() - right.getPartitionId();
int leftId = left.getPartitionId();
int rightId = right.getPartitionId();
return leftId < rightId ? -1 : (leftId > rightId ? 1 : 0);
}
}
private static final SortComparator SORT_COMPARATOR = new SortComparator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ public UnsafeSorterIterator getIterator() throws IOException {
/**
* Chain multiple UnsafeSorterIterator together as single one.
*/
class ChainedIterator extends UnsafeSorterIterator {
static class ChainedIterator extends UnsafeSorterIterator {

private final Queue<UnsafeSorterIterator> iterators;
private UnsafeSorterIterator current;
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
val shuffleId: Int = _rdd.context.newShuffleId()

val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.size, this)
shuffleId, _rdd.partitions.length, this)

_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ object Partitioner {
* We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD.
*/
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.length).reverse
for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
return r.partitioner.get
}
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
} else {
new HashPartitioner(bySize.head.partitions.size)
new HashPartitioner(bySize.head.partitions.length)
}
}
}
Expand Down Expand Up @@ -122,7 +122,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
val sampleSize = math.min(20.0 * partitions, 1e6)
// Assume the input partitions are roughly balanced and over-sample a little bit.
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.size).toInt
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
if (numItems == 0L) {
Array.empty
Expand All @@ -137,7 +137,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
imbalancedPartitions += idx
} else {
// The weight is 1 over the sampling probability.
val weight = (n.toDouble / sample.size).toFloat
val weight = (n.toDouble / sample.length).toFloat
for (key <- sample) {
candidates += ((key, weight))
}
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import java.net.URI
import java.util.{Arrays, Properties, UUID}
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, AtomicReference}
import java.util.UUID.randomUUID

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.Map
import scala.collection.generic.Growable
Expand Down Expand Up @@ -391,8 +391,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)

_jars = _conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0))
_jars = _conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
.toSeq.flatten

_eventLogDir =
Expand Down Expand Up @@ -2310,6 +2310,7 @@ object SparkContext extends Logging {
* Create a task scheduler based on a given master URL.
* Return a 2-tuple of the scheduler backend and the task scheduler.
*/
@tailrec
private def createTaskScheduler(
sc: SparkContext,
master: String,
Expand Down
1 change: 0 additions & 1 deletion core/src/main/scala/org/apache/spark/TaskEndReason.scala
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ case class ExecutorLostFailure(
} else {
"unrelated to the running tasks"
}
s"ExecutorLostFailure (executor ${execId} exited due to an issue ${exitBehavior})"
s"ExecutorLostFailure (executor ${execId} exited ${exitBehavior})" +
reason.map { r => s" Reason: $r" }.getOrElse("")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.deploy

import java.net.{URI, URISyntaxException}

import scala.annotation.tailrec
import scala.collection.mutable.ListBuffer

import org.apache.log4j.Level
Expand Down Expand Up @@ -49,6 +50,7 @@ private[deploy] class ClientArguments(args: Array[String]) {

parse(args.toList)

@tailrec
private def parse(args: List[String]): Unit = args match {
case ("--cores" | "-c") :: IntParam(value) :: tail =>
cores = value
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowab
import java.net.URL
import java.security.PrivilegedExceptionAction

import scala.annotation.tailrec
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}

import org.apache.commons.lang3.StringUtils
Expand Down Expand Up @@ -150,6 +151,7 @@ object SparkSubmit {
* Second, we use this launch environment to invoke the main method of the child
* main class.
*/
@tailrec
private def submit(args: SparkSubmitArguments): Unit = {
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)

Expand Down Expand Up @@ -721,6 +723,7 @@ object SparkSubmit {
throw new IllegalStateException("The main method in the given main class must be static")
}

@tailrec
def findCause(t: Throwable): Throwable = t match {
case e: UndeclaredThrowableException =>
if (e.getCause() != null) findCause(e.getCause()) else e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.deploy.history

import scala.annotation.tailrec

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.util.Utils

Expand All @@ -29,6 +31,7 @@ private[history] class HistoryServerArguments(conf: SparkConf, args: Array[Strin

parse(args.toList)

@tailrec
private def parse(args: List[String]): Unit = {
if (args.length == 1) {
setLogDirectory(args.head)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.deploy.master

import scala.annotation.tailrec

import org.apache.spark.SparkConf
import org.apache.spark.util.{IntParam, Utils}

Expand Down Expand Up @@ -49,6 +51,7 @@ private[master] class MasterArguments(args: Array[String], conf: SparkConf) {
webUiPort = conf.get("spark.master.ui.port").toInt
}

@tailrec
private def parse(args: List[String]): Unit = args match {
case ("--ip" | "-i") :: value :: tail =>
Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[spark] class MasterSource(val master: Master) extends Source {

// Gauge for alive worker numbers in cluster
metricRegistry.register(MetricRegistry.name("aliveWorkers"), new Gauge[Int]{
override def getValue: Int = master.workers.filter(_.state == WorkerState.ALIVE).size
override def getValue: Int = master.workers.count(_.state == WorkerState.ALIVE)
})

// Gauge for application numbers in cluster
Expand All @@ -42,6 +42,6 @@ private[spark] class MasterSource(val master: Master) extends Source {

// Gauge for waiting application numbers in cluster
metricRegistry.register(MetricRegistry.name("waitingApps"), new Gauge[Int] {
override def getValue: Int = master.apps.filter(_.state == ApplicationState.WAITING).size
override def getValue: Int = master.apps.count(_.state == ApplicationState.WAITING)
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer

override def read[T: ClassTag](prefix: String): Seq[T] = {
zk.getChildren.forPath(WORKING_DIR).asScala
.filter(_.startsWith(prefix)).map(deserializeFromFile[T]).flatten
.filter(_.startsWith(prefix)).flatMap(deserializeFromFile[T])
}

override def close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,18 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
</li>
}.getOrElse { Seq.empty }
}
<li><strong>Alive Workers:</strong> {aliveWorkers.size}</li>
<li><strong>Alive Workers:</strong> {aliveWorkers.length}</li>
<li><strong>Cores in use:</strong> {aliveWorkers.map(_.cores).sum} Total,
{aliveWorkers.map(_.coresUsed).sum} Used</li>
<li><strong>Memory in use:</strong>
{Utils.megabytesToString(aliveWorkers.map(_.memory).sum)} Total,
{Utils.megabytesToString(aliveWorkers.map(_.memoryUsed).sum)} Used</li>
<li><strong>Applications:</strong>
{state.activeApps.size} Running,
{state.completedApps.size} Completed </li>
{state.activeApps.length} Running,
{state.completedApps.length} Completed </li>
<li><strong>Drivers:</strong>
{state.activeDrivers.size} Running,
{state.completedDrivers.size} Completed </li>
{state.activeDrivers.length} Running,
{state.completedDrivers.length} Completed </li>
<li><strong>Status:</strong> {state.status}</li>
</ul>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.deploy.mesos

import scala.annotation.tailrec

import org.apache.spark.SparkConf
import org.apache.spark.util.{IntParam, Utils}

Expand All @@ -34,6 +36,7 @@ private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf:

propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)

@tailrec
private def parse(args: List[String]): Unit = args match {
case ("--host" | "-h") :: value :: tail =>
Utils.checkHost(value, "Please use hostname " + value)
Expand Down
Loading

0 comments on commit e97fc7f

Please sign in to comment.