自定义Topic
大约 3 分钟topicfeature
概念说明
在物联网平台中,自定义Topic是用户根据特定业务需求,自己定义的消息传输通道。它的重要性在于提供了更高的灵活性和可扩展性,让用户能够更精确地控制消息的流向和处理方式,满足复杂多变的业务需求。
自定义Topic的应用场景:
- 特定业务逻辑处理:比如智能农业中的灌溉系统,需要根据多种因素调整策略,这时就可以通过自定义Topic来实现特定数据的上报和接收。
- 设备分组管理:通过自定义Topic,可以将具有相同功能的设备划分到不同的组中,实现组内的统一管理和通信。
- 高级功能实现:如设备远程配置、固件升级等,可能需要通过自定义Topic来实现。
自定义Topic的优势:
- 灵活性:用户可以根据实际需求灵活定义消息传输通道。
- 可扩展性:自定义Topic可以轻松扩展以支持新的业务场景和数据类型。
- 高效性:通过精确控制消息的流向,自定义Topic可以提高通信效率。
一、平台创建自定义Topic
- 登录物联网平台,选择【设备管理】>【产品管理】,进入产品管理页面。
- 点击进入需要创建自定义Topic的产品详情页,选择【Topic类列表】>【自定义Topic】,进入自定义Topic管理页面。
- 点击【新增】,填写Topic类、功能、操作权限、描述等信息,点击【保存】即可创建自定义Topic。
二、服务端自定义Topic订阅
- 进入
blade-broker
工程,前往org.springblade.mqtt.broker.data.function
包下,创建custom
目录并创建CustomBatteryPostFunction
类。 - 在
CustomBatteryPostFunction
类上添加@DeviceFunc
注解,配置自定义的topic
和method
属性。 - 实现
IDeviceFunction
接口,重写getPayloadType
、getMqttSender
和execute
方法。 - 在
execute
方法中实现自定义Topic的业务逻辑处理,如数据入时序库等。 - 自定义类的初始逻辑与系统属性上报逻辑基本一致,系统通用处理可参考
org.springblade.mqtt.broker.data.function.event.EventPropertyPostFunction
。 - 若有更复杂的业务逻辑,后续请在
execute
方法中自行实现。
@Slf4j
@DeviceFunc(
topic = "/blade/custom/${productKey}/${deviceName}/battery/post",
method = FuncMethods.THING_EVENT_PROPERTY_POST
)
public class CustomBatteryPostFunction implements IDeviceFunction<DataReq<Map<String, Object>>> {
@Autowired
private IMqttSender mqttSender;
@Autowired
private ITsDBDeviceDataService deviceDataService;
@Override
public JavaType getPayloadType() {
return JsonUtil.getParametricType(DataReq.class, JsonUtil.getMapType(Object.class));
}
@Override
public IMqttSender getMqttSender() {
return mqttSender;
}
@Override
public void execute(ChannelContext context, String topic, Map<String, String> topicVariables, DataReq<Map<String, Object>> req) {
String replyTopic = topic + "_reply";
Map<String, Object> params = req.getParams();
// 上报数据为空
if (params == null || params.isEmpty()) {
log.error("req id:{} property post params is null or empty.", req.getId());
fail(topicVariables, replyTopic, req, ApiCode.REQUEST_PARAMETER_ERROR);
return;
}
// 入时序库
try {
log.info("自定义topic上报数据:{}", params);
deviceDataService.saveDeviceProperty(topicVariables, params);
success(topicVariables, replyTopic, req);
} catch (Throwable throwable) {
log.error("req id:{} property post error", req.getId(), throwable);
fail(topicVariables, replyTopic, req, ApiCode.REQUEST_ERROR);
}
}
}
三、设备端自定义Topic发布
- 修改
WatchSimulator
类,将原本的通用topic修改为自定义topic
//修改前
publishTask(client, "/blade/sys/${productKey}/${deviceName}/thing/event/property/post", createPropertyData());
//修改后
publishTask(client, "/blade/custom/${productKey}/${deviceName}/battery/post", createPropertyData());
- 重启blade-broker服务后,启动模拟器,可以看到控制台正确打印了我们自定义的日志信息,也可以看到属性列表数据已经成功入库。