基于org.eclipse.paho.client.mqttv3实现的一个简易的MQTT客户端
MQTT
全称为Message Queuing Telemetry Transport(消息队列遥测传输),是一种基于发布/订阅范式的“轻量级”消息协议,需要一个消息中间件协议主要有三种身份:发布者(Publisher)、代理服务器(Broker)、订阅者(Subscriber)。发布者发布消息到代理服务器,再由订阅者消费消息。
这里我们选择EMQX作为代理服务器
编写一个自定义的MQTT的客户端,我们需要实现一下几个方法:
以下代码是基于org.eclipse.paho.client.mqttv3
实现的一个简单的客户端
/************************* Client ****************************************/
package xenoscode.cn.mqttclient.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* @author Xenos
* @version V1.0
* @Package xenoscode.cn.mqttclient.mqtt
* @date 2020/9/1 16:46
*/
public class SimpleMqttClient {
/**
* MQTT异步客户端
*/
private MqttAsyncClient client = null;
/**
* 连接配置
*/
private MqttConnectOptions options = null;
/**
* 客户端的ID,唯一且不可重复
*/
private String clientid;
private String userName;
/**
* 服务器地址
*/
private String host;
private int timeOut;
private int aliveTime;
/**
* 订阅的主题列表
*/
private String[] listTopic;
/**
* 主题列表对应的Qos列表
*/
private int[] listQos;
/**
* 最大尝试连接次数
*/
private int maxConnectTimes;
public SimpleMqttClient(String clientid, String userName, String host, int timeOut, int aliveTime, String[] listTopic, int[] listQos, int maxConnectTimes) {
this.clientid = clientid;
this.userName = userName;
this.host = host;
this.timeOut = timeOut;
this.aliveTime = aliveTime;
this.listTopic = listTopic;
this.listQos = listQos;
this.maxConnectTimes = maxConnectTimes;
}
/**
* 连接MQTT服务器
*/
public synchronized void connect() {
if (options == null) {
setOptions();
}
if (client == null) {
creatClient();
}
int connectTimes = 0;
while (connectTimes < maxConnectTimes && !client.isConnected()) {
try {
IMqttToken token = client.connect(options);
token.waitForCompletion();
connectTimes++;
} catch (Exception e) {
e.printStackTrace();
System.out.println(clientid + " 连接时发生错误: " + e.toString());
}
}
}
/**
* 断开与MQTT服务器的连接
*/
public synchronized void disconnect() {
if (client != null && client.isConnected()) {
try {
IMqttToken token = client.disconnect();
token.waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
System.out.println(clientid + " 断开连接时发生错误: " + e.toString());
}
}
client = null;
}
/**
* 刷新MQTT的连接
*/
public synchronized void refresh() {
disconnect();
setOptions();
creatClient();
connect();
}
/**
* 消息订阅
*/
public void subscribe() {
if (client != null && client.isConnected()) {
try {
IMqttToken token = client.subscribe(listTopic, listQos);
token.waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
System.out.println(clientid + "订阅主题时发生错误: " + e.toString());
}
}
}
/**
* 消息推送
*
* @param topic 消息的主题名
* @param message 消息报文
*/
public void publish(String topic, MqttMessage message) {
if (client != null && client.isConnected()) {
try {
IMqttDeliveryToken token = client.publish(topic, message);
token.waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
System.out.println(clientid + "推送消息时发生错误: " + e.toString());
}
}
}
/**
* @return 是否处于连接状态
*/
public boolean isConnected() {
return client != null && client.isConnected();
}
public String getClientid() {
return clientid;
}
public int getMaxConnectTimes() {
return maxConnectTimes;
}
/**
* 设置连接属性
*/
private void setOptions() {
if (options != null) {
options = null;
}
options = new MqttConnectOptions();
//将CleanSession设置为true时,一旦客户端断开连接,就会清除相关Session
options.setCleanSession(true);
options.setConnectionTimeout(timeOut);
options.setKeepAliveInterval(aliveTime);
options.setUserName(userName);
//org.eclipse.paho.client.mqttv3提供的自动重连,默认为false,也可以在回调中进行重连
// options.setAutomaticReconnect(true);
}
/**
* 创建客户端
*/
private void creatClient() {
if (client == null) {
try {
// host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
client = new MqttAsyncClient(host, clientid, new MemoryPersistence());
// 设置回调函数
client.setCallback(new SimpleMqttClientCallback(SimpleMqttClient.this));
} catch (MqttException e) {
e.printStackTrace();
System.out.println("创建连接客户端实例: [" + clientid + "] 时发生错误:" + e.toString());
}
}
}
}
/************************* 回调类 SimpleMqttClientCallback ****************************************/
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public class SimpleMqttClientCallback implements MqttCallbackExtended {
private SimpleMqttClient client;
private int connectTimes = 0;
public SimpleMqttClientCallback(SimpleMqttClient client) {
this.client = client;
}
@Override
public void connectComplete(boolean b, String s) {
System.out.println("————" + client.getClientid() + " 连接成功!————");
//连接成功后,自动订阅主题
client.subscribe();
connectTimes = 0;
}
@Override
public void connectionLost(Throwable throwable) {
System.out.println("————" + client.getClientid() + " 连接丢失!————");
//可以在此处做重连处理
if (connectTimes < client.getMaxConnectTimes()) {
client.refresh();
connectTimes++;
} else {
client.disconnect();
}
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
LocalDateTime startTime = LocalDateTime.now();
System.out.println("[MQTT]" + client.getClientid() + " ----成功接收消息!---- 时间: " + startTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
String content = new String(mqttMessage.getPayload());
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + mqttMessage.getQos());
System.out.println("接收消息内容 : " + content);
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
System.out.println("[MQTT]" + client.getClientid() + " ----成功发送消息!---- 时间: " + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
}
}
将客户端连接需要的一些属性写在配置文件中
mqtt:
# emq的默认端口为1883
host: tcp://127.0.0.1:1883
client:
clientid: testClient
time-out: 10
alive-time: 20
max-connect-times: 5
topics: ["HELLOWORLD"]
qos: [2]
编写配置类
/******************** SimpleMqttClientProperties ***********************************/
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Component
@Setter
@Getter
@ConfigurationProperties("mqtt.client")
public class SimpleMqttClientProperties {
private String clientid;
private String userName;
private int timeOut;
private int aliveTime;
private int maxConnectTimes;
private String[] topics;
private int[] qos;
}
/******************** SimpleMqttClientProperties ***********************************/
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import xenoscode.cn.mqttclient.mqtt.SimpleMqttClient;
@Configuration
public class MqttClientPoolConfiguration {
@Value("${mqtt.host}")
private String host;
@Autowired
private SimpleMqttClientProperties simpleMqttClientProperties;
@Bean
SimpleMqttClient mqttClient() {
SimpleMqttClient mqttClient = new SimpleMqttClient(simpleMqttClientProperties.getClientid(),
simpleMqttClientProperties.getUserName(),
host,
simpleMqttClientProperties.getTimeOut(),
simpleMqttClientProperties.getAliveTime(),
simpleMqttClientProperties.getTopics(),
simpleMqttClientProperties.getQos(),
simpleMqttClientProperties.getMaxConnectTimes());
return mqttClient;
}
}
初始化连接
/************************* 启动类 ********************************/
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SimpleMqttPoolApplication {
public static void main(String[] args) {
SpringApplication.run(SimpleMqttPoolApplication.class, args);
}
}
/************************* Initialize ********************************/
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import xenoscode.cn.mqttclient.mqtt.SimpleMqttClient;
@Component
public class Initialize implements CommandLineRunner {
@Autowired
private SimpleMqttClient simpleMqttClient;
@Override
public void run(String... args) throws Exception {
simpleMqttClient.connect();
}
}
实现
CommandLineRunner
接口,并重写run
方法,可以让其中的方法在Spring Boot项目启动后执行
看一下运行结果
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.3.3.RELEASE)
2020-09-04 23:18:45.647 INFO 5580 --- [ main] x.cn.mqttpool.SimpleMqttPoolApplication : Starting SimpleMqttPoolApplication on Xenos with PID 5580 (D:\study\workspace\SimpleMqttPool\target\classes started by 89314 in D:\study\workspace\SimpleMqttPool)
2020-09-04 23:18:45.649 INFO 5580 --- [ main] x.cn.mqttpool.SimpleMqttPoolApplication : No active profile set, falling back to default profiles: default
2020-09-04 23:18:46.088 INFO 5580 --- [ main] x.cn.mqttpool.SimpleMqttPoolApplication : Started SimpleMqttPoolApplication in 0.747 seconds (JVM running for 1.031)
————testClient 连接成功!————
可以看到日志中输出连接成功的字样,为了确定是否真的连接成功,我们可以登录EMQ的Dashboard查看(dashboard的默认端口为18083,用户名为admin,密码为public):
可以看到客户端与服务器确实已经连接,在看一下是否有主题的订阅
点击Tools,在WebSocket中,可以进行订阅和推送的测试
连接测试用的客户端
推送测试报文
查看日志输出是否接收到报文
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.3.3.RELEASE)
2020-09-04 23:18:45.647 INFO 5580 --- [ main] x.cn.mqttpool.SimpleMqttPoolApplication : Starting SimpleMqttPoolApplication on Xenos with PID 5580 (D:\study\workspace\SimpleMqttPool\target\classes started by 89314 in D:\study\workspace\SimpleMqttPool)
2020-09-04 23:18:45.649 INFO 5580 --- [ main] x.cn.mqttpool.SimpleMqttPoolApplication : No active profile set, falling back to default profiles: default
2020-09-04 23:18:46.088 INFO 5580 --- [ main] x.cn.mqttpool.SimpleMqttPoolApplication : Started SimpleMqttPoolApplication in 0.747 seconds (JVM running for 1.031)
————testClient 连接成功!————
[MQTT]testClient ----成功接收消息!---- 时间: 2020-09-04 23:44:51.753
接收消息主题 : HELLOWORLD
接收消息Qos : 2
接收消息内容 : { "msg": "Hello, World!" }
可以看到我们的自定义的客户端成功接收到了来自另外一个客户端的消息。
编写一个测试用的Controller
,每次访问该地址都想服务端推送一条测试报文
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import xenoscode.cn.mqttclient.mqtt.SimpleMqttClient;
@RestController
public class TestController {
@Autowired
private SimpleMqttClient simpleMqttClient;
@GetMapping("/test/publish")
public void publishTest() {
byte[] payload = "publish test".getBytes();
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(2);
mqttMessage.setPayload(payload);
String topic = "PUBLISH_TEST";
simpleMqttClient.publish(topic, mqttMessage);
}
}
在dashboard中订阅该主题
访问接口,查看结果
后台日志
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.3.3.RELEASE)
2020-09-04 23:59:20.843 INFO 3016 --- [ main] x.cn.mqttpool.SimpleMqttPoolApplication : Starting SimpleMqttPoolApplication on Xenos with PID 3016 (D:\study\workspace\SimpleMqttPool\target\classes started by 89314 in D:\study\workspace\SimpleMqttPool)
2020-09-04 23:59:20.845 INFO 3016 --- [ main] x.cn.mqttpool.SimpleMqttPoolApplication : No active profile set, falling back to default profiles: default
2020-09-04 23:59:21.507 INFO 3016 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port(s): 8080 (http)
2020-09-04 23:59:21.515 INFO 3016 --- [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2020-09-04 23:59:21.515 INFO 3016 --- [ main] org.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/9.0.37]
2020-09-04 23:59:21.581 INFO 3016 --- [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2020-09-04 23:59:21.581 INFO 3016 --- [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 698 ms
2020-09-04 23:59:21.747 INFO 3016 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'applicationTaskExecutor'
2020-09-04 23:59:21.863 INFO 3016 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2020-09-04 23:59:21.869 INFO 3016 --- [ main] x.cn.mqttpool.SimpleMqttPoolApplication : Started SimpleMqttPoolApplication in 1.321 seconds (JVM running for 1.621)
————testClient 连接成功!————
2020-09-04 23:59:24.962 INFO 3016 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2020-09-04 23:59:24.962 INFO 3016 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2020-09-04 23:59:24.965 INFO 3016 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 3 ms
[MQTT]testClient ----成功发送消息!---- 时间: 2020-09-04 23:59:24.983
Dashboard结果
可以看到当前的客户端能够将数据上送到服务器中
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。