@RabbitListener起作用的原理是什么

其他教程   发布日期:2023年09月19日   浏览次数:616

这篇文章主要讲解了“@RabbitListener起作用的原理是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“@RabbitListener起作用的原理是什么”吧!

一、前言

在spring中,定义rabbitMq的消费者可以相当方便,只需要在消息处理类或者类方法加上@RabbitListener注解,指定队列名称即可。

如下代码:

  1. @Component
  2. public class RabbitMqListener1 {
  3. @RabbitListener(queues = "queue1")
  4. public void consumer1(Message message) {
  5. }
  6. @RabbitListener(queues = "queue2")
  7. public void consumer2(String messsageBody) {
  8. }
  9. }
  10. @Component
  11. @RabbitListener(queues = "queue3")
  12. public class RabbitMqListener2 {
  13. @RabbitHandler(isDefault=true)
  14. public void consumer3() {
  15. }
  16. }

注意!!!如果@RabbitListener加在类上面,需要有一个默认的处理方法@RabbitHandler(isDefault=true),默认是false。

不设置一个true,消费mq消息的时候会出现“Listener method ‘no match’ threw exception”异常。

原因在RabbitListenerAnnotationBeanPostProcessor.processMultiMethodListeners方法,有兴趣的可以看下。

可以看到代码相当的简单。但是!!!为什么加上这个注解,就能作为一个consumer接受mq的消息呢?为啥处理mq消息的方法,入参可以那么随意?

有经验的程序员,可能会有这样的设想:

1、单纯看这些listener的代码,只是定义了由spring管理的bean,要能监听rabbitMq的消息,肯定需要有另外一个类,这个类会扫描所有加了@RabbitListener的bean,进行加工。

2、看这些listener的代码,可以发现处理mq消息的,都是具体的某个方法。那加工的过程,应该就是利用反射拿到对象、方法和@RabbitListener中的queue属性,然后建立一个绑定关系(对象+方法)——>(queue的consumer)。queue的consumer在接收到mq消息后,找到绑定的“对象+方法”,再通过反射的方式,调用真正的处理方法。

3、mq消息的处理方法,可以那么随意,应该是queue的consumer在调用真正处理方法之前,需要根据处理方法的参数类型,做一次数据转换。

接下来,就去看看源码,看一下设想是不是正确的~~

二、源码分析

1、谁来扫描@RabbitListener注解的bean

在springBoot使用rabbit,一般是在@Configuration类上加上@EnableRabbit注解来开启rabbit功能。那我们就去看看@EnableRabbit注解的源码,看这个注解的作用

  1. @Target(ElementType.TYPE)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. @Import(RabbitBootstrapConfiguration.class)
  5. public @interface EnableRabbit {
  6. }

可以看到,这个注解的作用,是导入RabbitBootstrapConfiguration配置类

  1. @Configuration
  2. public class RabbitBootstrapConfiguration {
  3. @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
  4. @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
  5. public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
  6. return new RabbitListenerAnnotationBeanPostProcessor();
  7. }
  8. @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
  9. public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
  10. return new RabbitListenerEndpointRegistry();
  11. }
  12. }

RabbitBootstrapConfiguration 配置类的作用,就是定义了RabbitListenerAnnotationBeanPostProcessor 和RabbitListenerEndpointRegistry 两个bean。

看到RabbitListenerAnnotationBeanPostProcessor 这个类名,就可以猜到,该类的实例bean就是用来扫描加了@RabbitListener 的类,并做一些加工。

(“RabbitListenerAnnotationBean”——针对添加了@RabbitListener注解的bean; “PostProcessor”——后置加工)

2、怎么建立(对象+方法)——>(queue的consumer)的映射关系

