package service import ( "encoding/base64" "encoding/json" "fmt" "reflect" "rtzh_elec_temperature/datachannel" "rtzh_elec_temperature/logger" "rtzh_elec_temperature/mqtt" "rtzh_elec_temperature/rtelec_app_public_lib/utils" "rtzh_elec_temperature/tools" "github.com/spf13/cast" ) //公共平台数据采集服务,提供采集数据接收以及处理等 type dataService struct { BaseService } var rtelecDataIns *dataService //初始化 func init() { logger.Logger.Println("rtelec数据服务初始化") go func() { for { data, ok := <-datachannel.Service_DataPush_Chanl logger.Logger.Debug(fmt.Sprintf("=======接收到中台采集数据:%+v", data)) if ok { utils.MessageIds.Remove(tools.IsEmpty(data["mid"])) ins := new(dataService) go ins.HandleDataReceive(data) } } }() } //接收到dataPush数据 func (t *dataService) HandleDataReceive(data map[string]interface{}) { datacode := tools.IsEmpty(data["code"]) if datacode == "getEvent" { go t.ReveiceEventQueryData(data) return } deviceService := new(DeviceService) deviceInfoAttr := deviceService.LoadDefaultDeviceAttr(false) var err error for key, val := range data { if key == "message" { var record map[string]interface{} if reflect.TypeOf(val).Kind() == reflect.String { bytes := []byte(val.(string)) err = json.Unmarshal(bytes, &record) } else { record = val.(map[string]interface{}) } if err == nil { var historyServiceObj = new(HistoryService) var deviceName = record["device_name"].(string) historyServiceObj.Model.Devicename = deviceName if Device_Id, ok := deviceInfoAttr.Load(deviceName); ok && tools.IsEmpty(Device_Id) != "" { historyServiceObj.Model.Deviceid = cast.ToInt32(Device_Id) } else { logger.Logger.Error(fmt.Sprintf("未发现设备%s定义,该设备可能还未创建或已删除!", deviceName)) return } //将数据发送到by组件 go t.SendToByCompnent(record) //接收数据后设置设备为在线状态 deviceService.SetDeviceState(int(historyServiceObj.Model.Deviceid), 2) creattime := record["create_time"] historyServiceObj.Model.Date = tools.Unix2TimeString(int64(creattime.(float64))) var AttrInfo map[string]interface{} err = json.Unmarshal([]byte(record["value"].(string)), &AttrInfo) if err == nil { historyServiceObj.AttrInfo = AttrInfo //log.Println(fmt.Println("===接收到采集数据:%+v", Field)) //处理告警事件表。注:告警逻辑已经迁移到历史数据入库后处理 //go new(AlarmService).HandleAlarmEvent(historyServiceObj) //添加进历史数据表 go historyServiceObj.InsertHistory() //处理联动事件表 go new(LinkEventService).HandleLinkEvent(historyServiceObj) } } } } } //将数据发送到BY服务 func (t *dataService) SendToByCompnent(data map[string]interface{}) { //map[message:{"model_name":"jwl灯光模型","device_name":"灯光1","create_time":1666503765,"timeout":0,"value":"{\"ON1\":0,\"ON2\":0,\"ON3\":0,\"ON4\":0}"} mid:d0eba2de-1954-a91d-9092-20ae0a746054] model_name := tools.IsEmpty(data["model_name"]) if model_name == "" { logger.Logger.Error(fmt.Sprintf("接收到数据中未发现模型名称数据")) logger.Logger.Println(fmt.Sprintf("%v", data)) return } modelServiceObj := new(ModelService) modeid := modelServiceObj.GetModelId(model_name) if modeid == 0 { logger.Logger.Error(fmt.Sprintf("未找到模型%s名称的模型定义!", model_name)) return } mpserviceobj := new(MpinfoService) mplist := mpserviceobj.MpList(modeid) if len(mplist) == 0 { logger.Logger.Error(fmt.Sprintf("未找到模型%s名的测点定义!", model_name)) return } mpids := map[string]string{} for _, r := range mplist { key := tools.IsEmpty(r["attrname"]) mpids[key] = tools.IsEmpty(r["mpid"]) } senddata := map[string]string{} dataValue := map[string]interface{}{} json.Unmarshal([]byte(tools.IsEmpty(data["value"])), &dataValue) for attrname, val := range dataValue { b := tools.IsEmpty(val) if b == "" { continue } vid := mpids[attrname] if vid == "" { continue } senddata[vid] = b } if len(senddata) == 0 { return } senddataStr, _ := json.Marshal(senddata) mqtt.PublishMessage("/rtelec/byzt/device/data", string(senddataStr)) } //事件查询返回数据处理。如门禁记录查询 func (t *dataService) ReveiceEventQueryData(data map[string]interface{}) { logger.Logger.Debug(fmt.Sprintf("======getEvent Result:", data)) var message map[string]interface{} val := data["message"] var err error if reflect.TypeOf(val).Kind() == reflect.String { bytes := []byte(val.(string)) err = json.Unmarshal(bytes, &message) } else { message = val.(map[string]interface{}) } msgValue := tools.IsEmpty(message["value"]) if msgValue == "" { return } msgValueObjStr, err := base64.StdEncoding.DecodeString(msgValue) if err != nil { logger.Logger.Error(err) logger.Logger.Println(data) return } msgValueObj := map[string]interface{}{} err = json.Unmarshal(msgValueObjStr, &msgValueObj) if err == nil { MessageId := tools.IsEmpty(data["mid"]) // if msgValueObj["code"] == 200.00 { msgObj := new(utils.MsgStateManage) msgObj.SetMessageStateObj(MessageId, utils.MsgState{Success: true, State: true, Message: "操作成功", Value: msgValueObj}) // } else { // log.Println(fmt.Sprintf("事件查询%s执行返回失败:%v", MessageId, data)) // tools.SetMessageStateObj(MessageId, tools.MsgState{Success: false, State: true, Message: tools.IsEmpty(msgValueObj["msg"]), Value: -1}) // } } else { logger.Logger.Debug("接收到数据不是有效的JSON格式:") logger.Logger.Debug(data) } }