mqtt_service.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package service
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "rtzh_elec_temperature/conf"
  6. "rtzh_elec_temperature/global"
  7. "rtzh_elec_temperature/logger"
  8. "rtzh_elec_temperature/mqtt"
  9. "rtzh_elec_temperature/tools"
  10. "time"
  11. )
  12. var RtelecMqttService = new(rtelecMqttService)
  13. //管理框架平台的mqtt相关服务
  14. type rtelecMqttService struct {
  15. BaseService
  16. reloadMqtt bool
  17. }
  18. func (t *rtelecMqttService) TopicInit() {
  19. global.Rtelec_Topics = map[string]string{
  20. //"register": "iss/register/request", //注册App
  21. "register_reply": "iss/register/reply", //注册App回复
  22. //"heart": "iss/heartbeat/request", //心跳检测
  23. "sys_parameter": "iss/iadmin/updateparam", //更新系统参数
  24. "alarm_info": "iss/event/alarm", //告警事件消息
  25. "link_request": "iss/event/link/request", //联动事件消息
  26. "link_reply": "iss/event/link/reply", //联动事件回复
  27. "signal_info": "iss/event/signal", //信号量事件
  28. "auxctrl_info": "iss/event/auxctrl",
  29. }
  30. ins := RtelecManageApp()
  31. if ins != nil {
  32. appid := ins.RegAppID
  33. logger.Logger.Debug(fmt.Sprintf("当前应用id:%s,重新生成订阅主题...", appid))
  34. if appid != "" {
  35. version := conf.GlobalConfig["rtelec_iss_version_1.0"]
  36. appname := conf.GlobalConfig["appid"]
  37. tmpTopics := map[string]string{
  38. "data_push": fmt.Sprintf("iss/%s/dataPush", appid), //数据推送
  39. "login": fmt.Sprintf("iss/%s/Login/request", appname), //登录
  40. "login_reply": fmt.Sprintf("iss/%s/Login/response", appname), //登录回复
  41. "model_get": fmt.Sprintf("/iss_%s/%s/getModelAll/request", version, appname), //模型查询与回复
  42. "model_reply": fmt.Sprintf("/iss_%s/%s/getModelAll/response", version, appname),
  43. "attr_get": fmt.Sprintf("/iss_%s/%s/getAttrAll/request", version, appname), //属性查询与回复
  44. "attr_reply": fmt.Sprintf("/iss_%s/%s/getAttrAll/response", version, appname),
  45. "device_add": fmt.Sprintf("/iss_%s/%s/setDevice/request", version, appname), //添加设备
  46. "device_reply": fmt.Sprintf("/iss_%s/%s/setDevice/response", version, appname),
  47. "device_get": fmt.Sprintf("/iss_%s/%s/getDevice/request", version, appname),
  48. "device_del": fmt.Sprintf("/iss_%s/%s/delDevice/request", version, appname), //删除设备
  49. "device_del_reply": fmt.Sprintf("/iss_%s/%s/delDevice/response", version, appname),
  50. "app_reset": fmt.Sprintf("/iss_%s/%s/resetApp/request", version, appname), //设备重启
  51. "app_reset_reply": fmt.Sprintf("/iss_%s/%s/resetApp/response", version, appname),
  52. "command": fmt.Sprintf("/iss_%s/%s/command/request", version, appname), //命令
  53. "command_reply": fmt.Sprintf("/iss_%s/%s/command/response", version, appname),
  54. }
  55. for k, v := range tmpTopics {
  56. global.Rtelec_Topics[k] = v
  57. }
  58. }
  59. }
  60. for _, v := range global.Rtelec_Topics {
  61. if v[len(v)-7:] == "request" {
  62. //请求类的主题不进行订阅
  63. continue
  64. }
  65. if token := mqtt.Mqttserverconn.Subscribe(v, 0, nil); token.Wait() && token.Error() != nil {
  66. logger.Logger.Println(v + " 主题订阅失败:" + token.Error().Error())
  67. time.Sleep(100 * time.Millisecond)
  68. } else {
  69. logger.Logger.Println(fmt.Sprintf("主题%s订阅成功!", v))
  70. }
  71. }
  72. }
  73. //重新订阅相关主题
  74. func (t *rtelecMqttService) ResetSubscribe() {
  75. for _, v := range global.Rtelec_Topics {
  76. if token := mqtt.Mqttserverconn.Unsubscribe(v); token.Wait() && token.Error() != nil {
  77. logger.Logger.Error(token.Error())
  78. } else {
  79. logger.Logger.Debug("取消主题" + v + "订阅成功")
  80. }
  81. }
  82. //根据当前appId重新订阅
  83. t.TopicInit()
  84. }
  85. // Mqtt数据中心的重连(每五分钟检查一次)
  86. func (t *rtelecMqttService) MqttReConnection() {
  87. for {
  88. if t.reloadMqtt || mqtt.Mqttserverconn.IsConnected() {
  89. logger.Logger.Println("===正在进行中台MQTT重连====")
  90. mqtt.Mqttserverconn.Disconnect(0)
  91. mqtt.Start()
  92. }
  93. time.Sleep(time.Minute * 2)
  94. }
  95. }
  96. // 向中心发送心跳
  97. func (t *rtelecMqttService) Heart() {
  98. for {
  99. appname := conf.GlobalConfig["appname"]
  100. var interval = 60
  101. if RtelecManageApp().RegAppID != "" {
  102. parameter := make(map[string]interface{})
  103. parameter["code"] = 200
  104. parameter["message"] = "App运行正常"
  105. parameter["appname"] = appname
  106. parameter["mid"] = tools.GetUid()
  107. value, _ := json.Marshal(parameter)
  108. go mqtt.PublishMessage("iss/heartbeat/request", string(value))
  109. }
  110. time.Sleep(time.Duration(interval) * time.Second)
  111. }
  112. }