引言
在探究 Kafka 核心知识之前,我们先思考一个问题:什么场景会促使我们使用Kafka? 说到这里,我们头脑中或多或少会蹦出异步解耦和削峰填谷等字样,是的,这就是 Kafka 最重要的落地场景。
-
异步解耦:同步调用转换成异步消息通知,实现生产者和消费者的解耦。想象一个场景,在商品交易时,在订单创建完成之后,需要触发一系列其他的操作,比如进行用户订单数据的统计、给用户发送短信、给用户发送邮件等等。如果所有操作都采用同步方式实现,将严重影响系统性能。针对此场景,我们可以利用消息中间件解耦订单创建操作和其他后续行为。
-
削峰填谷:利用 Broker 缓冲上游生产者瞬时突发的流量,使消费者消费流量整体平滑。对于发送能力很强的上游系统,如果没有消息中间件的保护,下游系统可能会直接被压垮导致全链路服务雪崩。想象秒杀业务场景,上游业务发起下单请求,下游业务执行秒杀业务(库存检查,库存冻结,余额冻结,生成订单等等),下游业务处理的逻辑是相当复杂的,并发能力有限,如果上游服务不做限流策略,瞬时可能把下游服务压垮。针对此场景,我们可以利用 MQ 来做削峰填谷,让高峰流量填充低谷空闲资源,达到系统资源的合理利用。
通过上述例子可以发现交易、支付等场景常需要异步解耦和削峰填谷功能解决问题,而交易、支付等场景对性能、可靠性要求特别高。那么,我们本文的主角Kafka能否满足相应要求呢?下面我们来探讨下。
Kafka 宏观认知
在探究 Kafka 的高性能、高可靠性之前,我们从宏观上来看下 Kafka 的系统架构
如上图所示,Kafka 由 Producer、Broker、Consumer 以及负责集群管理的 ZooKeeper 组成,各部分功能如下:
- Producer: 生产者,负责消息的创建并通过一定的路由策略发送消息到合适的 Broker;
- Broker:服务实例,负责消息的持久化、中转等功能;
- Consumer :消费者,负责从 Broker 中拉取(Pull)订阅的消息并进行消费,通常多个消费者构成一个分组,消息只能被同组中的一个消费者消费;
- ZooKeeper:负责 Broker、Consumer 集群元数据的管理等;(注意:Producer 端直接连接 Broker,不在 ZK 上存任何数据,只是通过 ZK 监听 Broker 和 Topic 等信息)
上图消息流转过程中,还有几个特别重要的概念—主题(Topic)、分区(Partition)、分段(Segment)、位移(Offset)。
- topic:消息主题。Kafka 按 Topic 对消息进行分类,我们在收发消息时只需指定 Topic。
- partition:分区。为了提升系统的吞吐,一个 Topic 下通常有多个 Partition,Partition 分布在不同的 Broker 上,用于存储 Topic 的消息,这使 Kafka 可以在多台机器上处理、存储消息,给 Kafka 提供给了并行的消息处理能力和横向扩容能力。另外,为了提升系统的可靠性,Partition 通常会分组,且每组有一个主 Partition、多个副本 Partition,且分布在不同的 Broker上,从而起到容灾的作用。
- Segment:分段。宏观上看,一个 Partition 对应一个日志(Log)。由于生产者生产的消息会不断追加到 Log 文件末尾,为防止 Log 文件过大导致数据检索效率低下,Kafka 采取了分段和索引机制,将每个 Partition 分为多个 Segment,同时也便于消息的维护和清理。每个 Segment 包含一个.log日志文件、两个索引(.index、timeindex)文件以及其他可能的文件。每个 Segment 的数据文件以该段中最小的 Offset 为文件名,当查找 Offset 的 Message 的时候,通过二分查找快找到 Message 所处于的 Segment 中。
- Offset:消息在日志中的位置,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量。Offset 是消息在分区中的唯一标识,是一个单调递增且不变的值。Kafka 通过它来保证消息在分区内的顺序性,不过 Offset 并不跨越分区,也就是说,Kafka 保证的是分区有序而不是主题有序。
Kafka 高可靠性、高性能探究
在对 Kafka 的整体系统框架及相关概念简单了解后,下面我们来进一步深入探讨下高可靠性、高性能实现原理。
Kafka 高可靠性探究
Kafka 高可靠性的核心是保证消息在传递过程中不丢失,涉及如下核心环节:
- 消息从生产者可靠地发送至 Broker;– 网络、本地丢数据
- 发送到 Broker 的消息可靠持久化; — Pagecache 缓存落盘、单点崩溃、主从同步跨网络
- 消费者从 Broker 消费到消息且最好只消费一次 — 跨网络消息传输
消息从生产者可靠地发送至 Broker
为了保障消息从生产者可靠地发送至 Broker,我们需要确保两点
1. Producer 发送消息后,能够收到来自 Broker 的消息保存成功 Ack;
2. Producer 发送消息后,能够捕获超时、失败Ack等异常Ack并做处理;
Ack 策略
针对问题1,Kafka 为我们提供了三种 Ack 策略,
- Request.required.acks = 0:请求发送即认为成功,不关心有没有写成功,常用于日志进行分析场景。
- Request.required.acks = 1:当 Leader partition 写入成功以后,才算写入成功,有丢数据的可能。
- Request.required.acks= -1:ISR列表里面的所有副本都写完以后,这条消息才算写入成功,强可靠性保证。
为了实现强可靠的 Kafka 系统,我们需要设置Request.required.acks= -1,同时还会设置集群中处于正常同步状态的副本 Follower 数量 min.insync.replicas>2,另外,设置unclean.leader.election.enable=false 使得集群中 ISR 的 Follower 才可变成新的 Leader,避免特殊情况下消息截断的出现。
消息发送策略
针对问题2,Kafka 提供两类消息发送方式:同步(Sync)发送和异步(Async)发送,相关参数如下:
参数 | 含义 | 默认值 | 推荐值 |
---|---|---|---|
async | 是否进行异步生产,可选值: 0:同步生产,默认值 1:异步生产 |
0 | 一般情况下使用0,如果使用异步生产,需要通过channel捕捉消息生产失败的情况,并进行异步修复处理,逻辑会相对复杂。 对于延时比较敏感,避免生产消息导致耗时过高的场景可以考虑异步。 |
以 Sarama 实现为例,在消息发送的过程中,无论是同步发送还是异步发送都会涉及到两个协程–负责消息发送的主协程和负责消息分发的 Dispatcher 协程。
异步发送
对于异步发送(Ack != 0 场景,等于0时不关心写 Kafka 结果,后文详细讲解)而言,其流程大概如下
1. 在主协程中调用异步发送 Kafka 消息的时候,其本质是将消息体放进了一个 Input 的 Channel,只要入 Channel 成功,则这个函数直接返回,不会产生任何阻塞。相反,如果入 Channel 失败,则会返回错误信息。因此调用 Async 写入的时候返回的错误信息是入 Channel 的错误信息,至于具体最终消息有没有发送到 Kafka 的 Broker,我们无法从返回值得知。
2. 当消息进入 Input 的 Channel 后,会有另一个 Dispatcher 的协程负责遍历 Input,来真正发送消息到特定 Broker 上的主 Partition 上。发送结果通过一个异步协程进行监听,循环处理 Err Channel 和 Success Channel,出现了 Error 就记一个日志。因此异步写入场景时,写 Kafka 的错误信息,我们暂时仅能够从这个错误日志来得知具体发生了什么错,并且也不支持我们自建函数进行兜底处理,这一点在 trpc-go 的官方也得到了承认。
同步发送
同步发送(Ack != 0 场景)是在异步发送的基础上加以条件限制实现的。同步消息发送在newSyncProducerFromAsyncProducer 中开启两个异步协程处理消息成功与失败的“回调”,并使用 WaitGroup 进行等待,从而将异步操作转变为同步操作。其流程大概如下
通过上述分析可以发现,Kafka 消息发送本质上都是异步的,不过同步发送通过 WaitGroup 将异步操作转变为同步操作。同步发送在一定程度上确保了我们在跨网络向 Broker 传输消息时,消息一定可以可靠地传输到 Broker。因为在同步发送场景我们可以明确感知消息是否发送至 Broker,若因网络抖动、机器宕机等故障导致消息发送失败或结果不明,可通过重试等手段确保消息至少一次(at least once) 发送到Broker。另外,Kafka(0.11.0.0版本后)还为 Producer 提供两种机制来实现精确一次(exactly once) 消息发送:幂等性(Idempotence)和事务(Transaction)。
| 类别 | 开启 | 特征 | 实现原理 |注意事项 |
| :— | :— | :—|:—|:—|
| 幂等性 Producer | enable.idempotence=true | 1. 单分区幂等性:只能保证单分区上幂等性,无法实现多个分区的幂等。 2. 单会话幂等性:只能实现单会话上的冥等性,当 Producer 重启后,这种幂等性保证就失效了。 | 为每个 Producer 设置唯一的 PID,Broker 发送消息时会带上 PID 并为每个消息生产一个 Seq number,Broker 端根据 PID 和 Seq number 去重 |Broker 端不会重复写入同一 PID 的 Producer 发送的相同的消息,底层日志中只会持久化一次。 |
| 事务型 Producer |enable.idempotence=true,生产者设置 Transcational.id;若要实现 consumer-transform-producer,还需设置 Consumer 端参数 Isolation.level=read_committed| 1. 跨分区事务:能够保证将消息原子性地写入到多个分区中。2. 跨会话的事务恢复:如果一个应用实例挂了,启动的下一个实例依然可以保证上一个事务完成(Commit 或者 Abort)。 |1. Kafka 引入了一个协调者组件 TransactionCoordinator 来管理 Transaction。Producer 在开始一个事务时,告诉协调者事务开始,然后开始向多个 Topic-Partition 写数据,只有这批数据全部写完(中间没有出现异常),Producer 会调用 Commit 接口进行 Commit,然后事务真正提交,否则如果中间出现异常,那么事务将会被 Abort(Producer 通过 Abort 接口告诉协调者执行 Abort 操作)。2. 引入一个全局唯一的 Transaction ID,并将 Producer 获得的 PID 和 Transaction ID 绑定。这样当 Producer 重启后就可以通过正在进行的Transaction ID 获得原来的 PID。3. TransactionCoordinator 还负责将事务状态写入 Kafka 的一个内部 Topic: __transaction_state 中,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。 |1. 开启事务对性能影响很大,在使用时要充分考虑。2. 当具有相同 Transaction ID 的新的 Producer 实例被创建且工作时,旧的且拥有相同 Transaction ID 的 Producer 将不再工作。 |
小结
通过 Ack 策略配置、同步发送、事务消息组合能力,我们可以实现 exactly once 语意跨网络向 Broker 传输消息。但是,Producer 收到 Broker 的成功 Ack,消息一定不会丢失吗?为了搞清这个问题,我们首先要搞明白 Broker 在接收到消息后做了哪些处理。
发送到 Broker 的消息可靠持久化
为了确保 Producer 收到 Broker 的成功 Ack 后,消息一定不在 Broker 环节丢失,我们核心要关注以下几点:
– Broker 返回 Producer 成功 Ack 时,消息是否已经落盘;
– Broker 宕机是否会导致数据丢失,容灾机制是什么;
– Replica 副本机制带来的多副本间数据同步一致性问题如何解决;
Broker 异步刷盘机制
Kafka 为了获得更高吞吐,Broker 接收到消息后只是将数据写入 PageCache 后便认为消息已写入成功,而 PageCache 中的数据通过 Linux 的 Flusher 程序进行异步刷盘(刷盘触发条:主动调用 Sync 或 Fsync 函数、可用内存低于阀值、Dirty Data 时间达到阀值),将数据顺序写到磁盘。消息处理示意图如下:
由于消息是写入到 PageCache,单机场景,如果还没刷盘 Broker 就宕机了,那么 Producer 产生的这部分数据就可能丢失。为了解决单机故障可能带来的数据丢失问题,Kafka 为分区引入了副本机制。
Replica副本机制
Kafka 每组分区通常有多个副本,同组分区的不同副本分布在不同的 Broker 上,保存相同的消息(可能有滞后)。副本之间是“一主多从”的关系,其中 Leader 副本负责处理读写请求,Follower 副本负责从 Leader 拉取消息进行同步。分区的所有副本统称为 AR(Assigned Replicas),其中所有与 Leader 副本保持一定同步的副本(包括 Leader 副本在内)组成 ISR(In-Sync Replicas),与 Leader 同步滞后过多的副本组成 OSR(Out-of-Sync Replicas),由此可见,AR=ISR+OSR。Follower 副本是否与 Leader 同步的判断标准取决于 Broker 端参数 replica.lag.time.max.ms(默认为10秒),Follower 默认每隔500ms向 Leader Fetch 一次数据,只要一个 Follower 副本落后 Leader 副本的时间不连续超过10秒,那么 Kafka 就认为该 Follower 副本与 Leader 是同步的。在正常情况下,所有的 Follower 副本都应该与 Leader 副本保持一定程度的同步,即 AR=ISR,OSR 集合为空。
当 Leader 副本所在 Broker 宕机时,Kafka 会借助ZK从 Follower 副本中选举新的 Leader 继续对外提供服务,实现故障的自动转移,保证服务可用。为了使选举的新 Leader 和旧 Leader 数据尽可能一致,当 Leader 副本发生故障时,默认情况下只有在 ISR 集合中的副本才有资格被选举为新的 Leader,而在 OSR 集合中的副本则没有任何机会(可通过设置 unclean.leader.election.enable 改变)。
当 Kafka 通过多副本机制解决单机故障问题时,同时也带来了多副本间数据同步一致性问题。Kafka 通过高水位更新机制、副本同步机制、 Leader Epoch 等多种措施解决了多副本间数据同步一致性问题,下面我们来依次看下这几大措施。
HW 和 LEO
首先,我们来看下两个和 Kafka 中日志相关的重要概念 HW 和 LEO:
- HW: High Watermark,高水位,表示已经提交(Commit)的最大日志偏移量,Kafka 中某条日志“已提交”的意思是 ISR 中所有节点都包含了此条日志,并且消费者只能消费 HW 之前的数据;
- LEO: Log End Offset,表示当前 Log 文件中下一条待写入消息的 Offset;
如上图所示,它代表一个日志文件,这个日志文件中有8条消息,0至5之间的消息为已提交消息,5至7的消息为未提交消息。日志文件的 HW 为6,表示消费者只能拉取到5之前的消息,而 Offset 为5的消息对消费者而言是不可见的。日志文件的 LEO 为8,下一条消息将在此处写入。
注意:所有副本都有对应的 HW 和 LEO,只不过 Leader 副本比较特殊,Kafka 使用 Leader 副本的高水位来定义所在分区的高水位。换句话说,分区的高水位就是其 Leader 副本的高水位。Leader 副本和 Follower 副本的HW有如下特点:
- Leader HW:min(所有副本 LEO),为此 Leader 副本不仅要保存自己的 HW 和 LEO,还要保存 Follower 副本的 HW 和 LEO,而 Follower 副本只需保存自己的 HW 和 LEO;
- Follower HW:min(Follower 自身 LEO,Leader HW)。
注意:为方便描述,下面 Leader HW 简记为 HW L ,Follower HW 简记为 HW F ,Leader LEO 简记为 LEOL ,Follower LEO 简记为 LEO F。
下面我们演示一次完整的 HW / LEO 更新流程:
1. 初始状态
HW L=0,LEO L=0,HW F=0,LEO F=0。
2. Follower 第一次 Fetch
- Leader 收到 Producer 发来的一条消息完成存储, 更新 LEO L=1;
- Follower从 Leader Fetch 数据, Leader 收到请求,记录 Follower 的 LEO F =0,并且尝试更新 HW L =min(全部副本 LEO)=0;
- Leader 返回 HW L=0 和 LEO L=1 给 Follower,Follower 存储消息并更新 LEO F =1, HW=min(LEO F,HW L)=0。
3. Follower 第二次 Fetch
- Follower 再次从 Leader Fetch 数据, Leader 收到请求,记录 Follower 的 LEO F =1,并且尝试更新 HW L =min(全部副本 LEO)=1;
- leader 返回 HW L=1和LEO L=1 给 Follower,Leader 收到请求,更新自己的 HW=min(LEO F,HW L)=1。
上述更新流程中 Follower 和 Leader 的 HW 更新有时间 GAP。如果 Leader 节点在此期间发生故障,则 Follower 的 HW 和 Leader 的 HW 可能会处于不一致状态,如果 Follower 被选为新的 Leader 并且以自己的 HW 为准对外提供服务,则可能带来数据丢失或数据错乱问题。
KIP-101 问题:数据丢失&数据错乱
数据丢失
第1步:
1. 副本 B 作为 Leader 收到 Producer 的 m2 消息并写入本地文件,等待副本 A 拉取。
2. 副本 A 发起消息拉取请求,请求中携带自己的最新的日志 Offset(LEO=1),B 收到后更新自己的 HW 为1,并将 HW=1 的信息以及消息 m2 返回给 A。
3. A 收到拉取结果后更新本地的 HW 为1,并将 m2 写入本地文件。发起新一轮拉取请求(LEO=2),B 收到 A 拉取请求后更新自己的 HW 为2,没有新数据只将 HW=2 的信息返回给 A,并且回复给 Producer 写入成功。此处的状态就是图中第一步的状态。
第2步:
此时,如果没有异常,A 会收到 B 的回复,得知目前的 HW 为2,然后更新自身的 HW 为2。但在此时 A 重启了,没有来得及收到 B 的回复,此时 B 仍然是 Leader。A 重启之后会以 HW 为标准截断自己的日志,因为 A 作为 Follower 不知道多出的日志是否是被提交过的,防止数据不一致从而截断多余的数据并尝试从 Leader 那里重新同步
第3步:
B 崩溃了,min.isr 设置的是1,所以 Zookeeper 会从 ISR 中再选择一个作为 Leader,也就是 A,但是 A 的数据不是完整的,从而出现了数据丢失现象。
问题在哪里?在于 A 重启之后以 HW 为标准截断了多余的日志。不截断行不行?不行,因为这个日志可能没被提交过(也就是没有被 ISR 中的所有节点写入过),如果保留会导致日志错乱。
数据错乱
在分析日志错乱的问题之前,我们需要了解到 Kafka 的副本可靠性保证有一个前提:在 ISR 中至少有一个节点。如果节点均宕机的情况下,是不保证可靠性的,在这种情况会出现数据丢失,数据丢失是可接受的。这里我们分析的问题比数据丢失更加槽糕,会引发日志错乱甚至导致整个系统异常,而这是不可接受的。
第1步:
1. A 和 B 均为 ISR 中的节点。副本 A 作为 Leader,收到 Producer 的消息 m2 的请求后写入 PageCache 并在某个时刻刷新到本地磁盘。
2. 副本 B 拉取到 m2 后写入 PageCage 后(尚未刷盘) 再次去 A 中拉取新消息并告知 A 自己的 LEO=2,A 收到更新自己的 HW 为1并回复给 Producer 成功。
3. 此时 A 和 B 同时宕机,B 的 m2 由于尚未刷盘,所以 m2 消息丢失。此时的状态就是第1步的状态。
第2步:
由于 A 和 B 均宕机,而 min.isr=1 并且 unclean.leader.election.enable=true(关闭 Unclean 选择策略),所以 Kafka 会等到第一个 ISR 中的节点恢复并选为 Leader,这里不幸的是 B 被选为 Leader,而且还接收到 Producer 发来的新消息 m3。注意,这里丢失 m2 消息是可接受的,毕竟所有节点都宕机了。
第3步:
A 恢复重启后发现自己是 Follower,而且 HW 为2,并没有多余的数据需要截断,所以开始和 B 进行新一轮的同步。但此时 A 和 B 均没有意识到,Offset 为1的消息不一致了。
问题在哪里?在于日志的写入是异步的,上面也提到 Kafka 的副本策略的一个设计是消息的持久化是异步的,这就会导致在场景二的情况下被选出的 Leader 不一定包含所有数据,从而引发日志错乱的问题。
Leader Epoch
为了解决上述缺陷,Kafka 引入了 Leader Epoch 的概念。Leader Epoch 和 Raft 中的任期号的概念很类似,每次重新选择 Leader 的时候,用一个严格单调递增的 ID 来标志,可以让所有 Follower 意识到 Leader 的变化。而 Follower 也不再以 HW 为准,每次奔溃重启后都需要去 Leader 那边确认下当前 Leader 的日志是从哪个 Offset 开始的。下面看下 Leader Epoch 是如何解决上面两个问题的。
数据丢失解决
这里的关键点在于副本 A 重启后作为 Follower,不是忙着以 HW 为准截断自己的日志,而是先发起 LeaderEpochRequest 询问副本 B 第0代的最新的偏移量是多少,副本 B 会返回自己的 LEO 为2给副本 A,A 此时就知道消息 m2 不能被截断,所以 m2 得到了保留。当 A 选为 Leader 的时候就保留了所有已提交的日志,日志丢失的问题得到解决。
如果发起 LeaderEpochRequest 的时候就已经挂了怎么办?这种场景下,不会出现日志丢失,因为副本 A 被选为 Leader 后不会截断自己的日志,日志截断只会发生在 Follower 身上。
数据错乱解决
这里的关键点还是在第3步,副本 A 重启作为 Follower 的第一步还是需要发起 LeaderEpochRequest 询问 Leader 当前第0代最新的偏移量是多少,由于副本 B 已经经过换代,所以会返回给 A 第1代的起始偏移(也就是1),A 发现冲突后会截断自己偏移量为1的日志,并重新开始和 Leader 同步。副本 A 和副本 B 的日志达到了一致,解决了日志错乱。
小结
Broker 接收到消息后只是将数据写入 PageCache 后便认为消息已写入成功,但是,通过副本机制并结合 ACK 策略可以大概率规避单机宕机带来的数据丢失问题,并通过 HW、副本同步机制、 Leader Epoch 等多种措施解决了多副本间数据同步一致性问题,最终实现了 Broker 数据的可靠持久化。
消费者从Broker消费到消息且最好只消费一次
Consumer 在消费消息的过程中需要向 Kafka 汇报自己的位移数据,只有当 Consumer 向 Kafka 汇报了消息位移,该条消息才会被 Broker 认为已经被消费。因此,Consumer 端消息的可靠性主要和 Offset 提交方式有关,Kafka消费端提供了两种消息提交方式:
| 提交方式 | 参数设置 | 功能特点 |注意事项 |
| :— | :— | :—| :—|
| 自动提交 | enable.auto.commit = true,auto.commit.interval.ms(提交间隔) | Kafka Consumer 在后台默默地为你提交位移,开发者无需在代码中显示提交 | Consumer 收到消息就返回正确给 Brocker,如果业务逻辑没有走完中断了,即消息没有消费成功,则相关消息可能丢失。这种场景适用于可靠性要求不高的业务。 |
| 手动提交 | enable.auto.commit = false | 开发者需要在代码中自己提交位移,Kafka Consumer 压根不管 | 该模式下,业务开发者需要在消息被处理完成后进行手动提交。如果消息处理完成后还没来得及提价位移,系统发生重启,则之前消费到未提交的消息会重新消费到,即消息会投递多次。因此,消费侧需要做幂等保障。 |
正常情况下我们很难实现 exactly once 语意的消息,通常是通过手动提交+幂等实现消息的可靠消费。
Kafka高性能探究
Kafka 高性能的核心是保障系统低延迟、高吞吐地处理消息,为此,Kafaka 采用了许多精妙的设计:
- 异步发送
- 批量发送
- 压缩技术
- Pagecach机制&顺序追加落盘
- 零拷贝
- 稀疏索引
- Broker & 数据分区
- 多 Reactor 多线程网络模型
异步发送
如上文所述,Kafka 提供了异步和同步两种消息发送方式。在异步发送中,整个流程都是异步的。调用异步发送方法后,消息会被写入 Channel,然后立即返回成功。Dispatcher 协程会从 Channel 轮询消息,将其发送到 Broker,同时会有另一个异步协程负责处理 Broker 返回的结果。同步发送本质上也是异步的,但是在处理结果时,同步发送通过 WaitGroup 将异步操作转换为同步。使用异步发送可以最大化提高消息发送的吞吐能力。
批量发送
Kafka 支持批量发送消息,将多个消息打包成一个批次进行发送,从而减少网络传输的开销,提高网络传输的效率和吞吐量。
Kafka 的批量发送消息是通过以下两个参数来控制的:
1. Batch.size:控制批量发送消息的大小,默认值为16KB,可适当增加 Batch.size 参数值提升吞吐。但是,需要注意的是,如果批量发送的大小设置得过大,可能会导致消息发送的延迟增加,因此需要根据实际情况进行调整。
2. linger.ms:控制消息在批量发送前的等待时间,默认值为0。当Linger.ms 大于0时,如果有消息发送,Kafka 会等待指定的时间,如果等待时间到达或者批量大小达到 Batch.size,就会将消息打包成一个批次进行发送。可适当增加 Linger.ms 参数值提升吞吐,比如10~100。
在 Kafka 的生产者客户端中,当发送消息时,如果启用了批量发送,Kafka 会将消息缓存到缓冲区中。当缓冲区中的消息大小达到 Batch.size 或者等待时间到达 Linger.ms 时,Kafka 会将缓冲区中的消息打包成一个批次进行发送。如果在等待时间内没有达到 Batch.size,Kafka 也会将缓冲区中的消息发送出去,从而避免消息积压。
压缩技术
Kafka 支持压缩技术,通过将消息进行压缩后再进行传输,从而减少网络传输的开销(压缩和解压缩的过程会消耗一定的 CPU 资源,因此需要根据实际情况进行调整。),提高网络传输的效率和吞吐量。Kafka 支持多种压缩算法,在 Kafka2.1.0 版本之前,仅支持 GZIP,Snappy 和 LZ4,2.1.0 后还支持 Zstandard 算法(Facebook 开源,能够提供超高压缩比)。这些压缩算法性能对比(两指标都是越高越好)如下:
- 吞吐量:LZ4>Snappy>zstd和GZIP,压缩比:zstd>LZ4>GZIP>Snappy。
在 Kafka 中,压缩技术是通过以下两个参数来控制的:
1. Compression.type:控制压缩算法的类型,默认值为 None,表示不进行压缩。
2. Compression.level:控制压缩的级别,取值范围为0-9,默认值为-1。当值为-1时,表示使用默认的压缩级别。
在 Kafka 的生产者客户端中,当发送消息时,如果启用了压缩技术,Kafka 会将消息进行压缩后再进行传输。在消费者客户端中,如果消息进行了压缩,Kafka 会在消费消息时将其解压缩。注意:Broker 如果设置了和生产者不通的压缩算法,接收消息后会解压后重新压缩保存。Broker 如果存在消息版本兼容也会触发解压后再压缩。
Pagecache机制&顺序追加落盘
Kafka 为了提升系统吞吐、降低时延,Broker 接收到消息后只是将数据写入 PageCache 后便认为消息已写入成功,而 PageCache 中的数据通过 Linux 的 Flusher 程序进行异步刷盘(避免了同步刷盘的巨大系统开销),将数据顺序追加写到磁盘日志文件中。由于 Pagecache 是在内存中进行缓存,因此读写速度非常快,可以大大提高读写效率。顺序追加写充分利用顺序 I/O 写操作,避免了缓慢的随机 I/O 操作,可有效提升 Kafka 吞吐。
如上图所示,消息被顺序追加到每个分区日志文件的尾部。
零拷贝
Kafka 中存在大量的网络数据持久化到磁盘(Producer 到 Broker)和磁盘文件通过网络发送(Broker 到 Consumer)的过程,这一过程的性能直接影响 Kafka 的整体吞吐量。传统的 IO 操作存在多次数据拷贝和上下文切换,性能比较低。Kafka 利用零拷贝技术提升上述过程性能,其中网络数据持久化磁盘主要用 mmap 技术,网络数据传输环节主要使用 Sendfile 技术。
网络数据持久化磁盘之 mmap
传统模式下,数据从网络传输到文件需要 4 次数据拷贝、4 次上下文切换和两次系统调用。如下图所示
为了减少上下文切换以及数据拷贝带来的性能开销,Broker 在对 Producer 传来的网络数据进行持久化时使用了 mmap 技术。通过这种技术手段,Broker 读取到 Socket Buffer 的网络数据,可以直接在内核空间完成落盘,没有必要将 Socket Buffer 的网络数据读取到应用进程缓冲区。
网络数据传输之 Sendfile
传统方式实现:先读取磁盘、再用 Socket 发送,实际也是进过四次 Copy。如下图所示:
为了减少上下文切换以及数据拷贝带来的性能开销,Kafka 在 Consumer 从 Broker 读数据过程中使用了 Sendfile 技术。具体在这里采用的方案是通过 NIO 的 transferTo/transferFrom
调用操作系统的 Sendfile 实现零拷贝。总共发生 2 次内核数据拷贝、2 次上下文切换和一次系统调用,消除了 CPU 数据拷贝,如下:
稀疏索引
为了方便对日志进行检索和过期清理,Kafka 日志文件除了有用于存储日志的.log文件,还有一个位移索引文件.index 和一个时间戳索引文件.timeindex 文件,并且三文件的名字完全相同,如下
Kafka 的索引文件是按照稀疏索引的思想进行设计的。稀疏索引的核心是不会为每个记录都保存索引,而是写入一定的记录之后才会增加一个索引值,具体这个间隔有多大则通过 log.index.interval.bytes 参数进行控制,默认大小为 4 KB,意味着 Kafka 至少写入 4KB 消息数据之后,才会在索引文件中增加一个索引项。可见,单条消息大小会影响 Kakfa 索引的插入频率,因此 log.index.interval.bytes 也是 Kafka 调优一个重要参数值。由于索引文件也是按照消息的顺序性进行增加索引项的,因此 Kafka 可以利用二分查找算法来搜索目标索引项,把时间复杂度降到了 O(lgN),大大减少了查找的时间。
位移索引文件.index
位移索引文件的索引项结构如下:
相对位移:保存于索引文件名字上面的起始位移的差值,假设一个索引文件为:00000000000000000100.index,那么起始位移值即 100,当存储位移为 150 的消息索引时,在索引文件中的相对位移则为 150 – 100 = 50,这么做的好处是使用 4 字节保存位移即可,可以节省非常多的磁盘空间。
文件物理位置:消息在 log 文件中保存的位置,也就是说 Kafka 可根据消息位移,通过位移索引文件快速找到消息在 Log 文件中的物理位置,有了该物理位置的值,我们就可以快速地从 log 文件中找到对应的消息了。
下面我用图来表示 Kafka 是如何快速检索消息:
假设 Kafka 需要找出位移为 3550 的消息,那么 Kafka 首先会使用二分查找算法找到小于 3550 的最大索引项:[3528, 2310272],得到索引项之后,Kafka 会根据该索引项的文件物理位置在 Log 文件中从位置 2310272 开始顺序查找,直至找到位移为 3550 的消息记录为止。
时间戳索引文件.timeindex
Kafka 在 0.10.0.0 以后的版本当中,消息中增加了时间戳信息,为了满足用户需要根据时间戳查询消息记录,Kafka 增加了时间戳索引文件,时间戳索引文件的索引项结构如下:
时间戳索引文件的检索与位移索引文件类似,如下快速检索消息示意图:
Broker & 数据分区
Kafka 集群包含多个 Broker。一个 Topic 下通常有多个 Partition,Partition 分布在不同的 Broker 上,用于存储 Topic 的消息,这使 Kafka 可以在多台机器上处理、存储消息,给 Kafka 提供给了并行的消息处理能力和横向扩容能力。
多 Reactor 多线程网络模型
多 Reactor 多线程网络模型 是一种高效的网络通信模型,可以充分利用多核 CPU 的性能,提高系统的吞吐量和响应速度。Kafka 为了提升系统的吞吐,在 Broker 端处理消息时采用了该模型,示意如下:
SocketServer 和 KafkaRequestHandlerPool 是其中最重要的两个组件:
- SocketServer:实现 Reactor 模式,用于处理多个 Client(包括客户端和其他 Broker 节点)的并发请求,并将处理结果返回给 Client。
- KafkaRequestHandlerPool:Reactor 模式中的 Worker 线程池,里面定义了多个工作线程,用于处理实际的 I/O 请求逻辑。
整个服务端处理请求的流程大致分为以下几个步骤:
1. Acceptor 接收客户端发来的请求
2. 轮询分发给 Processor 线程处理
3. Processor 将请求封装成 Request 对象,放到 RequestQueue 队列
4. KafkaRequestHandlerPool 分配工作线程,处理 RequestQueue 中的请求
5. KafkaRequestHandler 线程处理完请求后,将响应 Response 返回给 Processor 线程
6. Processor 线程将响应返回给客户端
其他知识探究
负载均衡
生产者负载均衡
Kafka 生产端的负载均衡主要指如何将消息发送到合适的分区。Kafka 生产者生产消息时,根据分区器将消息投递到指定的分区中,所以 Kafka 的负载均衡很大程度上依赖于分区器。Kafka 默认的分区器是 Kafka 提供的 DefaultPartitioner。它的分区策略是根据 Key 值进行分区分配的:
- 如果 key 不为 null:对 Key 值进行 Hash 计算,从所有分区中根据 Key 的 Hash 值计算出一个分区号;拥有相同 Key 值的消息被写入同一个分区,顺序消息实现的关键;
- 如果 key 为 null:消息将以轮询的方式,在所有可用分区中分别写入消息。
- 如果不想使用 Kafka 默认的分区器,用户可以实现 Partitioner 接口,自行实现分区方法。
消费者负载均衡
在 Kafka 中,每个分区(Partition)只能由一个消费者组中的一个消费者消费。当消费者组中有多个消费者时,Kafka 会自动进行负载均衡,将分区均匀地分配给每个消费者。在 Kafka 中,消费者负载均衡算法可以通过设置消费者组的 Partition.assignment.strategy 参数来选择。目前主流的分区分配策略以下几种:
- Range: 在保证均衡的前提下,将连续的分区分配给消费者,对应的实现是 RangeAssignor;
- Round-robin:在保证均衡的前提下,轮询分配,对应的实现是 RoundRobinAssignor;
- 0.11.0.0版本引入了一种新的分区分配策略 StickyAssignor,其优势在于能够保证分区均衡的前提下尽量保持原有的分区分配结果,从而避免许多冗余的分区分配操作,减少分区再分配的执行时间。
集群管理
Kafka 借助 ZooKeeper 进行集群管理。Kafka 中很多信息都在ZK中维护,如 Broker 集群信息、Consumer 集群信息、 Topic 相关信息、 Partition 信息等。Kafka 的很多功能也是基于 ZK 实现的,如 Partition 选主、Broker 集群管理、Consumer 负载均衡等,限于篇幅本文将不展开陈述,这里先附一张网上截图大家感受下:
参考文献
1. https://www.cnblogs.com/arvinhuang/p/16437948.html
2. https://segmentfault.com/a/1190000039133960
3. http://matt33.com/2018/11/04/kafka-transaction/
4. https://blog.51cto.com/u_14020077/5836698
5. https://t1mek1ller.github.io/2020/02/15/kafka-leader-epoch/
6. https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation
7. https://xie.infoq.cn/article/c06fea629926e2b6a8073e2f0
8. https://xie.infoq.cn/article/8191412c8da131e78cbfa6600
9. https://mp.weixin.qq.com/s/iEk0loXsKsMO_OCVlUsk2Q
10. https://cloud.tencent.com/developer/article/1657649
11. https://www.cnblogs.com/vivotech/p/16347074.html