分析一下RabbitListenerAnnotationBeanPostProcessor类的源码

  1. // 实现了BeanPostProcessor、Ordered、BeanFactoryAware、BeanClassLoaderAware、EnvironmentAware和SmartInitializingSingleton 6个接口
  2. public class RabbitListenerAnnotationBeanPostProcessor
  3. implements BeanPostProcessor, Ordered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware,
  4. SmartInitializingSingleton {
  5. .......
  6. // 完成初始化bean之后,调用该方法
  7. @Override
  8. public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
  9. Class<?> targetClass = AopUtils.getTargetClass(bean);
  10. TypeMetadata metadata = this.typeCache.get(targetClass);
  11. if (metadata == null) {
  12. metadata = buildMetadata(targetClass);
  13. this.typeCache.putIfAbsent(targetClass, metadata);
  14. }
  15. for (ListenerMethod lm : metadata.listenerMethods) {
  16. for (RabbitListener rabbitListener : lm.annotations) {
  17. processAmqpListener(rabbitListener, lm.method, bean, beanName);
  18. }
  19. }
  20. if (metadata.handlerMethods.length > 0) {
  21. processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
  22. }
  23. return bean;
  24. }
  25. // 根据Class,获取元数据
  26. private TypeMetadata buildMetadata(Class<?> targetClass) {
  27. Collection<RabbitListener> classLevelListeners = findListenerAnnotations(targetClass);
  28. final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
  29. final List<ListenerMethod> methods = new ArrayList<ListenerMethod>();
  30. final List<Method> multiMethods = new ArrayList<Method>();
  31. ReflectionUtils.doWithMethods(targetClass, new ReflectionUtils.MethodCallback() {
  32. @Override
  33. public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException {
  34. Collection<RabbitListener> listenerAnnotations = findListenerAnnotations(method);
  35. if (listenerAnnotations.size() > 0) {
  36. methods.add(new ListenerMethod(method,
  37. listenerAnnotations.toArray(new RabbitListener[listenerAnnotations.size()])));
  38. }
  39. if (hasClassLevelListeners) {
  40. RabbitHandler rabbitHandler = AnnotationUtils.findAnnotation(method, RabbitHandler.class);
  41. if (rabbitHandler != null) {
  42. multiMethods.add(method);
  43. }
  44. }
  45. }
  46. }, ReflectionUtils.USER_DECLARED_METHODS);
  47. if (methods.isEmpty() && multiMethods.isEmpty()) {
  48. return TypeMetadata.EMPTY;
  49. }
  50. return new TypeMetadata(
  51. methods.toArray(new ListenerMethod[methods.size()]),
  52. multiMethods.toArray(new Method[multiMethods.size()]),
  53. classLevelListeners.toArray(new RabbitListener[classLevelListeners.size()]));
  54. }
  55. // 检查一下是否使用jdk代理,使用jdk代理方式必须实现了接口
  56. // new一个MethodRabbitListenerEndpoint对象,交由processListener方法进行处理
  57. protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
  58. Method methodToUse = checkProxy(method, bean);
  59. MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
  60. endpoint.setMethod(methodToUse);
  61. endpoint.setBeanFactory(this.beanFactory);
  62. processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
  63. }
  64. // 前面大半代码都是对MethodRabbitListenerEndpoint对象的属性设置:处理消息的bean、消息处理方法的工厂类、监听的队列名。。。。
  65. // 通过beanFactory获取RabbitListenerContainerFactory类的bean
  66. // 调用RabbitListenerEndpointRegistar的registerEndpoint方法注册mq消息消费端点
  67. protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
  68. Object adminTarget, String beanName) {
  69. endpoint.setBean(bean);
  70. endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
  71. endpoint.setId(getEndpointId(rabbitListener));
  72. endpoint.setQueueNames(resolveQueues(rabbitListener));
  73. String group = rabbitListener.group();
  74. if (StringUtils.hasText(group)) {
  75. Object resolvedGroup = resolveExpression(group);
  76. if (resolvedGroup instanceof String) {
  77. endpoint.setGroup((String) resolvedGroup);
  78. }
  79. }
  80. endpoint.setExclusive(rabbitListener.exclusive());
  81. String priority = resolve(rabbitListener.priority());
  82. if (StringUtils.hasText(priority)) {
  83. try {
  84. endpoint.setPriority(Integer.valueOf(priority));
  85. }
  86. catch (NumberFormatException ex) {
  87. throw new BeanInitializationException("Invalid priority value for " +
  88. rabbitListener + " (must be an integer)", ex);
  89. }
  90. }
  91. String rabbitAdmin = resolve(rabbitListener.admin());
  92. if (StringUtils.hasText(rabbitAdmin)) {
  93. Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve RabbitAdmin by bean name");
  94. try {
  95. endpoint.setAdmin(this.beanFactory.getBean(rabbitAdmin, RabbitAdmin.class));
  96. }
  97. catch (NoSuchBeanDefinitionException ex) {
  98. throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
  99. adminTarget + "], no " + RabbitAdmin.class.getSimpleName() + " with id '" +
  100. rabbitAdmin + "' was found in the application context", ex);
  101. }
  102. }
  103. RabbitListenerContainerFactory<?> factory = null;
  104. String containerFactoryBeanName = resolve(rabbitListener.containerFactory());
  105. if (StringUtils.hasText(containerFactoryBeanName)) {
  106. Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
  107. try {
  108. factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class);
  109. }
  110. catch (NoSuchBeanDefinitionException ex) {
  111. throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
  112. adminTarget + "] for bean " + beanName + ", no " +
  113. RabbitListenerContainerFactory.class.getSimpleName() + " with id '" +
  114. containerFactoryBeanName + "' was found in the application context", ex);
  115. }
  116. }
  117. this.registrar.registerEndpoint(endpoint, factory);
  118. }
  119. ........
  120. }

