原文网址:RabbitMQ–重试机制_IT利刃出鞘的博客-CSDN博客
简介
说明
本文介绍RabbitMQ的重试机制。
问题描述
消费者默认是自动提交,如果消费时出现了RuntimException,会导致消息直接重新入队,再次投递(进入队首),进入死循环,继而导致后面的消息被阻塞。
消息阻塞带来的后果是:后边的消息无法被消费;RabbitMQ服务端继续接收消息,占内存和磁盘越来越多。
RabbitMQ的自动确认
自动确认分四种情况(第一就是正常消费,其他三种为异常情况)
- 消息成功被消费,没有抛出异常,则自动确认,回复ack。
不涉及requeue,毕竟已经成功了。requeue是对被拒绝的消息生效。 - 当抛出ImmediateAcknowledgeAmqpException异常的时候,则视为成功消费,确认该消息。
- 当抛出AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且requeue = false(该异常会在重试超过限制后抛出)
- 抛出其他的异常,消息会被拒绝,且requeue = true
我遇到的是第四种情况,导致mq消息阻塞,并且消费者一直在消费同一条消息,然后抛异常,此时就进入了死循环。
消息未被确认时如下图所示:
RabbitMQ的重试机制
本处使用spring-rabbit中自带的重试功能解决上述问题。
注意
重试并不是RabbitMQ重新发送了消息,仅仅是消费者内部进行的重试,换句话说就是重试跟mq没有任何关系。
不管消息被消费了之后是手动确认还是自动确认,代码中不能使用try/catch捕获异常,否则重试机制失效。
重试机制有2种情况
- 消息是自动确认时,如果抛出了异常导致多次重试都失败,消息被自动确认,消息就丢失了
- 消息是手动确认时,如果抛出了异常导致多次重试都失败,消息没被确认,也无法nack,就一直是unacked状态,导致消息积压。
RabbitMQ的重试的实例
配置
application.yml
spring:
# RabbitMQ服务配置
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
listener:
simple:
# 重试机制
retry:
enabled: true #是否开启消费者重试
max-attempts: 3 #最大重试次数
initial-interval: 5000ms #重试间隔时间(单位毫秒)
max-interval: 1200000ms #重试最大时间间隔(单位毫秒)
# 乘子。间隔时间*乘子=下一次的间隔时间,不能超过max-interval
# 以本处为例:第一次间隔 5 秒,第二次间隔 10 秒,以此类推
multiplier: 2
代码
@RabbitListener(queues = "meat_queue")
public void processMeatTwo(String message) throws InterruptedException {
System.out.println("processMeatTwo消费了队列meat_queue的消息:" + message);
Thread.sleep(1000);
//模拟异常
String is = null;
is.toString();
}
结果
可以看到,消息重试了5次,之后会抛出ListenerExecutionFailedException的异常。后面附带着Retry Policy Exhausted,提示我们重试次数已经用尽了。
消息重试次数用尽后,消息就会被抛弃。
重试完之后对消息的处理
概述
消息在重试完之后,会调用MessageRecoverer接口的recover方法。MessageRecoverer接口有如下三个实现类(看它们名字即可知道含义):
- RejectAndDontRequeueRecoverer:拒绝而且不把消息重新放入队列(默认)
- RepublishMessageRecoverer:重新发布消息
- ImmediateRequeueMessageRecoverer:立即把消息重新放入队列
处理示例
默认情况下是RejectAndDontRequeueRecoverer:拒绝而且不把消息重新放入队列。我们可以使用RepublishMessageRecoverer,重新发布消息,将它发布到其他队列,后边对它进行补偿处理。
先创建一个异常队列,然后与交换机绑定进行绑定,绑定之后设置MessageRecoverer。
@Configuration
public class MQErrorConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
private static String errorTopicExchange = "error-topic-exchange";
private static String errorQueue = "error-queue";
private static String errorRoutingKey = "error-routing-key";
//创建异常交换机
@Bean
public TopicExchange errorTopicExchange(){
return new TopicExchange(errorTopicExchange, true, false);
}
//创建异常队列
@Bean
public Queue errorQueue(){
return new Queue(errorQueue, true);
}
//队列与交换机进行绑定
@Bean
public Binding BindingErrorQueueAndExchange(Queue errorQueue, TopicExchange errorTopicExchange){
return BindingBuilder.bind(errorQueue).to(errorTopicExchange).with(errorRoutingKey);
}
//设置MessageRecoverer
@Bean
public MessageRecoverer messageRecoverer(){
//AmqpTemplate和RabbitTemplate都可以
return new RepublishMessageRecoverer(rabbitTemplate, errorTopicExchange, errorRoutingKey);
}
}
查看处理结果:
通过控制台可以看到,消息重试5次以后直接以新的routingKey发送到了配置的交换机中,此时再查看监控页面,可以看原始队列中已经没有消息了,但是配置的异常队列中存在一条消息:
源码分析
上面的例子在测试中发现了一个问题,就是经过5次重试以后,控制台输出了一个异常的堆栈日志,然后队列中的数据也被ack掉了(自动ack模式),首先我们看一下这个异常日志是什么。
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Retry Policy Exhausted
出现消息被消费掉并且出现上述异常的原因是因为在构建SimpleRabbitListenerContainerFactoryConfigurer类时使用了MessageRecoverer接口,这个接口有一个cover方法,用来实现重试完成之后对消息的处理,源码如下:
ListenerRetry retryConfig = configuration.getRetry();
if (retryConfig.isEnabled()) {
RetryInterceptorBuilder, ?> builder = (retryConfig.isStateless()) ? RetryInterceptorBuilder.stateless()
: RetryInterceptorBuilder.stateful();
RetryTemplate retryTemplate = new RetryTemplateFactory(this.retryTemplateCustomizers)
.createRetryTemplate(retryConfig, RabbitRetryTemplateCustomizer.Target.LISTENER);
builder.retryOperations(retryTemplate);
MessageRecoverer recoverer = (this.messageRecoverer != null) ? this.messageRecoverer
: new RejectAndDontRequeueRecoverer(); // 1
builder.recoverer(recoverer);
factory.setAdviceChain(builder.build());
注意看1处的代码,默认使用的是RejectAndDontRequeueRecoverer实现类,根据实现类的名字我们就可以看出来该实现类的作用就是拒绝并且不会将消息重新发回队列,我们可以看一下这个实现类的具体内容:
public class RejectAndDontRequeueRecoverer implements MessageRecoverer {
protected Log logger = LogFactory.getLog(RejectAndDontRequeueRecoverer.class); // NOSONAR protected
@Override
public void recover(Message message, Throwable cause) {
if (this.logger.isWarnEnabled()) {
this.logger.warn("Retries exhausted for message " + message, cause);
}
throw new ListenerExecutionFailedException("Retry Policy Exhausted",
new AmqpRejectAndDontRequeueException(cause), message);
}
}
上述源码给出了异常的来源,但是未看到拒绝消息的代码,猜测应该是使用aop的方式实现的,此处不再继续深究。