mqtt_receive_message.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package mqtt
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "rtzh_elec_temperature/datachannel"
  6. "rtzh_elec_temperature/global"
  7. "rtzh_elec_temperature/logger"
  8. "rtzh_elec_temperature/rtelec_app_public_lib/utils"
  9. )
  10. //mqtt消息处理控制器
  11. type ReceiveDataMessage struct {
  12. }
  13. func (c *ReceiveDataMessage) Receive(topic string, message string) error {
  14. logger.Logger.Debug(fmt.Sprintf("接收到mqtt消息:topic=%s\r\n%s\r\n", topic, message))
  15. bytes := []byte(message)
  16. var data map[string]interface{}
  17. err := json.Unmarshal(bytes, &data)
  18. if err == nil {
  19. var returnMsgId = ""
  20. if MsgId, ok := data["mid"]; ok {
  21. returnMsgId = MsgId.(string)
  22. } else {
  23. //非本应用的消息,不处理
  24. return nil
  25. }
  26. var localMsgId = utils.MessageIds.Get(returnMsgId)
  27. // if MsgId, k := MessageIds[returnMsgId]; ok {
  28. // localMsgId = MsgId
  29. // }
  30. if topic == global.Rtelec_Topics["register_reply"] && localMsgId == returnMsgId {
  31. datachannel.Service_AppRegID_Chanl <- data
  32. } else if topic == global.Rtelec_Topics["login_reply"] && localMsgId == returnMsgId { //订阅登录后的返回处理
  33. datachannel.Service_AppLogin_Chanl <- data
  34. } else if topic == global.Rtelec_Topics["model_reply"] && localMsgId == returnMsgId {
  35. datachannel.Service_Model_Chanl <- data
  36. } else if topic == global.Rtelec_Topics["attr_reply"] && localMsgId == returnMsgId {
  37. datachannel.Service_ModelAttr_Chanl <- data
  38. } else if topic == global.Rtelec_Topics["device_reply"] && localMsgId == returnMsgId { //添加设备记录后的回复
  39. datachannel.Service_DeviceAdd_Chanl <- data
  40. } else if topic == global.Rtelec_Topics["device_del_reply"] && localMsgId == returnMsgId { //删除设备记录后的回复
  41. datachannel.Service_DeviceDel_Chanl <- data
  42. } else if topic == global.Rtelec_Topics["sys_parameter"] {
  43. } else if topic == global.Rtelec_Topics["data_push"] { //获取中台推送的业务数据
  44. datachannel.Service_DataPush_Chanl <- data
  45. } else if topic == global.Rtelec_Topics["app_reset_reply"] && localMsgId == returnMsgId {
  46. } else if topic == global.Rtelec_Topics["command_reply"] && localMsgId == returnMsgId { //收到命令执行后的回复
  47. datachannel.Service_Command_Chanl <- data
  48. } else if topic == global.Rtelec_Topics["link_request"] {
  49. datachannel.Service_LinkEventRequest_Chanl <- data
  50. } else if topic == global.Rtelec_Topics["alarm_info"] {
  51. //fmt.Println("=========接收到的消息====", message, "topic==", topic)
  52. } else if topic == global.Rtelec_Topics["link_reply"] {
  53. datachannel.Service_LinkEventReply_Chanl <- data
  54. } else if topic == "iss/update/linkStrategy" {
  55. //联动策略更新通知:重新加载联动策略到本地缓存
  56. datachannel.Service_LinkInfoNotice_Chanl <- data
  57. }
  58. }
  59. return nil
  60. }