123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704 |
- package service
- import (
- "encoding/json"
- "errors"
- "fmt"
- "io/fs"
- "os"
- "rtzh_elec_temperature/datachannel"
- "rtzh_elec_temperature/enum"
- "rtzh_elec_temperature/global"
- "rtzh_elec_temperature/logger"
- "rtzh_elec_temperature/models/bo"
- "sort"
- "sync"
- //"rtzh_elec_temperature/mqtt"
- "rtzh_elec_temperature/tools"
- "strconv"
- "strings"
- "time"
- "github.com/spf13/cast"
- "github.com/astaxie/beego/orm"
- )
- //本地历史数据管理
- type HistoryService struct {
- BaseService
- Model dev_history
- PageIndex int `json:"page_index"`
- PageSize int `json:"page_size"`
- StartDate string `json:"start_date"`
- EndDate string `json:"end_date"`
- Mpnames []string
- MpAttrnames []string
- AttrInfo map[string]interface{}
- }
- //历史数据模型
- type dev_history struct {
- Id int `orm:"pk"`
- Appid int
- Deviceid int32
- Devicename string
- Attrname string
- Mpid string
- Mpname string
- Date string
- Val int
- }
- func init() {
- orm.RegisterModel(new(dev_history))
- //加载本地缓存文件中的数据到缓存
- go func() {
- time.Sleep(1 * time.Second)
- new(HistoryService).GetMaxDataInfo()
- new(HistoryService).GetLastData()
- }()
- }
- // 添加历史数据
- func (field *HistoryService) InsertHistory() error {
- //appid := RtelecManageApp().RegAppID
- deviceServiceIns := new(DeviceService)
- //获取设备的模型信息
- deviceModelInfo := deviceServiceIns.GetComboxListByName(field.Model.Devicename)
- if deviceModelInfo == nil {
- logger.Logger.Error(fmt.Sprintf("设备%s未创建", field.Model.Devicename))
- return errors.New("设备" + field.Model.Devicename + "未创建")
- }
- modelid := deviceModelInfo.Modelid
- ormrow, err := new(bo.Global).GetCodeInfo("model_table_mapping", cast.ToString(modelid))
- if err != nil || tools.IsEmpty(ormrow["name"]) == "" {
- logger.Logger.Error(fmt.Sprintf("未找到模型id=%d对应的数据表关系", modelid))
- return err
- }
- //获取设备的测点
- mplist := deviceServiceIns.DeviceMpInfo(field.Model.Deviceid)
- if len(mplist) == 0 {
- logger.Logger.Debug(fmt.Sprintf("未获取到设备%s的测点数据", field.Model.Devicename))
- return nil
- }
- lastdatacache := map[string]interface{}{}
- localTableName := "t_data_" + tools.IsEmpty(ormrow["name"])
- db := orm.NewOrm()
- sqlvalues := []interface{}{field.Model.Devicename, field.Model.Deviceid, field.Model.Date}
- sqlChar := []string{"?", "?", "?"}
- sqlCols := []string{"device_name", "device_id", "acquisition_time"}
- //组装缓存数据
- lastdatacache["device_name"] = field.Model.Devicename
- lastdatacache["device_id"] = field.Model.Deviceid
- lastdatacache["acquisition_time"] = field.Model.Date
- lastdatacache_value := map[string]interface{}{} //采集数据
- dataoptimizeService := new(DataoptimizeService)
- for keyName, _ := range mplist {
- keyName2 := strings.ToLower(keyName)
- sqlCols = append(sqlCols, keyName2)
- sqlChar = append(sqlChar, "?")
- optValue := dataoptimizeService.OptimizeValue(modelid, field.Model.Deviceid, keyName, field.AttrInfo)
- sqlvalues = append(sqlvalues, optValue)
- if tools.IsEmpty(optValue) == global.ReplaceNumber {
- lastdatacache_value[keyName], _ = bo.GetSysParamValue("invalid_show_rule", "")
- } else {
- lastdatacache_value[keyName] = optValue
- }
- }
- lastdatacache["value"] = lastdatacache_value
- if len(lastdatacache_value) > 0 {
- sqlCommandText := fmt.Sprintf("insert into %s(%s)values(%s)", localTableName, strings.Join(sqlCols, ","), strings.Join(sqlChar, ","))
- _, err = db.Raw(sqlCommandText, sqlvalues).Exec()
- if err != nil {
- sqllog := fmt.Sprintf("SQL:%s 参数:%+v", sqlCommandText, sqlvalues)
- logger.Logger.Error(err, sqllog)
- new(bo.SystemLog).Fail(
- enum.AuditType_datapush,
- enum.LogType_Insert,
- enum.OptEventType_System,
- enum.OptEventLevel_Hight,
- sqllog,
- global.SystemLogDefaultAccount,
- )
- return err
- }
- //处理告警策略
- //field.AttrInfo = lastdatacache_value //经过优化后的测点数据
- go new(AlarmService).HandleAlarmEvent(field)
- //计算当前采集数据中每类测点的最高值
- go field.setMaxDataInfo(lastdatacache)
- //推送最后采集到前端
- go func(did int32, lastdatacache map[string]interface{}) {
- //将数据发送到MQTT主题/rtelec/runtime/device/data。一般该主题为前端订阅
- //msg, _ := json.Marshal(lastdatacache)
- // 2023-03-27 取消采用mqtt的方式向前端发送数据。改为websockets的方式。甲方mqtt只提供tcp协议
- // ======================================
- publishMes := map[string]interface{}{}
- publishMes["topic"] = global.Topic_RuntimeDataPublish
- publishMes["data"] = lastdatacache
- datachannel.SendDataQueue <- publishMes
- // ======================================
- //mqtt.PublishMessage(global.Topic_RuntimeDataPublish, string(msg))
- //缓存最后一次数据。
- ccc := cast.ToString(did)
- lastdata := map[string]interface{}{}
- v_lastdata, _ := global.GoCahce.Get("DeviceLastData")
- if v_lastdata == nil {
- lastdata = map[string]interface{}{ccc: lastdatacache}
- } else {
- lastdata = v_lastdata.(map[string]interface{})
- lastdata[ccc] = lastdatacache
- }
- global.GoCahce.Set("DeviceLastData", lastdata, -1)
- //缓存数据持久化到文件中,否则重启后一段时间内将获取不到最后一次的采集数据,从数据库查可能会很慢
- lastdataStr, _ := json.Marshal(lastdata)
- cachePath := "lastdata_cache.json"
- os.WriteFile(cachePath, lastdataStr, fs.ModePerm)
- }(field.Model.Deviceid, lastdatacache)
- }
- return nil
- }
- // 查询历史数据
- func (field *HistoryService) SearchHistory() (map[string]interface{}, int, error) {
- deviceSvr := new(DeviceService)
- //获取设备的模型信息
- deviceModelInfo := deviceSvr.GetComboxListById(field.Model.Deviceid)
- if deviceModelInfo == nil {
- logger.Logger.Error(fmt.Sprintf("设备%s未创建", field.Model.Devicename))
- return nil, 0, errors.New("设备" + field.Model.Devicename + "未创建")
- }
- field.Model.Devicename = deviceModelInfo.Name
- modelid := deviceModelInfo.Modelid
- ormrow, err := new(bo.Global).GetCodeInfo("model_table_mapping", cast.ToString(modelid))
- if err != nil || tools.IsEmpty(ormrow["name"]) == "" {
- logger.Logger.Error(fmt.Sprintf("未找到模型id=%d对应的数据表关系", modelid))
- return nil, 0, err
- }
- //获取设备的测点
- deviceMpList := deviceSvr.DeviceMpInfo(field.Model.Deviceid)
- if len(deviceMpList) == 0 {
- logger.Logger.Debug(fmt.Sprintf("未获取到设备%s的测点数据", field.Model.Devicename))
- return nil, 0, errors.New(fmt.Sprintf("未获取到设备%s的测点数据", field.Model.Devicename))
- }
- localTableName := "t_data_" + tools.IsEmpty(ormrow["name"])
- //返回格式对象定义.包括表头列名,表体数据行及数据子行
- result := map[string]interface{}{"theader": []interface{}{}, "tbody": []orm.Params{}}
- total := 0
- o := orm.NewOrm()
- colnames := []string{}
- temCols := []string{} //温度列
- humCols := []string{} //湿度列
- volCols := []string{} //电压列
- headerNames := map[string]interface{}{
- "name": "acquisition_time",
- "content": "时刻",
- }
- for _, mp := range deviceMpList {
- attrname := strings.ToLower(fmt.Sprintf("%s", mp["attrname"]))
- if attrname[0:3] == "tem" {
- temCols = append(temCols, attrname)
- } else if attrname[0:3] == "hum" {
- humCols = append(humCols, attrname)
- } else if attrname[0:3] == "vol" {
- volCols = append(volCols, attrname)
- }
- headerNames[attrname] = tools.IsEmpty(mp["mpname"])
- }
- sort.Strings(temCols)
- sort.Strings(humCols)
- sort.Strings(volCols)
- colnames = append(colnames, temCols...)
- colnames = append(colnames, humCols...)
- colnames = append(colnames, volCols...)
- for _, tv := range colnames {
- result["theader"] = append(result["theader"].([]interface{}), map[string]interface{}{"name": tv, "content": headerNames[tv]})
- }
- //数据查询SQL
- sql := fmt.Sprintf(" select acquisition_time, %s from %s where acquisition_time BETWEEN ? and ? and device_id=? ", strings.Join(colnames, ","), localTableName)
- //总数查询SQL
- totalSql := fmt.Sprintf(" select count(1) cnt from %s where acquisition_time BETWEEN ? and ? and device_id=? ", localTableName)
- sqlParas := []interface{}{field.StartDate + " 00:00:00", field.EndDate + " 23:59:59", field.Model.Deviceid}
- //分页条件
- limit := fmt.Sprintf(" order by acquisition_time desc limit %d,%d", (field.PageIndex-1)*field.PageSize, field.PageSize)
- rowset := []orm.Params{}
- tbody := [][]orm.Params{}
- //查询数据
- var wg = sync.WaitGroup{}
- wg.Add(2)
- go func() {
- _, err := o.Raw(sql+limit, sqlParas).Values(&rowset)
- if err != nil {
- wg.Done()
- logger.Logger.Error(err, fmt.Sprintf("SQL:%s 参数:%+v", sql+limit, sqlParas))
- return
- } else {
- //将查询结果转换成父子行
- tmpRows := [][]orm.Params{}
- for _, row := range rowset {
- objTem := orm.Params{
- "acquisition_time": row["acquisition_time"],
- "type": "温度",
- }
- objHum := orm.Params{
- "acquisition_time": row["acquisition_time"],
- "type": "湿度",
- }
- objVol := orm.Params{
- "acquisition_time": row["acquisition_time"],
- "type": "电压",
- }
- hasvol := false //是否有电压测点
- hashum := false //是否有湿度测点
- for colKey, v := range row {
- if colKey == "acquisition_time" {
- continue
- }
- v1 := tools.IsEmpty(v)
- if v1 == global.NullNumber {
- v1 = ""
- } else if v1 == global.ReplaceNumber {
- //将值显示为指定字符
- v1, _ = bo.GetSysParamValue("invalid_show_rule", "")
- }
- if colKey[0:3] == "tem" {
- objTem[colKey] = v1
- } else {
- objTem[colKey] = ""
- }
- if colKey[0:3] == "hum" {
- objHum[colKey] = v1
- hashum = true
- } else {
- objHum[colKey] = ""
- }
- if colKey[0:3] == "vol" {
- objVol[colKey] = v1
- hasvol = true
- } else {
- objVol[colKey] = ""
- }
- }
- if !hashum {
- objHum["type"] = ""
- }
- if !hasvol {
- objVol["type"] = ""
- }
- tmpRows = append(tmpRows, []orm.Params{objTem, objHum, objVol})
- }
- tbody = tmpRows
- }
- wg.Done()
- }()
- totalRowset := []orm.Params{}
- go func() {
- _, err = o.Raw(totalSql, sqlParas).Values(&totalRowset)
- wg.Done()
- if err != nil {
- logger.Logger.Error(err, fmt.Sprintf("SQL:%s 参数:%+v", totalSql, sqlParas))
- return
- }
- }()
- wg.Wait()
- if len(totalRowset) > 0 {
- total, _ = strconv.Atoi(tools.IsEmpty(totalRowset[0]["cnt"]))
- }
- result["tbody"] = tbody
- result["total"] = total
- return result, total, err
- }
- //生成历史数据echarts line数据。仅支持多个测点
- func (field *HistoryService) GetHistoryDataEchartsLine() (map[string]interface{}, error) {
- deviceSvr := new(DeviceService)
- //获取设备的模型信息
- deviceModelInfo := deviceSvr.GetComboxListById(field.Model.Deviceid)
- if deviceModelInfo == nil {
- logger.Logger.Error(fmt.Sprintf("设备%s未创建", field.Model.Devicename))
- return nil, errors.New("设备" + field.Model.Devicename + "未创建")
- }
- field.Model.Devicename = deviceModelInfo.Name
- modelid := deviceModelInfo.Modelid
- ormrow, err := new(bo.Global).GetCodeInfo("model_table_mapping", cast.ToString(modelid))
- if err != nil || tools.IsEmpty(ormrow["name"]) == "" {
- logger.Logger.Error(fmt.Sprintf("未找到模型id=%d对应的数据表关系", modelid))
- return nil, err
- }
- //获取设备的测点
- deviceMpList := deviceSvr.DeviceMpInfo(field.Model.Deviceid)
- if len(deviceMpList) == 0 {
- logger.Logger.Debug(fmt.Sprintf("未获取到设备%s的测点数据", field.Model.Devicename))
- return nil, errors.New(fmt.Sprintf("未获取到设备%s的测点数据", field.Model.Devicename))
- }
- if len(field.MpAttrnames) == 0 && len(field.Mpnames) == 0 {
- return nil, errors.New("未指定获取的测点名称或模型属性名")
- }
- if len(field.MpAttrnames) == 0 && len(field.Mpnames) > 0 {
- for _, n1 := range field.Mpnames {
- for attrname, n2 := range deviceMpList {
- if tools.IsEmpty(n2["mpname"]) == n1 {
- field.MpAttrnames = append(field.MpAttrnames, attrname)
- break
- }
- }
- }
- }
- localTableName := "t_data_" + tools.IsEmpty(ormrow["name"])
- //返回格式对象定义.
- result := map[string]interface{}{"legend": map[string]interface{}{}, "xAxis": map[string]interface{}{}, "series": []interface{}{}}
- o := orm.NewOrm()
- colnames := []string{}
- legendNames := []string{}
- deviceMpName := map[string]string{}
- for _, mp := range field.MpAttrnames {
- mpinfo := deviceMpList[mp]
- attrname := strings.ToLower(fmt.Sprintf("%s", mpinfo["attrname"]))
- mpname := tools.IsEmpty(mpinfo["mpname"])
- legendNames = append(legendNames, mpname)
- colnames = append(colnames, attrname)
- deviceMpName[attrname] = mpname
- }
- result["legend"] = map[string]interface{}{"data": legendNames}
- //数据查询SQL
- sql := fmt.Sprintf(" select acquisition_time, %s from %s where acquisition_time BETWEEN ? and ? order by id ", strings.Join(colnames, ","), localTableName)
- sqlParas := []interface{}{field.StartDate + " 00:00:00", field.EndDate + " 23:59:59"}
- rowset := []orm.Params{}
- _, err = o.Raw(sql, sqlParas).Values(&rowset)
- if err != nil {
- logger.Logger.Error(err, fmt.Sprintf("SQL:%s 参数:%+v", sql, sqlParas))
- return nil, err
- }
- //生成X轴上的时间点
- xAxisData := []string{}
- sameDateHour := map[string]int{}
- cDate := ""
- for _, row := range rowset {
- //acquisition_time和day不会同时存在于同一条数据中
- xPoint := tools.IsEmpty(row["acquisition_time"])
- sameDateHour[xPoint[:len(xPoint)-3]] = 1
- xPointParts := strings.Split(xPoint, " ")
- if cDate != xPointParts[0] {
- xPoint = xPoint[:len(xPoint)-3] //去除最后的秒
- } else {
- xPoint = xPointParts[1][:len(xPointParts[1])-3] //去除日期和最后的秒
- }
- cDate = xPointParts[0]
- xAxisData = append(xAxisData, xPoint)
- }
- result["xAxis"] = map[string]interface{}{"data": xAxisData}
- //生成该设备每个测点的序列数据
- for _, mp := range colnames {
- series := map[string]interface{}{}
- series["name"] = deviceMpName[mp] //序列名称
- datas := []string{}
- for _, row := range rowset {
- dataTime := tools.IsEmpty(row["acquisition_time"])
- if len(dataTime) > 10 {
- dataTime = dataTime[:len(dataTime)-3]
- }
- if sameDateHour[dataTime] == 1 {
- v := tools.IsEmpty(row[mp])
- if v == global.NullNumber || v == global.ReplaceNumber {
- v = ""
- }
- datas = append(datas, v)
- } else {
- datas = append(datas, "")
- }
- }
- series["data"] = datas
- result["series"] = append(result["series"].([]interface{}), series)
- }
- return result, err
- }
- //获取所有或者指定设备的最新数据
- func (t *HistoryService) GetRuntimeData() ([]map[string]interface{}, error) {
- db := orm.NewOrm()
- modelSvr := new(ModelService)
- allmode := modelSvr.GetModelListObj()
- if len(allmode) == 0 {
- return nil, errors.New("未配置应用的物模型!")
- }
- devSvr := new(DeviceService)
- devall, _ := devSvr.GetComboxList()
- result := []map[string]interface{}{}
- for _, modelitem := range allmode {
- modelitem2 := modelitem.(map[string]interface{})
- modelid, _ := strconv.Atoi(string(modelitem2["id"].(json.Number)))
- ormrow, err := new(bo.Global).GetCodeInfo("model_table_mapping", cast.ToString(modelid))
- if err != nil || tools.IsEmpty(ormrow["name"]) == "" {
- logger.Logger.Error(fmt.Sprintf("未找到模型id=%d对应的数据表关系", modelid))
- continue
- }
- localTableName := "t_data_" + tools.IsEmpty(ormrow["name"])
- //获取模型下的设备列表
- for _, devitem := range devall {
- if modelid == devitem.Modelid {
- //获取设备测点
- devmpist := devSvr.DeviceMpInfo(int32(devitem.Deviceid))
- if len(devmpist) == 0 {
- continue
- }
- cols := []string{}
- theader := []string{}
- tbody := orm.Params{}
- for attrname, item := range devmpist {
- cols = append(cols, strings.ToLower("a."+attrname))
- theader = append(theader, tools.IsEmpty(item["mpname"]))
- }
- sql := "SELECT a.device_id,a.device_name, a.acquisition_time," + strings.Join(cols, ",") + " from " + localTableName + " a,(select max(id) maxid from " + localTableName + " where device_id=?) b where a.id=b.maxid"
- tmpRows := []orm.Params{}
- _, err := db.Raw(sql, devitem.Deviceid).Values(&tmpRows)
- if err != nil {
- logger.Logger.Error(err, fmt.Sprintf("SQL:%s 参数:%+v", sql, devitem.Deviceid))
- continue
- }
- if len(tmpRows) > 0 {
- for atr, atv := range tmpRows[0] {
- vv := tools.IsEmpty(atv)
- if vv == global.NullNumber {
- tmpRows[0][atr] = ""
- } else if vv == global.ReplaceNumber {
- tmpRows[0][atr], _ = bo.GetSysParamValue("invalid_show_rule", "")
- }
- }
- tbody = tmpRows[0]
- }
- result = append(result, map[string]interface{}{
- "device_name": devitem.Name,
- "device_id": devitem.Deviceid,
- "thead": theader,
- "tbody": tbody,
- })
- }
- }
- }
- return result, nil
- }
- //获取设备最后一次报的历史数据
- //设备最后一次报的数据存储在缓存DeviceLastData中
- func (t *HistoryService) GetLastData() (map[string]interface{}, error) {
- v_devceid := ""
- if t.Model.Deviceid > 0 {
- v_devceid = tools.IsEmpty(t.Model.Deviceid)
- }
- if lastdata, h := global.GoCahce.Get("DeviceLastData"); h {
- lastdata2 := lastdata.(map[string]interface{})
- if v_devceid == "" {
- return lastdata2, nil
- }
- if lastdata2[v_devceid] == nil {
- return nil, nil
- }
- return lastdata2[v_devceid].(map[string]interface{}), nil
- } else {
- cachePath := "lastdata_cache.json"
- d1, ferr := os.ReadFile(cachePath)
- if ferr == nil && len(d1) > 0 {
- lastdata := map[string]interface{}{}
- err := json.Unmarshal(d1, &lastdata)
- if err != nil {
- logger.Logger.Error(err)
- } else {
- global.GoCahce.Set("DeviceLastData", lastdata, -1)
- }
- return lastdata, err
- } else {
- logger.Logger.Error(ferr)
- }
- }
- return nil, nil
- }
- //删除本地库历史数据
- func (t *HistoryService) DeleteHistory(deviceId int32, MpId int64) {
- deviceServiceIns := new(DeviceService)
- //获取设备的模型信息
- deviceModelInfo := deviceServiceIns.GetComboxListById(deviceId)
- if deviceModelInfo == nil {
- logger.Logger.Error(fmt.Sprintf("设备%d未创建", deviceId))
- return
- }
- modelid := deviceModelInfo.Modelid
- ormrow, err := new(bo.Global).GetCodeInfo("model_table_mapping", cast.ToString(modelid))
- if err != nil || tools.IsEmpty(ormrow["name"]) == "" {
- logger.Logger.Error(fmt.Sprintf("未找到模型id=%d对应的数据表关系", modelid))
- return
- }
- localTableName := "t_data_" + tools.IsEmpty(ormrow["name"])
- o := orm.NewOrm()
- var sqlCommandText = ""
- var sqlParameter []interface{}
- if MpId > 0 {
- mpinfo, err := new(MpinfoService).GetMpinfo(MpId)
- if err != nil {
- return
- }
- colname := tools.IsEmpty(mpinfo["attrname"])
- if colname == "" {
- return
- }
- //设置指定列为空数据,以9999表示数据为空
- sqlCommandText = "update " + localTableName + " set " + colname + "=? where mpid=? "
- sqlParameter = append(sqlParameter, global.NullNumber)
- sqlParameter = append(sqlParameter, MpId)
- } else if deviceId > 0 {
- sqlCommandText = "delete from " + localTableName + " where device_id=? "
- sqlParameter = append(sqlParameter, deviceId)
- }
- if len(sqlParameter) > 0 {
- _, err = o.Raw(sqlCommandText, sqlParameter).Exec()
- sqllog := fmt.Sprintf("SQL:%s 参数:%+v", sqlCommandText, sqlParameter)
- if err != nil {
- logger.Logger.Error(err, sqllog)
- new(bo.SystemLog).Fail(
- enum.AuditType_datapush,
- enum.LogType_Delete,
- enum.OptEventType_System,
- enum.OptEventLevel_Hight,
- sqllog,
- global.SystemLogDefaultAccount,
- )
- }
- }
- }
- //获取当日最高温度、湿度、电压的设备和时间数据
- //将当日最高温度、湿度、电压的设备和时间数据存储在缓存MaxDataInfo中
- func (t *HistoryService) GetMaxDataInfo() (map[string]interface{}, error) {
- if lastdata, h := global.GoCahce.Get("CurrentMaxDataInfo"); h {
- lastdata2 := lastdata.(map[string]interface{})
- return lastdata2, nil
- } else {
- cachePath := "current_maxdata_cache.json"
- d1, ferr := os.ReadFile(cachePath)
- if ferr == nil && len(d1) > 0 {
- lastdata := map[string]interface{}{}
- err := json.Unmarshal(d1, &lastdata)
- if err != nil {
- logger.Logger.Error(err)
- } else {
- global.GoCahce.Set("CurrentMaxDataInfo", lastdata, -1)
- }
- return lastdata, err
- } else {
- logger.Logger.Error(ferr)
- }
- }
- return nil, nil
- }
- //保存当日最高数据信息
- func (t *HistoryService) setMaxDataInfo(info map[string]interface{}) {
- maxdata := map[string]interface{}{}
- maxTem := map[string]interface{}{}
- maxVol := map[string]interface{}{}
- maxHum := map[string]interface{}{}
- value := info["value"].(map[string]interface{})
- v_lastdata, _ := global.GoCahce.Get("CurrentMaxDataInfo")
- v_tem := float64(-10000)
- v_vol := float64(-10000)
- v_hum := float64(-10000)
- for k, v := range value {
- v1 := cast.ToFloat64(v)
- if v1 > 99990 {
- //无效数据99999,采集float类型,所以比较值比实际值小一些,这样不用转换精度
- continue
- }
- attrnameType := strings.ToLower(k[0:3])
- if attrnameType == "tem" && v1 > v_tem {
- v_tem = v1
- maxTem["attr_name"] = k
- } else if attrnameType == "vol" && v1 > v_vol {
- v_vol = v1
- maxVol["attr_name"] = k
- } else if attrnameType == "hum" && v1 > v_hum {
- v_hum = v1
- maxHum["attr_name"] = k
- }
- }
- maxTem["device_name"] = info["device_name"]
- maxVol["device_name"] = info["device_name"]
- maxHum["device_name"] = info["device_name"]
- maxTem["device_id"] = info["device_id"]
- maxVol["device_id"] = info["device_id"]
- maxHum["device_id"] = info["device_id"]
- maxTem["acquisition_time"] = info["acquisition_time"]
- maxVol["acquisition_time"] = info["acquisition_time"]
- maxHum["acquisition_time"] = info["acquisition_time"]
- if v_tem > -9999 {
- maxTem["max"] = v_tem
- }
- if v_hum > -9999 {
- maxHum["max"] = v_hum
- }
- if v_vol > -9999 {
- maxVol["max"] = v_vol
- }
- if v_lastdata == nil {
- maxdata = map[string]interface{}{"tem": maxTem, "vol": maxVol, "hum": maxHum}
- } else {
- maxdata = v_lastdata.(map[string]interface{})
- if tmpv, h := maxdata["tem"]; h {
- oldTem := tmpv.(map[string]interface{})
- if oldTem["max"] != nil {
- oldTemMax := cast.ToFloat64(oldTem["max"])
- if v_tem > oldTemMax {
- maxdata["tem"] = maxTem
- }
- } else {
- maxdata["tem"] = maxTem
- }
- } else if v_tem > -9999 {
- maxdata["tem"] = maxTem
- }
- if tmpv, h := maxdata["vol"]; h {
- oldTem := tmpv.(map[string]interface{})
- if oldTem["max"] != nil {
- oldTemMax := cast.ToFloat64(oldTem["max"])
- if v_tem > oldTemMax {
- maxdata["vol"] = maxVol
- }
- } else {
- maxdata["vol"] = maxVol
- }
- } else if v_tem > -9999 {
- maxdata["vol"] = maxVol
- }
- if tmpv, h := maxdata["hum"]; h {
- oldTem := tmpv.(map[string]interface{})
- if oldTem["max"] != nil {
- oldTemMax := cast.ToFloat64(oldTem["max"])
- if v_tem > oldTemMax {
- maxdata["hum"] = maxHum
- }
- } else {
- maxdata["hum"] = maxHum
- }
- } else if v_tem > -9999 {
- maxdata["hum"] = maxHum
- }
- }
- logger.Logger.Debug(fmt.Sprintf("======当前最高值数据统计结果:%+v", maxdata))
- global.GoCahce.Set("CurrentMaxDataInfo", maxdata, -1)
- //缓存数据持久化到文件中,否则重启后一段时间内将获取不到最后一次的采集数据,从数据库查可能会很慢
- lastdataStr, _ := json.Marshal(maxdata)
- cachePath := "current_maxdata_cache.json"
- os.WriteFile(cachePath, lastdataStr, fs.ModePerm)
- //MQTT方式推送方式主机不支持
- //msg, _ := json.Marshal(maxdata)
- //mqtt.PublishMessage(global.Topic_TodayMaxDataPublish, string(msg))
- //采用ws方式
- publishMes := map[string]interface{}{}
- publishMes["topic"] = global.Topic_TodayMaxDataPublish
- publishMes["data"] = maxdata
- datachannel.SendDataQueue <- publishMes
- }
|