Skip to content

Commit

Permalink
[SPARK-12115][SPARKR] Change numPartitions() to getNumPartitions() to…
Browse files Browse the repository at this point in the history
… be consistent with Scala/Python

Change ```numPartitions()``` to ```getNumPartitions()``` to be consistent with Scala/Python.
<del>Note: If we can not catch up with 1.6 release, it will be breaking change for 1.7 that we also need to explain in release note.<del>

cc sun-rui felixcheung shivaram

Author: Yanbo Liang <[email protected]>

Closes apache#10123 from yanboliang/spark-12115.
  • Loading branch information
yanboliang authored and shivaram committed Dec 6, 2015
1 parent 895b6c4 commit 6979edf
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 30 deletions.
55 changes: 33 additions & 22 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -306,17 +306,28 @@ setMethod("checkpoint",
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10, 2L)
#' numPartitions(rdd) # 2L
#' getNumPartitions(rdd) # 2L
#'}
#' @rdname numPartitions
#' @rdname getNumPartitions
#' @aliases getNumPartitions,RDD-method
#' @noRd
setMethod("getNumPartitions",
signature(x = "RDD"),
function(x) {
callJMethod(getJRDD(x), "getNumPartitions")
})

#' Gets the number of partitions of an RDD, the same as getNumPartitions.
#' But this function has been deprecated, please use getNumPartitions.
#'
#' @rdname getNumPartitions
#' @aliases numPartitions,RDD-method
#' @noRd
setMethod("numPartitions",
signature(x = "RDD"),
function(x) {
jrdd <- getJRDD(x)
partitions <- callJMethod(jrdd, "partitions")
callJMethod(partitions, "size")
.Deprecated("getNumPartitions")
getNumPartitions(x)
})

#' Collect elements of an RDD
Expand Down Expand Up @@ -443,7 +454,7 @@ setMethod("countByValue",
signature(x = "RDD"),
function(x) {
ones <- lapply(x, function(item) { list(item, 1L) })
collect(reduceByKey(ones, `+`, numPartitions(x)))
collect(reduceByKey(ones, `+`, getNumPartitions(x)))
})

