栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 软件开发 > 后端开发 > Java

使用sprintboot 优雅的整合 mqtt

Java 更新时间:发布时间: 百科书网 趣学号

mqtt 有许多实现的产品,这里使用国内使用比较多的emqx,可以参考 emqx安装 来安装环境

emqx 官方有提供java示例来连接mqtt:传送门

下面我使用springboot 来优雅的整合下

创建springboot 工程

pom


    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.6.0
         
    
    top.wushanghui
    mqtt-client
    0.0.1-SNAPSHOT
    mqtt-client
    mqtt-client
    
        11
    
    
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.boot
            spring-boot-configuration-processor
            true
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        

        
            org.eclipse.paho
            org.eclipse.paho.client.mqttv3
            1.2.5
        

        
            org.projectlombok
            lombok
        
    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    



主要是引入mqtt依赖:


    org.eclipse.paho
    org.eclipse.paho.client.mqttv3
    1.2.5

配置文件(yml)

application.yml

spring:
  profiles:
    active: dev

application-dev.yml

server:
  port: 8080
emqx:
  host: tcp://192.168.33.10:1883
  clientId: MQTT_IOT_DEFAULT_CLIENT
  username:
  password:
  timeout: 1000
  keepAlive: 60
配置类

EmqxProperties

package top.wushanghui.mqttclient.config;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.springframework.boot.context.properties.ConfigurationProperties;


@Getter
@Setter
@ToString
@ConfigurationProperties(prefix = "emqx")
public class EmqxProperties {

    private String host;
    private String clientId;
    private String username;
    private String password;
    private Integer timeout;
    private Integer keepAlive;

}

EmqxConfiguration

package top.wushanghui.mqttclient.config;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import top.wushanghui.mqttclient.callback.MyPushCallback;
import top.wushanghui.mqttclient.enums.MqttTopicEnum;

@Slf4j
@Configuration
@EnableConfigurationProperties(EmqxProperties.class)
public class EmqxConfiguration {

    @Bean
    public MqttClient mqttClient(EmqxProperties properties, MqttConnectOptions mqttConnectOptions, MyPushCallback myPushCallback) throws MqttException {
        log.info("{}", properties);
        String cid = properties.getClientId() + System.currentTimeMillis();
        MqttClient mqttClient = new MqttClient(properties.getHost(), cid, new MemoryPersistence());
        mqttClient.setCallback(myPushCallback);
        myPushCallback.setMqttClient(mqttClient);

        mqttClient.connect(mqttConnectOptions);
        log.info("连接 emqx 成功 当前客户端id={}", mqttClient.getClientId());
        mqttClient.subscribe(MqttTopicEnum.DEFAULT_TOPIC.getTopic(), MqttTopicEnum.DEFAULT_TOPIC.getQos());
        log.info("mqttClient={}", mqttClient);
        log.info("myPushCallback={}", myPushCallback);
        return mqttClient;
    }

    @Bean
    public MqttConnectOptions mqttConnectOptions(EmqxProperties properties) {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setUserName(properties.getUsername());
        options.setPassword(properties.getPassword().toCharArray());
        options.setConnectionTimeout(properties.getTimeout());
        options.setKeepAliveInterval(properties.getKeepAlive());
        options.setAutomaticReconnect(true);
        return options;
    }

}

上述代码中,往spring容器中注入了MqttClient 对象,如果需要发布消息的话,可以直接在类中注入 MqttClient 直接使用:

mqttClient.publish(String topic, MqttMessage message)
枚举

MqttTopicEnum

package top.wushanghui.mqttclient.enums;

public enum MqttTopicEnum {

    
    DEFAULT_TOPIC("sensor/+/#", 0);

    
    private String topic;
    
    private Integer qos;

    MqttTopicEnum(String topic, Integer qos) {
        this.topic = topic;
        this.qos = qos;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public Integer getQos() {
        return qos;
    }

    public void setQos(Integer qos) {
        this.qos = qos;
    }
}

回调(消费消息)
package top.wushanghui.mqttclient.callback;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.stereotype.Component;
import top.wushanghui.mqttclient.enums.MqttTopicEnum;


@Slf4j
@Component
public class MyPushCallback implements MqttCallbackExtended {

    private MqttClient mqttClient;

    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        log.info("MyPushCallback connectComplete reconnect={}, serverURI={}", reconnect, serverURI);
        if (reconnect) {
            // 重新连接后,需重新订阅主题
            try {
                log.info("mqttClient={}", mqttClient);
                mqttClient.subscribe(MqttTopicEnum.DEFAULT_TOPIC.getTopic(), MqttTopicEnum.DEFAULT_TOPIC.getQos());
            } catch (MqttException e) {
                log.error("订阅主题失败", e);
            }
        }
    }

    @Override
    public void connectionLost(Throwable cause) {
        log.error("MyPushCallback connectionLost cause={}", cause.toString());
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        log.info("MyPushCallback messageArrived topic={}, message={}", topic, message);
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        log.info("MyPushCallback deliveryComplete token={}", token);
    }

    public void setMqttClient(MqttClient mqttClient) {
        this.mqttClient = mqttClient;
    }
}

在messageArrived方法中可以消费消息。

启动服务,用工具发个消息试一下:

在控制台就可以收到消息:

需要源码的小伙伴点击:传送门

转载请注明:文章转载自 www.051e.com
本文地址:http://www.051e.com/it/601644.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号