Skip to content

Commit

Permalink
[SPARK-15866] Rename listAccumulator collectionAccumulator
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
SparkContext.listAccumulator, by Spark's convention, makes it sound like "list" is a verb and the method should return a list of accumulators. This patch renames the method and the class collection accumulator.

## How was this patch tested?
Updated test case to reflect the names.

Author: Reynold Xin <[email protected]>

Closes apache#13594 from rxin/SPARK-15866.
  • Loading branch information
rxin committed Jun 10, 2016
1 parent 0ec279f commit 254bc8c
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 18 deletions.
16 changes: 8 additions & 8 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1340,21 +1340,21 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

/**
* Create and register a list accumulator, which starts with empty list and accumulates inputs
* by adding them into the inner list.
* Create and register a [[CollectionAccumulator]], which starts with empty list and accumulates
* inputs by adding them into the list.
*/
def listAccumulator[T]: ListAccumulator[T] = {
val acc = new ListAccumulator[T]
def collectionAccumulator[T]: CollectionAccumulator[T] = {
val acc = new CollectionAccumulator[T]
register(acc)
acc
}

/**
* Create and register a list accumulator, which starts with empty list and accumulates inputs
* by adding them into the inner list.
* Create and register a [[CollectionAccumulator]], which starts with empty list and accumulates
* inputs by adding them into the list.
*/
def listAccumulator[T](name: String): ListAccumulator[T] = {
val acc = new ListAccumulator[T]
def collectionAccumulator[T](name: String): CollectionAccumulator[T] = {
val acc = new CollectionAccumulator[T]
register(acc, name)
acc
}
Expand Down
15 changes: 10 additions & 5 deletions core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -415,15 +415,20 @@ class DoubleAccumulator extends AccumulatorV2[jl.Double, jl.Double] {
}


class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
/**
* An [[AccumulatorV2 accumulator]] for collecting a list of elements.
*
* @since 2.0.0
*/
class CollectionAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
private val _list: java.util.List[T] = new ArrayList[T]

override def isZero: Boolean = _list.isEmpty

override def copyAndReset(): ListAccumulator[T] = new ListAccumulator
override def copyAndReset(): CollectionAccumulator[T] = new CollectionAccumulator

override def copy(): ListAccumulator[T] = {
val newAcc = new ListAccumulator[T]
override def copy(): CollectionAccumulator[T] = {
val newAcc = new CollectionAccumulator[T]
newAcc._list.addAll(_list)
newAcc
}
Expand All @@ -433,7 +438,7 @@ class ListAccumulator[T] extends AccumulatorV2[T, java.util.List[T]] {
override def add(v: T): Unit = _list.add(v)

override def merge(other: AccumulatorV2[T, java.util.List[T]]): Unit = other match {
case o: ListAccumulator[T] => _list.addAll(o.value)
case o: CollectionAccumulator[T] => _list.addAll(o.value)
case _ => throw new UnsupportedOperationException(
s"Cannot merge ${this.getClass.getName} with ${other.getClass.getName}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class AccumulatorV2Suite extends SparkFunSuite {
}

test("ListAccumulator") {
val acc = new ListAccumulator[Double]
val acc = new CollectionAccumulator[Double]
assert(acc.value.isEmpty)
assert(acc.isZero)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.UserDefinedType
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{AccumulatorContext, ListAccumulator, LongAccumulator}
import org.apache.spark.util.{AccumulatorContext, CollectionAccumulator, LongAccumulator}


private[sql] object InMemoryRelation {
Expand Down Expand Up @@ -67,16 +67,16 @@ private[sql] case class InMemoryRelation(
tableName: Option[String])(
@transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = null,
@transient private[sql] var _statistics: Statistics = null,
private[sql] var _batchStats: ListAccumulator[InternalRow] = null)
private[sql] var _batchStats: CollectionAccumulator[InternalRow] = null)
extends logical.LeafNode with MultiInstanceRelation {

override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)

override def producedAttributes: AttributeSet = outputSet

private[sql] val batchStats: ListAccumulator[InternalRow] =
private[sql] val batchStats: CollectionAccumulator[InternalRow] =
if (_batchStats == null) {
child.sqlContext.sparkContext.listAccumulator[InternalRow]
child.sqlContext.sparkContext.collectionAccumulator[InternalRow]
} else {
_batchStats
}
Expand Down

0 comments on commit 254bc8c

Please sign in to comment.