代码拉取完成,页面将自动刷新
springboot快速接入MQTT,一个方法注解就搞定消息处理
之前做一个硬件项目用到mqtt,处理设备上传的数据
部署产线环境是阿里云,测试用的EMQ都需要支持,就有了这个项目
此项目整合springboot部分和topic规则搬运了一个项目,刚接触这个,十分感谢前辈的经验https://gitee.com/yezhihao/mqtt-sample
关于共享订阅的高可用兼容,如果有方案还望各位不吝赐教
<dependency>
<groupId>com.stanwind</groupId>
<artifactId>spring-boot-windmq</artifactId>
<version>1.1.2-RELEASE</version>
</dependency>
https://gitee.com/sense7/windmq-demo.git
<!-- windmq dependency -->
<dependency>
<groupId>com.stanwind</groupId>
<artifactId>spring-boot-windmq</artifactId>
<version>1.1.0-RELEASE</version>
</dependency>
<!-- MQTT -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<exclusions>
<exclusion>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<groupId>org.eclipse.paho</groupId>
</exclusion>
</exclusions>
</dependency>
<!-- 1.2.0 版本有bug -->
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.1</version>
</dependency>
@EnableWindMQ
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
void addTopic(String... topic);
void addTopic(String topic, int qos);
void addTopics(String[] topic, int[] qos);
void removeTopic(String... topic);
/**
* 发送消息给device 需要payload Class上有@Topic
* @param deviceId
* @param payload
*/
void notify(String deviceId, Object payload);
/**
* 发送消息到topic
* @param topic
* @param payload
*/
void notifyToTopic(String topic, Object payload);
/**
* 同步确认发送消息给设备
* @param deviceId
* @param payload
* @return
*/
MqttResponse request(String deviceId, MqttRequest payload);
/**
* 同步确认发送消息给设备
* @param deviceId
* @param payload
* @param timeout 等待超时 ms
* @return
*/
MqttResponse request(String deviceId, MqttRequest payload, long timeout);
/**
* 响应同步消息
* @param message
* @return
*/
boolean response(Message<MqttResponse> message);
/**
* 客户端请求通用回复
* @param messageId
* @param deviceId
* @param result
*/
void sendCommonResponse(Long messageId, String deviceId, Integer result);
/**
* 客户端请求通用回复
* @param messageId
* @param deviceId
*/
void sendCommonResponse(Long messageId, String deviceId);
@TopicHandler(topic = "$SYS/brokers/{node}/clients/{deviceId}/connected")
public void connected(MQTTMsg msg) {
ClientReqVO clientReqVO = JSONObject.parseObject(msg.getPayload().toString(), ClientReqVO.class);
process(clientReqVO);
}
@Service
public class DemoHandler extends BaseTopicHandler {
@TopicHandler(topic = "IOT_SERVER/ping/{instanceId}/{taskId}/{param1}")
public void uploadPingData(MQTTMsg msg) {
String taskId = getParam("taskId");
String param1 = getParam("param1");
//或
MqttContext.getContext().getParams().getOrDefault("taskId", null);
}
}
例: Server 要求客户端往topic IOT_SERVER/log/{instanceId}/{taskId}发送日志数据
//拿到初始化的配置 内有生成的instanceId
@Resource
private MqttConfig mqttConfig;
//随机生成的任务id
String taskId = "abcd";
String topic = "IOT_SERVER/log/{instanceId}/{taskId}".replace("{taskId}", taskId).replace("{instanceId}", mqttConfig.getInstanceId());
//订阅监听客户端回写数据
wmHolder.addTopic(topic);
//发送消息告知客户端往哪里写日志数据
messageService.notify(cli, request);
@TopicHandler(topic = "IOT_SERVER/log/{instanceId}/{taskId}")
public void uploadLogData(MQTTMsg msg) {
if (!currentHandle()) {
log.debug("非当前实例任务: [{}]", msg);
return;
}
if (接收完毕) {
//取消订阅
wmHolder.removeTopic(msg.getTopic());
}
}
@Autowired
private MqttConfig mqttConfig;
@Autowired
private WMHolder holder;
public MqttConnVO generateMqttConnConfig(String sn) throws Exception {
String r = mqttConfig.getAclRead().replaceAll(DEVICE_ID, stcUtil.sn2cli(sn));
String w = mqttConfig.getAclWrite().replaceAll(DEVICE_ID, stcUtil.sn2cli(sn));
MQTTClientConnData connData = holder.getClientConnData(Utils.splitToList(r), Utils.splitToList(w));
MqttConnVO vo = new MqttConnVO();
//缺省外网地址则返回统一地址 否则返回外网地址
vo.setUris(ArrayUtils.isEmpty(mqttConfig.getPubServerURIs()) ? mqttConfig.getServerURIs() : mqttConfig.getPubServerURIs());
vo.setReadTopics(r);
vo.setWriteTopics(w);
vo.setEnc(mqttConfig.getEncTable());
vo.setEncSize(mqttConfig.getEncCount());
BeanUtils.copyProperties(connData, vo);
log.info("{} 获取mqtt: {}", sn, vo);
return vo;
}
@Data
public class MqttConnVO implements Serializable {
private static final long serialVersionUID = 1L;
private String[] uris;
private Long expire;
private String username;
private String password;
private String readTopics;
private String writeTopics;
private String enc;
private Integer encSize;
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。
1. 开源生态
2. 协作、人、软件
3. 评估模型