对接指南
大约 8 分钟deviceotaintegration
对接指南
本指南提供完整的OTA功能对接方案,包含Java示例代码和最佳实践。通过按照本指南进行集成,您可以快速实现设备端的OTA升级能力,包括MQTT通信、固件下载、升级执行、状态上报等全部核心功能。
Java模拟器
完整的Java OTA设备模拟器代码:https://center.javablade.com/blade/BladeX-Links/src/branch/master/blade-demo/device-simulator
一、设备认证
设备需要使用三元组信息完成MQTT连接认证:
// 设备三元组
String productKey = "PK_SENSOR_001";
String deviceName = "DEV_20240101_001";
String deviceSecret = "xxxxxxxxxxxxx";
// 创建客户端认证信息
ClientIdInfo clientIdInfo = new ClientIdInfo();
clientIdInfo.setClientId(deviceName + "_ota");
ClientSign clientSign = new ClientSign();
clientSign.setClientIdInfo(clientIdInfo);
clientSign.setProductKey(productKey);
clientSign.setDeviceName(deviceName);
clientSign.setDeviceSecret(deviceSecret);
二、Topic订阅配置
设备启动后必须订阅以下OTA相关Topic:
// OTA相关Topic模板
private static final String TOPIC_OTA_INFORM = "/blade/ota/device/inform/${productKey}/${deviceName}";
private static final String TOPIC_OTA_UPGRADE = "/blade/ota/device/upgrade/${productKey}/${deviceName}";
private static final String TOPIC_OTA_PROGRESS = "/blade/ota/device/progress/${productKey}/${deviceName}";
private static final String TOPIC_OTA_FIRMWARE_GET = "/blade/sys/${productKey}/${deviceName}/thing/ota/firmware/get";
private static final String TOPIC_OTA_FIRMWARE_GET_REPLY = "/blade/sys/${productKey}/${deviceName}/thing/ota/firmware/get_reply";
// 订阅OTA升级指令主题
String upgradeTopic = buildTopicPath(TOPIC_OTA_UPGRADE);
client.subQos0(upgradeTopic, (context, topic, message, payload) -> {
String payloadStr = ByteBufferUtil.toString(payload);
handleUpgradeCommand(payloadStr);
});
// 订阅OTA固件拉取回复主题
String firmwareGetReplyTopic = buildTopicPath(TOPIC_OTA_FIRMWARE_GET_REPLY);
client.subQos0(firmwareGetReplyTopic, (context, topic, message, payload) -> {
String payloadStr = ByteBufferUtil.toString(payload);
handleFirmwareGetReply(payloadStr);
});
三、快速开始
以下是一个最简单的OTA升级示例,帮助您快速理解核心流程。
1. 核心流程说明
- 连接平台:使用设备三元组建立MQTT连接
- 订阅消息:订阅升级指令Topic和固件拉取响应Topic
- 上报版本:定期上报当前固件版本
- 主动拉取:定期向平台查询是否有可用升级
- 执行升级:收到指令后下载并安装固件
- 上报进度:实时上报升级进度和结果
2. 启动流程
public void start() {
// 初始化客户端认证信息
ClientSign clientSign = createClientSign();
// 初始化 MQTT 客户端
client = MqttClient.create()
.username(clientSign.getUsername())
.password(clientSign.getPassword())
.clientId(clientSign.getClientIdInfo().getMqttClientId())
.connectSync();
// 初始化OTA处理器
otaHandler = new OtaHandler(this);
// 订阅主题
subscribeTopics();
// 开始任务调度
scheduleTasks();
}
四、功能实现
本章节提供完整的OTA功能实现代码。
1. 升级指令处理
处理平台下发的升级指令:
private void handleUpgradeCommand(String payload) {
try {
// 检查是否正在升级
if (isUpgrading.get()) {
log.warn("设备正在升级中,忽略新的升级指令");
return;
}
// 解析升级指令
DataReq<?> dataReq = JsonUtil.readValue(payload, DataReq.class);
if (dataReq.getParams() == null || !(dataReq.getParams() instanceof Map)) {
log.error("升级指令参数为空或格式错误");
return;
}
@SuppressWarnings("unchecked")
Map<String, Object> params = (Map<String, Object>) dataReq.getParams();
// 处理升级参数
processUpgradeParams(params, "升级指令");
} catch (Exception exception) {
log.error("处理升级指令失败", exception);
isUpgrading.set(false);
}
}
2. 固件主动拉取
设备可以主动向平台请求是否有可用的固件升级:
public void pullFirmware() {
// 检查是否正在升级
if (isUpgrading.get()) {
log.warn("设备正在升级中,无法主动拉取固件");
return;
}
String firmwareGetTopic = buildTopicPath(TOPIC_OTA_FIRMWARE_GET);
// 构建请求数据
DataReq<Map<String, Object>> req = new DataReq<>();
req.setId(StringUtil.randomUUID());
req.setVersion("1.0");
req.setMethod("thing.ota.firmware.get");
req.setParams(Kv.create().set("version", currentVersion.get()));
// 设置需要响应ACK(关键修改)
SysBean sysBean = new SysBean(true); // true 表示需要 ACK
req.setSys(sysBean);
// 发送请求
client.publish(firmwareGetTopic, JsonUtil.toJsonBytes(req));
}
3. 固件拉取响应处理
private void handleFirmwareGetReply(String payload) {
try {
// 检查是否正在升级
if (isUpgrading.get()) {
log.warn("设备正在升级中,忽略固件拉取响应");
return;
}
// 解析响应数据
DataResp<?> dataResp = JsonUtil.readValue(payload, DataResp.class);
// 检查响应状态
if (dataResp.getCode() != 200) {
log.warn("固件拉取响应取消: code={}, message={}",
dataResp.getCode(), dataResp.getMessage());
return;
}
// 检查响应数据,如果data为空则认为无升级
if (dataResp.getData() == null || !(dataResp.getData() instanceof Map)) {
log.info("固件拉取响应: 无可用升级");
return;
}
@SuppressWarnings("unchecked")
Map<String, Object> data = (Map<String, Object>) dataResp.getData();
// 直接尝试解析升级参数,如果解析失败则认为无升级
OtaUpgradeParams otaParams = OtaUpgradeParams.fromMap(data);
if (otaParams == null) {
log.info("固件拉取响应: 无可用升级");
return;
}
// 验证升级参数中的关键字段,如果缺少则认为无升级
if (otaParams.getVersion() == null || otaParams.getUrl() == null) {
log.info("固件拉取响应: 无可用升级,关键参数缺失");
return;
}
// 处理升级参数
processUpgradeParams(data, "固件拉取响应");
} catch (Exception exception) {
log.error("处理固件拉取响应失败", exception);
isUpgrading.set(false);
}
}
4. 统一升级参数处理
private void processUpgradeParams(Map<String, Object> params, String source) {
// 使用统一的参数处理类解析升级参数
OtaUpgradeParams otaParams = OtaUpgradeParams.fromMap(params);
if (otaParams == null) {
log.error("解析升级参数失败: 来源={}", source);
return;
}
// 验证参数完整性
OtaUpgradeParams.Validation validation = otaParams.validate();
if (validation.hasErrors()) {
log.error("升级参数验证失败: 来源={}, 错误={}", source, validation.getErrorMessage());
return;
}
String targetVersion = otaParams.getVersion();
String firmwareUrl = otaParams.getUrl();
String sign = otaParams.getSign();
Long size = otaParams.getSize();
Long taskId = otaParams.getTaskId();
// 版本对比:如果本地版本大于等于目标版本,则不进行升级
String localVersion = currentVersion.get();
if (!VersionUtil.isLessThanSafe(localVersion, targetVersion)) {
log.info("本地版本[{}]已经大于等于目标版本[{}],无需升级", localVersion, targetVersion);
return;
}
// 设置升级状态
isUpgrading.set(true);
// 开始升级流程
otaHandler.startUpgrade(targetVersion, firmwareUrl, sign, size, taskId);
}
五、进度上报
1. 升级进度上报
public void reportProgress(OtaProgress progressData) {
String progressTopic = buildTopicPath(TOPIC_OTA_PROGRESS);
DataReq<OtaProgress> req = new DataReq<>();
req.setId(StringUtil.randomUUID());
req.setVersion("1.0");
req.setMethod("thing.ota.upgrade.progress");
req.setParams(progressData);
client.publish(progressTopic, JsonUtil.toJsonBytes(req));
}
2. OtaProgress数据结构
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
public class OtaProgress implements Serializable {
/**
* 任务ID
*/
private Long taskId;
/**
* 升级步骤
* 1-100: 升级进度百分比
* -1: 升级失败
* -2: 下载失败
* -3: 校验失败
* -4: 烧录失败
*/
private String step;
/**
* 进度描述
* 示例:1%-100%表示进度百分比
* -1: 升级失败
* -2: 下载失败
* -3: 校验失败
* -4: 烧录失败
*/
private String desc;
/**
* 扩展参数(可选)
* 可包含错误信息、详细状态等
*/
private Object extData;
/**
* 创建下载进度
* 将下载进度映射到总进度的0-50%
*/
public static OtaProgress downloading(Long taskId, int percent) {
// 将0-100的下载进度映射到0-50的总进度
int mappedProgress = percent * 50 / 100;
return OtaProgress.builder()
.taskId(taskId)
.step(String.valueOf(mappedProgress))
.desc(mappedProgress + "%")
.build();
}
/**
* 创建烧录进度
* 将烧录进度映射到总进度的50-100%
*/
public static OtaProgress burning(Long taskId, int percent) {
// 将0-100的烧录进度映射到50-100的总进度
int mappedProgress = 50 + (percent * 50 / 100);
return OtaProgress.builder()
.taskId(taskId)
.step(String.valueOf(mappedProgress))
.desc(mappedProgress + "%")
.build();
}
/**
* 创建完成状态
*/
public static OtaProgress done(Long taskId) {
return OtaProgress.builder()
.taskId(taskId)
.step("100")
.desc("100%")
.build();
}
/**
* 创建失败状态
*/
public static OtaProgress failed(Long taskId, String errorCode, String errorMessage) {
return OtaProgress.builder()
.taskId(taskId)
.step(errorCode)
.desc(errorMessage)
.extData(errorMessage)
.build();
}
}
3. 版本上报
private void reportVersion(String topic) {
DataReq<Map<String, Object>> req = createVersionReportData();
client.publish(topic, JsonUtil.toJsonBytes(req));
log.info("上报设备版本: productKey={}, deviceName={}, version={}",
deviceInfo.getProductKey(), deviceInfo.getDeviceName(), currentVersion.get());
}
private DataReq<Map<String, Object>> createVersionReportData() {
DataReq<Map<String, Object>> req = new DataReq<>();
req.setId(StringUtil.randomUUID());
req.setVersion("1.0");
req.setMethod("thing.ota.device.inform");
req.setParams(Kv.create().set("version", currentVersion.get()));
// 设置需要响应ACK
SysBean sysBean = new SysBean(true);
req.setSys(sysBean);
return req;
}
六、OTA处理器实现
1. 项目结构
blade-demo/device-simulator/
├── src/main/java/org/springblade/device/simulator/
│ ├── listener/
│ │ └── OtaListener.java
│ ├── simulator/
│ │ └── OtaSimulator.java
│ ├── support/
│ │ ├── DeviceInfo.java
│ │ ├── OtaHandler.java
│ │ └── OtaProgress.java
│ └── DeviceSimulatorApplication.java
└── pom.xml
2. OtaHandler升级处理器
@Slf4j
@RequiredArgsConstructor
public class OtaHandler {
private final OtaSimulator simulator;
/**
* 开始升级
*/
public void startUpgrade(String targetVersion, String firmwareUrl,
String sign, Long size, Long taskId) {
// 异步执行升级流程
CompletableFuture.runAsync(() -> {
try {
log.info("开始OTA升级: 目标版本={}, 任务ID={}", targetVersion, taskId);
// 1. 开始下载固件
boolean downloadSuccess = downloadFirmware(firmwareUrl, sign, size, taskId);
if (!downloadSuccess) {
handleUpgradeFailure(taskId, "-2", "固件下载失败");
return;
}
// 2. 模拟校验固件
boolean verifySuccess = verifyFirmware(taskId);
if (!verifySuccess) {
handleUpgradeFailure(taskId, "-3", "固件校验失败");
return;
}
// 3. 模拟烧录固件
boolean burnSuccess = burnFirmware(taskId);
if (!burnSuccess) {
handleUpgradeFailure(taskId, "-4", "固件烧录失败");
return;
}
// 4. 升级成功,更新版本
handleUpgradeSuccess(taskId, targetVersion);
} catch (Exception exception) {
log.error("OTA升级异常", exception);
handleUpgradeFailure(taskId, "-1", "升级失败: " + exception.getMessage());
}
});
}
/**
* 下载固件
*/
private boolean downloadFirmware(String firmwareUrl, String sign,
Long expectedSize, Long taskId) {
try {
log.info("开始下载固件: url={}, 期望大小={}", firmwareUrl, expectedSize);
// 模拟下载进度(映射到0-50%的总进度)
for (int i = 0; i <= 100; i += 5) {
// 上报下载进度(内部会映射到0-50%)
OtaProgress progress = OtaProgress.downloading(taskId, i);
simulator.reportProgress(progress);
// 模拟下载延时
TimeUnit.MILLISECONDS.sleep(500);
}
// 如果提供了真实的下载URL,可以进行实际下载
if (firmwareUrl != null && firmwareUrl.startsWith("http")) {
verifyUrlAccessible(firmwareUrl);
}
log.info("固件下载完成");
return true;
} catch (Exception exception) {
log.error("下载固件失败", exception);
return false;
}
}
/**
* 校验固件
*/
private boolean verifyFirmware(Long taskId) {
try {
log.info("开始校验固件");
TimeUnit.SECONDS.sleep(2);
log.info("固件校验成功");
return true;
} catch (Exception exception) {
log.error("校验固件失败", exception);
return false;
}
}
/**
* 烧录固件
*/
private boolean burnFirmware(Long taskId) {
try {
log.info("开始烧录固件");
// 模拟烧录进度(映射到50-100%的总进度)
for (int i = 0; i < 100; i += 10) {
// 上报烧录进度(内部会映射到50-100%)
OtaProgress progress = OtaProgress.burning(taskId, i);
simulator.reportProgress(progress);
// 模拟烧录延时
TimeUnit.SECONDS.sleep(1);
}
// 烧录完成,上报100%进度
OtaProgress finalProgress = OtaProgress.burning(taskId, 100);
simulator.reportProgress(finalProgress);
log.info("固件烧录完成");
return true;
} catch (Exception exception) {
log.error("烧录固件失败", exception);
return false;
}
}
/**
* 处理升级成功
*/
private void handleUpgradeSuccess(Long taskId, String targetVersion) {
log.info("OTA升级成功,准备重启设备");
// 上报升级完成
OtaProgress progress = OtaProgress.done(taskId);
simulator.reportProgress(progress);
// 模拟设备重启
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException exception) {
Thread.currentThread().interrupt();
}
// 更新设备版本
simulator.updateVersion(targetVersion);
log.info("设备重启完成,新版本: {}", targetVersion);
}
/**
* 处理升级失败
*/
private void handleUpgradeFailure(Long taskId, String errorCode, String errorMessage) {
log.error("OTA升级失败: {} - {}", errorCode, errorMessage);
// 上报失败状态
OtaProgress progress = OtaProgress.failed(taskId, errorCode, errorMessage);
simulator.reportProgress(progress);
// 重置升级状态
simulator.resetUpgradeStatus();
}
}
七、定时任务
1. 任务调度
private void scheduleTasks() {
// 定期主动拉取固件(每60秒)
pullFirmwareTask();
// 定期上报设备版本(每30秒)
reportVersionTask();
}
private void pullFirmwareTask() {
// 立即拉取一次固件
pullFirmware();
// 定期主动拉取固件(每60秒)
client.schedule(() -> {
if (!isUpgrading.get()) {
pullFirmware();
}
}, 60000);
}
private void reportVersionTask() {
String informTopic = buildTopicPath(TOPIC_OTA_INFORM);
// 立即上报一次版本
reportVersion(informTopic);
// 定期上报版本(每30秒)
client.schedule(() -> {
if (!isUpgrading.get()) {
reportVersion(informTopic);
}
}, 30000);
}
2. 工具方法
/**
* 构建主题路径
*/
private String buildTopicPath(String topicTemplate) {
return TemplateUtil.safeProcess(topicTemplate,
Kv.create()
.set("productKey", deviceInfo.getProductKey())
.set("deviceName", deviceInfo.getDeviceName()));
}
/**
* 更新设备版本
*/
public void updateVersion(String newVersion) {
String oldVersion = currentVersion.getAndSet(newVersion);
isUpgrading.set(false);
log.info("设备版本更新成功: productKey={}, deviceName={}, 版本变化: {} -> {}",
deviceInfo.getProductKey(), deviceInfo.getDeviceName(), oldVersion, newVersion);
// 立即上报新版本
String informTopic = buildTopicPath(TOPIC_OTA_INFORM);
reportVersion(informTopic);
}
/**
* 重置升级状态
*/
public void resetUpgradeStatus() {
isUpgrading.set(false);
}