data_service.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package service
  2. import (
  3. "encoding/base64"
  4. "encoding/json"
  5. "fmt"
  6. "reflect"
  7. "rtzh_elec_temperature/datachannel"
  8. "rtzh_elec_temperature/logger"
  9. "rtzh_elec_temperature/mqtt"
  10. "rtzh_elec_temperature/rtelec_app_public_lib/utils"
  11. "rtzh_elec_temperature/tools"
  12. "github.com/spf13/cast"
  13. )
  14. //公共平台数据采集服务,提供采集数据接收以及处理等
  15. type dataService struct {
  16. BaseService
  17. }
  18. var rtelecDataIns *dataService
  19. //初始化
  20. func init() {
  21. logger.Logger.Println("rtelec数据服务初始化")
  22. go func() {
  23. for {
  24. data, ok := <-datachannel.Service_DataPush_Chanl
  25. logger.Logger.Debug(fmt.Sprintf("=======接收到中台采集数据:%+v", data))
  26. if ok {
  27. utils.MessageIds.Remove(tools.IsEmpty(data["mid"]))
  28. ins := new(dataService)
  29. go ins.HandleDataReceive(data)
  30. }
  31. }
  32. }()
  33. }
  34. //接收到dataPush数据
  35. func (t *dataService) HandleDataReceive(data map[string]interface{}) {
  36. datacode := tools.IsEmpty(data["code"])
  37. if datacode == "getEvent" {
  38. go t.ReveiceEventQueryData(data)
  39. return
  40. }
  41. deviceService := new(DeviceService)
  42. deviceInfoAttr := deviceService.LoadDefaultDeviceAttr(false)
  43. var err error
  44. for key, val := range data {
  45. if key == "message" {
  46. var record map[string]interface{}
  47. if reflect.TypeOf(val).Kind() == reflect.String {
  48. bytes := []byte(val.(string))
  49. err = json.Unmarshal(bytes, &record)
  50. } else {
  51. record = val.(map[string]interface{})
  52. }
  53. if err == nil {
  54. var historyServiceObj = new(HistoryService)
  55. var deviceName = record["device_name"].(string)
  56. historyServiceObj.Model.Devicename = deviceName
  57. if Device_Id, ok := deviceInfoAttr.Load(deviceName); ok && tools.IsEmpty(Device_Id) != "" {
  58. historyServiceObj.Model.Deviceid = cast.ToInt32(Device_Id)
  59. } else {
  60. logger.Logger.Error(fmt.Sprintf("未发现设备%s定义,该设备可能还未创建或已删除!", deviceName))
  61. return
  62. }
  63. //将数据发送到by组件
  64. go t.SendToByCompnent(record)
  65. //接收数据后设置设备为在线状态
  66. deviceService.SetDeviceState(int(historyServiceObj.Model.Deviceid), 2)
  67. creattime := record["create_time"]
  68. historyServiceObj.Model.Date = tools.Unix2TimeString(int64(creattime.(float64)))
  69. var AttrInfo map[string]interface{}
  70. err = json.Unmarshal([]byte(record["value"].(string)), &AttrInfo)
  71. if err == nil {
  72. historyServiceObj.AttrInfo = AttrInfo
  73. //log.Println(fmt.Println("===接收到采集数据:%+v", Field))
  74. //处理告警事件表。注:告警逻辑已经迁移到历史数据入库后处理
  75. //go new(AlarmService).HandleAlarmEvent(historyServiceObj)
  76. //添加进历史数据表
  77. go historyServiceObj.InsertHistory()
  78. //处理联动事件表
  79. go new(LinkEventService).HandleLinkEvent(historyServiceObj)
  80. }
  81. }
  82. }
  83. }
  84. }
  85. //将数据发送到BY服务
  86. func (t *dataService) SendToByCompnent(data map[string]interface{}) {
  87. //map[message:{"model_name":"jwl灯光模型","device_name":"灯光1","create_time":1666503765,"timeout":0,"value":"{\"ON1\":0,\"ON2\":0,\"ON3\":0,\"ON4\":0}"} mid:d0eba2de-1954-a91d-9092-20ae0a746054]
  88. model_name := tools.IsEmpty(data["model_name"])
  89. if model_name == "" {
  90. logger.Logger.Error(fmt.Sprintf("接收到数据中未发现模型名称数据"))
  91. logger.Logger.Println(fmt.Sprintf("%v", data))
  92. return
  93. }
  94. modelServiceObj := new(ModelService)
  95. modeid := modelServiceObj.GetModelId(model_name)
  96. if modeid == 0 {
  97. logger.Logger.Error(fmt.Sprintf("未找到模型%s名称的模型定义!", model_name))
  98. return
  99. }
  100. mpserviceobj := new(MpinfoService)
  101. mplist := mpserviceobj.MpList(modeid)
  102. if len(mplist) == 0 {
  103. logger.Logger.Error(fmt.Sprintf("未找到模型%s名的测点定义!", model_name))
  104. return
  105. }
  106. mpids := map[string]string{}
  107. for _, r := range mplist {
  108. key := tools.IsEmpty(r["attrname"])
  109. mpids[key] = tools.IsEmpty(r["mpid"])
  110. }
  111. senddata := map[string]string{}
  112. dataValue := map[string]interface{}{}
  113. json.Unmarshal([]byte(tools.IsEmpty(data["value"])), &dataValue)
  114. for attrname, val := range dataValue {
  115. b := tools.IsEmpty(val)
  116. if b == "" {
  117. continue
  118. }
  119. vid := mpids[attrname]
  120. if vid == "" {
  121. continue
  122. }
  123. senddata[vid] = b
  124. }
  125. if len(senddata) == 0 {
  126. return
  127. }
  128. senddataStr, _ := json.Marshal(senddata)
  129. mqtt.PublishMessage("/rtelec/byzt/device/data", string(senddataStr))
  130. }
  131. //事件查询返回数据处理。如门禁记录查询
  132. func (t *dataService) ReveiceEventQueryData(data map[string]interface{}) {
  133. logger.Logger.Debug(fmt.Sprintf("======getEvent Result:", data))
  134. var message map[string]interface{}
  135. val := data["message"]
  136. var err error
  137. if reflect.TypeOf(val).Kind() == reflect.String {
  138. bytes := []byte(val.(string))
  139. err = json.Unmarshal(bytes, &message)
  140. } else {
  141. message = val.(map[string]interface{})
  142. }
  143. msgValue := tools.IsEmpty(message["value"])
  144. if msgValue == "" {
  145. return
  146. }
  147. msgValueObjStr, err := base64.StdEncoding.DecodeString(msgValue)
  148. if err != nil {
  149. logger.Logger.Error(err)
  150. logger.Logger.Println(data)
  151. return
  152. }
  153. msgValueObj := map[string]interface{}{}
  154. err = json.Unmarshal(msgValueObjStr, &msgValueObj)
  155. if err == nil {
  156. MessageId := tools.IsEmpty(data["mid"])
  157. // if msgValueObj["code"] == 200.00 {
  158. msgObj := new(utils.MsgStateManage)
  159. msgObj.SetMessageStateObj(MessageId, utils.MsgState{Success: true, State: true, Message: "操作成功", Value: msgValueObj})
  160. // } else {
  161. // log.Println(fmt.Sprintf("事件查询%s执行返回失败:%v", MessageId, data))
  162. // tools.SetMessageStateObj(MessageId, tools.MsgState{Success: false, State: true, Message: tools.IsEmpty(msgValueObj["msg"]), Value: -1})
  163. // }
  164. } else {
  165. logger.Logger.Debug("接收到数据不是有效的JSON格式:")
  166. logger.Logger.Debug(data)
  167. }
  168. }