SpringBoot如何实现MQTT消息发送和接收

其他教程   发布日期:2023年08月18日   浏览次数:426

今天小编给大家分享一下SpringBoot如何实现MQTT消息发送和接收的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考一下,希望大家阅读完这篇文章后有所收获,下面我们一起来了解一下吧。

Spring integration交互逻辑

对于发布者:

1.消息通过消息网关发送出去,由

  1. MessageChannel
的实例
  1. DirectChannel
处理发送的细节。

2.

  1. DirectChannel
收到消息后,内部通过
  1. MessageHandler
的实例
  1. MqttPahoMessageHandler
发送到指定的 Topic。

对于订阅者:

1.通过注入

  1. MessageProducerSupport
的实例
  1. MqttPahoMessageDrivenChannelAdapter
,实现订阅 Topic 和绑定消息消费的
  1. MessageChannel

2.同样由

  1. MessageChannel
的实例
  1. DirectChannel
处理消费细节。

Channel 消息后会发送给我们自定义的

  1. MqttInboundMessageHandler
实例进行消费。

可以看到整个处理的流程和前面将的基本一致。Spring Integration 就是抽象出了这么一套消息通信的机制,具体的通信细节由它集成的中间件来决定。

1、maven依赖

  1. <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-integration -->
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-integration</artifactId>
  5. <version>2.5.1</version>
  6. </dependency>
  7. <!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-stream -->
  8. <dependency>
  9. <groupId>org.springframework.integration</groupId>
  10. <artifactId>spring-integration-stream</artifactId>
  11. <version>5.5.5</version>
  12. </dependency>
  13. <!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-mqtt -->
  14. <dependency>
  15. <groupId>org.springframework.integration</groupId>
  16. <artifactId>spring-integration-mqtt</artifactId>
  17. <version>5.5.5</version>
  18. </dependency>

2、yaml配置文件

  1. #mqtt配置
  2. mqtt:
  3. username: 123
  4. password: 123
  5. #MQTT-服务器连接地址,如果有多个,用逗号隔开
  6. url: tcp://127.0.0.1:1883
  7. #MQTT-连接服务器默认客户端ID
  8. client:
  9. id: ${random.value}
  10. default:
  11. #MQTT-默认的消息推送主题,实际可在调用接口时指定
  12. topic: topic,mqtt/test/#
  13. #连接超时
  14. completionTimeout: 3000

