Skip to content

Commit

Permalink
Remove simple redundant return statement for Scala methods/functions:
Browse files Browse the repository at this point in the history
-) Only change simple return statements at the end of method
-) Ignore the complex if-else check
-) Ignore the ones inside synchronized
  • Loading branch information
hsaputra committed Jan 12, 2014
1 parent 26cdb5f commit 93a65e5
Show file tree
Hide file tree
Showing 63 changed files with 187 additions and 186 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ private object Accumulators {

def newId: Long = synchronized {
lastId += 1
return lastId
lastId
}

def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
blockManager.get(key) match {
case Some(values) =>
// Partition is already materialized, so just return its values
return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])

case None =>
// Mark the split as loading (unless someone else marks it first)
Expand Down Expand Up @@ -74,7 +74,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
val elements = new ArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel, tellMaster = true)
return elements.iterator.asInstanceOf[Iterator[T]]
elements.iterator.asInstanceOf[Iterator[T]]
} finally {
loading.synchronized {
loading.remove(key)
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/HttpFileServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,17 @@ private[spark] class HttpFileServer extends Logging {

def addFile(file: File) : String = {
addFileToDir(file, fileDir)
return serverUri + "/files/" + file.getName
serverUri + "/files/" + file.getName
}

def addJar(file: File) : String = {
addFileToDir(file, jarDir)
return serverUri + "/jars/" + file.getName
serverUri + "/jars/" + file.getName
}

def addFileToDir(file: File, dir: File) : String = {
Files.copy(file, new File(dir, file.getName))
return dir + "/" + file.getName
dir + "/" + file.getName
}

}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ trait Logging {
}
log_ = LoggerFactory.getLogger(className)
}
return log_
log_
}

