Skip to content

Commit

Permalink
[SPARK-4384] [PySpark] improve sort spilling
Browse files Browse the repository at this point in the history
If there some big broadcasts (or other object) in Python worker, the free memory could be used for sorting will be too small, then it will keep spilling small files into disks, finally failed with too many open files.

This PR try to delay the spilling until the used memory goes over limit and start to increase since last spilling, it will increase the size of spilling files, improve the stability and performance in this cases. (We also do this in ExternalAggregator).

Author: Davies Liu <[email protected]>

Closes apache#3252 from davies/sort and squashes the following commits:

711fb6c [Davies Liu] improve sort spilling
  • Loading branch information
Davies Liu authored and JoshRosen committed Nov 19, 2014
1 parent f9adda9 commit 73c8ea8
Showing 1 changed file with 10 additions and 1 deletion.
11 changes: 10 additions & 1 deletion python/pyspark/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,13 +478,21 @@ def _get_path(self, n):
os.makedirs(d)
return os.path.join(d, str(n))

def _next_limit(self):
"""
Return the next memory limit. If the memory is not released
after spilling, it will dump the data only when the used memory
starts to increase.
"""
return max(self.memory_limit, get_used_memory() * 1.05)

def sorted(self, iterator, key=None, reverse=False):
"""
Sort the elements in iterator, do external sort when the memory
goes above the limit.
"""
global MemoryBytesSpilled, DiskBytesSpilled
batch = 100
batch, limit = 100, self._next_limit()
chunks, current_chunk = [], []
iterator = iter(iterator)
while True:
Expand All @@ -504,6 +512,7 @@ def sorted(self, iterator, key=None, reverse=False):
chunks.append(self.serializer.load_stream(open(path)))
current_chunk = []
gc.collect()
limit = self._next_limit()
MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
DiskBytesSpilled += os.path.getsize(path)

Expand Down

0 comments on commit 73c8ea8

Please sign in to comment.