rocketmq结合springboot使用起来十分方便,只要我们将实现类添加注解RocketMQMessageListener,并且实现接口RocketMQListener,就可以轻松实现某topic的消费侧消费消息的功能。
springboot是如何使用这两点实现rocketmq消费功能的呢?
首先我们查看ListenerContainerConfiguration
ListenerContainerConfiguration是springboot注入到spring得类
该类实现了三个接口 ApplicationContextAware, SmartInitializingSingleton
ApplicationContextAware可以让该类感知spring容器SmartInitializingSingleton接口可以让该类bean单利初始化之后再加工bean类。该类的接口afterSingletonsInstantiated实现了实现RocketMQMessageListener注解各个类的,rocketmq容器注入工作。
@Override
public void afterSingletonsInstantiated() {
Map beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class)
.entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
beans.forEach(this::registerContainer);
}
registerContainer方法的含义是在注册rocket的容器。
1、获取容器名称,容器的名称是按照顺序编写的,举个例子
org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_2
这是第二个注入到容器的mq容器。
2、生成DefaultRocketMQListenerContainer实例
2.1、解析RocketMQMessageListener标签的参数参数并注入容器字段中。
2.2、将我们的RocketMQMessageListener注入到字段rocketMQListener当中
3、将2生成的DefaultRocketMQListenerContainer实例注入到spring容器当中。
4、启动DefaultRocketMQListenerContainer容器。
container.start();
container内部启动consumer,rocketmq的客户端。
consumer类型为DefaultMQPushConsumer
接下来就是为该监听器启动了一个DefaultMQPushConsumerImpl实现。DefaultMQPushConsumerImpl到此就是启动rocketmq客户端流程。springboot的标签RocketMQMessageListener解析就告一段落。
小结
springboot实现了RocketMQMessageListener可以很轻松的实现客户端监听topic,并接收远程broker发送来的请求。springboot通过配置@Configuration给ListenerContainerConfiguration,实现解析@RocketMQMessageListener的过程。具体的,通过将我们的业务类首先包装成DefaultRocketMQListenerContainer,并注入spring容器当中。启动DefaultRocketMQListenerContainer的过程就是为该group,topic启动一个客户端,这样才能够获取远程broker信息。通过代码可以发现RocketMQMessageListener底层使用了DefaultMQPushConsumer类实现。也就是说RocketMQMessageListener使用的是推的模式获取远程broker信息。另外,推送模式是通过自动同步offset实现,拉模式可以手动设置offset同步时机相对灵活。
org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer_2