Spring Rocketmq 事务消息 @RocketMQMessageListener注解的使用
- 1、 RocketMQMessageListener参数讲解
- 2、参数一 :consumeMode
- 3、参数二:messageModel
- 3、参数三:selectorExpression
- 4、参数四:consumerGroup
- 5、参数:topic
- 6、代码示例
1、 RocketMQMessageListener参数讲解
@RocketMQMessageListener事务消息监听器
2、参数一 :consumeMode
/**
*控制消费模式,您可以选择并发或有序接收消息。
*/
ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
3、参数二:messageModel
/**
* 控制消息模式,
* 广播模式:所有消费者都能接受到消息
* 集群模式:无论有多少个消费者,只有一个消费者能够接收到消息。
*/
MessageModel messageModel() default MessageModel.CLUSTERING;
3、参数三:selectorExpression
/**
* 控制可以选择哪个消息
*/
String selectorExpression() default "*";
// 同步顺序消息 根据下面使用方式使用即可,在主题上面拼接
rocketMQTemplate.syncSendOrderly("MQ_TOPIC:user", info);
4、参数四:consumerGroup
/**
*
*概念:消费者组(多个消费者) 此参数相同即为同一个消费者组
*作用:集群模式负载均衡的实现,广播模式的通知的实现
*
*/
String consumerGroup();
5、参数:topic
/**
* Topic name. 主题
* 指该消费者组所订阅的消息服务
*
*/
String topic();
6、代码示例
//consumeMode = ConsumeMode.ORDERLY,messageModel = MessageModel.BROADCASTING,这样会报 messageModel BROADCASTING does not support ORDERLY message!
@Service("MqConsumer")
//@RocketMQMessageListener事务消息监听器
//consumeMode 消费模式,默认值 ConsumeMode.CONCURRENTLY 并行处理;ConsumeMode.ORDERLY 按顺序处理
//messageModel 消息模型,默认值 MessageModel.CLUSTERING 集群;MessageModel.BROADCASTING 广播
@RocketMQMessageListener(consumerGroup = "testmq2222", topic = "MQ_TOPIC",
//selectorType = SelectorType.TAG,
// selectorExpression = "user",
consumeMode = ConsumeMode.ORDERLY,
messageModel = MessageModel.CLUSTERING)
public class MqConsumer implements RocketMQListenerMessageExt>, RocketMQPushConsumerLifecycleListener {
int i = 0;
//消息接收处理方法
@Override
public void onMessage(MessageExt message) {
String msg = null;
try {
msg = new String(message.getBody(), "utf-8");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.err.println("MqConsumer1111111111 接收到消息:tag:" + message.getTags() + " count:" + (i++)
+ " QueueId:" + message.getQueueId() + " 消息[body]:" + msg);
}
// 该方法重写消息监听器的属性
@Override
public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
// 设置消费者消息重试次数
defaultMQPushConsumer.setMaxReconsumeTimes(3);
// 设置实例名称
defaultMQPushConsumer.setInstanceName("mqconsumer1");
}
注意:顺序处理不能和广播模式同时使用,应该广播模式是属于并发的,而顺序是强调FIFO原则,广播模式不能保证顺序一致性。