Skip to content

Commit

Permalink
Merge pull request apache#567 from ScrapCodes/style2.
Browse files Browse the repository at this point in the history
SPARK-1058, Fix Style Errors and Add Scala Style to Spark Build. Pt 2

Continuation of PR apache#557

With this all scala style errors are fixed across the code base !!

The reason for creating a separate PR was to not interrupt an already reviewed and ready to merge PR. Hope this gets reviewed soon and merged too.

Author: Prashant Sharma <[email protected]>

Closes apache#567 and squashes the following commits:

3b1ec30 [Prashant Sharma] scala style fixes
  • Loading branch information
ScrapCodes authored and rxin committed Feb 10, 2014
1 parent 2182aa3 commit 919bd7f
Show file tree
Hide file tree
Showing 63 changed files with 356 additions and 254 deletions.
3 changes: 2 additions & 1 deletion bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ object Bagel extends Logging {
* @param messages initial set of messages represented as an RDD of (Key, Message) pairs. Often
* this will be an empty array, i.e. sc.parallelize(Array[K, Message]()).
* @param combiner [[org.apache.spark.bagel.Combiner]] combines multiple individual messages to a
* given vertex into one message before sending (which often involves network I/O).
* given vertex into one message before sending (which often involves network
* I/O).
* @param aggregator [[org.apache.spark.bagel.Aggregator]] performs a reduce across all vertices
* after each superstep and provides the result to each vertex in the next
* superstep.
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {

/** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {
storageLevel: StorageLevel): Iterator[T] = {
val key = RDDBlockId(rdd.id, split.index)
logDebug("Looking for partition " + key)
blockManager.get(key) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ private[spark] trait AppClientListener {
/** Dead means that we couldn't find any Masters to connect to, and have given up. */
def dead(): Unit

def executorAdded(
fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit
def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int)

def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
System.exit(0)
}

case RegisterWorker(id, workerHost, workerPort, cores, memory, workerWebUiPort, publicAddress)
=> {
case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>
{
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
host, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
Expand All @@ -176,7 +176,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
sender, workerWebUiPort, publicAddress)
sender, workerUiPort, publicAddress)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ private[spark] class IndexPage(parent: MasterWebUI) {
</div>

<div>
{if (hasDrivers) {
{
if (hasDrivers) {
<div class="row-fluid">
<div class="span12">
<h4> Completed Drivers </h4>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ object CommandUtils extends Logging {
.map(p => List("-Djava.library.path=" + p))
.getOrElse(Nil)
val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS"))
.map(Utils.splitCommandString).getOrElse(Nil)
.map(Utils.splitCommandString).getOrElse(Nil)
val userOpts = getEnv("SPARK_JAVA_OPTS", command).map(Utils.splitCommandString).getOrElse(Nil)
val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
new LinkedBlockingDeque[Runnable]())

private val serverChannel = ServerSocketChannel.open()
private val connectionsByKey = new HashMap[SelectionKey, Connection]
with SynchronizedMap[SelectionKey, Connection]
private val connectionsByKey =
new HashMap[SelectionKey, Connection] with SynchronizedMap[SelectionKey, Connection]
private val connectionsById = new HashMap[ConnectionManagerId, SendingConnection]
with SynchronizedMap[ConnectionManagerId, SendingConnection]
private val messageStatuses = new HashMap[Int, MessageStatus]
Expand Down Expand Up @@ -445,10 +445,9 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
assert (sendingConnectionManagerId == remoteConnectionManagerId)

messageStatuses.synchronized {
for (s <- messageStatuses.values if
s.connectionManagerId == sendingConnectionManagerId) {
logInfo("Notifying " + s)
s.synchronized {
for (s <- messageStatuses.values if s.connectionManagerId == sendingConnectionManagerId) {
logInfo("Notifying " + s)
s.synchronized {
s.attempted = true
s.acked = false
s.markDone()
Expand Down Expand Up @@ -574,7 +573,7 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi
val promise = Promise[Option[Message]]
val status = new MessageStatus(
message, connectionManagerId, s => promise.success(s.ackMessage))
messageStatuses.synchronized {
messageStatuses.synchronized {
messageStatuses += ((message.id, status))
}
sendMessage(connectionManagerId, message)
Expand Down Expand Up @@ -684,8 +683,11 @@ private[spark] object ConnectionManager {
println("--------------------------")
val size = 10 * 1024 * 1024
val count = 10
val buffers = Array.tabulate(count)(i => ByteBuffer.allocate(size * (i + 1)).put(
Array.tabulate[Byte](size * (i + 1))(x => x.toByte)))
val buffers = Array.tabulate(count) { i =>
val bufferLen = size * (i + 1)
val bufferContent = Array.tabulate[Byte](bufferLen)(x => x.toByte)
ByteBuffer.allocate(bufferLen).put(bufferContent)
}
buffers.foreach(_.flip)
val mb = buffers.map(_.remaining).reduceLeft(_ + _) / 1024.0 / 1024.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,13 @@ private[spark] object ConnectionManagerTest extends Logging{
buffer.flip

val startTime = System.currentTimeMillis
val futures = slaveConnManagerIds.filter(_ != thisConnManagerId).map(slaveConnManagerId =>
{
val bufferMessage = Message.createBufferMessage(buffer.duplicate)
logInfo("Sending [" + bufferMessage + "] to [" + slaveConnManagerId + "]")
connManager.sendMessageReliably(slaveConnManagerId, bufferMessage)
})
val futures = slaveConnManagerIds.filter(_ != thisConnManagerId).map{ slaveConnManagerId =>
{
val bufferMessage = Message.createBufferMessage(buffer.duplicate)
logInfo("Sending [" + bufferMessage + "] to [" + slaveConnManagerId + "]")
connManager.sendMessageReliably(slaveConnManagerId, bufferMessage)
}
}
val results = futures.map(f => Await.result(f, awaitTime))
val finishTime = System.currentTimeMillis
Thread.sleep(5000)
Expand Down
17 changes: 8 additions & 9 deletions core/src/main/scala/org/apache/spark/network/SenderTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,19 @@ private[spark] object SenderTest {
val dataMessage = Message.createBufferMessage(buffer.duplicate)
val startTime = System.currentTimeMillis
/*println("Started timer at " + startTime)*/
val responseStr =
manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage) match {
case Some(response) =>
val buffer = response.asInstanceOf[BufferMessage].buffers(0)
new String(buffer.array)
case None => "none"
}
val responseStr = manager.sendMessageReliablySync(targetConnectionManagerId, dataMessage)
.map { response =>
val buffer = response.asInstanceOf[BufferMessage].buffers(0)
new String(buffer.array)
}.getOrElse("none")

val finishTime = System.currentTimeMillis
val mb = size / 1024.0 / 1024.0
val ms = finishTime - startTime
// val resultStr = "Sent " + mb + " MB " + targetServer + " in " + ms + " ms at " + (mb / ms
// * 1000.0) + " MB/s"
val resultStr = "Sent " + mb + " MB " + targetServer + " in " + ms + " ms (" + (mb / ms *
1000.0).toInt + "MB/s) | Response = " + responseStr
val resultStr = "Sent " + mb + " MB " + targetServer + " in " + ms + " ms (" +
(mb / ms * 1000.0).toInt + "MB/s) | Response = " + responseStr
println(resultStr)
})
}
Expand Down
9 changes: 4 additions & 5 deletions core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,7 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
def next(): (String, Partition) = {
if (it.hasNext) {
it.next()
}
else {
} else {
it = resetIterator() // ran out of preferred locations, reset and rotate to the beginning
it.next()
}
Expand Down Expand Up @@ -291,9 +290,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
val r1 = rnd.nextInt(groupArr.size)
val r2 = rnd.nextInt(groupArr.size)
val minPowerOfTwo = if (groupArr(r1).size < groupArr(r2).size) groupArr(r1) else groupArr(r2)
if (prefPart == None) {
// if no preferred locations, just use basic power of two
return minPowerOfTwo
if (prefPart.isEmpty) {
// if no preferred locations, just use basic power of two
return minPowerOfTwo
}

val prefPartActual = prefPart.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ private[spark] class ParallelCollectionPartition[T: ClassTag](
override def hashCode(): Int = (41 * (41 + rddId) + slice).toInt

override def equals(other: Any): Boolean = other match {
case that: ParallelCollectionPartition[_] => (this.rddId == that.rddId &&
this.slice == that.slice)
case that: ParallelCollectionPartition[_] =>
this.rddId == that.rddId && this.slice == that.slice
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ private[spark] object ResultTask {
val metadataCleaner = new MetadataCleaner(
MetadataCleanerType.RESULT_TASK, serializedInfoCache.clearOldValues, new SparkConf)

def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _)
: Array[Byte] = {
def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _): Array[Byte] =
{
synchronized {
val old = serializedInfoCache.get(stageId).orNull
if (old != null) {
Expand All @@ -56,8 +56,8 @@ private[spark] object ResultTask {
}
}

def deserializeInfo(stageId: Int, bytes: Array[Byte])
: (RDD[_], (TaskContext, Iterator[_]) => _) = {
def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], (TaskContext, Iterator[_]) => _) =
{
val loader = Thread.currentThread.getContextClassLoader
val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
val ser = SparkEnv.get.closureSerializer.newInstance()
Expand Down
18 changes: 10 additions & 8 deletions core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -100,13 +100,13 @@ class StatsReportListener extends SparkListener with Logging {

//shuffle write
showBytesDistribution("shuffle bytes written:",
(_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten})
(_,metric) => metric.shuffleWriteMetrics.map(_.shuffleBytesWritten))

//fetch & io
showMillisDistribution("fetch wait time:",
(_, metric) => metric.shuffleReadMetrics.map{_.fetchWaitTime})
(_, metric) => metric.shuffleReadMetrics.map(_.fetchWaitTime))
showBytesDistribution("remote bytes read:",
(_, metric) => metric.shuffleReadMetrics.map{_.remoteBytesRead})
(_, metric) => metric.shuffleReadMetrics.map(_.remoteBytesRead))
showBytesDistribution("task result size:", (_, metric) => Some(metric.resultSize))

//runtime breakdown
Expand Down Expand Up @@ -152,8 +152,8 @@ private[spark] object StatsReportListener extends Logging {
logInfo("\t" + quantiles.mkString("\t"))
}

def showDistribution(heading: String,
dOpt: Option[Distribution], formatNumber: Double => String) {
def showDistribution(heading: String, dOpt: Option[Distribution], formatNumber: Double => String)
{
dOpt.foreach { d => showDistribution(heading, d, formatNumber)}
}

Expand All @@ -162,9 +162,11 @@ private[spark] object StatsReportListener extends Logging {
showDistribution(heading, dOpt, f _)
}

def showDistribution(heading:String, format: String,
getMetric: (TaskInfo,TaskMetrics) => Option[Double])
(implicit stage: SparkListenerStageCompleted) {
def showDistribution(
heading: String,
format: String,
getMetric: (TaskInfo, TaskMetrics) => Option[Double])
(implicit stage: SparkListenerStageCompleted) {
showDistribution(heading, extractDoubleDistribution(stage, getMetric), format)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ private[spark] class BlockManager(
throw new Exception("Block " + blockId + " not found on disk, though it should be")
}
} else {
doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
doGetLocal(blockId, asValues = false).asInstanceOf[Option[ByteBuffer]]
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.network._

private[spark]
class BlockMessageArray(var blockMessages: Seq[BlockMessage])
extends Seq[BlockMessage] with Logging {
extends Seq[BlockMessage] with Logging {

def this(bm: BlockMessage) = this(Array(bm))

Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,14 @@ private[spark] object JettyUtils extends Logging {
}

/**
* Attempts to start a Jetty server at the supplied hostName:port which uses the supplied handlers.
* Attempts to start a Jetty server at the supplied hostName:port which uses the supplied
* handlers.
*
* If the desired port number is contented, continues incrementing ports until a free port is
* found. Returns the chosen port and the jetty Server object.
*/
def startJettyServer(hostName: String, port: Int, handlers: Seq[(String, Handler)]): (Server, Int) = {
def startJettyServer(hostName: String, port: Int, handlers: Seq[(String, Handler)]): (Server, Int)
= {

val handlersToRegister = handlers.map { case(path, handler) =>
val contextHandler = new ContextHandler(path)
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ private[spark] object UIUtils {
case _ => <li><a href={prependBaseUri("/environment")}>Environment</a></li>
}
val executors = page match {
case Executors => <li class="active"><a href={prependBaseUri("/executors")}>Executors</a>
</li>
case Executors =>
<li class="active"><a href={prependBaseUri("/executors")}>Executors</a></li>
case _ => <li><a href={prependBaseUri("/executors")}>Executors</a></li>
}

Expand All @@ -66,7 +66,8 @@ private[spark] object UIUtils {
<div class="navbar navbar-static-top">
<div class="navbar-inner">
<a href={prependBaseUri("/")} class="brand">
<img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} /></a>
<img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} />
</a>
<ul class="nav">
{jobs}
{storage}
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,8 @@ private[spark] class PoolTable(pools: Seq[Schedulable], listener: JobProgressLis
}
<tr>
<td>
<a href=
{"%s/stages/pool?poolname=%s".format(UIUtils.prependBaseUri(),p.name)}>
{p.name}</a></td>
<a href={"%s/stages/pool?poolname=%s".format(UIUtils.prependBaseUri(),p.name)}>{p.name}</a>
</td>
<td>{p.minShare}</td>
<td>{p.weight}</td>
<td>{activeStages}</td>
Expand Down
19 changes: 9 additions & 10 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,28 +223,27 @@ private[spark] class StagePage(parent: JobProgressUI) {
val gcTime = metrics.map(m => m.jvmGCTime).getOrElse(0L)
val serializationTime = metrics.map(m => m.resultSerializationTime).getOrElse(0L)

val maybeShuffleRead = metrics.flatMap{m => m.shuffleReadMetrics}.map{s => s.remoteBytesRead}
val maybeShuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s => s.remoteBytesRead)
val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("")
val shuffleReadReadable = maybeShuffleRead.map{Utils.bytesToString(_)}.getOrElse("")
val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("")

val maybeShuffleWrite =
metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => s.shuffleBytesWritten}
metrics.flatMap{m => m.shuffleWriteMetrics}.map(s => s.shuffleBytesWritten)
val shuffleWriteSortable = maybeShuffleWrite.map(_.toString).getOrElse("")
val shuffleWriteReadable = maybeShuffleWrite.map{Utils.bytesToString(_)}.getOrElse("")
val shuffleWriteReadable = maybeShuffleWrite.map(Utils.bytesToString).getOrElse("")

val maybeWriteTime = metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => s.shuffleWriteTime}
val maybeWriteTime = metrics.flatMap(m => m.shuffleWriteMetrics).map(s => s.shuffleWriteTime)
val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("")
val writeTimeReadable = maybeWriteTime.map{ t => t / (1000 * 1000)}.map{ ms =>
val writeTimeReadable = maybeWriteTime.map( t => t / (1000 * 1000)).map{ ms =>
if (ms == 0) "" else parent.formatDuration(ms)}.getOrElse("")

val maybeMemoryBytesSpilled = metrics.map{m => m.memoryBytesSpilled}
val maybeMemoryBytesSpilled = metrics.map(m => m.memoryBytesSpilled)
val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.map(_.toString).getOrElse("")
val memoryBytesSpilledReadable = maybeMemoryBytesSpilled.map{Utils.bytesToString(_)}
.getOrElse("")
val memoryBytesSpilledReadable = maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("")

val maybeDiskBytesSpilled = metrics.map{m => m.diskBytesSpilled}
val diskBytesSpilledSortable = maybeDiskBytesSpilled.map(_.toString).getOrElse("")
val diskBytesSpilledReadable = maybeDiskBytesSpilled.map{Utils.bytesToString(_)}.getOrElse("")
val diskBytesSpilledReadable = maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("")

<tr>
<td>{info.index}</td>
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgr
</table>
}

private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int)
: Seq[Node] = {
private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] =
{
val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
val startWidth = "width: %s%%".format((started.toDouble/total)*100)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ private[spark] object ClosureCleaner extends Logging {
}
}

private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]])
extends ClassVisitor(ASM4) {
private[spark]
class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM4) {
override def visitMethod(access: Int, name: String, desc: String,
sig: String, exceptions: Array[String]): MethodVisitor = {
new MethodVisitor(ASM4) {
Expand Down
Loading

0 comments on commit 919bd7f

Please sign in to comment.