3、mqtt生产者消费者配置类

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.integration.annotation.IntegrationComponentScan;
  8. import org.springframework.integration.annotation.ServiceActivator;
  9. import org.springframework.integration.channel.DirectChannel;
  10. import org.springframework.integration.core.MessageProducer;
  11. import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
  12. import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
  13. import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
  14. import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
  15. import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
  16. import org.springframework.messaging.Message;
  17. import org.springframework.messaging.MessageChannel;
  18. import org.springframework.messaging.MessageHandler;
  19. import org.springframework.messaging.MessagingException;
  20. import java.util.Arrays;
  21. import java.util.List;
  22. /**
  23. * mqtt 推送and接收 消息类
  24. **/
  25. @Configuration
  26. @IntegrationComponentScan
  27. @Slf4j
  28. public class MqttSenderAndReceiveConfig {
  29. private static final byte[] WILL_DATA;
  30. static {
  31. WILL_DATA = "offline".getBytes();
  32. }
  33. @Autowired
  34. private MqttReceiveHandle mqttReceiveHandle;
  35. @Value("${mqtt.username}")
  36. private String username;
  37. @Value("${mqtt.password}")
  38. private String password;
  39. @Value("${mqtt.url}")
  40. private String hostUrl;
  41. @Value("${mqtt.client.id}")
  42. private String clientId;
  43. @Value("${mqtt.default.topic}")
  44. private String defaultTopic;
  45. @Value("${mqtt.completionTimeout}")
  46. private int completionTimeout; //连接超时
  47. /**
  48. * MQTT连接器选项
  49. **/
  50. @Bean(value = "getMqttConnectOptions")
  51. public MqttConnectOptions getMqttConnectOptions1() {
  52. MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
  53. // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
  54. mqttConnectOptions.setCleanSession(true);
  55. // 设置超时时间 单位为秒
  56. mqttConnectOptions.setConnectionTimeout(10);
  57. mqttConnectOptions.setAutomaticReconnect(true);
  58. mqttConnectOptions.setUserName(username);
  59. mqttConnectOptions.setPassword(password.toCharArray());
  60. mqttConnectOptions.setServerURIs(new String[]{hostUrl});
  61. // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
  62. mqttConnectOptions.setKeepAliveInterval(10);
  63. // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
  64. //mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false);
  65. return mqttConnectOptions;
  66. }
  67. /**
  68. * MQTT工厂
  69. **/
  70. @Bean
  71. public MqttPahoClientFactory mqttClientFactory() {
  72. DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
  73. factory.setConnectionOptions(getMqttConnectOptions1());
  74. return factory;
  75. }
  76. /**
  77. * MQTT信息通道(生产者)
  78. **/
  79. @Bean
  80. public MessageChannel mqttOutboundChannel() {
  81. return new DirectChannel();
  82. }
  83. /**
  84. * MQTT消息处理器(生产者)
  85. **/
  86. @Bean
  87. @ServiceActivator(inputChannel = "mqttOutboundChannel")
  88. public MessageHandler mqttOutbound() {
  89. MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId + "_producer", mqttClientFactory());
  90. messageHandler.setAsync(true);
  91. messageHandler.setDefaultTopic(defaultTopic);
  92. messageHandler.setAsyncEvents(true); // 消息发送和传输完成会有异步的通知回调
  93. //设置转换器 发送bytes数据
  94. DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
  95. converter.setPayloadAsBytes(true);
  96. return messageHandler;
  97. }
  98. /**
  99. * 配置client,监听的topic
  100. * MQTT消息订阅绑定(消费者)
  101. **/
  102. @Bean
  103. public MessageProducer inbound() {
  104. List<String> topicList = Arrays.asList(defaultTopic.trim().split(","));
  105. String[] topics = new String[topicList.size()];
  106. topicList.toArray(topics);
  107. MqttPahoMessageDrivenChannelAdapter adapter =
  108. new MqttPahoMessageDrivenChannelAdapter(clientId + "_consumer", mqttClientFactory(), topics);
  109. adapter.setCompletionTimeout(completionTimeout);
  110. DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter();
  111. converter.setPayloadAsBytes(true);
  112. adapter.setConverter(converter);
  113. adapter.setQos(2);
  114. adapter.setOutputChannel(mqttInputChannel());
  115. return adapter;
  116. }
  117. /**
  118. * MQTT信息通道(消费者)
  119. **/
  120. @Bean
  121. public MessageChannel mqttInputChannel() {
  122. return new DirectChannel();
  123. }
  124. /**
  125. * MQTT消息处理器(消费者)
  126. **/
  127. @Bean
  128. @ServiceActivator(inputChannel = "mqttInputChannel")
  129. public MessageHandler handler() {
  130. return new MessageHandler() {
  131. @Override
  132. public void handleMessage(Message<?> message) throws MessagingException {
  133. //处理接收消息
  134. mqttReceiveHandle.handle(message);
  135. }
  136. };
  137. }
  138. }

4、消息处理类

  1. /**
  2. * mqtt客户端消息处理类
  3. **/
  4. @Slf4j
  5. @Component
  6. public class MqttReceiveHandle {
  7. public void handle(Message<?> message) {
  8. log.info("收到订阅消息: {}", message);
  9. String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();
  10. log.info("消息主题:{}", topic);
  11. Object payLoad = message.getPayload();
  12. byte[] data = (byte[]) payLoad;
  13. Packet packet = Packet.parse(data);
  14. log.info("发送的Packet数据{}", JSON.toJSONString(packet));
  15. }
  16. }