这个类的代码比较长,只贴部分比较主要的部分,其他的,可以自己查看源码进行了解。

RabbitListenerAnnotationBeanPostProcessor实现了BeanPostProcessor(bean初始化后的后置处理)、Ordered(后置处理的排序)、BeanFactoryAware(注入BeanFactory)、BeanClassLoaderAware(注入BeanClassLoader)、EnvironmentAware(注入spring环境)和SmartInitializingSingleton(单例bean初始化后的回调) 6个接口。

我们需要关注的是BeanPostProcessor接口定义的方法,看postProcessAfterInitialization方法的代码,大致流程为:

1、通过AopUtils得到bean代理的对象的class

2、判断缓存中是否有该class的类型元数据,如果没有则调用buildMetadata方法生成类型元数据并放入缓存

3、遍历加了@RabbitListener注解的方法,调用processAmqpListener方法进行处理

4、调用processMultiMethodListeners方法对加了@RabbitHandler的方法进行处理

关于buildMetadata方法:

代码不复杂,就是利用反射,拿到class中,添加了@RabbitListener和@RabbitHandler注解的方法。另外,从代码中也可以看出,@RabbitHandler注解要生效,必须在class上增加@RabbitListener注解

关于processAmqpListener方法:

没有什么实际内容,就干两个事情:

1、检查一下是否使用jdk代理,使用jdk代理方式必须实现了接口

2、new一个MethodRabbitListenerEndpoint对象,交由processListener方法进行处理

关于processListener方法:

1、前面大半代码都是对MethodRabbitListenerEndpoint对象的属性设置:处理消息的bean、消息处理方法的工厂类、监听的队列名。。。。

其中要关注一下setMessageHandlerMethodFactory方法,查看MessageHandlerMethodFactory接口的源码

  1. public interface MessageHandlerMethodFactory {
  2. InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method);

从入参和返回值可以看出来,这个工厂的作用就是将spring的bean对象和方法包装成一个InvocableHandlerMethod对象,也就是我们上面提到的(对象+方法)。

2、通过beanFactory获取RabbitListenerContainerFactory类的bean。

3、调用RabbitListenerEndpointRegistar的registerEndpoint方法注册mq消息消费端点。