#' Apply a function to all elements
Expand Down Expand Up @@ -759,7 +770,7 @@ setMethod("take",
resList <- list()
index <- -1
jrdd <- getJRDD(x)
numPartitions <- numPartitions(x)
numPartitions <- getNumPartitions(x)
serializedModeRDD <- getSerializedMode(x)

# TODO(shivaram): Collect more than one partition based on size
Expand Down Expand Up @@ -823,7 +834,7 @@ setMethod("first",
#' @noRd
setMethod("distinct",
signature(x = "RDD"),
function(x, numPartitions = SparkR:::numPartitions(x)) {
function(x, numPartitions = SparkR:::getNumPartitions(x)) {
identical.mapped <- lapply(x, function(x) { list(x, NULL) })
reduced <- reduceByKey(identical.mapped,
function(x, y) { x },
Expand Down Expand Up @@ -993,8 +1004,8 @@ setMethod("keyBy",
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5, 6, 7), 4L)
#' numPartitions(rdd) # 4
#' numPartitions(repartition(rdd, 2L)) # 2
#' getNumPartitions(rdd) # 4
#' getNumPartitions(repartition(rdd, 2L)) # 2
#'}
#' @rdname repartition
#' @aliases repartition,RDD
Expand All @@ -1014,8 +1025,8 @@ setMethod("repartition",
#'\dontrun{
#' sc <- sparkR.init()
#' rdd <- parallelize(sc, list(1, 2, 3, 4, 5), 3L)
#' numPartitions(rdd) # 3
#' numPartitions(coalesce(rdd, 1L)) # 1
#' getNumPartitions(rdd) # 3
#' getNumPartitions(coalesce(rdd, 1L)) # 1
#'}
#' @rdname coalesce
#' @aliases coalesce,RDD
Expand All @@ -1024,7 +1035,7 @@ setMethod("coalesce",
signature(x = "RDD", numPartitions = "numeric"),
function(x, numPartitions, shuffle = FALSE) {
numPartitions <- numToInt(numPartitions)
if (shuffle || numPartitions > SparkR:::numPartitions(x)) {
if (shuffle || numPartitions > SparkR:::getNumPartitions(x)) {
func <- function(partIndex, part) {
set.seed(partIndex) # partIndex as seed
start <- as.integer(base::sample(numPartitions, 1) - 1)
Expand Down Expand Up @@ -1112,7 +1123,7 @@ setMethod("saveAsTextFile",
#' @noRd
setMethod("sortBy",
signature(x = "RDD", func = "function"),
function(x, func, ascending = TRUE, numPartitions = SparkR:::numPartitions(x)) {
function(x, func, ascending = TRUE, numPartitions = SparkR:::getNumPartitions(x)) {
values(sortByKey(keyBy(x, func), ascending, numPartitions))
})

Expand Down Expand Up @@ -1144,7 +1155,7 @@ takeOrderedElem <- function(x, num, ascending = TRUE) {
resList <- list()
index <- -1
jrdd <- getJRDD(newRdd)
numPartitions <- numPartitions(newRdd)
numPartitions <- getNumPartitions(newRdd)
serializedModeRDD <- getSerializedMode(newRdd)

while (TRUE) {
Expand Down Expand Up @@ -1368,7 +1379,7 @@ setMethod("setName",
setMethod("zipWithUniqueId",
signature(x = "RDD"),
function(x) {
n <- numPartitions(x)
n <- getNumPartitions(x)

partitionFunc <- function(partIndex, part) {
mapply(
Expand Down Expand Up @@ -1409,7 +1420,7 @@ setMethod("zipWithUniqueId",
setMethod("zipWithIndex",
signature(x = "RDD"),
function(x) {
n <- numPartitions(x)
n <- getNumPartitions(x)
if (n > 1) {
nums <- collect(lapplyPartition(x,
function(part) {
Expand Down Expand Up @@ -1521,8 +1532,8 @@ setMethod("unionRDD",
setMethod("zipRDD",
signature(x = "RDD", other = "RDD"),
function(x, other) {
n1 <- numPartitions(x)
n2 <- numPartitions(other)
n1 <- getNumPartitions(x)
n2 <- getNumPartitions(other)
if (n1 != n2) {
stop("Can only zip RDDs which have the same number of partitions.")
}
Expand Down Expand Up @@ -1588,7 +1599,7 @@ setMethod("cartesian",
#' @noRd
setMethod("subtract",
signature(x = "RDD", other = "RDD"),
function(x, other, numPartitions = SparkR:::numPartitions(x)) {
function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
mapFunction <- function(e) { list(e, NA) }
rdd1 <- map(x, mapFunction)
rdd2 <- map(other, mapFunction)
Expand Down Expand Up @@ -1620,7 +1631,7 @@ setMethod("subtract",
#' @noRd
setMethod("intersection",
signature(x = "RDD", other = "RDD"),
function(x, other, numPartitions = SparkR:::numPartitions(x)) {
function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
rdd1 <- map(x, function(v) { list(v, NA) })
rdd2 <- map(other, function(v) { list(v, NA) })

Expand Down Expand Up @@ -1661,7 +1672,7 @@ setMethod("zipPartitions",
if (length(rrdds) == 1) {
return(rrdds[[1]])
}
nPart <- sapply(rrdds, numPartitions)
nPart <- sapply(rrdds, getNumPartitions)
if (length(unique(nPart)) != 1) {
stop("Can only zipPartitions RDDs which have the same number of partitions.")
}
Expand Down
6 changes: 5 additions & 1 deletion R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,11 @@ setGeneric("sumRDD", function(x) { standardGeneric("sumRDD") })
# @export
setGeneric("name", function(x) { standardGeneric("name") })

# @rdname numPartitions
# @rdname getNumPartitions
# @export
setGeneric("getNumPartitions", function(x) { standardGeneric("getNumPartitions") })

# @rdname getNumPartitions
# @export
setGeneric("numPartitions", function(x) { standardGeneric("numPartitions") })

Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ setMethod("cogroup",
#' @noRd
setMethod("sortByKey",
signature(x = "RDD"),
function(x, ascending = TRUE, numPartitions = SparkR:::numPartitions(x)) {
function(x, ascending = TRUE, numPartitions = SparkR:::getNumPartitions(x)) {
rangeBounds <- list()

if (numPartitions > 1) {
Expand Down Expand Up @@ -818,7 +818,7 @@ setMethod("sortByKey",
#' @noRd
setMethod("subtractByKey",
signature(x = "RDD", other = "RDD"),
function(x, other, numPartitions = SparkR:::numPartitions(x)) {
function(x, other, numPartitions = SparkR:::getNumPartitions(x)) {
filterFunction <- function(elem) {
iters <- elem[[2]]
(length(iters[[1]]) > 0) && (length(iters[[2]]) == 0)
Expand Down
10 changes: 5 additions & 5 deletions R/pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200))
intRdd <- parallelize(sc, intPairs, 2L)

test_that("get number of partitions in RDD", {
expect_equal(numPartitions(rdd), 2)
expect_equal(numPartitions(intRdd), 2)
expect_equal(getNumPartitions(rdd), 2)
expect_equal(getNumPartitions(intRdd), 2)
})

test_that("first on RDD", {
Expand Down Expand Up @@ -304,18 +304,18 @@ test_that("repartition/coalesce on RDDs", {

# repartition
r1 <- repartition(rdd, 2)
expect_equal(numPartitions(r1), 2L)
expect_equal(getNumPartitions(r1), 2L)
count <- length(collectPartition(r1, 0L))
expect_true(count >= 8 && count <= 12)

r2 <- repartition(rdd, 6)
expect_equal(numPartitions(r2), 6L)
expect_equal(getNumPartitions(r2), 6L)
count <- length(collectPartition(r2, 0L))
expect_true(count >= 0 && count <= 4)

# coalesce
r3 <- coalesce(rdd, 1)
expect_equal(numPartitions(r3), 1L)
expect_equal(getNumPartitions(r3), 1L)
count <- length(collectPartition(r3, 0L))
expect_equal(count, 20)
})
Expand Down

0 comments on commit 6979edf

Please sign in to comment.