Skip to content

Commit

Permalink
[SPARK-2538] [PySpark] Hash based disk spilling aggregation
Browse files Browse the repository at this point in the history
During aggregation in Python worker, if the memory usage is above spark.executor.memory, it will do disk spilling aggregation.

It will split the aggregation into multiple stage, in each stage, it will partition the aggregated data by hash and dump them into disks. After all the data are aggregated, it will merge all the stages together (partition by partition).

Author: Davies Liu <[email protected]>

Closes apache#1460 from davies/spill and squashes the following commits:

cad91bf [Davies Liu] call gc.collect() after data.clear() to release memory as much as possible.
37d71f7 [Davies Liu] balance the partitions
902f036 [Davies Liu] add shuffle.py into run-tests
dcf03a9 [Davies Liu] fix memory_info() of psutil
67e6eba [Davies Liu] comment for MAX_TOTAL_PARTITIONS
f6bd5d6 [Davies Liu] rollback next_limit() again, the performance difference is huge:
e74b785 [Davies Liu] fix code style and change next_limit to memory_limit
400be01 [Davies Liu] address all the comments
6178844 [Davies Liu] refactor and improve docs
fdd0a49 [Davies Liu] add long doc string for ExternalMerger
1a97ce4 [Davies Liu] limit used memory and size of objects in partitionBy()
e6cc7f9 [Davies Liu] Merge branch 'master' into spill
3652583 [Davies Liu] address comments
e78a0a0 [Davies Liu] fix style
24cec6a [Davies Liu] get local directory by SPARK_LOCAL_DIR
57ee7ef [Davies Liu] update docs
286aaff [Davies Liu] let spilled aggregation in Python configurable
e9a40f6 [Davies Liu] recursive merger
6edbd1f [Davies Liu] Hash based disk spilling aggregation
  • Loading branch information