继续往下追,看一下RabbitListenerEndpointRegistar的代码:

  1. public class RabbitListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
  2. // 将整个endpointDescriptors数组进行注册
  3. protected void registerAllEndpoints() {
  4. synchronized (this.endpointDescriptors) {
  5. for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
  6. this.endpointRegistry.registerListenerContainer(
  7. descriptor.endpoint, resolveContainerFactory(descriptor));
  8. }
  9. this.startImmediately = true; // trigger immediate startup
  10. }
  11. }
  12. // 解析得到RabbitListenerContainerFactory
  13. // 如果AmqpListenerEndpointDescriptor 的containerFactory属性不为空,直接返回containerFactory
  14. // 如果为空,尝试从beanFactory获取
  15. private RabbitListenerContainerFactory<?> resolveContainerFactory(AmqpListenerEndpointDescriptor descriptor) {
  16. if (descriptor.containerFactory != null) {
  17. return descriptor.containerFactory;
  18. }
  19. else if (this.containerFactory != null) {
  20. return this.containerFactory;
  21. }
  22. else if (this.containerFactoryBeanName != null) {
  23. Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
  24. this.containerFactory = this.beanFactory.getBean(
  25. this.containerFactoryBeanName, RabbitListenerContainerFactory.class);
  26. return this.containerFactory; // Consider changing this if live change of the factory is required
  27. }
  28. else {
  29. throw new IllegalStateException("Could not resolve the " +
  30. RabbitListenerContainerFactory.class.getSimpleName() + " to use for [" +
  31. descriptor.endpoint + "] no factory was given and no default is set.");
  32. }
  33. }
  34. // new一个AmqpListenerEndpointDescriptor对象
  35. // 如果立即启动,则调用RabbitListenerEndpointRegistry注册器来注册消息监听
  36. // 如果不是立即启动,则添加到endpointDescriptors列表中,后面通过registerAllEndpoints方法统一启动
  37. public void registerEndpoint(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) {
  38. Assert.notNull(endpoint, "Endpoint must be set");
  39. Assert.hasText(endpoint.getId(), "Endpoint id must be set");
  40. // Factory may be null, we defer the resolution right before actually creating the container
  41. AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory);
  42. synchronized (this.endpointDescriptors) {
  43. if (this.startImmediately) { // Register and start immediately
  44. this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
  45. resolveContainerFactory(descriptor), true);
  46. }
  47. else {
  48. this.endpointDescriptors.add(descriptor);
  49. }
  50. }
  51. }
  52. }

从上面的代码可以看出,我们关心的内容,应该是在RabbitListenerEndpointRegistry类的registerListenerContainer方法!!

  1. public class RabbitListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware,
  2. ApplicationListener<ContextRefreshedEvent> {
  3. // 检查是否被注册过,注册过就不能注册第二次
  4. // 调用createListenerContainer创建消息监听
  5. // 关于分组消费的,我们不关心
  6. // 是否立即启动,是的话,同步调用startIfNecessary方法
  7. public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory,
  8. boolean startImmediately) {
  9. Assert.notNull(endpoint, "Endpoint must not be null");
  10. Assert.notNull(factory, "Factory must not be null");
  11. String id = endpoint.getId();
  12. Assert.hasText(id, "Endpoint id must not be empty");
  13. synchronized (this.listenerContainers) {
  14. Assert.state(!this.listenerContainers.containsKey(id),
  15. "Another endpoint is already registered with id '" + id + "'");
  16. MessageListenerContainer container = createListenerContainer(endpoint, factory);
  17. this.listenerContainers.put(id, container);
  18. if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
  19. List<MessageListenerContainer> containerGroup;
  20. if (this.applicationContext.containsBean(endpoint.getGroup())) {
  21. containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
  22. }
  23. else {
  24. containerGroup = new ArrayList<MessageListenerContainer>();
  25. this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
  26. }
  27. containerGroup.add(container);
  28. }
  29. if (startImmediately) {
  30. startIfNecessary(container);
  31. }
  32. }
  33. // 其实就是调用了RabbitListenerContainerFactory的createListenerContainer生成了一个MessageListenerContainer对象
  34. protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint,
  35. RabbitListenerContainerFactory<?> factory) {
  36. MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);
  37. if (listenerContainer instanceof InitializingBean) {
  38. try {
  39. ((InitializingBean) listenerContainer).afterPropertiesSet();
  40. }
  41. catch (Exception ex) {
  42. throw new BeanInitializationException("Failed to initialize message listener container", ex);
  43. }
  44. }
  45. int containerPhase = listenerContainer.getPhase();
  46. if (containerPhase < Integer.MAX_VALUE) { // a custom phase value
  47. if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) {
  48. throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " +
  49. this.phase + " vs " + containerPhase);
  50. }
  51. this.phase = listenerContainer.getPhase();
  52. }
  53. return listenerContainer;
  54. }
  55. }

createListenerContainer方法调用了RabbitListenerContainerFactory接口的createListenerContainer方法创建一个MessageListenerContainer对象。