5、mqtt发送接口

  1. import org.springframework.integration.annotation.MessagingGateway;
  2. import org.springframework.integration.mqtt.support.MqttHeaders;
  3. import org.springframework.messaging.handler.annotation.Header;
  4. /**
  5. * mqtt发送消息
  6. * (defaultRequestChannel = "mqttOutboundChannel" 对应config配置)
  7. * **/
  8. @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
  9. public interface MqttGateway {
  10. /**
  11. * 发送信息到MQTT服务器
  12. *
  13. * @param
  14. */
  15. void sendToMqttObject(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
  16. /**
  17. * 发送信息到MQTT服务器
  18. *
  19. * @param topic 主题
  20. * @param payload 消息主体
  21. */
  22. void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
  23. /**
  24. * 发送信息到MQTT服务器
  25. *
  26. * @param topic 主题
  27. * @param qos 对消息处理的几种机制。
  28. * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
  29. * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
  30. * 2 多了一次去重的动作,确保订阅者收到的消息有一次。
  31. * @param payload 消息主体
  32. */
  33. void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
  34. /**
  35. * 发送信息到MQTT服务器
  36. *
  37. * @param topic 主题
  38. * @param payload 消息主体
  39. */
  40. void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, Object payload);
  41. /**
  42. * 发送信息到MQTT服务器
  43. *
  44. * @param topic 主题
  45. * @param payload 消息主体
  46. */
  47. void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, byte[] payload);
  48. }

6、mqtt事件监听类

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.context.event.EventListener;
  3. import org.springframework.integration.mqtt.event.MqttConnectionFailedEvent;
  4. import org.springframework.integration.mqtt.event.MqttMessageDeliveredEvent;
  5. import org.springframework.integration.mqtt.event.MqttMessageSentEvent;
  6. import org.springframework.integration.mqtt.event.MqttSubscribedEvent;
  7. import org.springframework.stereotype.Component;
  8. @Slf4j
  9. @Component
  10. public class MqttListener {
  11. /**
  12. * 连接失败的事件通知
  13. * @param mqttConnectionFailedEvent
  14. */
  15. @EventListener(classes = MqttConnectionFailedEvent.class)
  16. public void listenerAction(MqttConnectionFailedEvent mqttConnectionFailedEvent) {
  17. log.info("连接失败的事件通知");
  18. }
  19. /**
  20. * 已发送的事件通知
  21. * @param mqttMessageSentEvent
  22. */
  23. @EventListener(classes = MqttMessageSentEvent.class)
  24. public void listenerAction(MqttMessageSentEvent mqttMessageSentEvent) {
  25. log.info("已发送的事件通知");
  26. }
  27. /**
  28. * 已传输完成的事件通知
  29. * 1.QOS == 0,发送消息后会即可进行此事件回调,因为不需要等待回执
  30. * 2.QOS == 1,发送消息后会等待ACK回执,ACK回执后会进行此事件通知
  31. * 3.QOS == 2,发送消息后会等待PubRECV回执,知道收到PubCOMP后会进行此事件通知
  32. * @param mqttMessageDeliveredEvent
  33. */
  34. @EventListener(classes = MqttMessageDeliveredEvent.class)
  35. public void listenerAction(MqttMessageDeliveredEvent mqttMessageDeliveredEvent) {
  36. log.info("已传输完成的事件通知");
  37. }
  38. /**
  39. * 消息订阅的事件通知
  40. * @param mqttSubscribedEvent
  41. */
  42. @EventListener(classes = MqttSubscribedEvent.class)
  43. public void listenerAction(MqttSubscribedEvent mqttSubscribedEvent) {
  44. log.info("消息订阅的事件通知");
  45. }
  46. }

7、接口测试

  1. @Resource
  2. private MqttGateway mqttGateway;
  3. /**
  4. * sendData 消息
  5. * topic 订阅主题
  6. **/
  7. @RequestMapping(value = "/sendMqtt",method = RequestMethod.POST)
  8. public String sendMqtt(String sendData, String topic) {
  9. MqttMessage mqttMessage = new MqttMessage();
  10. mqttGateway.sendToMqtt(topic, sendData);
  11. //mqttGateway.sendToMqttObject(topic, sendData.getBytes());
  12. return "OK";
  13. }

以上就是SpringBoot如何实现MQTT消息发送和接收的详细内容,更多关于SpringBoot如何实现MQTT消息发送和接收的资料请关注九品源码其它相关文章!