继上一篇文章:SQL优化之诊断篇:快速定位生产性能问题实践。本文将从优化运行时间和优化资源消耗这两个方面,介绍可以提升作业性能的常用方法。
1.优化运行时间
在优化运行时间这个维度上,我们重点关注时间上的加速,单位时间内可能会消耗更多的计算资源。总成本有可能上升,也可能降低。为了缩短作业运行时间,可以从作业并行度,数据倾斜等角度进行优化。
1.1 调整并行度
task 并行度不合理有很多时候是因为数据从上游 task 计算后,数据膨胀得很厉害。我们第一步应该做的是去检查自己的业务逻辑有没有问题,是否有数据膨胀。
并行度是衡量并行计算并行程度的一个指标,从执行计划上来看,比如 M1,用 1000个 instance 来执行,我们就说 M1 的并行度是 1000。需要注意的是,调整并行度不一定是调高,instance 数量过多会从两个方面影响执行速度:
1.instance 越多,等待资源需要更长的时间,排队次数也更多。
2.每个 instance 初始化需要一定的时间,并行度越高,初始化占用的总时间就越多,有效的执行时间占比就越低。
下面列出了非人为干预情况下,影响单个task并行度的主要因素:
1.某些操作强制了必须 1 个 instance 来执行,例如:
a.做聚合的时候,没有group by,或者 group by 一个常量;
b.窗口函数的 over 里面指定 partition by 一个常量;
c.SQL 中指定 distribute by、cluster by 、order by一个常量;
d.mapjoin 的时候,需要把一个小表广播出去,目前会先被塞到一起然后再做广播;
2.读表的 task:instance 数量与源数据的大小有关系。
3.非读表的 task:instance 数量根据输入 task 的并行度,加 task 内部的 operator 来推算。
强制 1 个 instance 执行
针对这种情况,用户需要检查这些操作是否必要,能否去掉,尽量取消掉这些操作。
读表的 task
对于读表的 task,一般情况下,一个 instance 读取 256M(压缩后的大小)的数据。绝大多数情况下没有问题,这里列出一些常见出问题的情况:
1.数据压缩比很高:256M 解压出来后可能有好几十 G 数据,保持默认设置读 256M 数据不合理,需要调小单个 instance 读取的数据量。
2.Task 中执行了一些很 heavy 的操作,特别是存在 UDF(包含使用非结构化存储的时候用户自定义的Extractor),有可能读 256M 的执行时间也不能满足用户需求,需要调小单个 instance 读取的数据量。
3.读取 256M 数据太少,导致 instance 的执行时间太短,而由于输入数据很大(比如在对一年的数据做统计分析),反而导致了并行度过大,让 instance 大多数时间在排队等资源,需要调大单个 instance 读取的数据量。
可以通过调整split size来设置task的实例数
set xxx.sql.mapper.split.size=
-- 设定一个map的最大数据输入量,单位M
-- 默认256。区间[1,Integer.MAX_VALUE]
非读表的 task
主要有三种方式调整并行度:
1.调整 xxx.sql.mapper.split.size:非读表 task 的并行度会受到输入 task 的并行度影响,通过调整读表 task 的并行度间接调整非读表 task 的并行度。
2.通过 xxx.sql.reducer.instances 强制设置 reducer 并行度,但是会影响所有的相关 task。
3.通过 xxx.sql.joiner.instances 强制设置 joiner 并行度,但是会影响所有的相关 task。
1.2 优化执行计划
Map Join Hint
在对大表和一个或者多个小表执行join操作时,若因为统计信息缺失、UDF逻辑黑盒等原因导致优化器没有自动生成map join的,用户可以手动添加map join hint,使得原本的Sort-Merge Join变成Map Join,避免大表数据shuffle从而提升性能。
1.3 数据倾斜
在数据倾斜的情况下,通常情况下调整并行度是没有用的,并且没有万能的解决方案。一般会有下面几种倾斜类别:
1.Map端长尾优化
2.Reduce端长尾优化
3.Join长尾优化
注意,数据倾斜的情况非常多,并没有百分之百有效的解决方案。这里也只是提供一些优化思路,经验之谈,大多数思路都是需要用户去对代码甚至算法做一定的修改。
数据Shuffle导致的数据倾斜
数据倾斜大多数是由于数据的 reshuffle 引起的,因为按照某个 key 来做 shuffle,同一个 key 值的数据会强制集中在一个 instance 处理。
【如何判断是否由数据 Shuffle 导致的倾斜】Shuffle 对应于执行计划中的 streamline,如果发现数据倾斜的 task 是其他 task 的下游,并且上游的数据按照某个 key 来做shuffle,那么首先分析一下是不是因为 shuffle 导致的。Shuffle 导致的数据倾斜一般有几种情况:
1.Shuffle 用了区分度低(值域只包含少量的值)的 key。比如人口数据,按照“性别”来做分区,由于性别只有“男”和“女”两种,那么数据最终会集中到两个 instance 来处理,如果并行度不是 2,除了这两个 instance 之外,其他的 instance 都是空跑。
2.Key 的区分度虽然不低,但是分布不均匀。比如以人的姓作为 shuffle key,虽然姓的值域理论上很多,但是某些大姓占据了大部分,那么处理这几个大姓的 instance 就会集中了比其他 instance 多得多的数据,耗时也会长很多。
3.数据中存在一些特殊值,比如使用 null、0 或者 ‘-’ 这类的特殊值来表示一些异常情况。这类特殊值可能在数据中占据非常大的比例,也会导致数据倾斜。
某些情况下,用户并没有意识到自己的数据里面存在一些特殊值或者热点值。此时可以通过一些简单的统计来分析。比如下面的 query:
select * from t join t2 on t.a = t2.a;
首先可以通过 explain,看出是按照哪列做 shuffle 的 key
...
In Task M1:
Data source: test_proj.t
TS: test_proj.t
FIL: ISNOTNULL(a)
RS: order: +
...
keys:
a
values:
...
In Task M3:
Data source: test_proj.t2
TS: test_proj.t2
FIL: ISNOTNULL(a)
RS: order: +
...
keys:
a
values:
...
通过一些简单的 aggregation,可以看到哪些 key 属于热点数据,如:
select a, count(*) from t group by a order by count(*) desc limit 100;
可以观察t表a列中,哪个数据出现频次比较高,比其他数据高很多。
【如何优化数据 Shuffle 导致的倾斜】这类数据倾斜,调整并行度是没有用的。一般可以从几个方面去考虑优化:
1.去掉 shuffle,没有了 shuffle,问题自然就消失了。
a).JOIN导致的shuffle:可以考虑使用 mapjoin 来取代 mergejoin,mapjoin 下大表是不需要做 reshuffle的,这样不仅仅消除了数据倾斜,还省了一次 shuffle 的时间。
b).GROUP BY, DISTRIBUTE BY,CLUSTERED BY, 窗口函数的PARTITION BY 等语法结构导致的shuffle:可以去看看这些操作是否真的必要,能不能去掉,这里并没有统一的解法。
2.换别的 shuffle key。在业务逻辑的允许下,更换 shuffle key 或者增加 shuffle key 的维度,能解决某些问题。比如人口数据,能够按照“姓名”来做 key,就不要用“姓“来做 key。
3.将热点数据特殊处理。比如先将出现数据倾斜的 key 和其他数据分开来,分别处理,最后再 union 起来。这个方案有时候可以简化,比如 join 的时候,特殊值有可能是已知 join 不上的,那么可以将特殊值做一些处理,把特殊值随机化。
非Shuffle导致的数据倾斜
有些时候,数据倾斜不一定是 shuffle 导致的,也可能是别的问题。这里列举一些常见情况:
1.数表数据不均匀。读表 instance 读取的数据量一般都差不多,但是也不排除会出现长尾的情况:
a).数据压缩率不同导致的数据不均匀。同样是压缩前的 256M 数据,有可能某些 instance 解压后是几百M,某些 instance 解压后好几 G。
b).数据文件数导致的不均匀。读取数据的效率并非和数据量成正比,打开文件,建立数据通道,关闭文件这些操作都是有消耗的。如果读取的文件数量过多,那么这些额外消耗照成的时间消耗也会上升。为了防止由于这个问题导致的数据倾斜,ODPS 会控制每个 instance 读取的小文件数,默认是 64M。
c).文件大小有区别,导致读同一数目文件,读入的数据量也会有较大变化。
2.选择率(Selectivity)不同导致的数据不均匀。比如有些 instance 经过 filter,只剩下少量的数据,某些 instance filter 过后,还剩下大量的数据,从而导致数据倾斜。
3.计算复杂度不均匀。有些操作,比如UDF,或者case when,对于某些数据值的计算量会比其他数据值多很多。这种情况下不同 instance 的数据量虽然差别也不大,但是可能计算量差很多,耗时也会差很多。
4.数据膨胀不均匀。有些操作会导致数据膨胀,比如 join,grouping sets aggregate,UDTF。这类操作可能在不同的 instance 下由于数据内容不一致,导致了膨胀率差别很大,最终也会导致数据倾斜。
5.继承的数据不均匀。有些情况下,数据倾斜并不是 task 本身产生的,而是从上游 task 带下来的。比如数据膨胀的 task 自身没有让膨胀率大的 instance 执行时间过长而受到关注,下游的 task 反而出现了异常。
【优化思路】这些数据倾斜,通常能通过调整并行度来进行一定的缓解。当然还有其他的:
1.如果是小文件太多,导致了读表 instance 的数据倾斜,那么可以考虑先对输入数据表做小文件合并:
alter table t merge smallfiles
2.Map 端每个 Instance 读入的数据量不均匀时,可以调整 ODPS 最多能读取的小文件数,进行小文件的合并使得读入数据量均匀。这个参数一般会和odps.sql.mapper.split.size 结合使用。
set xxx.sql.mapper.merge.limit.size=
-- 设定控制文件被合并的最大阈值,单位M,
-- 默认64M。区间[0,Integer.MAX_VALUE]set odps.sql.mapper.merge.limit.size=
-- 设定控制文件被合并的最大阈值,单位M,
-- 默认64M。区间[0,Integer.MAX_VALUE]
1.选择率不同导致的长尾或者计算复杂度不同导致的长尾,正常情况下不会太突出,如果出现了,可以先看看自己的程序是不是有 bug,比如 UDF 是不是在某些情况下有死循环。
2.不仅仅是数据倾斜,数据膨胀过大本身就有问题。
3.为防止上游的数据倾斜对下游产生影响,一个可能的做法是,通过使用distribute by rand() 来强制上游数据做reshuffle。
2.优化消耗资源
为了节省资源消耗,可以通过使用SQL新语法、新功能,合理设置资源参数等方式来进行优化。
2.1 MR典型场景用SQL实现
目前线上很多 MapReduce 程序功能都可以使用 SQL 新功能实现,MR 作业修改为 SQL 后表达更为直观,升级维护更为方便,SQL 执行计划经过多层优化后,基于代码生成器生成,在深度优化执行引擎 NATIVE 执行,性能和稳定性保障性更高。
1.使用 SQL-聚合函数
适用场景:对数据做汇总和合并操作的 MR 程序。
例如,安全需求统计每个电话设备呼入呼出号码及次数并且要求每个设备保存一条记录;MR 程序通过 MAP 读取到数据后按设备分组,REDUCE 对同设备汇总各类信息打标签统计合并输出。SQL 聚合函数可以表示为:
select k, WM_CONCAT(';',concat(v,":",c))
from ( select k, v, count(v) c from t group by k,v) t2
group by k;
类似有 COLLECT_LIST、COLLECT_SET等函数可以表示在给定 GROUP 内将 COL 指定的表达式聚合为数组。
2.用 SQL-窗口/分析函数
适用场景:聚合表达不了的作业。例如组内多条对比分析的作业可以考虑使用开窗函数,限制条件是分组后组内数据量不超过1亿。
例如,通讯软件如微信如果有需求要对每一条对话消息内容分析(打标签、回复类型判断等),MR 程序可以通过在 REDUCE 中对同组数据按时间排序,然后获取前后 1 条或 n 条记录与当前记录在一起分析;类似需求也可以用 SQL-开窗函数提供的 LEAD、LAG 等函数获取前后偏移的数据进行对比分析。
2.2 合理设置资源参数
尽量不要通过设置参数的方式进行优化,首先要看能否从业务上或者算法上减少数据和其他方式进行优化。
ODPS 处理一个任务主要分为三个阶段:Map、Reduce、Join。如果处理的数据量比较大,导致各个阶段的每个 instance 处理的时间比较长,在没有数据倾斜的情况下,可以通过设置下面的资源参数适当增加资源来加快处理速度。
2.2.1 Map端设置
set XXX.sql.mapper.cpu=10
作用:设置处理Map Task每个Instance的CPU数目,默认为100,在[50,800]之间调整。
场景:某些任务如果特别耗计算资源的话,可以适当调整cpu数目。对于大多数sql任务来说,一般不需要调整cpu个数的。
set XXX.sql.mapper.memory=1024
作用:设定Map Task每个Instance的Memory大小,单位M,默认1024M,在[256,12288]之间调整。
场景:当Map阶段的Instance有Writer Dumps时,可以适当的增加内存大小,减少Dumps所花的时间。
set XXX.sql.mapper.merge.limit.size=64
作用:设定控制文件被合并的最大阈值,单位M,默认64M,在[0,Integer.MAX_VALUE]之间调整。
场景:当Map端每个Instance读入的数据量不均匀时,可以通过设置这个变量值进行小文件的合并,使得每个Instance的读入文件均匀。一般会和
XXX.sql.mapper.split.size
这个参数结合使用。
set XXX.sql.mapper.split.size=256
作用:设定一个Map的最大数据输入量,可以通过设置这个变量达到对Map端输入的控制,单位M,默认256M,在[1,Integer.MAX_VALUE]之间调整。
场景:当每个Map Instance处理的数据量比较大,时间比较长,并且没有发生长尾时,可以适当调小这个参数。如果有发生长尾,则结合
XXX.sql.mapper.merge.limit.size
这个参数设置每个Map的输入数量。
2.2.2 Join设置
set XXX.sql.joiner.instances=-1
作用: 设定Join Task的Instance数量,默认为-1,在[0,2000]之间调整。
场景:每个Join Instance处理的数据量比较大,耗时较长,没有发生长尾,可以考虑增大使用这个参数。
set XXX.sql.joiner.cpu=100
作用: 设定Join Task每个Instance的CPU数目,默认为100,在[50,800]之间调整。
场景:某些任务如果特别耗计算资源的话,可以适当调整CPU数目。对于大多数SQL任务来说,一般不需要调整CPU。
set XXX.sql.joiner.memory=1024
作用:设定Join Task每个Instance的Memory大小,单位为M,默认为1024M,在[256,12288]之间调整。
场景:当Join阶段的Instance有Writer Dumps时,可以适当的增加内存大小,减少Dumps所花的时间。
2.2.3 Reduce端设置
set XXX.sql.reducer.instances=-1sql
作用: 设定Reduce Task的Instance数量,手动设置区间在[1,99999]之间调整。
场景:每个Join Instance处理的数据量比较大,耗时较长,没有发生长尾,可以考虑增大使用这个参数。
set XXX.sql.reducer.cpu=100
作用:设定处理Reduce Task每个Instance的Cpu数目,默认为100,在[50,800]之间调整。
场景:某些任务如果特别耗计算资源的话,可以适当调整Cpu数目。对于大多数Sql任务来说,一般不需要调整Cpu。
set XXX.sql.reducer.memory=1024
作用:设定Reduce Task每个Instance的Memory大小,单位M,默认1024M,在[256,12288]之间调整。
场景:当Reduce阶段的Instance有Writer Dumps时,可以适当的增加内存的大小,减少Dumps所花的时间。