Skip to content

Commit

Permalink
[MINOR] Trivial cleanups
Browse files Browse the repository at this point in the history
These are what I found during working on apache#22282.

- Remove unused value: `UnsafeArraySuite#defaultTz`
- Remove redundant new modifier to the case class, `KafkaSourceRDDPartition`
- Remove unused variables from `RDD.scala`
- Remove trailing space from `structured-streaming-kafka-integration.md`
- Remove redundant parameter from `ArrowConvertersSuite`: `nullable` is `true` by default.
- Remove leading empty line: `UnsafeRow`
- Remove trailing empty line: `KafkaTestUtils`
- Remove unthrown exception type: `UnsafeMapData`
- Replace unused declarations: `expressions`
- Remove duplicated default parameter: `AnalysisErrorSuite`
- `ObjectExpressionsSuite`: remove duplicated parameters, conversions and unused variable

Closes apache#25251 from dongjinleekr/cleanup/201907.

Authored-by: Lee Dongjin <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
  • Loading branch information
dongjinleekr authored and HyukjinKwon committed Jul 29, 2019
1 parent 18156d5 commit d98aa2a
Show file tree
Hide file tree
Showing 8 changed files with 44 additions and 47 deletions.
32 changes: 16 additions & 16 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ abstract class RDD[T: ClassTag](
*/
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF))
}

/**
Expand All @@ -381,7 +381,7 @@ abstract class RDD[T: ClassTag](
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.flatMap(cleanF))
}

/**
Expand All @@ -391,7 +391,7 @@ abstract class RDD[T: ClassTag](
val cleanF = sc.clean(f)
new MapPartitionsRDD[T, T](
this,
(context, pid, iter) => iter.filter(cleanF),
(_, _, iter) => iter.filter(cleanF),
preservesPartitioning = true)
}

Expand All @@ -402,16 +402,16 @@ abstract class RDD[T: ClassTag](
def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = {
// Create an instance of external append only map which ignores values.
val map = new ExternalAppendOnlyMap[T, Null, Null](
createCombiner = value => null,
createCombiner = _ => null,
mergeValue = (a, b) => a,
mergeCombiners = (a, b) => a)
map.insertAll(partition.map(_ -> null))
map.iterator.map(_._1)
}
partitioner match {
case Some(p) if numPartitions == partitions.length =>
case Some(_) if numPartitions == partitions.length =>
mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true)
case _ => map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
}
}

Expand Down Expand Up @@ -684,7 +684,7 @@ abstract class RDD[T: ClassTag](
* Return an RDD created by coalescing all elements within each partition into an array.
*/
def glom(): RDD[Array[T]] = withScope {
new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
new MapPartitionsRDD[Array[T], T](this, (_, _, iter) => Iterator(iter.toArray))
}

