MQTT对接
基础通信Topic
基础通信Topic是物联网平台中用于设备与云端之间进行基本数据传输和控制命令交换的主题。这些Topic通常用于设备的初始化、配置更新、状态报告和基础命令执行。通过这些Topic,设备能够实现与云端的基本通信,包括:
- 设备初始化和配置:设备接入网络后,通过基础通信Topic向云端报告设备状态,获取初始配置和必要的参数设定。
- 状态报告:设备定期或在特定事件发生时,通过基础通信Topic向云端发送当前状态信息,如在线状态、运行参数和故障报警等。
- 基础控制命令:云端通过基础通信Topic向设备发送控制命令,如重启设备、更新配置、开启或关闭特定功能等。
基础通信Topic的设计目的是确保设备能够稳定可靠地与云端保持通信,实现基本的管理和控制功能。其主要特点是通信内容简单、频率较高且对实时性要求较高。
功能 | Topic 类 | 操作权限 | 描述信息 |
---|---|---|---|
配置更新 | /blade/sys/${productKey}/${deviceName}/thing/config/push | 订阅 | 云端主动下推配置信息 |
/blade/sys/${productKey}/${deviceName}/thing/config/get_reply | 云端响应配置信息 | ||
/blade/sys/${productKey}/${deviceName}/thing/config/get | 发布 | 设备端查询配置信息 | |
设备标签 | /blade/sys/${productKey}/${deviceName}/thing/deviceinfo/update_reply | 订阅 | 云端响应标签上报 |
/blade/sys/${productKey}/${deviceName}/thing/deviceinfo/delete | 设备删除标签信息 | ||
/blade/sys/${productKey}/${deviceName}/thing/deviceinfo/update | 发布 | 设备上报标签数据 | |
/blade/sys/${productKey}/${deviceName}/thing/deviceinfo/delete_reply | 云端响应标签删除 | ||
设备影子 | /blade/shadow/get/${productKey}/${deviceName} | 订阅 | 设备接收影子变更 |
/blade/shadow/update/${productKey}/${deviceName} | 发布 | 设备影子发布 | |
时钟同步 | /blade/ext/ntp/${productKey}/${deviceName}/response | 订阅 | NTP 时钟同步响应 |
/blade/ext/ntp/${productKey}/${deviceName}/request | 发布 | NTP 时钟同步请求 | |
广播 | /blade/broadcast/${productKey}/${identifier} | 订阅 | 广播 Topic,identifier 为用户自定义字符串 |
OTA 升级 | /blade/ota/device/upgrade/${productKey}/${deviceName} | 订阅 | 固件升级信息下行 |
/blade/ota/device/inform/${productKey}/${deviceName} | 发布 | 设备上报固件升级信息 | |
/blade/ota/device/progress/${productKey}/${deviceName} | 设备上报固件升级进度 | ||
/blade/sys/${productKey}/${deviceName}/thing/ota/firmware/get | 设备主动拉取固件升级信息 |
物模型通信Topic
物模型通信Topic是用于描述和交换物联网设备复杂数据结构的主题,涵盖设备的属性、命令和事件等信息。这些Topic基于物模型(Thing Model)的定义,使得不同类型和厂商的设备能够以统一的方式进行数据交互。物模型通信Topic的应用包括:
- 属性管理:通过物模型通信Topic,设备可以向云端报告其属性数据,如温度、湿度、电量等,云端也可以通过这些Topic下发属性设定指令。
- 命令执行:设备可以通过物模型通信Topic接收云端下发的操作命令,例如执行特定的功能或操作,完成后再通过Topic反馈执行结果。
- 事件上报:当设备发生特定事件时(如故障、警报触发),通过物模型通信Topic向云端上报事件信息,包括事件类型、发生时间和相关数据。
物模型通信Topic的设计旨在支持设备的复杂数据交换和控制需求,确保数据格式的一致性和通信的可靠性。其主要特点是数据结构复杂、内容丰富且对数据一致性要求较高。
功能 | Topic 类 | 操作权限 | 描述信息 |
---|---|---|---|
服务调用 | /blade/sys/${productKey}/${deviceName}/thing/service/${identifier} | 订阅 | 设备服务调用 |
/blade/sys/${productKey}/${deviceName}/thing/service/${identifier}_reply | 发布 | 设备端响应服务调用 | |
属性设置 | /blade/sys/${productKey}/${deviceName}/thing/service/property/set | 订阅 | 设备属性设置 |
属性上报 | /blade/sys/${productKey}/${deviceName}/thing/event/property/post_reply | 订阅 | 云端响应属性上报 |
/blade/sys/${productKey}/${deviceName}/thing/event/property/post | 发布 | 设备属性上报 | |
事件上报 | /blade/sys/${productKey}/${deviceName}/thing/event/${identifier}/post_reply | 订阅 | 云端响应事件上报 |
/blade/sys/${productKey}/${deviceName}/thing/event/${identifier}/post | 发布 | 设备事件上报 |
自定义Topic
自定义Topic是在物联网平台中为特定应用场景或业务需求而专门定义的通信主题。通过自定义Topic,设备和平台可以实现更灵活和精细化的数据传输和控制,满足特定的功能需求。自定义Topic通常用于以下场景:
- 特定业务逻辑:实现某些特定业务逻辑,如智能家居中的自定义控制命令或特定事件通知。
- 数据过滤和处理:根据特定条件过滤和处理数据,例如仅在特定时间段内发送数据或仅在特定事件发生时触发操作。
- 设备群组管理:管理和控制一组设备,通过自定义Topic实现群组消息的发布和订阅,便于批量操作。
- 增强安全性:通过使用自定义Topic,实现更细粒度的权限控制和数据隔离,增强系统的安全性。
自定义Topic举例
以下是自定义topic的示例,用于描述设备和应用程序之间的高效数据交互。通过这些自定义topic,设备可以实现更灵活的控制和管理,满足特定的应用需求。
功能 | 自定义Topic | 操作权限 | 描述信息 |
---|---|---|---|
灯光控制 | /blade/home/${productKey}/${deviceName}/livingroom/light/control | 发布 | 控制客厅灯光开关 |
/blade/home/${productKey}/${deviceName}/livingroom/light/status | 订阅 | 订阅客厅灯光状态 | |
温度监控 | /blade/home/${productKey}/${deviceName}/bedroom/temperature/set | 发布 | 设置卧室温度 |
/blade/home/${productKey}/${deviceName}/bedroom/temperature/reading | 订阅 | 订阅卧室温度数据 | |
安全报警 | /blade/home/${productKey}/${deviceName}/security/alarm/set | 发布 | 设置安全报警 |
/blade/home/${productKey}/${deviceName}/security/alarm/status | 订阅 | 订阅安全报警状态 |
一、后端编写模拟器接入
接入语言说明
目前先以java语言为例,编写mqtt接入的示例。未来会增加各种不同语言的sdk,方便大家快速接入。
- mqtt底层采用mica-mqtt实现,开源地址: https://gitee.com/596392912/mica-mqtt
- 我们以现成的
device-simulator
工程为例,进行mqtt接入的示例讲解 - 工程结构很简单,一共只有三个类
DeviceListener
: 服务启动完毕后执行的监听器DeviceSimulator
: 设备模拟器,模拟设备上报数据DeviceSimulatorApplication
: 服务启动类
DeviceListener
类中,我们通过@EventListener
注解监听ApplicationReadyEvent
事件,当服务启动完毕后,执行DeviceSimulator
类中的start
方法
@Component
public class DeviceListener {
@EventListener(ApplicationReadyEvent.class)
public void handleApplicationReady() {
// 产品key
String productKey = "JkerjK97oub";
// 设备名称
String deviceName = "27OY29BMmLn";
// 设备密钥
String deviceSecret = "1ae59684f3e8e40f";
// 初始化模拟器
DeviceSimulator simulator = new DeviceSimulator(productKey, deviceName, deviceSecret);
// 启动模拟器
simulator.start();
}
}
接入信息说明
其中的productKey
、deviceName
、deviceSecret
是在物联网平台中创建设备后,获取到的设备信息,需要替换成自己准备接入的设备信息
DeviceSimulator
类中,我们通过MqttClient
类实现mqtt的连接和数据上报- 我们先定义一个总体的
start
方法,用于启动模拟器
@Slf4j
@RequiredArgsConstructor
public class DeviceSimulator {
private final String productKey;
private final String deviceName;
private final String deviceSecret;
/**
* 启动模拟器
*/
public void start() {
// 初始化客户端认证信息
ClientSign clientSign = createClientSign();
String username = clientSign.getUsername();
String password = clientSign.getPassword();
String clientId = clientSign.getClientIdInfo().getMqttClientId();
log.info("username: {}, password: {}, clientId: {}", username, password, clientId);
// 初始化 MQTT 客户端
MqttClient client = MqttClient.create()
.username(username)
.password(password)
.clientId(clientId)
.connectSync();
// 订阅主题
subscribeTopics(client);
// 任务调度
scheduleTasks(client);
}
}
- 连接mqtt服务器需要通过认证,我们已经准备了
ClientIdInfo
和ClientSign
类,用于创建客户端认证信息,直接引用便可以生成符合认证协议的格式。
/**
* 创建客户端认证信息
*
* @return ClientSign
*/
private ClientSign createClientSign() {
ClientIdInfo clientIdInfo = new ClientIdInfo();
clientIdInfo.setClientId(deviceName);
ClientSign clientSign = new ClientSign();
clientSign.setClientIdInfo(clientIdInfo);
clientSign.setProductKey(productKey);
clientSign.setDeviceName(deviceName);
clientSign.setDeviceSecret(deviceSecret);
return clientSign;
}
- 将计算完毕的参数传入MQTT客户端,进行连接
// 初始化 MQTT 客户端
MqttClient client = MqttClient.create()
.username(username)
.password(password)
.clientId(clientId)
.connectSync();
- 订阅主题,我们在
subscribeTopics
方法中,订阅了两个主题,一个是设备上报数据的主题,一个是ntp时钟同步的主题
/**
* 订阅主题
*
* @param client MqttClient
*/
private void subscribeTopics(MqttClient client) {
client.subQos0("/blade/sys/" + productKey + '/' + deviceName + "/thing/#", (context, topic, message, payload) -> {
log.info("topic: {} payload: {}", topic, ByteBufferUtil.toString(payload));
});
client.subQos0("/blade/ext/ntp/" + productKey + '/' + deviceName + "/response", (context, topic, message, payload) -> {
log.info("topic: {} payload: {}", topic, ByteBufferUtil.toString(payload));
});
}
- 任务调度,我们在
scheduleTasks
方法中,模拟了设备上报属性、执行命令、上报事件和ntp时钟同步的任务
/**
* 任务调度
*
* @param client MqttClient
*/
private void scheduleTasks(MqttClient client) {
// 属性任务
publishTask(client, "/blade/sys/${productKey}/${deviceName}/thing/event/property/post", createPropertyData());
// 命令任务
publishTask(client, "/blade/sys/${productKey}/${deviceName}/thing/service/Reboot", createRebootData());
// 事件任务
publishTask(client, "/blade/sys/${productKey}/${deviceName}/thing/event/LowBatteyEvent/post", createLowBatteryEventData());
// 时钟任务
publishTask(client, "/blade/ext/ntp/${productKey}/${deviceName}/request", createNtpReqData());
}
- 发布任务,我们在
publishTask
方法中,将任务发布到mqtt服务器
/**
* 发布任务
*
* @param client MqttClient
* @param endpoint endpoint
* @param data 数据
*/
private void publishTask(MqttClient client, String endpoint, Object data) {
String topic = TemplateUtil.safeProcess(endpoint, Kv.create().set("productKey", productKey).set("deviceName", deviceName));
client.schedule(() -> client.publish(topic, JsonUtil.toJsonBytes(data)), 3000);
}
- 创建属性数据,我们在
createPropertyData
方法中,创建了一个属性数据
/**
* 创建属性数据
*
* @return DataReq
*/
private DataReq<Map<String, Object>> createPropertyData() {
DataReq<Map<String, Object>> req = new DataReq<>();
req.setId(StringUtil.randomUUID());
req.setVersion("1.0");
Map<String, Object> params = new HashMap<>();
params.put("LightSwitch", "1");
params.put("BatteryLevel", 20);
req.setParams(params);
return req;
}
- 创建重启数据,我们在
createRebootData
方法中,创建了一个重启数据
/**
* 创建重启数据
*
* @return DataReq
*/
private DataReq<Map<String, Object>> createRebootData() {
DataReq<Map<String, Object>> req = new DataReq<>();
req.setId(StringUtil.randomUUID());
req.setVersion("1.0");
Map<String, Object> params = new HashMap<>();
Map<String, Object> input = new HashMap<>();
input.put("RebootStatus", "true");
Map<String, Object> output = new HashMap<>();
output.put("RebootTime", System.currentTimeMillis());
params.put("input", JsonUtil.toJsonString(input));
params.put("output", JsonUtil.toJsonString(output));
params.put("commandName", "重启服务");
req.setParams(params);
return req;
}
- 创建低电量事件数据,我们在
createLowBatteryEventData
方法中,创建了一个低电量事件数据
/**
* 创建低电量事件数据
*
* @return DataReq
*/
private DataReq<Map<String, Object>> createLowBatteryEventData() {
DataReq<Map<String, Object>> req = new DataReq<>();
req.setId(StringUtil.randomUUID());
req.setVersion("1.0");
Map<String, Object> params = new HashMap<>();
Map<String, Object> output = new HashMap<>();
output.put("BatteryLevel", 20);
params.put("output", JsonUtil.toJsonString(output));
params.put("eventName", "电量低事件");
params.put("eventType", "alert");
req.setParams(params);
return req;
}
- 创建ntp时钟同步请求数据,我们在
createNtpReqData
方法中,创建了一个ntp时钟同步请求数据
/**
* 创建ntp时钟同步请求数据
*
* @return DataReq
*/
private DataReq<Map<String, Object>> createNtpReqData() {
DataReq<Map<String, Object>> req = new DataReq<>();
req.setId(StringUtil.randomUUID());
req.setVersion("1.0");
return req;
}
- 完整代码如下
@Slf4j
@RequiredArgsConstructor
public class DeviceSimulator {
private final String productKey;
private final String deviceName;
private final String deviceSecret;
/**
* 启动模拟器
*/
public void start() {
// 初始化客户端认证信息
ClientSign clientSign = createClientSign();
String username = clientSign.getUsername();
String password = clientSign.getPassword();
String clientId = clientSign.getClientIdInfo().getMqttClientId();
log.info("username: {}, password: {}, clientId: {}", username, password, clientId);
// 初始化 MQTT 客户端
MqttClient client = MqttClient.create()
.username(username)
.password(password)
.clientId(clientId)
.connectSync();
// 订阅主题
subscribeTopics(client);
// 任务调度
scheduleTasks(client);
}
/**
* 创建客户端认证信息
*
* @return ClientSign
*/
private ClientSign createClientSign() {
ClientIdInfo clientIdInfo = new ClientIdInfo();
clientIdInfo.setClientId(deviceName);
ClientSign clientSign = new ClientSign();
clientSign.setClientIdInfo(clientIdInfo);
clientSign.setProductKey(productKey);
clientSign.setDeviceName(deviceName);
clientSign.setDeviceSecret(deviceSecret);
return clientSign;
}
/**
* 订阅主题
*
* @param client MqttClient
*/
private void subscribeTopics(MqttClient client) {
client.subQos0("/blade/sys/" + productKey + '/' + deviceName + "/thing/#", (context, topic, message, payload) -> {
log.info("topic: {} payload: {}", topic, ByteBufferUtil.toString(payload));
});
client.subQos0("/blade/ext/ntp/" + productKey + '/' + deviceName + "/response", (context, topic, message, payload) -> {
log.info("topic: {} payload: {}", topic, ByteBufferUtil.toString(payload));
});
}
/**
* 任务调度
*
* @param client MqttClient
*/
private void scheduleTasks(MqttClient client) {
// 属性任务
publishTask(client, "/blade/sys/${productKey}/${deviceName}/thing/event/property/post", createPropertyData());
// 命令任务
publishTask(client, "/blade/sys/${productKey}/${deviceName}/thing/service/Reboot", createRebootData());
// 事件任务
publishTask(client, "/blade/sys/${productKey}/${deviceName}/thing/event/LowBatteyEvent/post", createLowBatteryEventData());
// 时钟任务
publishTask(client, "/blade/ext/ntp/${productKey}/${deviceName}/request", createNtpReqData());
}
/**
* 发布任务
*
* @param client MqttClient
* @param endpoint endpoint
* @param data 数据
*/
private void publishTask(MqttClient client, String endpoint, Object data) {
String topic = TemplateUtil.safeProcess(endpoint, Kv.create().set("productKey", productKey).set("deviceName", deviceName));
client.schedule(() -> client.publish(topic, JsonUtil.toJsonBytes(data)), 3000);
}
/**
* 创建属性数据
*
* @return DataReq
*/
private DataReq<Map<String, Object>> createPropertyData() {
DataReq<Map<String, Object>> req = new DataReq<>();
req.setId(StringUtil.randomUUID());
req.setVersion("1.0");
Map<String, Object> params = new HashMap<>();
params.put("LightSwitch", "1");
params.put("BatteryLevel", 20);
req.setParams(params);
return req;
}
/**
* 创建重启数据
*
* @return DataReq
*/
private DataReq<Map<String, Object>> createRebootData() {
DataReq<Map<String, Object>> req = new DataReq<>();
req.setId(StringUtil.randomUUID());
req.setVersion("1.0");
Map<String, Object> params = new HashMap<>();
Map<String, Object> input = new HashMap<>();
input.put("RebootStatus", "true");
Map<String, Object> output = new HashMap<>();
output.put("RebootTime", System.currentTimeMillis());
params.put("input", JsonUtil.toJsonString(input));
params.put("output", JsonUtil.toJsonString(output));
params.put("commandName", "重启服务");
req.setParams(params);
return req;
}
/**
* 创建低电量事件数据
*
* @return DataReq
*/
private DataReq<Map<String, Object>> createLowBatteryEventData() {
DataReq<Map<String, Object>> req = new DataReq<>();
req.setId(StringUtil.randomUUID());
req.setVersion("1.0");
Map<String, Object> params = new HashMap<>();
Map<String, Object> output = new HashMap<>();
output.put("BatteryLevel", 20);
params.put("output", JsonUtil.toJsonString(output));
params.put("eventName", "电量低事件");
params.put("eventType", "alert");
req.setParams(params);
return req;
}
/**
* 创建时钟请求数据
*
* @return NtpReq
*/
private NtpReq createNtpReqData() {
NtpReq req = new NtpReq();
req.setDeviceSendTime(String.valueOf(System.currentTimeMillis()));
return req;
}
}
二、前端设备模拟器接入
操作说明
打开设备管理
,进入设备模拟器
,选择好对应的产品与设备后点击连接。
1. 属性上报
选择属性进行数据上报,便可以在对应设备看到数据上报的情况
2. 事件上报
选择事件进行数据上报,便可以在对应设备看到事件上报的情况
3. 属性同步
选择事件进行数据上报,便可以在下行指令中获取对应设备的实时属性
三、通用MQTT客户端接入
操作说明
打开设备管理
,选择MQTT连接参数
,进入mqtt调试
,一键导入连接参数后进行连接服务器。
订阅与发布
订阅
:订阅系统自带的Topic,也可以订阅自定义的Topic,进行数据上报,模拟客户端。发布
:发布系统自带的Topic,也可以发布自定义的Topic,进行指令下发,模拟服务端。注意
:图中也可以看到只有订阅过的Topic才可以接收到服务端推送的数据。