davies authored and mateiz committed Jul 25, 2014
1 parent eff9714 commit 14174ab
Show file tree
Hide file tree
Showing 9 changed files with 611 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ private[spark] class PythonRDD[T: ClassTag](
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
val startTime = System.currentTimeMillis
val env = SparkEnv.get
val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
val localdir = env.blockManager.diskBlockManager.localDirs.map(
f => f.getPath()).mkString(",")
val worker: Socket = env.createPythonWorker(pythonExec,
envVars.toMap + ("SPARK_LOCAL_DIR" -> localdir))

// Start a thread to feed the process input from our parent's iterator
val writerThread = new WriterThread(env, worker, split, context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
/* Create one local directory for each path mentioned in spark.local.dir; then, inside this
* directory, create multiple subdirectories that we will hash files into, in order to avoid
* having really large inodes at the top level. */
private val localDirs: Array[File] = createLocalDirs()
val localDirs: Array[File] = createLocalDirs()
if (localDirs.isEmpty) {
logError("Failed to create any local dir.")
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,15 @@ Apart from these, the following properties are also available, and may be useful
Spark's dependencies and user dependencies. It is currently an experimental feature.
</td>
</tr>
<tr>
<td><code>spark.python.worker.memory</code></td>
<td>512m</td>
<td>
Amount of memory to use per python worker process during aggregation, in the same
format as JVM memory strings (e.g. <code>512m</code>, <code>2g</code>). If the memory
used during aggregation goes above this amount, it will spill the data into disks.
</td>
</tr>
</table>

#### Shuffle Behavior
Expand Down
2 changes: 1 addition & 1 deletion python/epydoc.conf
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ private: no
exclude: pyspark.cloudpickle pyspark.worker pyspark.join
pyspark.java_gateway pyspark.examples pyspark.shell pyspark.tests
pyspark.rddsampler pyspark.daemon pyspark.mllib._common
pyspark.mllib.tests
pyspark.mllib.tests pyspark.shuffle
92 changes: 71 additions & 21 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
from pyspark.rddsampler import RDDSampler
from pyspark.storagelevel import StorageLevel
from pyspark.resultiterable import ResultIterable
from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, \
get_used_memory

from py4j.java_collections import ListConverter, MapConverter

Expand Down Expand Up @@ -197,6 +199,22 @@ def _replaceRoot(self, value):
self._sink(1)


def _parse_memory(s):
"""
Parse a memory string in the format supported by Java (e.g. 1g, 200m) and
return the value in MB
>>> _parse_memory("256m")
256
>>> _parse_memory("2g")
2048
"""
units = {'g': 1024, 'm': 1, 't': 1 << 20, 'k': 1.0 / 1024}
if s[-1] not in units:
raise ValueError("invalid format: " + s)
return int(float(s[:-1]) * units[s[-1].lower()])


class RDD(object):

"""
Expand Down Expand Up @@ -1207,20 +1225,49 @@ def partitionBy(self, numPartitions, partitionFunc=portable_hash):
if numPartitions is None:
numPartitions = self._defaultReducePartitions()

# Transferring O(n) objects to Java is too expensive. Instead, we'll
# form the hash buckets in Python, transferring O(numPartitions) objects
# to Java. Each object is a (splitNumber, [objects]) pair.
# Transferring O(n) objects to Java is too expensive.
# Instead, we'll form the hash buckets in Python,
# transferring O(numPartitions) objects to Java.
# Each object is a (splitNumber, [objects]) pair.
# In order to avoid too huge objects, the objects are
# grouped into chunks.
outputSerializer = self.ctx._unbatched_serializer

limit = (_parse_memory(self.ctx._conf.get(
"spark.python.worker.memory", "512m")) / 2)

def add_shuffle_key(split, iterator):

buckets = defaultdict(list)
c, batch = 0, min(10 * numPartitions, 1000)

for (k, v) in iterator:
buckets[partitionFunc(k) % numPartitions].append((k, v))
c += 1

# check used memory and avg size of chunk of objects
if (c % 1000 == 0 and get_used_memory() > limit
or c > batch):
n, size = len(buckets), 0
for split in buckets.keys():
yield pack_long(split)
d = outputSerializer.dumps(buckets[split])
del buckets[split]
yield d
size += len(d)

avg = (size / n) >> 20
# let 1M < avg < 10M
if avg < 1:
batch *= 1.5
elif avg > 10:
batch = max(batch / 1.5, 1)
c = 0

for (split, items) in buckets.iteritems():
yield pack_long(split)
yield outputSerializer.dumps(items)

keyed = PipelinedRDD(self, add_shuffle_key)
keyed._bypass_serializer = True
with _JavaStackTrace(self.context) as st:
Expand All @@ -1230,8 +1277,8 @@ def add_shuffle_key(split, iterator):
id(partitionFunc))
jrdd = pairRDD.partitionBy(partitioner).values()
rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer))
# This is required so that id(partitionFunc) remains unique, even if
# partitionFunc is a lambda:
# This is required so that id(partitionFunc) remains unique,
# even if partitionFunc is a lambda:
rdd._partitionFunc = partitionFunc
return rdd

Expand Down Expand Up @@ -1265,26 +1312,28 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
if numPartitions is None:
numPartitions = self._defaultReducePartitions()

serializer = self.ctx.serializer
spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower()
== 'true')
memory = _parse_memory(self.ctx._conf.get(
"spark.python.worker.memory", "512m"))
agg = Aggregator(createCombiner, mergeValue, mergeCombiners)

def combineLocally(iterator):
combiners = {}
for x in iterator:
(k, v) = x
if k not in combiners:
combiners[k] = createCombiner(v)
else:
combiners[k] = mergeValue(combiners[k], v)
return combiners.iteritems()
merger = ExternalMerger(agg, memory * 0.9, serializer) \
if spill else InMemoryMerger(agg)
merger.mergeValues(iterator)
return merger.iteritems()

locally_combined = self.mapPartitions(combineLocally)
shuffled = locally_combined.partitionBy(numPartitions)

def _mergeCombiners(iterator):
combiners = {}
for (k, v) in iterator:
if k not in combiners:
combiners[k] = v
else:
combiners[k] = mergeCombiners(combiners[k], v)
return combiners.iteritems()
merger = ExternalMerger(agg, memory, serializer) \
if spill else InMemoryMerger(agg)
merger.mergeCombiners(iterator)
return merger.iteritems()

return shuffled.mapPartitions(_mergeCombiners)

def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None):
Expand Down Expand Up @@ -1343,7 +1392,8 @@ def mergeValue(xs, x):
return xs

def mergeCombiners(a, b):
return a + b
a.extend(b)
return a

return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
numPartitions).mapValues(lambda x: ResultIterable(x))
Expand Down
29 changes: 28 additions & 1 deletion python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def load_stream(self, stream):
return chain.from_iterable(self._load_stream_without_unbatching(stream))

def _load_stream_without_unbatching(self, stream):
return self.serializer.load_stream(stream)
return self.serializer.load_stream(stream)

def __eq__(self, other):
return (isinstance(other, BatchedSerializer) and
Expand Down Expand Up @@ -302,6 +302,33 @@ class MarshalSerializer(FramedSerializer):
loads = marshal.loads


class AutoSerializer(FramedSerializer):
"""
Choose marshal or cPickle as serialization protocol autumatically
"""
def __init__(self):
FramedSerializer.__init__(self)
self._type = None

def dumps(self, obj):
if self._type is not None:
return 'P' + cPickle.dumps(obj, -1)
try:
return 'M' + marshal.dumps(obj)
except Exception:
self._type = 'P'
return 'P' + cPickle.dumps(obj, -1)

def loads(self, obj):
_type = obj[0]
if _type == 'M':
return marshal.loads(obj[1:])
elif _type == 'P':
return cPickle.loads(obj[1:])
else:
raise ValueError("invalid sevialization type: %s" % _type)


class UTF8Deserializer(Serializer):
"""
Deserializes streams written by String.getBytes.
Expand Down
Loading

0 comments on commit 14174ab

Please sign in to comment.