| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177 |
- package service
- import (
- "encoding/base64"
- "encoding/json"
- "fmt"
- "reflect"
- "rtzh_elec_temperature/datachannel"
- "rtzh_elec_temperature/logger"
- "rtzh_elec_temperature/mqtt"
- "rtzh_elec_temperature/rtelec_app_public_lib/utils"
- "rtzh_elec_temperature/tools"
- "github.com/spf13/cast"
- )
- //公共平台数据采集服务,提供采集数据接收以及处理等
- type dataService struct {
- BaseService
- }
- var rtelecDataIns *dataService
- //初始化
- func init() {
- logger.Logger.Println("rtelec数据服务初始化")
- go func() {
- for {
- data, ok := <-datachannel.Service_DataPush_Chanl
- logger.Logger.Debug(fmt.Sprintf("=======接收到中台采集数据:%+v", data))
- if ok {
- utils.MessageIds.Remove(tools.IsEmpty(data["mid"]))
- ins := new(dataService)
- go ins.HandleDataReceive(data)
- }
- }
- }()
- }
- //接收到dataPush数据
- func (t *dataService) HandleDataReceive(data map[string]interface{}) {
- datacode := tools.IsEmpty(data["code"])
- if datacode == "getEvent" {
- go t.ReveiceEventQueryData(data)
- return
- }
- deviceService := new(DeviceService)
- deviceInfoAttr := deviceService.LoadDefaultDeviceAttr(false)
- var err error
- for key, val := range data {
- if key == "message" {
- var record map[string]interface{}
- if reflect.TypeOf(val).Kind() == reflect.String {
- bytes := []byte(val.(string))
- err = json.Unmarshal(bytes, &record)
- } else {
- record = val.(map[string]interface{})
- }
- if err == nil {
- var historyServiceObj = new(HistoryService)
- var deviceName = record["device_name"].(string)
- historyServiceObj.Model.Devicename = deviceName
- if Device_Id, ok := deviceInfoAttr.Load(deviceName); ok && tools.IsEmpty(Device_Id) != "" {
- historyServiceObj.Model.Deviceid = cast.ToInt32(Device_Id)
- } else {
- logger.Logger.Error(fmt.Sprintf("未发现设备%s定义,该设备可能还未创建或已删除!", deviceName))
- return
- }
- //将数据发送到by组件
- go t.SendToByCompnent(record)
- //接收数据后设置设备为在线状态
- deviceService.SetDeviceState(int(historyServiceObj.Model.Deviceid), 2)
- creattime := record["create_time"]
- historyServiceObj.Model.Date = tools.Unix2TimeString(int64(creattime.(float64)))
- var AttrInfo map[string]interface{}
- err = json.Unmarshal([]byte(record["value"].(string)), &AttrInfo)
- if err == nil {
- historyServiceObj.AttrInfo = AttrInfo
- //log.Println(fmt.Println("===接收到采集数据:%+v", Field))
- //处理告警事件表。注:告警逻辑已经迁移到历史数据入库后处理
- //go new(AlarmService).HandleAlarmEvent(historyServiceObj)
- //添加进历史数据表
- go historyServiceObj.InsertHistory()
- //处理联动事件表
- go new(LinkEventService).HandleLinkEvent(historyServiceObj)
- }
- }
- }
- }
- }
- //将数据发送到BY服务
- func (t *dataService) SendToByCompnent(data map[string]interface{}) {
- //map[message:{"model_name":"jwl灯光模型","device_name":"灯光1","create_time":1666503765,"timeout":0,"value":"{\"ON1\":0,\"ON2\":0,\"ON3\":0,\"ON4\":0}"} mid:d0eba2de-1954-a91d-9092-20ae0a746054]
- model_name := tools.IsEmpty(data["model_name"])
- if model_name == "" {
- logger.Logger.Error(fmt.Sprintf("接收到数据中未发现模型名称数据"))
- logger.Logger.Println(fmt.Sprintf("%v", data))
- return
- }
- modelServiceObj := new(ModelService)
- modeid := modelServiceObj.GetModelId(model_name)
- if modeid == 0 {
- logger.Logger.Error(fmt.Sprintf("未找到模型%s名称的模型定义!", model_name))
- return
- }
- mpserviceobj := new(MpinfoService)
- mplist := mpserviceobj.MpList(modeid)
- if len(mplist) == 0 {
- logger.Logger.Error(fmt.Sprintf("未找到模型%s名的测点定义!", model_name))
- return
- }
- mpids := map[string]string{}
- for _, r := range mplist {
- key := tools.IsEmpty(r["attrname"])
- mpids[key] = tools.IsEmpty(r["mpid"])
- }
- senddata := map[string]string{}
- dataValue := map[string]interface{}{}
- json.Unmarshal([]byte(tools.IsEmpty(data["value"])), &dataValue)
- for attrname, val := range dataValue {
- b := tools.IsEmpty(val)
- if b == "" {
- continue
- }
- vid := mpids[attrname]
- if vid == "" {
- continue
- }
- senddata[vid] = b
- }
- if len(senddata) == 0 {
- return
- }
- senddataStr, _ := json.Marshal(senddata)
- mqtt.PublishMessage("/rtelec/byzt/device/data", string(senddataStr))
- }
- //事件查询返回数据处理。如门禁记录查询
- func (t *dataService) ReveiceEventQueryData(data map[string]interface{}) {
- logger.Logger.Debug(fmt.Sprintf("======getEvent Result:", data))
- var message map[string]interface{}
- val := data["message"]
- var err error
- if reflect.TypeOf(val).Kind() == reflect.String {
- bytes := []byte(val.(string))
- err = json.Unmarshal(bytes, &message)
- } else {
- message = val.(map[string]interface{})
- }
- msgValue := tools.IsEmpty(message["value"])
- if msgValue == "" {
- return
- }
- msgValueObjStr, err := base64.StdEncoding.DecodeString(msgValue)
- if err != nil {
- logger.Logger.Error(err)
- logger.Logger.Println(data)
- return
- }
- msgValueObj := map[string]interface{}{}
- err = json.Unmarshal(msgValueObjStr, &msgValueObj)
- if err == nil {
- MessageId := tools.IsEmpty(data["mid"])
- // if msgValueObj["code"] == 200.00 {
- msgObj := new(utils.MsgStateManage)
- msgObj.SetMessageStateObj(MessageId, utils.MsgState{Success: true, State: true, Message: "操作成功", Value: msgValueObj})
- // } else {
- // log.Println(fmt.Sprintf("事件查询%s执行返回失败:%v", MessageId, data))
- // tools.SetMessageStateObj(MessageId, tools.MsgState{Success: false, State: true, Message: tools.IsEmpty(msgValueObj["msg"]), Value: -1})
- // }
- } else {
- logger.Logger.Debug("接收到数据不是有效的JSON格式:")
- logger.Logger.Debug(data)
- }
- }
|