spark 部分细节分享

背景

在日常工作中,会频繁涉及到一些问题, 比如:

  • 为什么 group by 用了 SortAggregate, 而不是 HashAggregate
  • 为什么 某些场景用 DataSet 会比 RDD 快这么多
  • 为什么有些 udf 会被执行多次,有些又不会
  • watermark 和 trigger 对延迟的影响

在尝试理解并回答这些问题的过程中,需要对 spark 有一定的了解。 本文旨在提供一个针对 spark 的工程视角,去 top 解释 spark 这样的并行计算框架的实现,从而针对上述问题可以有一个快速的认识。

spark 是什么

spark 是一个并行计算框架1, 提供一系列算子描述计算意图,并自动转换为并行计算,负责其执行效率,可靠性,容错等。

问题领域

并行计算

  • 什么场景需要并行计算
    1. 数据量大, 单机内存无法计算
    2. 计算量大, 利用多机/多核性能
  • 为什么需要框架
    1. 重复问题的统一处理
      1. 容错
      2. 调度
      3. io, rpc
    2. 常见问题的最佳实践

主要解决的计算问题

  1. transform (map, filter, ...)
    • Data[a] -> Data[b]
  2. join
    • (Data[a], Data[b]) -> Data[(a,b)]
  3. aggregate
    • Data[(k,a)] -> Agg (List[a] -> b) -> Data[(k, b)]
  4. sort
    • Data[a] -> SortedData[a]

实现方式

将常见计算问题翻译成 transform, join, agg, sort 等问题

  • select count(1) from a
    • transform a -> 1
    • aggregate by sum

实现 transform

实现 join

目的/场景

TODO

实现方式

  • HashJoin
    • 多表保证同一个 key 映射到一个分区
  • SortMergeJoin
  • BroadcaseJoin
    • 大小表 join
    • 小表广播到大表内存,直接 hashmap 查询

实现 aggregate

目的/场景

  1. 将数据按照某些 key 聚合,将聚合后的值执行某种计算获得结果
  2. 主要工作: 如何保证同一个 key 的数据最终都能聚合到一起进行计算

解决思路

  1. 将相同 key 放入一个分区
    1. partition 的常见方式: hash, range, 对应的 agg 即为 HashAgg/SortAgg
    2. range 的分区需要对值域有一定了解, spark 通过 sample 去做
    3. 对于 count(distinct(some value)), 可以认为是 group by some value, 亦即保证同一个 value 在同一个分区,然后去重(aggregate 操作), 然后转换为了 count 操作
  2. 将同一个 key 对应的结果进行计算

实现方式

  1. Hash aggregate
  2. Sort aggregate

实现 sort

工程考虑

  • hash 与 sort 对比
    • 内存
      • hash 需要内存足够保留待计算分区的全量状态
      • sort 可以仅使用常量内存
        • 排序过程可以利用磁盘,通过 merge sort 保证只使用指定内存
        • 可以通过保证相关计算相邻,减少内存状态(淘汰已计算完成的状态)
    • io
      • 不利用外存的情况下
        • hash / sort 只需读取一次
      • 内存不够的情况下
        • hash 会失败
        • sort 会多次读取磁盘 (和排序的 segments 个数相关)
    • 计算效率
      • hash O(n)
      • sort O(nlogn)
      • 与分区的大小强相关,比如分区个数为 p 的情况下
        • hash p * O(n/p) 大约 O(n)
        • sort p * O(n/p * log(n/p)) 大约 O(n * log(n/p))
          • p 越大, 基于 sort 的整体开销越小
  • 内存与计算
    • 问题
      • 常见对象在 java 堆上占用的内存较大, 影响 gc
      • shuffle/sort 读写盘 均需要一次序列化/反序列化
    • project tungen
      • UnsafeRow
        • 直接基于 byte[] 进行排序, mutable 的修改操作等
        • 自定义结构 memory foot 较小
          • 无需存储对象指针
          • string 的编码不使用 utf16
  • 重复表达式
    • CommonExprElimination
  • sql 优化(较复杂)
    • 大小表 join
    • filter 下推
    • constant folding
    • 可以参考常见的编译器优化以及数据库优化技术
  • 数据量太大
    • 重分区

案例分享

实时计算的多次反序列化

  • projectExec 在 codegen 时未做 common expr elimination
    • 选择解释执行时支持
    • 算是个小 bug
  • filter 下推后与 project 阶段, udf 不会共享
    • 未进行 wholeStage 的表达式优化 (?)

离线计算的 SortAggregate

  • aggregate 的执行使用 hash, sort
    • hashAggregate 的实现基于 UnsafeRow(无需反序列化,计算状态变更通过 变更 row 中的部分 byte 实现(以及 agg 的状态使用 unsaferow 表示时需要 mutable)
    • 自行实现的 udaf 的 bufferSchema 不支持 mutable
    • 从而选择了 failback 到 SortAggregate

RDD 比 DataSet 慢

  • 因为 exchange/shuffle 过程中,reducer 需要反序列化数据,存储在内存. 反序列化的开销和 gc 压力综合导致的时长

实时计算的计算效率不高

  • transform 过程中,行级别依赖 rpc

Footnotes:

1

当然, spark 也是一个解释器/编译器