作者: 张凯@阿里云、陳韋廷@Intel、周渊@Intel
简介
Apache Celeborn(Incubating) 是阿里云捐赠给 Apache 的通用 Remote Shuffle Service,旨在提升大数据计算引擎的性能/稳定性/弹性,目前已广泛应用于生产场景。Gluten 是 Intel 开源的引擎加速项目,旨在通过把 Spark Java Engine 替换为 Native Engine(Velox, ClickHouse, Arrow 等)来加速 Spark 引擎。过去一段时间,Gluten 社区和 Celeborn 社区相互合作,成功把 Celeborn 集成进 Gluten,本文将对此加以介绍。
Gluten: 给 Spark 换上 Native 引擎
Gluten 项目旨在解决基于 Apache Spark 的数据负载场景中的 CPU 计算瓶颈。随着 IO 技术的提升,特别是SSD和万兆网卡的普及,CPU 计算瓶颈逐渐成为限制性能的主要因素。然而,基于 JVM 进行 CPU 指令优化相对困难,因为与其他本地语言(如C++)相比,JVM 提供的优化功能较少。
为了解决这个问题,开源社区已经有一些成熟的本地引擎(如 ClickHouse、Velox)具备了优秀的向量化执行能力,可以带来显著的性能优势。但是,这些引擎通常与 Spark 生态系统脱离,对于那些已经严重依赖 Spark 计算框架且无法承受大量运维和迁移成本的用户来说,不够友好。Gluten 项目的目标是使 Spark 用户能够享受这些成熟的本地引擎带来的性能优势,而无需迁移。
Gluten 项目利用 Spark 插件机制,拦截并将查询计划发送给本地引擎执行,从而跳过 Spark 本身不够高效的执行路径。该项目支持多个本地引擎作为后端,包括 Velox、ClickHouse 和 Apache Arrow。对于本地引擎无法处理的操作,Gluten 会回退到 Spark 的正常执行路径。在线程模型方面,Gluten 使用 JNI 调用库的形式,在 Spark 执行器任务线程中直接调用本地代码,避免引入复杂的线程模型。
在内存管理方面,Gluten 能够统一管理本地内存和 JVM 内存,通过本地内存池和任务内存管理器分配内存。当内存不足时,会触发溢出操作,释放内存。此外,Gluten 还提供了完整的列式 Shuffle 机制及及统一API接口用于衔接市场受欢迎的第三方 RemoteShuffleService 如 Celeborn,避免了数据转换开销及提供服务。
为了兼容不同的本地引擎,Gluten 定义了清晰的JNI接口,作为 Spark 框架和底层引擎之间的桥梁。这些接口用于请求传递、数据传输和能力检测等方面的需求。开发者只需实现这些接口,并满足相应的语义要求,即可利用 Gluten 完成 Spark 和本地引擎的整合工作。此外, Spark 的架构设计中还预留了 Shim Layer 来适配支持不同版本的 Spark。
Gluten Columnar Shuffle
Shuffle 本身是影响 Spark 性能的重要一环,这里会引入多次序列化/反序列化,网络传输,磁盘读写,因此要想实现高性能才不至于成为瓶颈。由于 Native Engine 采用列式(Columnar)数据结构暂存数据,如果简单的沿用 Spark 的基于行数据模型的 Shuffle,则会在 Shuffle Write 阶段引入数据列转行的环节,在 Shuffle Read 阶段引入数据行转列的环节,才能使数据可以流畅周转。但是无论行转列,还是列转行的成本都不低。因此,Gluten 必须提供完整的 Columnar Shuffle 机制以避开这里的转化开销。具体到 columnar shuffle 实现层,主要分成 shuffle 数据写入和shuffle 数据读取两块。
Columnar Shuffle 写入
和原生 Spark 一样,Columnar Shuffle 目标是将当前 DAG 产生的临时数据写入磁盘,在下一个 stage 需要将数据读出,也需要支持内存不足时的 spill 操作,优先保证查询的健壮性。与Spark里不同的地方主要有以下几点:
- Spark 默认采用 row based 格式存储数据,Gluten 里 shuffle 采用了 columnar format 来保存数据。
- 目前的实现是基于 Arrow format,来做序列化的工作,并且支持自定义压缩格式。采用 columnar format 来实现可以方便的引入 SIMD 指令来做优化
- Spark 默认采用 sort-based shuffle,而 gluten 里默认采用 hash based shuffle
- Sort based shuffle 比,hash based 算法复杂度更低,但需要占用更多内存,并且引入很多小文件问题。gluten 里实现 hash based shuffle 时,也参考了 sort based shuffle 的部分设计,减少了小文件过多带来的影响
在一个 TPC-H Like Scale Factor 6TB的测试场景中,Columnar Shuffle Write 和原生Spark 的 row based shuffle 相比,可以达到减少约~12% 的 Shuffle Size 的效果。
Columnar Shuffle 读取
在实现 Columnar Shuffle 读取时,Gluten 复用了 Spark 里的 netty based shuffle transfer 机制,只需要提供对应的 de-serializer,将已经写到磁盘上的 shuffle 文件读取上来,并反序列化交给 reducer。Spark 里引入了很多软件栈比如 netty, kryo,导致 reducer 读取时有重复的内存拷贝,Gluten 里也做了一些零拷贝优化来减少这里的软件开销。
Celeborn:解决本地 Shuffle 的限制
Gluten 本地 Shuffle 限制
上图展示了 Gluten Columnar Shuffle 的主流程,其中 Hash-based Shuffle、Native Partitioner、零拷贝等设计是其获得高性能的关键。然而,Gluten 沿用了 Spark 的本地 Shuffle 框架,存在以下主要限制。
-
依赖大容量本地盘存储 shuffle 数据,一方面无法应用存算分离架构,另一方面计算节点“有状态”无法及时缩容,从而导致难以兼容云原生架构,资源利用率低。
-
Shuffle Write 内存紧张时 Spill 到磁盘,增加额外的磁盘 I/O。
-
Shuffle Read 有大量的网络连接和大量磁盘随机读,导致较差的稳定性和性能。
Celeborn 简介
Apache Celeborn(Incubating) 是较成熟的通用 Remote Shuffle Service,可以很好的解决大数据引擎本地 Shuffle 存在的稳定性、性能、弹性的问题,详见文末索引[1][2][3][4]。Apache Celeborn 社区和 Gluten 社区过去一段时间相互配合,成功把 Celeborn 集成进 Gluten,使得 Native Spark 能更好的拥抱 Cloud Native。接下来将介绍 Gluten 如何集成Celeborn。
Gluten + Celeborn
整体设计
Gluten 集成 Celeborn 的设计目标是同时保留 Gluten Columnar Shuffle 和 Celeborn Remote Shuffle 的核心设计,让两者的优势叠加。
上图描述了 Gluten+Celeborn Columnar Shuffle 的整体设计:Shuffle Writer 复用 Native Partitioner,拦截本地 IO 并改为推向 Celeborn 集群;Celeborn 集群做数据重组(聚合相同 Partition 的数据)和多备份;Shuffle Reader 从特定 Celeborn Worker 上顺序读取数据并反序列化为 Column Batch。这个设计不仅保留了 Gluten Columnar Shuffle 的高性能设计,又充分利用了 Celeborn 远端存储、数据重组和多副本的能力。
具体而言,Gluten 集成 Celeborn 主要在于实现对应的 ShuffleManager,ShuffleWriter 以及ShuffleReader,接下来将分别介绍。
CelebornShuffleManager
CelebornShuffleManager 继承了 Spark ShuffleManager 接口,作为 Gluten 对接 Celeborn 的 ShuffleManager,主要做了以下工作:
-
向 Celeborn register shuffle,失败则回退到 Gluten 的本地 Columnar Shuffle。
-
与 Celeborn 集群建立连接并初始化 Celeborn Shuffle Client。
-
提供 getWriter 方法获取 CelebornShuffleWriter。
-
提供 getReader 方法获取 CelebornShuffleReader。
CelebornShuffleWriter
CelebornShuffleWriter 与 Gluten Columnar Shuffle 一致,都采用了 Hash-based Shuffle。其核心功能是复用 Gluten 中的 Native Partitioner,并将磁盘 IO 操作(Spill,写 Shuffle 文件)替换为推向 Celeborn 集群。主要流程如下:
-
通过 JNI 向 Native 模块传递 CelebornPartitionPusher,使得 Native 模块可向 Celeborn 集群推送数据。
-
复用 Native Partitioner 对列式数据进行 Partition,与 Gluten Columnar Shuffle 保持一致。
-
向 GlutenMemoryConsumer 注册 Spiller,保证在 Spark 监测到内存不足触发 Spill 时,可以通过 Celeborn SDK 把数据推送到 Celeborn 集群,从而避免额外的磁盘 IO。
-
在 Native 模块,当全部数据完成 Partition 后,将写文件操作替换成通过 Celeborn SDK 推送到 Celeborn 集群。
CelebornShuffleReader
CelebornShuffleReader 跟 Celeborn 集群建立连接,读取 Shuffle 数据。在 Gluten 侧实现 CelebornColumnarBatchSerializer,通过 deserializeStream 方法定制 InputStream 的 deserialize 流程,最后将反序列化的 ColumnarBatch 交给 Gluten 继续处理。从上述两图对比可知,本地 Shuffle 的 Reducer 从多个文件读取数据,而 Celeborn Reducer 只需从一个 Worker 上读取,随机读转换成了顺序读,网络的连接数也从乘数关系变成了线性关系,从而提升了 Shuffle Read 的性能。
性能测试
Celeborn 在磁盘资源受限时有最好的性能表现。我们测试了三组硬件环境:SDD 环境,充分 HDD 环境,有限 HDD 环境。整体结论是:在 SDD 环境, Gluten + Celeborn Columnar Shuffle 性能跟 Gluten 本地 Columnar Shuffle 持平;在充分 HDD 和有限 HDD 环境,Gluten + Celeborn Columnar Shuffle 性能比 Gluten 本地 Columnar Shuffle 分别提升8% 和12% 。
充分 HDD 环境
部署方式:Celeborn 集群和 Yarn 集群混部。
硬件环境:1 x Master(64 vCPU, 256 GiB) 5 x worker(40 vCPU, 176 GiB, 15x7300GB HDD)
Spark 版本:3.3.1
Benchmark:3T TPCDS
下图是 Gluten+Celeborn 相比 Gluten 的Top20的加速比:
下图是完整 TPCDS 的时间对比,整体提升8%:
受限 HDD 环境
部署方式:Celeborn 集群和 Yarn 集群混部。
硬件环境:1 x Master(64 vCPU, 256 GiB) 5 x worker(40 vCPU, 176 GiB, 2x7300GB HDD)
Spark 版本:3.3.1
Benchmark:3T TPCDS
下图是 Gluten+Celeborn 相比 Gluten 的Top20的加速比:
下图是完整 TPCDS 的时间对比,整体提升12% :
SSD 环境
最后把磁盘全部换成 SSD,Gluten+Celeborn 在不额外消耗机器资源的情况下,比 Gluten 性能提升 1.2% ,性能基本持平。
总结
本篇文章介绍了 Gluten 项目的背景和目标,以及它如何解决基于 Apache Spark 的数据负载场景中的 CPU 计算瓶颈。Gluten 利用 Spark 插件机制,将查询计划发送给本地引擎执行,从而跳过 Spark 本身不够高效的执行路径。该项目支持多个本地引擎作为后端,引入 Columnar Shuffle 设计,并统一管理本地内存和 JVM 内存。此外,Gluten 集成了 Celeborn作为 Remote Shuffle Service,Celeborn 采用了 Push Shuffle 的设计,通过远端存储、数据重组、内存缓存、多副本等设计,不仅进一步提升 Gluten Shuffle 的性能和稳定性,还使得 Gluten 拥有更好的弹性,从而更好的拥抱云原生。
欢迎加入我们的开源项目,并贡献你的代码!我们的项目位于
Gluten: https://github.com/oap-project/gluten
Celeborn: https://github.com/apache/incubator-celeborn
Celeborn 用户交流钉群: 41594456
Reference
[1]https://developer.aliyun.com/article/779686
[2]https://developer.aliyun.com/article/857757
[3]https://developer.aliyun.com/article/891951
[4]https://developer.aliyun.com/article/1153123