package service import ( "encoding/json" "fmt" "rtzh_elec_temperature/conf" "rtzh_elec_temperature/global" "rtzh_elec_temperature/logger" "rtzh_elec_temperature/mqtt" "rtzh_elec_temperature/tools" "time" ) var RtelecMqttService = new(rtelecMqttService) //管理框架平台的mqtt相关服务 type rtelecMqttService struct { BaseService reloadMqtt bool } func (t *rtelecMqttService) TopicInit() { global.Rtelec_Topics = map[string]string{ //"register": "iss/register/request", //注册App "register_reply": "iss/register/reply", //注册App回复 //"heart": "iss/heartbeat/request", //心跳检测 "sys_parameter": "iss/iadmin/updateparam", //更新系统参数 "alarm_info": "iss/event/alarm", //告警事件消息 "link_request": "iss/event/link/request", //联动事件消息 "link_reply": "iss/event/link/reply", //联动事件回复 "signal_info": "iss/event/signal", //信号量事件 "auxctrl_info": "iss/event/auxctrl", } ins := RtelecManageApp() if ins != nil { appid := ins.RegAppID logger.Logger.Debug(fmt.Sprintf("当前应用id:%s,重新生成订阅主题...", appid)) if appid != "" { version := conf.GlobalConfig["rtelec_iss_version_1.0"] appname := conf.GlobalConfig["appid"] tmpTopics := map[string]string{ "data_push": fmt.Sprintf("iss/%s/dataPush", appid), //数据推送 "login": fmt.Sprintf("iss/%s/Login/request", appname), //登录 "login_reply": fmt.Sprintf("iss/%s/Login/response", appname), //登录回复 "model_get": fmt.Sprintf("/iss_%s/%s/getModelAll/request", version, appname), //模型查询与回复 "model_reply": fmt.Sprintf("/iss_%s/%s/getModelAll/response", version, appname), "attr_get": fmt.Sprintf("/iss_%s/%s/getAttrAll/request", version, appname), //属性查询与回复 "attr_reply": fmt.Sprintf("/iss_%s/%s/getAttrAll/response", version, appname), "device_add": fmt.Sprintf("/iss_%s/%s/setDevice/request", version, appname), //添加设备 "device_reply": fmt.Sprintf("/iss_%s/%s/setDevice/response", version, appname), "device_get": fmt.Sprintf("/iss_%s/%s/getDevice/request", version, appname), "device_del": fmt.Sprintf("/iss_%s/%s/delDevice/request", version, appname), //删除设备 "device_del_reply": fmt.Sprintf("/iss_%s/%s/delDevice/response", version, appname), "app_reset": fmt.Sprintf("/iss_%s/%s/resetApp/request", version, appname), //设备重启 "app_reset_reply": fmt.Sprintf("/iss_%s/%s/resetApp/response", version, appname), "command": fmt.Sprintf("/iss_%s/%s/command/request", version, appname), //命令 "command_reply": fmt.Sprintf("/iss_%s/%s/command/response", version, appname), } for k, v := range tmpTopics { global.Rtelec_Topics[k] = v } } } for _, v := range global.Rtelec_Topics { if v[len(v)-7:] == "request" { //请求类的主题不进行订阅 continue } if token := mqtt.Mqttserverconn.Subscribe(v, 0, nil); token.Wait() && token.Error() != nil { logger.Logger.Println(v + " 主题订阅失败:" + token.Error().Error()) time.Sleep(100 * time.Millisecond) } else { logger.Logger.Println(fmt.Sprintf("主题%s订阅成功!", v)) } } } //重新订阅相关主题 func (t *rtelecMqttService) ResetSubscribe() { for _, v := range global.Rtelec_Topics { if token := mqtt.Mqttserverconn.Unsubscribe(v); token.Wait() && token.Error() != nil { logger.Logger.Error(token.Error()) } else { logger.Logger.Debug("取消主题" + v + "订阅成功") } } //根据当前appId重新订阅 t.TopicInit() } // Mqtt数据中心的重连(每五分钟检查一次) func (t *rtelecMqttService) MqttReConnection() { for { if t.reloadMqtt || mqtt.Mqttserverconn.IsConnected() { logger.Logger.Println("===正在进行中台MQTT重连====") mqtt.Mqttserverconn.Disconnect(0) mqtt.Start() } time.Sleep(time.Minute * 2) } } // 向中心发送心跳 func (t *rtelecMqttService) Heart() { for { appname := conf.GlobalConfig["appname"] var interval = 60 if RtelecManageApp().RegAppID != "" { parameter := make(map[string]interface{}) parameter["code"] = 200 parameter["message"] = "App运行正常" parameter["appname"] = appname parameter["mid"] = tools.GetUid() value, _ := json.Marshal(parameter) go mqtt.PublishMessage("iss/heartbeat/request", string(value)) } time.Sleep(time.Duration(interval) * time.Second) } }