package mqtt import ( "encoding/json" "fmt" "rtzh_elec_temperature/datachannel" "rtzh_elec_temperature/global" "rtzh_elec_temperature/logger" "rtzh_elec_temperature/rtelec_app_public_lib/utils" ) //mqtt消息处理控制器 type ReceiveDataMessage struct { } func (c *ReceiveDataMessage) Receive(topic string, message string) error { logger.Logger.Debug(fmt.Sprintf("接收到mqtt消息:topic=%s\r\n%s\r\n", topic, message)) bytes := []byte(message) var data map[string]interface{} err := json.Unmarshal(bytes, &data) if err == nil { var returnMsgId = "" if MsgId, ok := data["mid"]; ok { returnMsgId = MsgId.(string) } else { //非本应用的消息,不处理 return nil } var localMsgId = utils.MessageIds.Get(returnMsgId) // if MsgId, k := MessageIds[returnMsgId]; ok { // localMsgId = MsgId // } if topic == global.Rtelec_Topics["register_reply"] && localMsgId == returnMsgId { datachannel.Service_AppRegID_Chanl <- data } else if topic == global.Rtelec_Topics["login_reply"] && localMsgId == returnMsgId { //订阅登录后的返回处理 datachannel.Service_AppLogin_Chanl <- data } else if topic == global.Rtelec_Topics["model_reply"] && localMsgId == returnMsgId { datachannel.Service_Model_Chanl <- data } else if topic == global.Rtelec_Topics["attr_reply"] && localMsgId == returnMsgId { datachannel.Service_ModelAttr_Chanl <- data } else if topic == global.Rtelec_Topics["device_reply"] && localMsgId == returnMsgId { //添加设备记录后的回复 datachannel.Service_DeviceAdd_Chanl <- data } else if topic == global.Rtelec_Topics["device_del_reply"] && localMsgId == returnMsgId { //删除设备记录后的回复 datachannel.Service_DeviceDel_Chanl <- data } else if topic == global.Rtelec_Topics["sys_parameter"] { } else if topic == global.Rtelec_Topics["data_push"] { //获取中台推送的业务数据 datachannel.Service_DataPush_Chanl <- data } else if topic == global.Rtelec_Topics["app_reset_reply"] && localMsgId == returnMsgId { } else if topic == global.Rtelec_Topics["command_reply"] && localMsgId == returnMsgId { //收到命令执行后的回复 datachannel.Service_Command_Chanl <- data } else if topic == global.Rtelec_Topics["link_request"] { datachannel.Service_LinkEventRequest_Chanl <- data } else if topic == global.Rtelec_Topics["alarm_info"] { //fmt.Println("=========接收到的消息====", message, "topic==", topic) } else if topic == global.Rtelec_Topics["link_reply"] { datachannel.Service_LinkEventReply_Chanl <- data } else if topic == "iss/update/linkStrategy" { //联动策略更新通知:重新加载联动策略到本地缓存 datachannel.Service_LinkInfoNotice_Chanl <- data } } return nil }