/**
Expand Down Expand Up @@ -814,7 +814,7 @@ abstract class RDD[T: ClassTag](
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
(_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter),
preservesPartitioning)
}

Expand All @@ -836,7 +836,7 @@ abstract class RDD[T: ClassTag](
isOrderSensitive: Boolean = false): RDD[U] = withScope {
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter),
(_: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter),
preservesPartitioning = preservesPartitioning,
isOrderSensitive = isOrderSensitive)
}
Expand All @@ -849,7 +849,7 @@ abstract class RDD[T: ClassTag](
preservesPartitioning: Boolean = false): RDD[U] = withScope {
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => f(iter),
(_: TaskContext, _: Int, iter: Iterator[T]) => f(iter),
preservesPartitioning)
}

Expand All @@ -866,7 +866,7 @@ abstract class RDD[T: ClassTag](
val cleanedF = sc.clean(f)
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
(_: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(index, iter),
preservesPartitioning)
}

Expand Down Expand Up @@ -1040,7 +1040,7 @@ abstract class RDD[T: ClassTag](
}
}
var jobResult: Option[T] = None
val mergeResult = (index: Int, taskResult: Option[T]) => {
val mergeResult = (_: Int, taskResult: Option[T]) => {
if (taskResult.isDefined) {
jobResult = jobResult match {
case Some(value) => Some(f(value, taskResult.get))
Expand Down Expand Up @@ -1110,7 +1110,7 @@ abstract class RDD[T: ClassTag](
var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
val cleanOp = sc.clean(op)
val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
val mergeResult = (_: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
sc.runJob(this, foldPartition, mergeResult)
jobResult
}
Expand All @@ -1136,7 +1136,7 @@ abstract class RDD[T: ClassTag](
val cleanSeqOp = sc.clean(seqOp)
val cleanCombOp = sc.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
val mergeResult = (_: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
sc.runJob(this, aggregatePartition, mergeResult)
jobResult
}
Expand Down Expand Up @@ -1201,7 +1201,7 @@ abstract class RDD[T: ClassTag](
timeout: Long,
confidence: Double = 0.95): PartialResult[BoundedDouble] = withScope {
require(0.0 <= confidence && confidence <= 1.0, s"confidence ($confidence) must be in [0,1]")
val countElements: (TaskContext, Iterator[T]) => Long = { (ctx, iter) =>
val countElements: (TaskContext, Iterator[T]) => Long = { (_, iter) =>
var result = 0L
while (iter.hasNext) {
result += 1L
Expand Down Expand Up @@ -1244,7 +1244,7 @@ abstract class RDD[T: ClassTag](
if (elementClassTag.runtimeClass.isArray) {
throw new SparkException("countByValueApprox() does not support arrays")
}
val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T, Long] = { (ctx, iter) =>
val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T, Long] = { (_, iter) =>
val map = new OpenHashMap[T, Long]
iter.foreach {
t => map.changeValue(t, 1L, _ + 1L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class InternalAccumulatorSuite extends SparkFunSuite with LocalSparkContext {
taskAccum.value.get.asInstanceOf[Long]
}
// Each task should keep track of the partial value on the way, i.e. 1, 2, ... numPartitions
assert(taskAccumValues.sorted === (1L to numPartitions).toSeq)
assert(taskAccumValues.sorted === (1L to numPartitions))
}
rdd.count()
listener.awaitNextJobCompletion()
Expand Down
4 changes: 2 additions & 2 deletions docs/structured-streaming-kafka-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ The following configurations are optional:
<td>spark-kafka-source</td>
<td>streaming and batch</td>
<td>Prefix of consumer group identifiers (`group.id`) that are generated by structured streaming
queries. If "kafka.group.id" is set, this option will be ignored. </td>
queries. If "kafka.group.id" is set, this option will be ignored.</td>
</tr>
<tr>
<td>kafka.group.id</td>
Expand All @@ -421,7 +421,7 @@ The following configurations are optional:
same group id are likely interfere with each other causing each query to read only part of the
data. This may also occur when queries are started/restarted in quick succession. To minimize such
issues, set the Kafka consumer session timeout (by setting option "kafka.session.timeout.ms") to
be very small. When this is set, option "groupIdPrefix" will be ignored. </td>
be very small. When this is set, option "groupIdPrefix" will be ignored.</td>
</tr>
</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void writeExternal(ObjectOutput out) throws IOException {
}

@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
public void readExternal(ObjectInput in) throws IOException {
this.baseOffset = BYTE_ARRAY_OFFSET;
this.sizeInBytes = in.readInt();
this.baseObject = new byte[sizeInBytes];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ package object expressions {
a => (resolver(dbPart, a.qualifier.head) && resolver(tblPart, a.qualifier.last))
}
(attributes, nestedFields)
case all =>
case _ =>
(Seq.empty, Seq.empty)
}

Expand All @@ -197,7 +197,7 @@ package object expressions {
resolver(qualifier, a.qualifier.last)
}
(attributes, nestedFields)
case all =>
case _ =>
(Seq.empty[Attribute], Seq.empty[String])
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,9 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
case null =>
assert(result.asInstanceOf[ArrayData].array.toSeq == expected)
case l if classOf[java.util.List[_]].isAssignableFrom(l) =>
assert(result.asInstanceOf[java.util.List[_]].asScala.toSeq == expected)
assert(result.asInstanceOf[java.util.List[_]].asScala == expected)
case s if classOf[Seq[_]].isAssignableFrom(s) =>
assert(result.asInstanceOf[Seq[_]].toSeq == expected)
assert(result.asInstanceOf[Seq[_]] == expected)
case s if classOf[scala.collection.Set[_]].isAssignableFrom(s) =>
assert(result.asInstanceOf[scala.collection.Set[_]] == expected.toSet)
}
Expand Down Expand Up @@ -532,8 +532,6 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
private def scalaMapSerializerFor[T: TypeTag, U: TypeTag](inputObject: Expression): Expression = {
import org.apache.spark.sql.catalyst.ScalaReflection._

val curId = new java.util.concurrent.atomic.AtomicInteger()

def kvSerializerFor[V: TypeTag](inputObject: Expression): Expression =
localTypeOf[V].dealias match {
case t if t <:< localTypeOf[java.lang.Integer] =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ class UnsafeArraySuite extends SparkFunSuite {
val dateArray = Array(
DateTimeUtils.stringToDate(UTF8String.fromString("1970-1-1")).get,
DateTimeUtils.stringToDate(UTF8String.fromString("2016-7-26")).get)
private def defaultTz = DateTimeUtils.defaultTimeZone()
private def defaultZoneId = ZoneId.systemDefault()
val timestampArray = Array(
DateTimeUtils.stringToTimestamp(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ case class SimpleFilteredScan(from: Int, to: Int)(@transient val sparkSession: S
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = {
def unhandled(filter: Filter): Boolean = {
filter match {
case EqualTo(col, v) => col == "b"
case EqualNullSafe(col, v) => col == "b"
case LessThan(col, v: Int) => col == "b"
case LessThanOrEqual(col, v: Int) => col == "b"
case GreaterThan(col, v: Int) => col == "b"
case GreaterThanOrEqual(col, v: Int) => col == "b"
case In(col, values) => col == "b"
case EqualTo(col, _) => col == "b"
case EqualNullSafe(col, _) => col == "b"
case LessThan(col, _: Int) => col == "b"
case LessThanOrEqual(col, _: Int) => col == "b"
case GreaterThan(col, _: Int) => col == "b"
case GreaterThanOrEqual(col, _: Int) => col == "b"
case In(col, _) => col == "b"
case IsNull(col) => col == "b"
case IsNotNull(col) => col == "b"
case Not(pred) => unhandled(pred)
Expand Down Expand Up @@ -107,7 +107,7 @@ case class SimpleFilteredScan(from: Int, to: Int)(@transient val sparkSession: S
case StringEndsWith("c", v) => _.endsWith(v)
case StringContains("c", v) => _.contains(v)
case EqualTo("c", v: String) => _.equals(v)
case EqualTo("c", v: UTF8String) => sys.error("UTF8String should not appear in filters")
case EqualTo("c", _: UTF8String) => sys.error("UTF8String should not appear in filters")
case In("c", values) => (s: String) => values.map(_.asInstanceOf[String]).toSet.contains(s)
case _ => (c: String) => true
}
Expand Down Expand Up @@ -152,39 +152,39 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic
sqlTest(
"SELECT * FROM oneToTenFiltered",
(1 to 10).map(i => Row(i, i * 2, (i - 1 + 'a').toChar.toString * 5
+ (i - 1 + 'a').toChar.toString.toUpperCase(Locale.ROOT) * 5)).toSeq)
+ (i - 1 + 'a').toChar.toString.toUpperCase(Locale.ROOT) * 5)))

sqlTest(
"SELECT a, b FROM oneToTenFiltered",
(1 to 10).map(i => Row(i, i * 2)).toSeq)
(1 to 10).map(i => Row(i, i * 2)))

sqlTest(
"SELECT b, a FROM oneToTenFiltered",
(1 to 10).map(i => Row(i * 2, i)).toSeq)
(1 to 10).map(i => Row(i * 2, i)))

sqlTest(
"SELECT a FROM oneToTenFiltered",
(1 to 10).map(i => Row(i)).toSeq)
(1 to 10).map(i => Row(i)))

sqlTest(
"SELECT b FROM oneToTenFiltered",
(1 to 10).map(i => Row(i * 2)).toSeq)
(1 to 10).map(i => Row(i * 2)))

sqlTest(
"SELECT a * 2 FROM oneToTenFiltered",
(1 to 10).map(i => Row(i * 2)).toSeq)
(1 to 10).map(i => Row(i * 2)))

sqlTest(
"SELECT A AS b FROM oneToTenFiltered",
(1 to 10).map(i => Row(i)).toSeq)
(1 to 10).map(i => Row(i)))

sqlTest(
"SELECT x.b, y.a FROM oneToTenFiltered x JOIN oneToTenFiltered y ON x.a = y.b",
(1 to 5).map(i => Row(i * 4, i)).toSeq)
(1 to 5).map(i => Row(i * 4, i)))

sqlTest(
"SELECT x.a, y.b FROM oneToTenFiltered x JOIN oneToTenFiltered y ON x.a = y.b",
(2 to 10 by 2).map(i => Row(i, i)).toSeq)
(2 to 10 by 2).map(i => Row(i, i)))

sqlTest(
"SELECT a, b FROM oneToTenFiltered WHERE a = 1",
Expand All @@ -208,19 +208,19 @@ class FilteredScanSuite extends DataSourceTest with SharedSQLContext with Predic

sqlTest(
"SELECT a, b FROM oneToTenFiltered WHERE a IS NOT NULL",
(1 to 10).map(i => Row(i, i * 2)).toSeq)
(1 to 10).map(i => Row(i, i * 2)))

sqlTest(
"SELECT a, b FROM oneToTenFiltered WHERE a < 5 AND a > 1",
(2 to 4).map(i => Row(i, i * 2)).toSeq)
(2 to 4).map(i => Row(i, i * 2)))

sqlTest(
"SELECT a, b FROM oneToTenFiltered WHERE a < 3 OR a > 8",
Seq(1, 2, 9, 10).map(i => Row(i, i * 2)))

sqlTest(
"SELECT a, b FROM oneToTenFiltered WHERE NOT (a < 6)",
(6 to 10).map(i => Row(i, i * 2)).toSeq)
(6 to 10).map(i => Row(i, i * 2)))

sqlTest(
"SELECT a, b, c FROM oneToTenFiltered WHERE c like 'c%'",
Expand Down

0 comments on commit d98aa2a

Please sign in to comment.