在这里,如果是通过RabbitAutoConfiguration自动配置的,那么RabbitListenerContainerFactory接口的具体实现类是SimpleRabbitListenerContainerFactory,MessageListenerContainer接口的具体实现类是SimpleMessageListenerContainer。有兴趣的话,可以去看下rabbitMq自动配置的几个类。

RabbitListenerContainerFactory接口的createListenerContainer方法是由AbstractRabbitListenerContainerFactory抽象类实现,代码如下:

  1. @Override
  2. public C createListenerContainer(RabbitListenerEndpoint endpoint) {
  3. C instance = createContainerInstance();
  4. if (this.connectionFactory != null) {
  5. instance.setConnectionFactory(this.connectionFactory);
  6. }
  7. if (this.errorHandler != null) {
  8. instance.setErrorHandler(this.errorHandler);
  9. }
  10. if (this.messageConverter != null) {
  11. instance.setMessageConverter(this.messageConverter);
  12. }
  13. if (this.acknowledgeMode != null) {
  14. instance.setAcknowledgeMode(this.acknowledgeMode);
  15. }
  16. if (this.channelTransacted != null) {
  17. instance.setChannelTransacted(this.channelTransacted);
  18. }
  19. if (this.autoStartup != null) {
  20. instance.setAutoStartup(this.autoStartup);
  21. }
  22. if (this.phase != null) {
  23. instance.setPhase(this.phase);
  24. }
  25. instance.setListenerId(endpoint.getId());
  26. // 最重要的一行!!!
  27. endpoint.setupListenerContainer(instance);
  28. initializeContainer(instance);
  29. return instance;
  30. }

乍一看,都是对MessageListenerContainer实例的初始化,实际上有一行,相当重要“ endpoint.setupListenerContainer(instance); ”,这一行最终是走到

  1. AbstractRabbitListenerEndpoint.setupListenerContainer
  1. public abstract class AbstractRabbitListenerEndpoint implements RabbitListenerEndpoint, BeanFactoryAware {
  2. ......
  3. // 设置MessageListenerContainer,最重要的就是设置监听的队列名称!!!
  4. @Override
  5. public void setupListenerContainer(MessageListenerContainer listenerContainer) {
  6. SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) listenerContainer;
  7. boolean queuesEmpty = getQueues().isEmpty();
  8. boolean queueNamesEmpty = getQueueNames().isEmpty();
  9. if (!queuesEmpty && !queueNamesEmpty) {
  10. throw new IllegalStateException("Queues or queue names must be provided but not both for " + this);
  11. }
  12. if (queuesEmpty) {
  13. Collection<String> names = getQueueNames();
  14. container.setQueueNames(names.toArray(new String[names.size()]));
  15. }
  16. else {
  17. Collection<Queue> instances = getQueues();
  18. container.setQueues(instances.toArray(new Queue[instances.size()]));
  19. }
  20. container.setExclusive(isExclusive());
  21. if (getPriority() != null) {
  22. Map<String, Object> args = new HashMap<String, Object>();
  23. args.put("x-priority", getPriority());
  24. container.setConsumerArguments(args);
  25. }
  26. if (getAdmin() != null) {
  27. container.setRabbitAdmin(getAdmin());
  28. }
  29. setupMessageListener(listenerContainer);
  30. }
  31. // 创建MessageListener
  32. protected abstract MessageListener createMessageListener(MessageListenerContainer container);
  33. // 创建MessageListener,设置到MessageListenerContainer 里
  34. private void setupMessageListener(MessageListenerContainer container) {
  35. MessageListener messageListener = createMessageListener(container);
  36. Assert.state(messageListener != null, "Endpoint [" + this + "] must provide a non null message listener");
  37. container.setupMessageListener(messageListener);
  38. }
  39. ......
  40. }

用@RabbitLinstener注解的方法,使用的endpoint是MethodRabbitListenerEndpoint继承自AbstractRabbitListenerEndpoint,所以看看AbstractRabbitListenerEndpoint的createMessageListener方法

  1. public class MethodRabbitListenerEndpoint extends AbstractRabbitListenerEndpoint {
  2. ......
  3. @Override
  4. protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) {
  5. Assert.state(this.messageHandlerMethodFactory != null,
  6. &nbsp

以上就是@RabbitListener起作用的原理是什么的详细内容,更多关于@RabbitListener起作用的原理是什么的资料请关注九品源码其它相关文章!