Skip to content

Commit

Permalink
[SPARK-5973] [PySpark] fix zip with two RDDs with AutoBatchedSerializer
Browse files Browse the repository at this point in the history
Author: Davies Liu <[email protected]>

Closes apache#4745 from davies/fix_zip and squashes the following commits:

2124b2c [Davies Liu] Update tests.py
b5c828f [Davies Liu] increase the number of records
c1e40fd [Davies Liu] fix zip with two RDDs with AutoBatchedSerializer
  • Loading branch information
Davies Liu authored and jkbradley committed Feb 24, 2015
1 parent a2b9137 commit da505e5
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 1 deletion.
2 changes: 1 addition & 1 deletion python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1950,7 +1950,7 @@ def batch_as(rdd, batchSize):

my_batch = get_batch_size(self._jrdd_deserializer)
other_batch = get_batch_size(other._jrdd_deserializer)
if my_batch != other_batch:
if my_batch != other_batch or not my_batch:
# use the smallest batchSize for both of them
batchSize = min(my_batch, other_batch)
if batchSize <= 0:
Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,12 @@ def test_zip_with_different_serializers(self):
# regression test for bug in _reserializer()
self.assertEqual(cnt, t.zip(rdd).count())

def test_zip_with_different_object_sizes(self):
# regress test for SPARK-5973
a = self.sc.parallelize(range(10000)).map(lambda i: '*' * i)
b = self.sc.parallelize(range(10000, 20000)).map(lambda i: '*' * i)
self.assertEqual(10000, a.zip(b).count())

def test_zip_with_different_number_of_items(self):
a = self.sc.parallelize(range(5), 2)
# different number of partitions
Expand Down

0 comments on commit da505e5

Please sign in to comment.