spark 部分细节分享
背景
在日常工作中,会频繁涉及到一些问题, 比如:
- 为什么 group by 用了 SortAggregate, 而不是 HashAggregate
- 为什么 某些场景用 DataSet 会比 RDD 快这么多
- 为什么有些 udf 会被执行多次,有些又不会
- watermark 和 trigger 对延迟的影响
在尝试理解并回答这些问题的过程中,需要对 spark 有一定的了解。 本文旨在提供一个针对 spark 的工程视角,去 top 解释 spark 这样的并行计算框架的实现,从而针对上述问题可以有一个快速的认识。
spark 是什么
spark 是一个并行计算框架1, 提供一系列算子描述计算意图,并自动转换为并行计算,负责其执行效率,可靠性,容错等。
问题领域
并行计算
- 什么场景需要并行计算
- 数据量大, 单机内存无法计算
- 计算量大, 利用多机/多核性能
- 为什么需要框架
- 重复问题的统一处理
- 容错
- 调度
- io, rpc
- 常见问题的最佳实践
- 重复问题的统一处理
主要解决的计算问题
- transform (map, filter, ...)
- Data[a] -> Data[b]
- join
- (Data[a], Data[b]) -> Data[(a,b)]
- aggregate
- Data[(k,a)] -> Agg (List[a] -> b) -> Data[(k, b)]
- sort
- Data[a] -> SortedData[a]
实现方式
将常见计算问题翻译成 transform, join, agg, sort 等问题
- select count(1) from a
- transform a -> 1
- aggregate by sum
实现 transform
实现 join
目的/场景
TODO
实现方式
- HashJoin
- 多表保证同一个 key 映射到一个分区
- SortMergeJoin
- BroadcaseJoin
- 大小表 join
- 小表广播到大表内存,直接 hashmap 查询
实现 aggregate
目的/场景
- 将数据按照某些 key 聚合,将聚合后的值执行某种计算获得结果
- 主要工作: 如何保证同一个 key 的数据最终都能聚合到一起进行计算
解决思路
- 将相同 key 放入一个分区
- partition 的常见方式: hash, range, 对应的 agg 即为 HashAgg/SortAgg
- range 的分区需要对值域有一定了解, spark 通过 sample 去做
- 对于 count(distinct(some value)), 可以认为是 group by some value, 亦即保证同一个 value 在同一个分区,然后去重(aggregate 操作), 然后转换为了 count 操作
- 将同一个 key 对应的结果进行计算
实现方式
- Hash aggregate
- Sort aggregate
实现 sort
工程考虑
- hash 与 sort 对比
- 内存
- hash 需要内存足够保留待计算分区的全量状态
- sort 可以仅使用常量内存
- 排序过程可以利用磁盘,通过 merge sort 保证只使用指定内存
- 可以通过保证相关计算相邻,减少内存状态(淘汰已计算完成的状态)
- io
- 不利用外存的情况下
- hash / sort 只需读取一次
- 内存不够的情况下
- hash 会失败
- sort 会多次读取磁盘 (和排序的 segments 个数相关)
- 不利用外存的情况下
- 计算效率
- hash O(n)
- sort O(nlogn)
- 与分区的大小强相关,比如分区个数为 p 的情况下
- hash p * O(n/p) 大约 O(n)
- sort p * O(n/p * log(n/p)) 大约 O(n * log(n/p))
- p 越大, 基于 sort 的整体开销越小
- 内存
- 内存与计算
- 问题
- 常见对象在 java 堆上占用的内存较大, 影响 gc
- shuffle/sort 读写盘 均需要一次序列化/反序列化
- project tungen
- UnsafeRow
- 直接基于 byte[] 进行排序, mutable 的修改操作等
- 自定义结构 memory foot 较小
- 无需存储对象指针
- string 的编码不使用 utf16
- UnsafeRow
- 问题
- 重复表达式
- CommonExprElimination
- sql 优化(较复杂)
- 大小表 join
- filter 下推
- constant folding
- 可以参考常见的编译器优化以及数据库优化技术
- 数据量太大
- 重分区
案例分享
实时计算的多次反序列化
- projectExec 在 codegen 时未做 common expr elimination
- 选择解释执行时支持
- 算是个小 bug
- filter 下推后与 project 阶段, udf 不会共享
- 未进行 wholeStage 的表达式优化 (?)
离线计算的 SortAggregate
- aggregate 的执行使用 hash, sort
- hashAggregate 的实现基于 UnsafeRow(无需反序列化,计算状态变更通过 变更 row 中的部分 byte 实现(以及 agg 的状态使用 unsaferow 表示时需要 mutable)
- 自行实现的 udaf 的 bufferSchema 不支持 mutable
- 从而选择了 failback 到 SortAggregate
RDD 比 DataSet 慢
- 因为 exchange/shuffle 过程中,reducer 需要反序列化数据,存储在内存. 反序列化的开销和 gc 压力综合导致的时长
实时计算的计算效率不高
- transform 过程中,行级别依赖 rpc
参考文档
- https://www.waitingforcode.com/apache-spark-sql/aggregations-execution-apache-spark-sql/read
- spark source code: org.apache.sql.execution
- 重点关注包的结构即可
- https://databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html
Footnotes:
1
当然, spark 也是一个解释器/编译器