| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- package service
- import (
- "encoding/json"
- "fmt"
- "rtzh_elec_temperature/conf"
- "rtzh_elec_temperature/global"
- "rtzh_elec_temperature/logger"
- "rtzh_elec_temperature/mqtt"
- "rtzh_elec_temperature/tools"
- "time"
- )
- var RtelecMqttService = new(rtelecMqttService)
- //管理框架平台的mqtt相关服务
- type rtelecMqttService struct {
- BaseService
- reloadMqtt bool
- }
- func (t *rtelecMqttService) TopicInit() {
- global.Rtelec_Topics = map[string]string{
- //"register": "iss/register/request", //注册App
- "register_reply": "iss/register/reply", //注册App回复
- //"heart": "iss/heartbeat/request", //心跳检测
- "sys_parameter": "iss/iadmin/updateparam", //更新系统参数
- "alarm_info": "iss/event/alarm", //告警事件消息
- "link_request": "iss/event/link/request", //联动事件消息
- "link_reply": "iss/event/link/reply", //联动事件回复
- "signal_info": "iss/event/signal", //信号量事件
- "auxctrl_info": "iss/event/auxctrl",
- }
- ins := RtelecManageApp()
- if ins != nil {
- appid := ins.RegAppID
- logger.Logger.Debug(fmt.Sprintf("当前应用id:%s,重新生成订阅主题...", appid))
- if appid != "" {
- version := conf.GlobalConfig["rtelec_iss_version_1.0"]
- appname := conf.GlobalConfig["appid"]
- tmpTopics := map[string]string{
- "data_push": fmt.Sprintf("iss/%s/dataPush", appid), //数据推送
- "login": fmt.Sprintf("iss/%s/Login/request", appname), //登录
- "login_reply": fmt.Sprintf("iss/%s/Login/response", appname), //登录回复
- "model_get": fmt.Sprintf("/iss_%s/%s/getModelAll/request", version, appname), //模型查询与回复
- "model_reply": fmt.Sprintf("/iss_%s/%s/getModelAll/response", version, appname),
- "attr_get": fmt.Sprintf("/iss_%s/%s/getAttrAll/request", version, appname), //属性查询与回复
- "attr_reply": fmt.Sprintf("/iss_%s/%s/getAttrAll/response", version, appname),
- "device_add": fmt.Sprintf("/iss_%s/%s/setDevice/request", version, appname), //添加设备
- "device_reply": fmt.Sprintf("/iss_%s/%s/setDevice/response", version, appname),
- "device_get": fmt.Sprintf("/iss_%s/%s/getDevice/request", version, appname),
- "device_del": fmt.Sprintf("/iss_%s/%s/delDevice/request", version, appname), //删除设备
- "device_del_reply": fmt.Sprintf("/iss_%s/%s/delDevice/response", version, appname),
- "app_reset": fmt.Sprintf("/iss_%s/%s/resetApp/request", version, appname), //设备重启
- "app_reset_reply": fmt.Sprintf("/iss_%s/%s/resetApp/response", version, appname),
- "command": fmt.Sprintf("/iss_%s/%s/command/request", version, appname), //命令
- "command_reply": fmt.Sprintf("/iss_%s/%s/command/response", version, appname),
- }
- for k, v := range tmpTopics {
- global.Rtelec_Topics[k] = v
- }
- }
- }
- for _, v := range global.Rtelec_Topics {
- if v[len(v)-7:] == "request" {
- //请求类的主题不进行订阅
- continue
- }
- if token := mqtt.Mqttserverconn.Subscribe(v, 0, nil); token.Wait() && token.Error() != nil {
- logger.Logger.Println(v + " 主题订阅失败:" + token.Error().Error())
- time.Sleep(100 * time.Millisecond)
- } else {
- logger.Logger.Println(fmt.Sprintf("主题%s订阅成功!", v))
- }
- }
- }
- //重新订阅相关主题
- func (t *rtelecMqttService) ResetSubscribe() {
- for _, v := range global.Rtelec_Topics {
- if token := mqtt.Mqttserverconn.Unsubscribe(v); token.Wait() && token.Error() != nil {
- logger.Logger.Error(token.Error())
- } else {
- logger.Logger.Debug("取消主题" + v + "订阅成功")
- }
- }
- //根据当前appId重新订阅
- t.TopicInit()
- }
- // Mqtt数据中心的重连(每五分钟检查一次)
- func (t *rtelecMqttService) MqttReConnection() {
- for {
- if t.reloadMqtt || mqtt.Mqttserverconn.IsConnected() {
- logger.Logger.Println("===正在进行中台MQTT重连====")
- mqtt.Mqttserverconn.Disconnect(0)
- mqtt.Start()
- }
- time.Sleep(time.Minute * 2)
- }
- }
- // 向中心发送心跳
- func (t *rtelecMqttService) Heart() {
- for {
- appname := conf.GlobalConfig["appname"]
- var interval = 60
- if RtelecManageApp().RegAppID != "" {
- parameter := make(map[string]interface{})
- parameter["code"] = 200
- parameter["message"] = "App运行正常"
- parameter["appname"] = appname
- parameter["mid"] = tools.GetUid()
- value, _ := json.Marshal(parameter)
- go mqtt.PublishMessage("iss/heartbeat/request", string(value))
- }
- time.Sleep(time.Duration(interval) * time.Second)
- }
- }
|