package service import ( "encoding/json" "errors" "fmt" "io/fs" "os" "rtzh_elec_temperature/datachannel" "rtzh_elec_temperature/enum" "rtzh_elec_temperature/global" "rtzh_elec_temperature/logger" "rtzh_elec_temperature/models/bo" "sort" "sync" //"rtzh_elec_temperature/mqtt" "rtzh_elec_temperature/tools" "strconv" "strings" "time" "github.com/spf13/cast" "github.com/astaxie/beego/orm" ) //本地历史数据管理 type HistoryService struct { BaseService Model dev_history PageIndex int `json:"page_index"` PageSize int `json:"page_size"` StartDate string `json:"start_date"` EndDate string `json:"end_date"` Mpnames []string MpAttrnames []string AttrInfo map[string]interface{} } //历史数据模型 type dev_history struct { Id int `orm:"pk"` Appid int Deviceid int32 Devicename string Attrname string Mpid string Mpname string Date string Val int } func init() { orm.RegisterModel(new(dev_history)) //加载本地缓存文件中的数据到缓存 go func() { time.Sleep(1 * time.Second) new(HistoryService).GetMaxDataInfo() new(HistoryService).GetLastData() }() } // 添加历史数据 func (field *HistoryService) InsertHistory() error { //appid := RtelecManageApp().RegAppID deviceServiceIns := new(DeviceService) //获取设备的模型信息 deviceModelInfo := deviceServiceIns.GetComboxListByName(field.Model.Devicename) if deviceModelInfo == nil { logger.Logger.Error(fmt.Sprintf("设备%s未创建", field.Model.Devicename)) return errors.New("设备" + field.Model.Devicename + "未创建") } modelid := deviceModelInfo.Modelid ormrow, err := new(bo.Global).GetCodeInfo("model_table_mapping", cast.ToString(modelid)) if err != nil || tools.IsEmpty(ormrow["name"]) == "" { logger.Logger.Error(fmt.Sprintf("未找到模型id=%d对应的数据表关系", modelid)) return err } //获取设备的测点 mplist := deviceServiceIns.DeviceMpInfo(field.Model.Deviceid) if len(mplist) == 0 { logger.Logger.Debug(fmt.Sprintf("未获取到设备%s的测点数据", field.Model.Devicename)) return nil } lastdatacache := map[string]interface{}{} localTableName := "t_data_" + tools.IsEmpty(ormrow["name"]) db := orm.NewOrm() sqlvalues := []interface{}{field.Model.Devicename, field.Model.Deviceid, field.Model.Date} sqlChar := []string{"?", "?", "?"} sqlCols := []string{"device_name", "device_id", "acquisition_time"} //组装缓存数据 lastdatacache["device_name"] = field.Model.Devicename lastdatacache["device_id"] = field.Model.Deviceid lastdatacache["acquisition_time"] = field.Model.Date lastdatacache_value := map[string]interface{}{} //采集数据 dataoptimizeService := new(DataoptimizeService) for keyName, _ := range mplist { keyName2 := strings.ToLower(keyName) sqlCols = append(sqlCols, keyName2) sqlChar = append(sqlChar, "?") optValue := dataoptimizeService.OptimizeValue(modelid, field.Model.Deviceid, keyName, field.AttrInfo) sqlvalues = append(sqlvalues, optValue) if tools.IsEmpty(optValue) == global.ReplaceNumber { lastdatacache_value[keyName], _ = bo.GetSysParamValue("invalid_show_rule", "") } else { lastdatacache_value[keyName] = optValue } } lastdatacache["value"] = lastdatacache_value if len(lastdatacache_value) > 0 { sqlCommandText := fmt.Sprintf("insert into %s(%s)values(%s)", localTableName, strings.Join(sqlCols, ","), strings.Join(sqlChar, ",")) _, err = db.Raw(sqlCommandText, sqlvalues).Exec() if err != nil { sqllog := fmt.Sprintf("SQL:%s 参数:%+v", sqlCommandText, sqlvalues) logger.Logger.Error(err, sqllog) new(bo.SystemLog).Fail( enum.AuditType_datapush, enum.LogType_Insert, enum.OptEventType_System, enum.OptEventLevel_Hight, sqllog, global.SystemLogDefaultAccount, ) return err } //处理告警策略 //field.AttrInfo = lastdatacache_value //经过优化后的测点数据 go new(AlarmService).HandleAlarmEvent(field) //计算当前采集数据中每类测点的最高值 go field.setMaxDataInfo(lastdatacache) //推送最后采集到前端 go func(did int32, lastdatacache map[string]interface{}) { //将数据发送到MQTT主题/rtelec/runtime/device/data。一般该主题为前端订阅 //msg, _ := json.Marshal(lastdatacache) // 2023-03-27 取消采用mqtt的方式向前端发送数据。改为websockets的方式。甲方mqtt只提供tcp协议 // ====================================== publishMes := map[string]interface{}{} publishMes["topic"] = global.Topic_RuntimeDataPublish publishMes["data"] = lastdatacache datachannel.SendDataQueue <- publishMes // ====================================== //mqtt.PublishMessage(global.Topic_RuntimeDataPublish, string(msg)) //缓存最后一次数据。 ccc := cast.ToString(did) lastdata := map[string]interface{}{} v_lastdata, _ := global.GoCahce.Get("DeviceLastData") if v_lastdata == nil { lastdata = map[string]interface{}{ccc: lastdatacache} } else { lastdata = v_lastdata.(map[string]interface{}) lastdata[ccc] = lastdatacache } global.GoCahce.Set("DeviceLastData", lastdata, -1) //缓存数据持久化到文件中,否则重启后一段时间内将获取不到最后一次的采集数据,从数据库查可能会很慢 lastdataStr, _ := json.Marshal(lastdata) cachePath := "lastdata_cache.json" os.WriteFile(cachePath, lastdataStr, fs.ModePerm) }(field.Model.Deviceid, lastdatacache) } return nil } // 查询历史数据 func (field *HistoryService) SearchHistory() (map[string]interface{}, int, error) { deviceSvr := new(DeviceService) //获取设备的模型信息 deviceModelInfo := deviceSvr.GetComboxListById(field.Model.Deviceid) if deviceModelInfo == nil { logger.Logger.Error(fmt.Sprintf("设备%s未创建", field.Model.Devicename)) return nil, 0, errors.New("设备" + field.Model.Devicename + "未创建") } field.Model.Devicename = deviceModelInfo.Name modelid := deviceModelInfo.Modelid ormrow, err := new(bo.Global).GetCodeInfo("model_table_mapping", cast.ToString(modelid)) if err != nil || tools.IsEmpty(ormrow["name"]) == "" { logger.Logger.Error(fmt.Sprintf("未找到模型id=%d对应的数据表关系", modelid)) return nil, 0, err } //获取设备的测点 deviceMpList := deviceSvr.DeviceMpInfo(field.Model.Deviceid) if len(deviceMpList) == 0 { logger.Logger.Debug(fmt.Sprintf("未获取到设备%s的测点数据", field.Model.Devicename)) return nil, 0, errors.New(fmt.Sprintf("未获取到设备%s的测点数据", field.Model.Devicename)) } localTableName := "t_data_" + tools.IsEmpty(ormrow["name"]) //返回格式对象定义.包括表头列名,表体数据行及数据子行 result := map[string]interface{}{"theader": []interface{}{}, "tbody": []orm.Params{}} total := 0 o := orm.NewOrm() colnames := []string{} temCols := []string{} //温度列 humCols := []string{} //湿度列 volCols := []string{} //电压列 headerNames := map[string]interface{}{ "name": "acquisition_time", "content": "时刻", } for _, mp := range deviceMpList { attrname := strings.ToLower(fmt.Sprintf("%s", mp["attrname"])) if attrname[0:3] == "tem" { temCols = append(temCols, attrname) } else if attrname[0:3] == "hum" { humCols = append(humCols, attrname) } else if attrname[0:3] == "vol" { volCols = append(volCols, attrname) } headerNames[attrname] = tools.IsEmpty(mp["mpname"]) } sort.Strings(temCols) sort.Strings(humCols) sort.Strings(volCols) colnames = append(colnames, temCols...) colnames = append(colnames, humCols...) colnames = append(colnames, volCols...) for _, tv := range colnames { result["theader"] = append(result["theader"].([]interface{}), map[string]interface{}{"name": tv, "content": headerNames[tv]}) } //数据查询SQL sql := fmt.Sprintf(" select acquisition_time, %s from %s where acquisition_time BETWEEN ? and ? and device_id=? ", strings.Join(colnames, ","), localTableName) //总数查询SQL totalSql := fmt.Sprintf(" select count(1) cnt from %s where acquisition_time BETWEEN ? and ? and device_id=? ", localTableName) sqlParas := []interface{}{field.StartDate + " 00:00:00", field.EndDate + " 23:59:59", field.Model.Deviceid} //分页条件 limit := fmt.Sprintf(" order by acquisition_time desc limit %d,%d", (field.PageIndex-1)*field.PageSize, field.PageSize) rowset := []orm.Params{} tbody := [][]orm.Params{} //查询数据 var wg = sync.WaitGroup{} wg.Add(2) go func() { _, err := o.Raw(sql+limit, sqlParas).Values(&rowset) if err != nil { wg.Done() logger.Logger.Error(err, fmt.Sprintf("SQL:%s 参数:%+v", sql+limit, sqlParas)) return } else { //将查询结果转换成父子行 tmpRows := [][]orm.Params{} for _, row := range rowset { objTem := orm.Params{ "acquisition_time": row["acquisition_time"], "type": "温度", } objHum := orm.Params{ "acquisition_time": row["acquisition_time"], "type": "湿度", } objVol := orm.Params{ "acquisition_time": row["acquisition_time"], "type": "电压", } hasvol := false //是否有电压测点 hashum := false //是否有湿度测点 for colKey, v := range row { if colKey == "acquisition_time" { continue } v1 := tools.IsEmpty(v) if v1 == global.NullNumber { v1 = "" } else if v1 == global.ReplaceNumber { //将值显示为指定字符 v1, _ = bo.GetSysParamValue("invalid_show_rule", "") } if colKey[0:3] == "tem" { objTem[colKey] = v1 } else { objTem[colKey] = "" } if colKey[0:3] == "hum" { objHum[colKey] = v1 hashum = true } else { objHum[colKey] = "" } if colKey[0:3] == "vol" { objVol[colKey] = v1 hasvol = true } else { objVol[colKey] = "" } } if !hashum { objHum["type"] = "" } if !hasvol { objVol["type"] = "" } tmpRows = append(tmpRows, []orm.Params{objTem, objHum, objVol}) } tbody = tmpRows } wg.Done() }() totalRowset := []orm.Params{} go func() { _, err = o.Raw(totalSql, sqlParas).Values(&totalRowset) wg.Done() if err != nil { logger.Logger.Error(err, fmt.Sprintf("SQL:%s 参数:%+v", totalSql, sqlParas)) return } }() wg.Wait() if len(totalRowset) > 0 { total, _ = strconv.Atoi(tools.IsEmpty(totalRowset[0]["cnt"])) } result["tbody"] = tbody result["total"] = total return result, total, err } //生成历史数据echarts line数据。仅支持多个测点 func (field *HistoryService) GetHistoryDataEchartsLine() (map[string]interface{}, error) { deviceSvr := new(DeviceService) //获取设备的模型信息 deviceModelInfo := deviceSvr.GetComboxListById(field.Model.Deviceid) if deviceModelInfo == nil { logger.Logger.Error(fmt.Sprintf("设备%s未创建", field.Model.Devicename)) return nil, errors.New("设备" + field.Model.Devicename + "未创建") } field.Model.Devicename = deviceModelInfo.Name modelid := deviceModelInfo.Modelid ormrow, err := new(bo.Global).GetCodeInfo("model_table_mapping", cast.ToString(modelid)) if err != nil || tools.IsEmpty(ormrow["name"]) == "" { logger.Logger.Error(fmt.Sprintf("未找到模型id=%d对应的数据表关系", modelid)) return nil, err } //获取设备的测点 deviceMpList := deviceSvr.DeviceMpInfo(field.Model.Deviceid) if len(deviceMpList) == 0 { logger.Logger.Debug(fmt.Sprintf("未获取到设备%s的测点数据", field.Model.Devicename)) return nil, errors.New(fmt.Sprintf("未获取到设备%s的测点数据", field.Model.Devicename)) } if len(field.MpAttrnames) == 0 && len(field.Mpnames) == 0 { return nil, errors.New("未指定获取的测点名称或模型属性名") } if len(field.MpAttrnames) == 0 && len(field.Mpnames) > 0 { for _, n1 := range field.Mpnames { for attrname, n2 := range deviceMpList { if tools.IsEmpty(n2["mpname"]) == n1 { field.MpAttrnames = append(field.MpAttrnames, attrname) break } } } } localTableName := "t_data_" + tools.IsEmpty(ormrow["name"]) //返回格式对象定义. result := map[string]interface{}{"legend": map[string]interface{}{}, "xAxis": map[string]interface{}{}, "series": []interface{}{}} o := orm.NewOrm() colnames := []string{} legendNames := []string{} deviceMpName := map[string]string{} for _, mp := range field.MpAttrnames { mpinfo := deviceMpList[mp] attrname := strings.ToLower(fmt.Sprintf("%s", mpinfo["attrname"])) mpname := tools.IsEmpty(mpinfo["mpname"]) legendNames = append(legendNames, mpname) colnames = append(colnames, attrname) deviceMpName[attrname] = mpname } result["legend"] = map[string]interface{}{"data": legendNames} //数据查询SQL sql := fmt.Sprintf(" select acquisition_time, %s from %s where acquisition_time BETWEEN ? and ? order by id ", strings.Join(colnames, ","), localTableName) sqlParas := []interface{}{field.StartDate + " 00:00:00", field.EndDate + " 23:59:59"} rowset := []orm.Params{} _, err = o.Raw(sql, sqlParas).Values(&rowset) if err != nil { logger.Logger.Error(err, fmt.Sprintf("SQL:%s 参数:%+v", sql, sqlParas)) return nil, err } //生成X轴上的时间点 xAxisData := []string{} sameDateHour := map[string]int{} cDate := "" for _, row := range rowset { //acquisition_time和day不会同时存在于同一条数据中 xPoint := tools.IsEmpty(row["acquisition_time"]) sameDateHour[xPoint[:len(xPoint)-3]] = 1 xPointParts := strings.Split(xPoint, " ") if cDate != xPointParts[0] { xPoint = xPoint[:len(xPoint)-3] //去除最后的秒 } else { xPoint = xPointParts[1][:len(xPointParts[1])-3] //去除日期和最后的秒 } cDate = xPointParts[0] xAxisData = append(xAxisData, xPoint) } result["xAxis"] = map[string]interface{}{"data": xAxisData} //生成该设备每个测点的序列数据 for _, mp := range colnames { series := map[string]interface{}{} series["name"] = deviceMpName[mp] //序列名称 datas := []string{} for _, row := range rowset { dataTime := tools.IsEmpty(row["acquisition_time"]) if len(dataTime) > 10 { dataTime = dataTime[:len(dataTime)-3] } if sameDateHour[dataTime] == 1 { v := tools.IsEmpty(row[mp]) if v == global.NullNumber || v == global.ReplaceNumber { v = "" } datas = append(datas, v) } else { datas = append(datas, "") } } series["data"] = datas result["series"] = append(result["series"].([]interface{}), series) } return result, err } //获取所有或者指定设备的最新数据 func (t *HistoryService) GetRuntimeData() ([]map[string]interface{}, error) { db := orm.NewOrm() modelSvr := new(ModelService) allmode := modelSvr.GetModelListObj() if len(allmode) == 0 { return nil, errors.New("未配置应用的物模型!") } devSvr := new(DeviceService) devall, _ := devSvr.GetComboxList() result := []map[string]interface{}{} for _, modelitem := range allmode { modelitem2 := modelitem.(map[string]interface{}) modelid, _ := strconv.Atoi(string(modelitem2["id"].(json.Number))) ormrow, err := new(bo.Global).GetCodeInfo("model_table_mapping", cast.ToString(modelid)) if err != nil || tools.IsEmpty(ormrow["name"]) == "" { logger.Logger.Error(fmt.Sprintf("未找到模型id=%d对应的数据表关系", modelid)) continue } localTableName := "t_data_" + tools.IsEmpty(ormrow["name"]) //获取模型下的设备列表 for _, devitem := range devall { if modelid == devitem.Modelid { //获取设备测点 devmpist := devSvr.DeviceMpInfo(int32(devitem.Deviceid)) if len(devmpist) == 0 { continue } cols := []string{} theader := []string{} tbody := orm.Params{} for attrname, item := range devmpist { cols = append(cols, strings.ToLower("a."+attrname)) theader = append(theader, tools.IsEmpty(item["mpname"])) } sql := "SELECT a.device_id,a.device_name, a.acquisition_time," + strings.Join(cols, ",") + " from " + localTableName + " a,(select max(id) maxid from " + localTableName + " where device_id=?) b where a.id=b.maxid" tmpRows := []orm.Params{} _, err := db.Raw(sql, devitem.Deviceid).Values(&tmpRows) if err != nil { logger.Logger.Error(err, fmt.Sprintf("SQL:%s 参数:%+v", sql, devitem.Deviceid)) continue } if len(tmpRows) > 0 { for atr, atv := range tmpRows[0] { vv := tools.IsEmpty(atv) if vv == global.NullNumber { tmpRows[0][atr] = "" } else if vv == global.ReplaceNumber { tmpRows[0][atr], _ = bo.GetSysParamValue("invalid_show_rule", "") } } tbody = tmpRows[0] } result = append(result, map[string]interface{}{ "device_name": devitem.Name, "device_id": devitem.Deviceid, "thead": theader, "tbody": tbody, }) } } } return result, nil } //获取设备最后一次报的历史数据 //设备最后一次报的数据存储在缓存DeviceLastData中 func (t *HistoryService) GetLastData() (map[string]interface{}, error) { v_devceid := "" if t.Model.Deviceid > 0 { v_devceid = tools.IsEmpty(t.Model.Deviceid) } if lastdata, h := global.GoCahce.Get("DeviceLastData"); h { lastdata2 := lastdata.(map[string]interface{}) if v_devceid == "" { return lastdata2, nil } if lastdata2[v_devceid] == nil { return nil, nil } return lastdata2[v_devceid].(map[string]interface{}), nil } else { cachePath := "lastdata_cache.json" d1, ferr := os.ReadFile(cachePath) if ferr == nil && len(d1) > 0 { lastdata := map[string]interface{}{} err := json.Unmarshal(d1, &lastdata) if err != nil { logger.Logger.Error(err) } else { global.GoCahce.Set("DeviceLastData", lastdata, -1) } return lastdata, err } else { logger.Logger.Error(ferr) } } return nil, nil } //删除本地库历史数据 func (t *HistoryService) DeleteHistory(deviceId int32, MpId int64) { deviceServiceIns := new(DeviceService) //获取设备的模型信息 deviceModelInfo := deviceServiceIns.GetComboxListById(deviceId) if deviceModelInfo == nil { logger.Logger.Error(fmt.Sprintf("设备%d未创建", deviceId)) return } modelid := deviceModelInfo.Modelid ormrow, err := new(bo.Global).GetCodeInfo("model_table_mapping", cast.ToString(modelid)) if err != nil || tools.IsEmpty(ormrow["name"]) == "" { logger.Logger.Error(fmt.Sprintf("未找到模型id=%d对应的数据表关系", modelid)) return } localTableName := "t_data_" + tools.IsEmpty(ormrow["name"]) o := orm.NewOrm() var sqlCommandText = "" var sqlParameter []interface{} if MpId > 0 { mpinfo, err := new(MpinfoService).GetMpinfo(MpId) if err != nil { return } colname := tools.IsEmpty(mpinfo["attrname"]) if colname == "" { return } //设置指定列为空数据,以9999表示数据为空 sqlCommandText = "update " + localTableName + " set " + colname + "=? where mpid=? " sqlParameter = append(sqlParameter, global.NullNumber) sqlParameter = append(sqlParameter, MpId) } else if deviceId > 0 { sqlCommandText = "delete from " + localTableName + " where device_id=? " sqlParameter = append(sqlParameter, deviceId) } if len(sqlParameter) > 0 { _, err = o.Raw(sqlCommandText, sqlParameter).Exec() sqllog := fmt.Sprintf("SQL:%s 参数:%+v", sqlCommandText, sqlParameter) if err != nil { logger.Logger.Error(err, sqllog) new(bo.SystemLog).Fail( enum.AuditType_datapush, enum.LogType_Delete, enum.OptEventType_System, enum.OptEventLevel_Hight, sqllog, global.SystemLogDefaultAccount, ) } } } //获取当日最高温度、湿度、电压的设备和时间数据 //将当日最高温度、湿度、电压的设备和时间数据存储在缓存MaxDataInfo中 func (t *HistoryService) GetMaxDataInfo() (map[string]interface{}, error) { if lastdata, h := global.GoCahce.Get("CurrentMaxDataInfo"); h { lastdata2 := lastdata.(map[string]interface{}) return lastdata2, nil } else { cachePath := "current_maxdata_cache.json" d1, ferr := os.ReadFile(cachePath) if ferr == nil && len(d1) > 0 { lastdata := map[string]interface{}{} err := json.Unmarshal(d1, &lastdata) if err != nil { logger.Logger.Error(err) } else { global.GoCahce.Set("CurrentMaxDataInfo", lastdata, -1) } return lastdata, err } else { logger.Logger.Error(ferr) } } return nil, nil } //保存当日最高数据信息 func (t *HistoryService) setMaxDataInfo(info map[string]interface{}) { maxdata := map[string]interface{}{} maxTem := map[string]interface{}{} maxVol := map[string]interface{}{} maxHum := map[string]interface{}{} value := info["value"].(map[string]interface{}) v_lastdata, _ := global.GoCahce.Get("CurrentMaxDataInfo") v_tem := float64(-10000) v_vol := float64(-10000) v_hum := float64(-10000) for k, v := range value { v1 := cast.ToFloat64(v) if v1 > 99990 { //无效数据99999,采集float类型,所以比较值比实际值小一些,这样不用转换精度 continue } attrnameType := strings.ToLower(k[0:3]) if attrnameType == "tem" && v1 > v_tem { v_tem = v1 maxTem["attr_name"] = k } else if attrnameType == "vol" && v1 > v_vol { v_vol = v1 maxVol["attr_name"] = k } else if attrnameType == "hum" && v1 > v_hum { v_hum = v1 maxHum["attr_name"] = k } } maxTem["device_name"] = info["device_name"] maxVol["device_name"] = info["device_name"] maxHum["device_name"] = info["device_name"] maxTem["device_id"] = info["device_id"] maxVol["device_id"] = info["device_id"] maxHum["device_id"] = info["device_id"] maxTem["acquisition_time"] = info["acquisition_time"] maxVol["acquisition_time"] = info["acquisition_time"] maxHum["acquisition_time"] = info["acquisition_time"] if v_tem > -9999 { maxTem["max"] = v_tem } if v_hum > -9999 { maxHum["max"] = v_hum } if v_vol > -9999 { maxVol["max"] = v_vol } if v_lastdata == nil { maxdata = map[string]interface{}{"tem": maxTem, "vol": maxVol, "hum": maxHum} } else { maxdata = v_lastdata.(map[string]interface{}) if tmpv, h := maxdata["tem"]; h { oldTem := tmpv.(map[string]interface{}) if oldTem["max"] != nil { oldTemMax := cast.ToFloat64(oldTem["max"]) if v_tem > oldTemMax { maxdata["tem"] = maxTem } } else { maxdata["tem"] = maxTem } } else if v_tem > -9999 { maxdata["tem"] = maxTem } if tmpv, h := maxdata["vol"]; h { oldTem := tmpv.(map[string]interface{}) if oldTem["max"] != nil { oldTemMax := cast.ToFloat64(oldTem["max"]) if v_tem > oldTemMax { maxdata["vol"] = maxVol } } else { maxdata["vol"] = maxVol } } else if v_tem > -9999 { maxdata["vol"] = maxVol } if tmpv, h := maxdata["hum"]; h { oldTem := tmpv.(map[string]interface{}) if oldTem["max"] != nil { oldTemMax := cast.ToFloat64(oldTem["max"]) if v_tem > oldTemMax { maxdata["hum"] = maxHum } } else { maxdata["hum"] = maxHum } } else if v_tem > -9999 { maxdata["hum"] = maxHum } } logger.Logger.Debug(fmt.Sprintf("======当前最高值数据统计结果:%+v", maxdata)) global.GoCahce.Set("CurrentMaxDataInfo", maxdata, -1) //缓存数据持久化到文件中,否则重启后一段时间内将获取不到最后一次的采集数据,从数据库查可能会很慢 lastdataStr, _ := json.Marshal(maxdata) cachePath := "current_maxdata_cache.json" os.WriteFile(cachePath, lastdataStr, fs.ModePerm) //MQTT方式推送方式主机不支持 //msg, _ := json.Marshal(maxdata) //mqtt.PublishMessage(global.Topic_TodayMaxDataPublish, string(msg)) //采用ws方式 publishMes := map[string]interface{}{} publishMes["topic"] = global.Topic_TodayMaxDataPublish publishMes["data"] = maxdata datachannel.SendDataQueue <- publishMes }