Skip to content

Commit

Permalink
Update 4-shuffleDetails.md
Browse files Browse the repository at this point in the history
  • Loading branch information
zixicc committed Sep 5, 2014
1 parent 510bdd5 commit c6de97a
Showing 1 changed file with 5 additions and 3 deletions.
8 changes: 5 additions & 3 deletions markdown/4-shuffleDetails.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Shuflfe 过程
# Shuffle 过程

上一章里讨论了 job 的物理执行图,也讨论了流入 RDD 中的 records 是怎么被 compute() 后流到后续 RDD 的,同时也分析了 task 是怎么产生 result,以及 result 怎么被收集后计算出最终结果的。然而,我们还没有讨论**数据是怎么通过 ShuffleDependency 流向下一个 stage 的?**

@@ -59,14 +59,16 @@ ShuffleMapTask 的执行过程很简单:先利用 pipeline 计算得到 finalR
// MapReduce
reduce(K key, Iterable<V> values) {
result = process(key, values)
return result }
return result
}

// Spark
reduce(K key, Iterable<V> values) {
result = null
for (V value : values)
result = func(result, value)
return result }
return result
}
```
MapReduce 可以在 process 函数里面可以定义任何数据结构,也可以将部分或全部的 values 都 cache 后再进行处理,非常灵活。而 Spark 中的 func 的输入参数是固定的,一个是上一个 record 的处理结果,另一个是当前读入的 record,它们经过 func 处理后的结果被下一个 record 处理时使用。因此一些算法比如求平均数,在 process 里面很好实现,直接`sum(values)/values.length`,而在 Spark 中 func 可以实现`sum(values)`,但不好实现`/values.length`。更多的 func 将会在下面的章节细致分析。
- **fetch 来的数据存放到哪里?**刚 fetch 来的 FileSegment 存放在 softBuffer 缓冲区,经过处理后的数据放在内存 + 磁盘上。这里我们主要讨论处理后的数据,可以灵活设置这些数据是“只用内存”还是“内存+磁盘”。如果`spark.shuffle.spill = false`就只用内存。内存使用的是`AppendOnlyMap` ,类似 Java 的`HashMap`,内存+磁盘使用的是`ExternalAppendOnlyMap`,如果内存空间不足时,`ExternalAppendOnlyMap`可以将 \<K, V\> records 进行 sort 后 spill 到磁盘上,等到需要它们的时候再进行归并,后面会详解。**使用“内存+磁盘”的一个主要问题就是如何在两者之间取得平衡?**在 Hadoop MapReduce 中,默认将 reducer 的 70% 的内存空间用于存放 shuffle 来的数据,等到这个空间利用率达到 66% 的时候就开始 merge-combine()-spill。在 Spark 中,也适用同样的策略,一旦 ExternalAppendOnlyMap 达到一个阈值就开始 spill,具体细节下面会讨论。

0 comments on commit c6de97a

Please sign in to comment.