创建消费者
在读取消息之前,需要先创建一个 KafkaConsumer 对象。
创建 KafkaConsumer 对象与创建 KafkaProducer 对象非常相似——把想要传给消费者的属性放在 Properties 对象里,后面深入讨论所有属性。这里我们只需要使用 3 个必要的属性:bootstrap. servers、key.deserializer 和 value.deserializer。
- bootstrap.servers指定了Kafka 集群的连接字符串。它的用途与在KafkaProducer中的用途是一样的。
- 另外两个属性 key.deserializer 和 value.deserializer 与生产者的 serializer 定义也很类似,不过它们不是使 用指定的类把 Java 对象转成字节数组,而是使用指定的类把字节数组转成Java对象。
第4个属性 group.id 不是必需的,它指定了 KafkaConsumer 属于哪一个消费者群组。
创建不属于任何一个群组的消费者也是可以的,只是这样做不太常见,一般消费者都要属于一个消费者群组。
下面看下如何创建一个 KafkaConsumer 对象:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumerString, String> consumer = new KafkaConsumerString, String>(props);
上面代码和生产者的代码类似。我们假设消费的键和值都是字符串类型,所以使用的是内置StringDeserializer,并且使用字符串类型创建了 KafkaConsumer 对象。
唯一不同的是新增了 group.id 属性,它指定了消费者所属群组的名字。
订阅主题
创建好消费者之后,下一步可以开始订阅主题了。subscribe() 方法接受一个Topic列表作为参数,使用起来很简单:
consumer.subscribe(Collections.singletonList("customerCountries")); // ➊
这里简单创建了一个只包含单个元素的列表,主题的名字叫作“customer Countries”。
我们也可以在调用 subscribe() 方法时传入一个正则表达式。正则表达式可以匹配多个主题,如果有人创建了新的主题,并且主题的名字与正则表达式匹配,那么会立即触发一次再均衡,消费者就可以读取新添加的主题。
如果应用程序需要读取多个主题,并且可以处理不同类型的数据,那么这种订阅方式就很管用。
在Kafka和其他系统之间复制数据时, 使用正则表达式的方式订阅多个主题是很常见的做法。
要订阅所有与 test 相关的主题,可以这样做:
consumer.subscribe(“test.*”);
轮询
消息轮询是消费者 API 的核心,通过一个简单的轮询向服务器请求数据(pull模式)。
一旦消费者订阅了主题,轮询就会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据,开发者只需要使用一组简单的 API 来处理从分区返回的数据。
消费者代码的主要部分如下所示:
try {
while (true) { // ➊
ConsumerRecordsString, String> records = consumer.poll(100); // ➋
for (ConsumerRecordString, String> record : records) { // ➌
log.debug("topic = %s, partition = %s, offset = %d, customer = %s,country = %sn",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
int updatedCount = 1;
if (custCountryMap.countainsValue(record.value())) {
updatedCount = custCountryMap.get(record.value()) + 1;
}
custCountryMap.put(record.value(), updatedCount);
JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4)) // ➍
}
}
} finally {
consumer.close(); // ➎
}
- ➊ 是一个无限循环,消费者实际上是一个长期运行的应用程序,它通过持续轮询向 Kafka 请求数据。
稍后我们会介绍如何退出循环,并关闭消费者。
- ➋ 非常重要。**消费者必须持续对 Kafka 进行轮询,否则会被认为已经死,**然后它的分区会被移交给群组里的其他消费者。
传给 poll() 方法的参数是一个超时时间,用于控制 poll() 方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)。
如果该参数被设为 0,poll() 会立即返回,否则它会在指定的毫秒数内一直等待 broker 返回数据。
- ➌ poll() 方法返回一个记录列表。
每条记录都包含了记录所属主题的信息、记录所在分 区的信息、记录在分区里的偏移量,以及记录的键值对,
一般会遍历处理。
- ➍ 把结果保存起来或者对已有的记录进行更新,处理过程也随之结束。在这里,我们的目 的是统计来自各个地方的客户数量,所以使用了一个散列表来保存结果,并以 JSON 的 格式打印结果。在真实场景里,结果一般会被保存到数据存储系统里。
- 最后在退出应用程序之前使用 close() 方法关闭消费者。
网络连接和 socket 也会随之关闭, 并立即触发一次再均衡,而不是等待群组协调器发现它不再发送心跳并认定它已死亡, 因为那样需要更长的时间,导致整个群组在一段时间内无法读取消息。
轮询不只是获取数据那么简单。
在第一次调用新消费者的 poll() 方法时,它会负责查找GroupCoordinator,然后加入群组,接受分配的分区。
如果发生了再均衡,整个过程也是在轮询期间进行的。
当然,心跳也是从轮询里发送出去的。
所以,我们要确保在轮询期间所做的任何处理工作都应该尽快完成。
请记住,如果poll()在超过max.poll.interval.ms的时间内没有被调用,消费者将被认为是死的,并从消费者组中驱逐出去,所以要避免在poll循环中做任何可能阻塞不可预测的间隔的事情。
线程安全
在同一个群组里,无法让一个线程运行多个消费者,也无法让多个线程安全地共享一个消费者。
按规定,一个消费者使用一个线程。
如果要在同一个消费者群组里运行多个消费者,需要让每个消费者运行在自己的线程里。
最好是把消费者的逻辑封装在自己的对象里,然后使用 Java 的 ExecutorService 启动多个线程,使每个消费者运行在自己的线程上。
在Kafka的旧版本中,完整的方法签名是poll(long);现在这个签名已经被废弃,新的API是poll(Duration)。
除了参数类型的变化之外,方法阻塞的语义也发生了微妙的变化。
原来的方法,poll(long),只要能从Kafka获取所需的元数据,就会阻塞,即使这比超时时间长。
新方法,poll(Duration),将遵守超时限制,不等待元数据。如果你有现有的consumer代码使用poll(0)作为方法来强制Kafka在不消耗任何记录的情况下获得元数据(一个相当常见的黑客行为),你不能只是把它改为poll(Duration.ofMillis(0))并期望有同样的行为。你需要想出一个新的方法来实现你的目标。通
常情况下,解决方案是将逻辑放在rebalanceListener.onPartitionAssignment()方法中,该方法可以保证在你有了分配的分区的元数据之后,但在记录开始到达之前被调用。
消费者的配置
目前为止,只介绍了bootstrap. servers、group.id、key.deserializer 和 value.deserializer这几个配置属性。Kafka 的文档列出了所有与消费者相 关的配置说明。大部分参数都有合理的默认值,一般不需要修改它们,不过有一些参数与消费者的性能和可用性有很大关系。接下来介绍这些重要的属性。
fetch.min.bytes
该属性指定了消费者从服务器获取记录的最小字节数。
broker 在收到消费者的数据请求时, 如果可用的数据量小于 fetch.min.bytes指定的大小,那么它会等到有足够的可用数据时 才把它返回给消费者。
这样可以降低消费者和 broker 的工作负载,因为它们在主题不是很活跃的时候(或者一天里的低谷时段)就不需要来来回回地处理消息。
如果没有很多可用数据,但消费者的 CPU 使用率却很高,那么就需要把该属性的值设得比默认值大。
如果消费者的数量比较多,把该属性的值设置得大一点可以降低 broker的工作负载。
fetch.max.wait.ms
上面通过 fetch.min.bytes 告诉 Kafka,等到有足够的数据时才把它返回给消费者。
而 feth. max.wait.ms 则用于指定 broker的等待时间,默认是 500ms。
如果没有足够的数据流入 Kafka,消费者获取最小数据量的要求就得不到满足,最终导致 500ms 的延迟。
如果要降低 潜在的延迟(为了满足 SLA),可以把该参数值设置得小一些。
如果 fetch.max.wait.ms 被设 为100ms,并且 fetch.min.bytes 被设为 1MB,那么 Kafka 在收到消费者的请求后,要么返 回 1MB 数据,要么在 100ms 后返回所有可用的数据,就看哪个条件先得到满足。
max.partition.fetch.bytes
该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是 1MB。
因此KafkaConsumer.poll() 方法从每个分区里返回的记录最多不超过 max.partition.fetch.bytes 指定的字节。
如果一个主题有 20 个分区和 5 个消费者,那么每个消费者需要至少 4MB 的可用内存来接收记录。
所可以为消费者多分配些内存时,因为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。
max.partition.fetch.bytes 的值必须比 broker 能够接收的最大消息的字节数(通过 max.message.size 属性配置)大,否则消费者可能无法读取这些消息,导致消费者一直挂起重试。
在设置该属性时,另一个需要考虑的因素是消费者处理数据的时间。
消费者需要频繁调用 poll() 方法来避免会话过期和发生分区再均衡,如果单次调用 poll() 返回的数据太多,消费者需要更多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。
如果出现这种情况,可以把 max.partition.fetch.bytes 值改小,或者延长会话过期时间。
max.poll.records
此属性控制单次调用 poll() 将返回的最大记录数. 使用它来控制你的应用程序在一次轮询循环中需要处理的记录个数(但不是数据的大小)。
session.timeout.ms和heartbeat.interval.ms
消费者在被认为死亡之前可以与服务器断开连接的时间,默认是10s;
如果超过session.timeout.ms的时间,消费者没有向组协调器发送心跳,它将被视为死亡,组协调器将触发消费者组的再平衡,将死亡消费者的分区分配给组内其他消费者。
这个属性与heartbeat.interval.ms密切相关,后者控制Kafka消费者向组协调员发送心跳的频率,而session.timeout.ms则控制消费者在死亡前可以多久不发送心跳的时间。
因此,这两个属性通常一起修改——heartbeat.interval.ms 必须低于 session.timeout.ms 并且通常设置为超时值的三分之一。因此,如果session.timeout.ms是3秒,heartbeat.interval.ms应该是1秒。
将session.timeout.ms设置得比默认值低,有利于消费者组更快地检测和恢复故障,但也可能导致不必要的再平衡。
将session.timeout.ms设置得更高会减少意外再平衡的机会,但也意味着它需要更长的时间来检测真正的故障。
max.poll.interval.ms
此属性设置的是消费者在被视为死亡之前可以不进行轮询的时间长度
如前所述,心跳和会话超时是 Kafka 检测死亡消费者并移除其分区的主要机制。
但是,由于心跳是由后台线程发送的,有可能从kafka消费的主线程死锁了,但是后台线程还在发送心跳。
这意味着该消费者拥有的分区的记录没有被处理。了解消费者是否仍在处理记录的最简单方法是检查它是否正在请求更多记录。然而,对更多记录的请求之间的间隔很难预测,取决于可用的数据量、消费者所做的处理类型,有时还取决于额外服务的延迟。
在需要对返回的每条记录进行耗时处理的应用程序中,max.poll.records 用于限制返回的数据量,从而限制应用程序再次可用于 poll() 之前的持续时间。
即使定义了 max.poll.records,调用 poll() 的间隔也很难预测,max.poll.interval.ms 被用作故障安全或后备。
它必须是一个足够大的区间,以至于一个健康的消费者很少会达到这个区间,但又足够低,以避免对hang住的消费带来较大的影响。
默认值为 5 分钟。当超时时,后台线程会发送“离开组”请求,让broker知道消费者已经死亡,组必须再平衡,然后停止发送心跳。
default.api.timeout.ms
当你在调用API时没有指定明确的超时时间时,这个超时时间将适用于消费者进行的(几乎)所有API调用。
默认值是1分钟,由于它高于请求超时的默认值,所以一般情况它会包括重试。
poll()方法是个例外,它总是需要一个明确的超时时间。
request.timeout.ms
这是消费者等broker回应的最长时间。
如果broker在这段时间内没有响应,client会认为broker根本不会响应,关闭连接,并尝试重连。
这个配置默认30秒,建议不要调低。
在放弃请求之前让broker有足够的时间来处理请求,是很重要的——向已经过载的代理重新发送请求几乎没有什么好处,断开连接和重新连接的行为会增加更多的开销。
auto.offset.reset
该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。
- 默认是 “latest”,这意味着如果缺乏有效的偏移量,消费者将从最新的记录(消费者开始运行后写入的记录)开始读取。
- 另一个选择是 “最早的”,这意味着如果没有有效的偏移量,消费者将从起始位置读取分区中的所有数据。
- 将auto.offset.reset设置为none将导致在试图从无效的偏移量消费时抛出一个异常。
enable.auto.commit
该参数控制消费者是否会自动提交偏移量,默认为true。
如果希望控制自己何时提交偏移量,则将其设置为false,同时这也有利于减少重复和避免数据丢失。
如果把enable.auto.commit设置为true,那么可能还还需要auto.commit.interval.ms来控制偏移量的提交频率。
后面会更深入地讨论提交偏移量的不同选项。
partition.assignment.strategy
我们已经知道分区会被分配给消费者组中的消费者。
PartitionAssignor 是一个类,它根据给定的消费者和他们订阅的Topics,决定将哪些分区分配给哪个消费者。
默认情况下,Kafka 有以下分配策略:
Range
该策略会把主题的若干个连续的分区分配给消费者。
假设消费者 C1 和消费者 C2 同时订阅了主题 T1 和主题 T2,并且每个主题有 3 个分区。那么消费者 C1 有可能分配到这两个主题的分区 0 和分区 1,而消费者 C2 分配到这两个主题的分区 2。
因为上面两个Topic都拥有奇数个分区,而分配是在Topic内独立完成的,第一个消费者最后分配到比第二个消
费者更多的分区。
只要使用了 Range 策略,而且分区数量无法被消费者数量整除,就会出现这种情况。
RoundRobin
从所有订阅的主题中获取所有分区,并将其按顺序逐一分配给消费者。
如果使用 RoundRobin 策略来给前面的消费者 C1和消费者 C2 分配分区:
那么消费者 C1 将分到主题 T1 的分区 0 和分区 2 以及主题 T2的分区 1
消费者 C2 将分配到主题 T1 的分区 1 以及主题 T2 的分区 0 和分区 2
一般来说,如果所有消费者都订阅相同的主题(这种情况很常见),RoundRobin 策略会给所有消费者分配相同数量的分区(或最多就差一个分区)。
Sticky
Sticky Assignor有两个目标:
- 第一是要有一个尽可能平衡的分配
- 第二是在重新平衡的情况下,它会把尽可能多保持原有分配,尽量减少将分区分配从一个消费者转移到另一个消费者的相关开销。
一般情况下,当所有消费者都订阅了同一Topic,来自Sticky Assignor的初始分配将与RoundRobin Assignor的分配一样平衡。随后的分配将同样平衡,但会减少分区移动的次数
在同一组的消费者订阅不同主题的情况下,Sticky Assignor 实现的分配比 RoundRobin Assignor 更均衡。
Cooperative Sticky
这种分配策略与Sticky Assignor的策略相同,但支持合作式再平衡,消费者可以继续从没有被重新分配的分区消费。
细节后续再介绍。
最后可以通过设置 partition.assignment.strategy来选择分区策略。默认使用的是 org.apache.kafka.clients.consumer.RangeAssignor,这个类实现了 Range 策略,不过也可以把它改成 org.apache.kafka.clients.consumer.RoundRobinAssignor。我们还可以使用自定义策略,在这种情况下,partition.assignment.strategy 属性的值就是自定义类的名字。
client.id
这个配置这可以是任何字符串,代理将使用它来识别从客户端发送的请求,例如获取请求。它用于日志记录和metrics,以及配额。
client.rack
默认情况下,消费者将从每个分区的leaer副本中获取消息。
但是,当集群跨越多个数据中心或多个云可用性区域时,从与消费者位于同一区域的副本中获取消息在性能和成本上都有优势。为了启用从最近的副本中获取数据,你需要设置client.rack配置,并确定客户端所在的区域。
然后你可以配置broker,用org.apache.kafka.common.replica.Rack AwareReplicaSelector替换默认的replica.selector.class。
你也可以实现你自己的replica.selector.class,根据客户端元数据和分区元数据,用自定义逻辑来选择最佳的副本来消费。
group.instance.id
这可以是任何唯一的字符串,用于为消费者提供静态组成员资格
receive.buffer.bytes and send.buffer.bytes
这些是套接字在写入和读取数据时使用的 TCP 发送和接收缓冲区的大小.
如果这些设置为 –1,将使用操作系统默认值。
当生产者或消费者与不同数据中心的代理通信时。设置为一个更大的值可能会更好些,因为这些网络链接通常具有更高的延迟和更低的带宽。
offsets.retention.minutes
这是broker的配置,但由于其对消费者的行为也会有影响,所以了解它也是有必要的。
只要消费者组有活跃的成员(即通过发送心跳维持活跃状态成员),Kafka就会保留该组向每个分区提交的最后一个偏移量,因此在重新分配或重新启动的情况下可以检索到它。
然而,一旦一个组成为空组,Kafka将只保留其提交的偏移量到该配置所设定的持续时间–默认为7天。
一旦偏移量被删除,如果该组再次变得活跃,它将表现得像一个全新的消费组,没有任何关于它过去消费的记忆。
注意,这种行为改变了几次,所以如果你使用比2.1.0更早的版本,请检查你的版本的文档以了解预期行为。