好文推荐:
2.5万字详解23种设计模式
基于Netty搭建websocket集群实现服务器消息推送
2.5万字讲解DDD领域驱动设计
文章目录
- 一、延时队列定义
- 二、应用场景
- 三、技术实现方案:
-
- 1. Redis
- 2. Kafka
- 3. RabbitMQ
- 4. RocketMQ
- 四、Kafka延时队列背景
- 五、Kafka延时队列实现思路
- 六、Kafka延时队列架构图
- 七、kafka延时任务代码实现
-
- 1. KafkaSyncConsumer:Kafka消费者
- 2. KafkaDelayQueue:Kafka延迟队列
- 3. KafkaDelayQueueFactory:Kafka延迟队列工厂
- 4. KafkaPollListener:Kafka延迟队列事件监听
- 5. KafkaConfig:Kafka配置文件
- 6. KafkaDelayApplication:Kafka延迟任务注册
- 九、测试
- 10、总结
一、延时队列定义
延时队列:是一种消息队列,可以用于在指定时间或经过一定时间后执行某种操作。
小编已经做好了Kafka延时队列的封装,以后只需要一行代码就可以实现kafka延时队列了,代码中有详细注释,完整代码已经给大家整理好了,领取方式放在了文章末。
二、应用场景
1,订单超时自动取消:用户下单后,如果在指定时间内未完成支付,系统会自动取消订单,释放库存。
2,定时推送:比如消息通知,用户预约某个服务,系统会在服务开始前一定时间发送提醒短信。
3,定时任务:将需要定时执行的任务放入延时队列中,等到指定的时间到达时再进行执行,例如生成报表、统计数据等操作。
4,限时抢购:将限时抢购的结束时间放入延时队列中,当时间到达时自动下架商品。
…
三、技术实现方案:
1. Redis
1.1 优点:
①Redis的延迟队列是基于Redis的sorted set实现的,性能较高。
②Redis的延迟队列可以通过TTL设置过期时间,灵活性较高。
③简单易用,适用于小型系统。
④性能较高,支持高并发。
1.2 缺点:
①可靠性相对较低,可能会丢失消息,就算redis最高级别的持久化也是有可能丢一条的,每次请求都做aof,但是aof是异步的,所以不保证这一条操作能被持久化。
②而且Redis持久化的特性也导致其在数据量较大时,存储和查询效率逐渐降低,此时会需要对其进行分片和负载均衡。
③Redis的延迟队列需要手动实现消息重试机制,更严谨的消息队列需要数据库兜底。
1.3 应用场景:
①适用于较小规模的系统,实时性要求较高的场景。
②适用于轻量级的任务调度和消息通知场景,适合短期延迟任务,不适合长期任务,例如订单超时未支付等。
2. Kafka
2.1 优点:
①Kafka的优点在于其高并发、高吞吐量和可扩展性强,同时支持分片。
②可靠性高,支持分布式和消息持久化。
③消费者可以随时回溯消费。
④支持多个消费者并行消费、消费者组等机制。
2.2 缺点:
①没有原生的延迟队列功能,需要使用topic和消费者组来实现,实现延迟队列需要额外的开发工作。
②消费者需要主动拉取数据,可能会导致延迟,精度不是特别高。
在此案例中代码已经实现了,直接拿来使用就可以了。
2.3 应用场景:
适用于大规模的数据处理,实时性要求较高的,高吞吐量的消息处理场景。
3. RabbitMQ
3.1 优点:
①RabbitMQ的延迟队列是通过RabbitMQ的插件实现的,易于部署和使用。
②RabbitMQ的延迟队列支持消息重试和消息顺序处理,可靠性较高。
③支持消息持久化和分布式。
④支持优先级队列和死信队列。
⑤提供了丰富的插件和工具。
3.2 缺点:
①RabbitMQ的延迟队列性能较低,不适用于高吞吐量的场景。
②性能较低,不适合高并发场景。
③实现延迟队列需要额外的配置,但是配置就很简单了。
3.3应用场景:
适用于中小型的任务调度和消息通知,对可靠性要求高的场景。
4. RocketMQ
4.1 优点:
①RocketMQ的延迟队列是RocketMQ原生支持的,易于使用和部署。
②RocketMQ的延迟队列支持消息重试和消息顺序处理,可靠性较高。
③高性能和高吞吐量,支持分布式和消息持久化。
④RocketMQ使用简单,性能好,并且支持延迟队列功能。
4.2 缺点:
①RocketMQ的延迟队列不支持动态添加或删除队列。
②RocketMQ的延迟队列需要保证消息的顺序,可能会导致消息延迟。
③在节点崩溃后,RocketMQ有可能发生消息丢失。
4.3 应用场景:
①适用于大规模的数据处理,对性能和吞吐量要求较高的场景。
②适合于任务量较大、需要延迟消息和定时消息的场景。例如电商平台、社交软件等。
③适用于分布式任务调度和高可靠性消息通知场景。
四、Kafka延时队列背景
-
基于以上四种实现延时队列的分析来,选择对应的技术方案的基础上呢,不同公司的mq的基础设施不同,如果只有Kafka,也没必要引入RabbitMQ和RocketMq来实现,引入新的组件也会顺便带来新的问题。
-
网上搜Kafka实现延时队列有很多文章,很多文章说使用Kafka内部的时间轮,支持延时操作,但这是Kafka自己内部使用的,时间轮只是一个工具类,用户无法将其作为延迟队列来使用。
-
Kafka延时队列的最佳实践,使用Kafka消费者的暂停和恢复机制来实现。
五、Kafka延时队列实现思路
- 解决一个问题前首先要明确问题,如何让Kafka有延时队列的功能呢?
- 就是在Kafka消费者消费的时候延时消费,不久搞定了嘛
- 那如何延时消费呢,网上有些文章使用Thread.sleep进行延时消费这是不靠谱的(亲身实践),sleep的时间超过了Kafka配置的max.poll.records时间,消费者无法及时提交offset,kafka就会认为这个消费者已经挂了,会进行rebalance也就是重新分配分区给消费者,以保证每个分区只被一个消费者消费
- 也有同学说了,为了不发生rebalance,那可以增加max.poll.records时间啊,但是这样的话,如果要sleep几天的时间,难道max.poll.records要写几天的时间嘛,有违Kafka的设计原理了,那怎么办呢?
- 这时候Kafka的pause暂停消费和resume恢复消费就登场了,pause暂停某个分区之后消费者不会再poll拉取该分区的消息,直到resume恢复该分区之后才会重新poll消息。
- 我已经做好了Kafka延时队列的封装,以后只需要一行代码就可以实现延时队列了
六、Kafka延时队列架构图
七、kafka延时任务代码实现
以下代码只列出了核心实现,完整代码已经给大家整理好了,可以关注【微信公众号】微信搜索【老板来一杯java】,然后【加群】直接获取源码。
源码目录:
1. KafkaSyncConsumer:Kafka消费者
该类封装了一个线程安全的KafkaConsumer,因为原生的 KafkaConsumer是不支持线程共享的,直接使用会报错:java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
package com.wdyin.kafka.delay;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
/**
* Kafka同步消费者
* @author WDYin
* @date 2023/4/14
**/
public class KafkaSyncConsumerK, V> extends KafkaConsumerK, V> {
KafkaSyncConsumer(Properties properties) {
super(properties);
}
@Override
public synchronized ConsumerRecordsK, V> poll(Duration timeout) {
return super.poll(timeout);
}
@Override
public synchronized SetTopicPartition> paused() {
return super.paused();
}
synchronized void pauseAndSeek(TopicPartition partition, long offset) {
super.pause(Collections.singletonList(partition));
super.seek(partition, offset);
}
@Override
public synchronized void commitSync(MapTopicPartition, OffsetAndMetadata> offsets) {
super.commitSync(offsets);
}
synchronized void resume(TopicPartition topicPartition) {
super.resume(Collections.singleton(topicPartition));
}
@Override
public synchronized void commitSync(){
super.commitSync();
}
}
2. KafkaDelayQueue:Kafka延迟队列
定义一个Kafka延期队列,包含的内容:KafkaDelayQueue,其中有延迟队列配置,主题,消费组,延迟时间,目标主题,KafkaSyncConsumer,ApplicationContext,poll线程池,delay线程池等等
package com.wdyin.kafka.delay;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.ThreadPoolExecutor;
/**
* kafka延时队列
*
* @Author WDYin
* @Date 2022/7/2
**/
@Slf4j
@Getter
@Setter
class KafkaDelayQueueK, V> {
private String topic;
private String group;
private Integer delayTime;
private String targetTopic;
private KafkaDelayConfig kafkaDelayConfig;
private KafkaSyncConsumerK, V> kafkaSyncConsumer;
private ApplicationContext applicationContext;
private ThreadPoolTaskScheduler threadPoolPollTaskScheduler;
private ThreadPoolTaskScheduler threadPoolDelayTaskScheduler;
void send() {
try {
kafkaSyncConsumer.subscribe(Collections.singletonList(topic));
this.threadPoolPollTaskScheduler
.scheduleWithFixedDelay(pollTask(), kafkaDelayConfig.getPollInterval());
} catch (Exception e) {
log.error("KafkaDelayQueue subscribe error", e);
}
}
private KafkaPollTaskK, V> pollTask(){
return new KafkaPollTask>(this, Duration.ofMillis(kafkaDelayConfig.getPollTimeout()), delayTime, applicationContext);
}
KafkaDelayTaskK, V> delayTask(TopicPartition partition){
return new KafkaDelayTask>(kafkaSyncConsumer, partition);
}
@Slf4j
private static class KafkaPollTaskK, V> implements Runnable {
private KafkaDelayQueueK, V> kafkaDelayQueue;
private Duration timeout;
private Integer delayTime;
private ApplicationContext applicationContext;
KafkaPollTask(KafkaDelayQueueK, V> kafkaDelayQueue, Duration timeout, Integer delayTime, ApplicationContext applicationContext) {
this.kafkaDelayQueue = kafkaDelayQueue;
this.timeout = timeout;
this.applicationContext = applicationContext;
this.delayTime = delayTime;
}
@Override
public void run() {
try {
ConsumerRecordsK, V> records = kafkaDelayQueue.getKafkaSyncConsumer().poll(timeout);
applicationContext.publishEvent(new KafkaPollEvent>(records, delayTime, kafkaDelayQueue));
} catch (Exception e) {
log.error("KafkaDelayQueue consumer fail", e);
}
}
}
@Slf4j
private static class KafkaDelayTaskK, V> implements Runnable {
private KafkaSyncConsumerK, V> kafkaSyncConsumer;
private TopicPartition partition;
private KafkaDelayTask(KafkaSyncConsumerK, V> kafkaSyncConsumer, TopicPartition partition) {
this.kafkaSyncConsumer = kafkaSyncConsumer;
this.partition = partition;
}
@Override
public void run() {
try {
kafkaSyncConsumer.resume(partition);
} catch (Exception e) {
log.error("KafkaDelayQueue resume failed", e);
}
}
}
}
3. KafkaDelayQueueFactory:Kafka延迟队列工厂
Kafka延期队列的工厂,用于及其管理延迟队列
package com.wdyin.kafka.delay;
import lombok.Data;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.context.ApplicationContext;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import java.util.Properties;
/**
* 延时队列工厂
* @author WDYin
* @date 2023/4/17
**/
@Data
public class KafkaDelayQueueFactory {
private KafkaDelayConfig kafkaDelayConfig;
private Properties properties;
private ApplicationContext applicationContext;
private Integer concurrency;
public KafkaDelayQueueFactory(Properties properties, KafkaDelayConfig kafkaDelayConfig) {
Assert.notNull(properties, "properties cannot null");
Assert.notNull(kafkaDelayConfig.getDelayThreadPool(), "delayThreadPool cannot null");
Assert.notNull(kafkaDelayConfig.getPollThreadPool(), "pollThreadPool cannot null");
Assert.notNull(kafkaDelayConfig.getPollInterval(), "pollInterval cannot null");
Assert.notNull(kafkaDelayConfig.getPollTimeout(), "timeout cannot null");
this.properties = properties;
this.kafkaDelayConfig = kafkaDelayConfig;
}
public void listener(String topic, String group, Integer delayTime, String targetTopic) {
if (StringUtils.isEmpty(topic)) {
throw new RuntimeException("topic cannot empty");
}
if (StringUtils.isEmpty(group)) {
throw new RuntimeException("group cannot empty");
}
if (StringUtils.isEmpty(delayTime)) {
throw new RuntimeException("delayTime cannot empty");
}
if (StringUtils.isEmpty(targetTopic)) {
throw new RuntimeException("targetTopic cannot empty");
}
KafkaSyncConsumerString, String> kafkaSyncConsumer = createKafkaSyncConsumer(group);
KafkaDelayQueueString, String> kafkaDelayQueue = createKafkaDelayQueue(topic, group, delayTime, targetTopic, kafkaSyncConsumer);
kafkaDelayQueue.send();
}
private KafkaDelayQueueString, String> createKafkaDelayQueue(String topic, String group, Integer delayTime, String targetTopic, KafkaSyncConsumerString, String> kafkaSyncConsumer) {
KafkaDelayQueueString, String> kafkaDelayQueue = new KafkaDelayQueue>(kafkaSyncConsumer, kafkaDelayConfig);
Assert.notNull(applicationContext, "kafkaDelayQueue need applicationContext");
kafkaDelayQueue.setApplicationContext(applicationContext);
kafkaDelayQueue.setDelayTime(delayTime);
kafkaDelayQueue.setTopic(topic);
kafkaDelayQueue.setGroup(group);
kafkaDelayQueue.setTargetTopic(targetTopic);
return kafkaDelayQueue;
}
private KafkaSyncConsumerString, String> createKafkaSyncConsumer(String group) {
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group);
return new KafkaSyncConsumer>(properties);
}
}
4. KafkaPollListener:Kafka延迟队列事件监听
package com.wdyin.kafka.delay;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.springframework.context.ApplicationListener;
import org.springframework.kafka.core.KafkaTemplate;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.*;
/**
* 延时队列监听
* @Author : WDYin
* @Date : 2021/5/7
* @Desc :
*/
@Slf4j
public class KafkaPollListenerK, V> implements ApplicationListenerKafkaPollEventK, V>> {
private KafkaTemplate kafkaTemplate;
public KafkaPollListener(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public void onApplicationEvent(KafkaPollEventK, V> event) {
ConsumerRecordsK, V> records = (ConsumerRecordsK, V>) event.getSource();
Integer delayTime = event.getDelayTime();
KafkaDelayQueueK, V> kafkaDelayQueue = event.getKafkaDelayQueue();
KafkaSyncConsumerK, V> kafkaSyncConsumer = kafkaDelayQueue.getKafkaSyncConsumer();
//1.获取poll到的有消息的分区
SetTopicPartition> partitions = records.partitions();
//2.存储需要commit的消息,提高效率批量提交
MapTopicPartition, OffsetAndMetadata> commitMap = new HashMap>();
//3.遍历有消息的分区
partitions.forEach((partition) -> {
ListConsumerRecordK, V>> consumerRecords = records.records(partition);
//4.遍历分区里面的消息
for (ConsumerRecordK, V> record : consumerRecords) {
//5.获取消息创建时间
long startTime = (record.timestamp() / 1000) * 1000;
long endTime = startTime + delayTime;
//6.不符合条件的分区暂停消费
long now = System.currentTimeMillis();
if (endTime > now) {
kafkaSyncConsumer.pauseAndSeek(partition, record.offset());
//7.使用 schedule()执行定时任务
kafkaDelayQueue.getThreadPoolPollTaskScheduler().schedule(kafkaDelayQueue.delayTask(partition), new Date(endTime));
//无需继续消费该分区下的其他消息,直接消费其他分区
break;
}
log.info("{}: partition:{}, offset:{}, key:{}, value:{}, messageDate:{}, nowDate:{}, messageDate:{}, nowDate:{}",
Thread.currentThread().getName() + "#" + Thread.currentThread().getId(), record.topic() + "-" + record.partition(), record.offset(), record.key(), record.value(), LocalDateTime.ofInstant(Instant.ofEpochMilli(startTime), ZoneId.systemDefault()), LocalDateTime.now(), startTime, Instant.now().getEpochSecond());
//发送目标主题
kafkaTemplate.send(kafkaDelayQueue.getTargetTopic(), record.value());
//更新需要commit的消息
commitMap.put(partition, new OffsetAndMetadata(record.offset() + 1));
}
});
//8.批量提交,提高效率,commitSync耗时几百毫秒
if (!commitMap.isEmpty()) {
kafkaSyncConsumer.commitSync(commitMap);
}
}
}
5. KafkaConfig:Kafka配置文件
package com.wdyin.kafka.config;
import com.wdyin.kafka.delay.KafkaDelayConfig;
import com.wdyin.kafka.delay.KafkaDelayQueueFactory;
import com.wdyin.kafka.delay.KafkaPollListener;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* @author WDYin
* @date 2023/4/21
**/
@Configuration
@Slf4j
public class KafkaConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String bootstrapServers;
@Resource
private ApplicationContext applicationContext;
/**
* 消费者参数配置
* @param bootstrapServers
* @param isAutoSubmit
* @return
*/
private MapString, Object> consumerProps(String bootstrapServers, Boolean isAutoSubmit) {
MapString, Object> props = new HashMap>();
//kafka broker地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//取消自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, isAutoSubmit);
//一次拉取消息数量,可根据实际情况自行调整
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
//序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
/**
* spring生产者参数配置
* @param bootstrapServer
* @return
*/
private HashMapString, Object> producerProps(String bootstrapServer) {
HashMapString, Object> configProps = new HashMap>();
//broke地址
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
//序列化
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//幂等发送给broker
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
//生产者时将多少数据累积到一个批次中,设置为0的目的提高实时性
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, "0");
return configProps;
}
/**
* 用于spring的@listener注解进行消费,并非用于延时队列
* @return
*/
@Bean("kafkaContainerFactory")
public KafkaListenerContainerFactoryConcurrentMessageListenerContainerString, String>> kafkaContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory>(consumerProps(bootstrapServers, Boolean.FALSE)));
//线程数为1
factory.setConcurrency(1);
//poll超时时间
factory.getContainerProperties().setPollTimeout(1500L);
//手动立即提交offset
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
/**
* spring kafkaTemplate注册
* @return
*/
@Bean
public KafkaTemplateString, String> kafkaTemplate() {
return new KafkaTemplate>(new DefaultKafkaProducerFactory>(producerProps(bootstrapServers)));
}
/**
* 延时队列-Kafka同步消费者配置
* @return
*/
public Properties kafkaSyncConsumerProperties() {
// Consumer的配置
Properties properties = new Properties();
// 服务地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 关闭offset自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 消费者offset自动提交到Kafka的频率(以毫秒为单位)
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "15000");
// KEY的反序列化器类
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
// VALUE的反序列化器类
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
return properties;
}
/**
* 延时队列-注册延时队列工厂
* @return
*/
@Bean
public KafkaDelayQueueFactory kafkaDelayQueueFactory() {
KafkaDelayConfig kafkaDelayConfig = new KafkaDelayConfig();
kafkaDelayConfig.setPollThreadPool(1);
kafkaDelayConfig.setPollTimeout(50);
kafkaDelayConfig.setPollInterval(50);
kafkaDelayConfig.setDelayThreadPool(10);
KafkaDelayQueueFactory kafkaDelayQueueFactory = new KafkaDelayQueueFactory(kafkaSyncConsumerProperties(), kafkaDelayConfig);
kafkaDelayQueueFactory.setApplicationContext(applicationContext);
return kafkaDelayQueueFactory;
}
/**
* 延时队列-注册消费者poll监听器
* @param kafkaTemplate
* @return
*/
@Bean
public KafkaPollListener kafkaPollListener(KafkaTemplate kafkaTemplate) {
return new KafkaPollListener>(kafkaTemplate);
}
}
6. KafkaDelayApplication:Kafka延迟任务注册
一个延时主题对应一个延迟时间,后续有新的延迟任务只需要在此注册延迟任务的监听即可!
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
/**
* @author WDYin
* @date 2023/4/18
**/
@Component
public class KafkaDelayApplication {
@Resource
private KafkaDelayQueueFactory kafkaDelayQueueFactory;
@PostConstruct
public void init() {
//延迟30秒
kafkaDelayQueueFactory.listener("delay-30-second-topic", "delay-30-second-group", 1 * 30 * 1000, "delay-60-second-target-topic");
//延迟60秒
kafkaDelayQueueFactory.listener("delay-60-second-topic", "delay-60-second-group", 1 * 60 * 1000, "delay-60-second-target-topic");
//延迟30分钟
kafkaDelayQueueFactory.listener("delay-30-minute-topic", "delay-30-minute-group", 30 * 60 * 1000, "delay-30-minute-target-topic");
}
}
九、测试
- 先往延时主题【delay-60-second-topic】发送一千条消息,一共10个分区,每个分区100条消息,消息时间是2023-04-21 16:37:26分,延迟消息消费时间就应该是2023-04-21 16:38:26
- 延时队列进行消费:通过日志查看,消息日期和延迟队列消费消息时间正好相差一分钟
10、总结
- 本案例已成功实现Kafka的延时队列,并进行实测,代码引入可用非常方便。
- Kafka实现的延时队列支持秒级别的延时任务,不支持毫秒级别,但是毫秒级别的延时任务也没有意义
- 注意一个主题对应的延时时间是一致的,不能在同一个主题里放不同时间的延时任务。
- 此方案的缺点就是,如果数据量很大,一定要保证Kafka的消费能力,否则可能会导致延迟,精度不是特别高,不过如果延迟小时级别的任务,差异个几秒种肯定可以接受的,一般场景肯定满足。
- 完整代码已经给大家整理好了,可以关注【微信公众号】微信搜索【老板来一杯java】,然后【加群】直接获取源码。
好文推荐:
2.5万字详解23种设计模式
微服务springcloud环境下基于Netty搭建websocket集群实现服务器消息推送—-netty是yyds
2.5万字讲解DDD领域驱动设计