// Log methods that take only a String
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
}
}
else{
else {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing all output locations for shuffle " + shuffleId))
}
Expand Down Expand Up @@ -185,7 +185,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
private[spark] class MapOutputTrackerMaster(conf: SparkConf)
extends MapOutputTracker(conf) {

// Cache a serialized version of the output statuses for each shuffle to send them out faster
// Cache a serialized version of the output statuses for each shuffle to send them out faster return
private var cacheEpoch = epoch
private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ object Partitioner {
return r.partitioner.get
}
if (rdd.context.conf.contains("spark.default.parallelism")) {
return new HashPartitioner(rdd.context.defaultParallelism)
new HashPartitioner(rdd.context.defaultParallelism)
} else {
return new HashPartitioner(bySize.head.partitions.size)
new HashPartitioner(bySize.head.partitions.size)
}
}
}
Expand Down
11 changes: 7 additions & 4 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -756,8 +756,11 @@ class SparkContext(

private[spark] def getCallSite(): String = {
val callSite = getLocalProperty("externalCallSite")
if (callSite == null) return Utils.formatSparkCallSite
callSite
if (callSite == null) {
Utils.formatSparkCallSite
} else {
callSite
}
}

/**
Expand Down Expand Up @@ -907,7 +910,7 @@ class SparkContext(
*/
private[spark] def clean[F <: AnyRef](f: F): F = {
ClosureCleaner.clean(f)
return f
f
}

/**
Expand All @@ -919,7 +922,7 @@ class SparkContext(
val path = new Path(dir, UUID.randomUUID().toString)
val fs = path.getFileSystem(hadoopConfiguration)
fs.mkdirs(path)
fs.getFileStatus(path).getPath().toString
fs.getFileStatus(path).getPath.toString
}
}

Expand Down
13 changes: 6 additions & 7 deletions core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -134,28 +134,28 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
format = conf.value.getOutputFormat()
.asInstanceOf[OutputFormat[AnyRef,AnyRef]]
}
return format
format
}

private def getOutputCommitter(): OutputCommitter = {
if (committer == null) {
committer = conf.value.getOutputCommitter
}
return committer
committer
}

private def getJobContext(): JobContext = {
if (jobContext == null) {
jobContext = newJobContext(conf.value, jID.value)
}
return jobContext
jobContext
}

private def getTaskContext(): TaskAttemptContext = {
if (taskContext == null) {
taskContext = newTaskAttemptContext(conf.value, taID.value)
}
return taskContext
taskContext
}

private def setIDs(jobid: Int, splitid: Int, attemptid: Int) {
Expand All @@ -182,7 +182,7 @@ object SparkHadoopWriter {
def createJobID(time: Date, id: Int): JobID = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
return new JobID(jobtrackerID, id)
new JobID(jobtrackerID, id)
}

def createPathFromString(path: String, conf: JobConf): Path = {
Expand All @@ -194,7 +194,6 @@ object SparkHadoopWriter {
if (outputPath == null || fs == null) {
throw new IllegalArgumentException("Incorrectly formatted output path")
}
outputPath = outputPath.makeQualified(fs)
return outputPath
outputPath.makeQualified(fs)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private[spark] class PythonRDD[T: ClassTag](

// Return an iterator that read lines from the process's stdout
val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
return new Iterator[Array[Byte]] {
new Iterator[Array[Byte]] {
def next(): Array[Byte] = {
val obj = _nextObj
if (hasNext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,16 +203,16 @@ extends Logging {
}
bais.close()

var tInfo = TorrentInfo(retVal, blockNum, byteArray.length)
val tInfo = TorrentInfo(retVal, blockNum, byteArray.length)
tInfo.hasBlocks = blockNum

return tInfo
tInfo
}

def unBlockifyObject[T](arrayOfBlocks: Array[TorrentBlock],
totalBytes: Int,
totalBlocks: Int): T = {
var retByteArray = new Array[Byte](totalBytes)
val retByteArray = new Array[Byte](totalBytes)
for (i <- 0 until totalBlocks) {
System.arraycopy(arrayOfBlocks(i).byteArray, 0, retByteArray,
i * BLOCK_SIZE, arrayOfBlocks(i).byteArray.length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class BufferMessage(id_ : Int, val buffers: ArrayBuffer[ByteBuffer], var ackId:
throw new Exception("Max chunk size is " + maxChunkSize)
}

if (size == 0 && gotChunkForSendingOnce == false) {
if (size == 0 && !gotChunkForSendingOnce) {
val newChunk = new MessageChunk(
new MessageChunkHeader(typ, id, 0, 0, ackId, senderAddress), null)
gotChunkForSendingOnce = true
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/network/Connection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
// Is highly unlikely unless there was an unclean close of socket, etc
registerInterest()
logInfo("Connected to [" + address + "], " + outbox.messages.size + " messages pending")
return true
true
} catch {
case e: Exception => {
logWarning("Error finishing connection to " + address, e)
Expand Down Expand Up @@ -385,7 +385,7 @@ class SendingConnection(val address: InetSocketAddress, selector_ : Selector,
}
}
// should not happen - to keep scala compiler happy
return true
true
}

// This is a hack to determine if remote socket was closed or not.
Expand Down Expand Up @@ -559,7 +559,7 @@ private[spark] class ReceivingConnection(channel_ : SocketChannel, selector_ : S
}
}
// should not happen - to keep scala compiler happy
return true
true
}

def onReceive(callback: (Connection, Message) => Unit) {onReceiveCallback = callback}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/network/Message.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,17 @@ private[spark] object Message {
if (dataBuffers.exists(_ == null)) {
throw new Exception("Attempting to create buffer message with null buffer")
}
return new BufferMessage(getNewId(), new ArrayBuffer[ByteBuffer] ++= dataBuffers, ackId)
new BufferMessage(getNewId(), new ArrayBuffer[ByteBuffer] ++= dataBuffers, ackId)
}

def createBufferMessage(dataBuffers: Seq[ByteBuffer]): BufferMessage =
createBufferMessage(dataBuffers, 0)

def createBufferMessage(dataBuffer: ByteBuffer, ackId: Int): BufferMessage = {
if (dataBuffer == null) {
return createBufferMessage(Array(ByteBuffer.allocate(0)), ackId)
createBufferMessage(Array(ByteBuffer.allocate(0)), ackId)
} else {
return createBufferMessage(Array(dataBuffer), ackId)
createBufferMessage(Array(dataBuffer), ackId)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private[spark] object ShuffleSender {
val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
val subDir = new File(localDirs(dirId), "%02x".format(subDirId))
val file = new File(subDir, blockId.name)
return new FileSegment(file, 0, file.length())
new FileSegment(file, 0, file.length())
}
}
val sender = new ShuffleSender(port, pResovler)
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,9 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
val prefPartActual = prefPart.get

if (minPowerOfTwo.size + slack <= prefPartActual.size) // more imbalance than the slack allows
return minPowerOfTwo // prefer balance over locality
minPowerOfTwo // prefer balance over locality
else {
return prefPartActual // prefer locality over balance
prefPartActual // prefer locality over balance
}
}

Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,19 +99,19 @@ class HadoopRDD[K, V](
val conf: Configuration = broadcastedConf.value.value
if (conf.isInstanceOf[JobConf]) {
// A user-broadcasted JobConf was provided to the HadoopRDD, so always use it.
return conf.asInstanceOf[JobConf]
conf.asInstanceOf[JobConf]
} else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
// getJobConf() has been called previously, so there is already a local cache of the JobConf
// needed by this RDD.
return HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
} else {
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
val newJobConf = new JobConf(broadcastedConf.value.value)
initLocalJobConfFuncOpt.map(f => f(newJobConf))
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
return newJobConf
newJobConf
}
}

Expand All @@ -127,7 +127,7 @@ class HadoopRDD[K, V](
newInputFormat.asInstanceOf[Configurable].setConf(conf)
}
HadoopRDD.putCachedMetadata(inputFormatCacheKey, newInputFormat)
return newInputFormat
newInputFormat
}

override def getPartitions: Array[Partition] = {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class PipedRDD[T: ClassTag](

// Return an iterator that read lines from the process's stdout
val lines = Source.fromInputStream(proc.getInputStream).getLines
return new Iterator[String] {
new Iterator[String] {
def next() = lines.next()
def hasNext = {
if (lines.hasNext) {
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ abstract class RDD[T: ClassTag](
val entry = iter.next()
m1.put(entry.getKey, m1.getLong(entry.getKey) + entry.getLongValue)
}
return m1
m1
}
val myResult = mapPartitions(countPartition).reduce(mergeMaps)
myResult.asInstanceOf[java.util.Map[T, Long]] // Will be wrapped as a Scala mutable Map
Expand Down Expand Up @@ -842,7 +842,7 @@ abstract class RDD[T: ClassTag](
partsScanned += numPartsToTry
}

return buf.toArray
buf.toArray
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
retval ++= SplitInfo.toSplitInfo(inputFormatClazz, path, split)
}

return retval.toSet
retval.toSet
}

// This method does not expect failures, since validate has already passed ...
Expand All @@ -121,18 +121,18 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl
elem => retval ++= SplitInfo.toSplitInfo(inputFormatClazz, path, elem)
)

return retval.toSet
retval.toSet
}

private def findPreferredLocations(): Set[SplitInfo] = {
logDebug("mapreduceInputFormat : " + mapreduceInputFormat + ", mapredInputFormat : " + mapredInputFormat +
", inputFormatClazz : " + inputFormatClazz)
if (mapreduceInputFormat) {
return prefLocsFromMapreduceInputFormat()
prefLocsFromMapreduceInputFormat()
}
else {
assert(mapredInputFormat)
return prefLocsFromMapredInputFormat()
prefLocsFromMapredInputFormat()
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/scheduler/Pool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ private[spark] class Pool(
return schedulableNameToSchedulable(schedulableName)
}
for (schedulable <- schedulableQueue) {
var sched = schedulable.getSchedulableByName(schedulableName)
val sched = schedulable.getSchedulableByName(schedulableName)
if (sched != null) {
return sched
}
}
return null
null
}

override def executorLost(executorId: String, host: String) {
Expand All @@ -92,7 +92,7 @@ private[spark] class Pool(
for (schedulable <- schedulableQueue) {
shouldRevive |= schedulable.checkSpeculatableTasks()
}
return shouldRevive
shouldRevive
}

override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
Expand All @@ -101,7 +101,7 @@ private[spark] class Pool(
for (schedulable <- sortedSchedulableQueue) {
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue()
}
return sortedTaskSetQueue
sortedTaskSetQueue
}

def increaseRunningTasks(taskNum: Int) {
Expand Down
Loading

0 comments on commit 93a65e5

Please sign in to comment.