history_service.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704
  1. package service
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/fs"
  7. "os"
  8. "rtzh_elec_temperature/datachannel"
  9. "rtzh_elec_temperature/enum"
  10. "rtzh_elec_temperature/global"
  11. "rtzh_elec_temperature/logger"
  12. "rtzh_elec_temperature/models/bo"
  13. "sort"
  14. "sync"
  15. //"rtzh_elec_temperature/mqtt"
  16. "rtzh_elec_temperature/tools"
  17. "strconv"
  18. "strings"
  19. "time"
  20. "github.com/spf13/cast"
  21. "github.com/astaxie/beego/orm"
  22. )
  23. //本地历史数据管理
  24. type HistoryService struct {
  25. BaseService
  26. Model dev_history
  27. PageIndex int `json:"page_index"`
  28. PageSize int `json:"page_size"`
  29. StartDate string `json:"start_date"`
  30. EndDate string `json:"end_date"`
  31. Mpnames []string
  32. MpAttrnames []string
  33. AttrInfo map[string]interface{}
  34. }
  35. //历史数据模型
  36. type dev_history struct {
  37. Id int `orm:"pk"`
  38. Appid int
  39. Deviceid int32
  40. Devicename string
  41. Attrname string
  42. Mpid string
  43. Mpname string
  44. Date string
  45. Val int
  46. }
  47. func init() {
  48. orm.RegisterModel(new(dev_history))
  49. //加载本地缓存文件中的数据到缓存
  50. go func() {
  51. time.Sleep(1 * time.Second)
  52. new(HistoryService).GetMaxDataInfo()
  53. new(HistoryService).GetLastData()
  54. }()
  55. }
  56. // 添加历史数据
  57. func (field *HistoryService) InsertHistory() error {
  58. //appid := RtelecManageApp().RegAppID
  59. deviceServiceIns := new(DeviceService)
  60. //获取设备的模型信息
  61. deviceModelInfo := deviceServiceIns.GetComboxListByName(field.Model.Devicename)
  62. if deviceModelInfo == nil {
  63. logger.Logger.Error(fmt.Sprintf("设备%s未创建", field.Model.Devicename))
  64. return errors.New("设备" + field.Model.Devicename + "未创建")
  65. }
  66. modelid := deviceModelInfo.Modelid
  67. ormrow, err := new(bo.Global).GetCodeInfo("model_table_mapping", cast.ToString(modelid))
  68. if err != nil || tools.IsEmpty(ormrow["name"]) == "" {
  69. logger.Logger.Error(fmt.Sprintf("未找到模型id=%d对应的数据表关系", modelid))
  70. return err
  71. }
  72. //获取设备的测点
  73. mplist := deviceServiceIns.DeviceMpInfo(field.Model.Deviceid)
  74. if len(mplist) == 0 {
  75. logger.Logger.Debug(fmt.Sprintf("未获取到设备%s的测点数据", field.Model.Devicename))
  76. return nil
  77. }
  78. lastdatacache := map[string]interface{}{}
  79. localTableName := "t_data_" + tools.IsEmpty(ormrow["name"])
  80. db := orm.NewOrm()
  81. sqlvalues := []interface{}{field.Model.Devicename, field.Model.Deviceid, field.Model.Date}
  82. sqlChar := []string{"?", "?", "?"}
  83. sqlCols := []string{"device_name", "device_id", "acquisition_time"}
  84. //组装缓存数据
  85. lastdatacache["device_name"] = field.Model.Devicename
  86. lastdatacache["device_id"] = field.Model.Deviceid
  87. lastdatacache["acquisition_time"] = field.Model.Date
  88. lastdatacache_value := map[string]interface{}{} //采集数据
  89. dataoptimizeService := new(DataoptimizeService)
  90. for keyName, _ := range mplist {
  91. keyName2 := strings.ToLower(keyName)
  92. sqlCols = append(sqlCols, keyName2)
  93. sqlChar = append(sqlChar, "?")
  94. optValue := dataoptimizeService.OptimizeValue(modelid, field.Model.Deviceid, keyName, field.AttrInfo)
  95. sqlvalues = append(sqlvalues, optValue)
  96. if tools.IsEmpty(optValue) == global.ReplaceNumber {
  97. lastdatacache_value[keyName], _ = bo.GetSysParamValue("invalid_show_rule", "")
  98. } else {
  99. lastdatacache_value[keyName] = optValue
  100. }
  101. }
  102. lastdatacache["value"] = lastdatacache_value
  103. if len(lastdatacache_value) > 0 {
  104. sqlCommandText := fmt.Sprintf("insert into %s(%s)values(%s)", localTableName, strings.Join(sqlCols, ","), strings.Join(sqlChar, ","))
  105. _, err = db.Raw(sqlCommandText, sqlvalues).Exec()
  106. if err != nil {
  107. sqllog := fmt.Sprintf("SQL:%s 参数:%+v", sqlCommandText, sqlvalues)
  108. logger.Logger.Error(err, sqllog)
  109. new(bo.SystemLog).Fail(
  110. enum.AuditType_datapush,
  111. enum.LogType_Insert,
  112. enum.OptEventType_System,
  113. enum.OptEventLevel_Hight,
  114. sqllog,
  115. global.SystemLogDefaultAccount,
  116. )
  117. return err
  118. }
  119. //处理告警策略
  120. //field.AttrInfo = lastdatacache_value //经过优化后的测点数据
  121. go new(AlarmService).HandleAlarmEvent(field)
  122. //计算当前采集数据中每类测点的最高值
  123. go field.setMaxDataInfo(lastdatacache)
  124. //推送最后采集到前端
  125. go func(did int32, lastdatacache map[string]interface{}) {
  126. //将数据发送到MQTT主题/rtelec/runtime/device/data。一般该主题为前端订阅
  127. //msg, _ := json.Marshal(lastdatacache)
  128. // 2023-03-27 取消采用mqtt的方式向前端发送数据。改为websockets的方式。甲方mqtt只提供tcp协议
  129. // ======================================
  130. publishMes := map[string]interface{}{}
  131. publishMes["topic"] = global.Topic_RuntimeDataPublish
  132. publishMes["data"] = lastdatacache
  133. datachannel.SendDataQueue <- publishMes
  134. // ======================================
  135. //mqtt.PublishMessage(global.Topic_RuntimeDataPublish, string(msg))
  136. //缓存最后一次数据。
  137. ccc := cast.ToString(did)
  138. lastdata := map[string]interface{}{}
  139. v_lastdata, _ := global.GoCahce.Get("DeviceLastData")
  140. if v_lastdata == nil {
  141. lastdata = map[string]interface{}{ccc: lastdatacache}
  142. } else {
  143. lastdata = v_lastdata.(map[string]interface{})
  144. lastdata[ccc] = lastdatacache
  145. }
  146. global.GoCahce.Set("DeviceLastData", lastdata, -1)
  147. //缓存数据持久化到文件中,否则重启后一段时间内将获取不到最后一次的采集数据,从数据库查可能会很慢
  148. lastdataStr, _ := json.Marshal(lastdata)
  149. cachePath := "lastdata_cache.json"
  150. os.WriteFile(cachePath, lastdataStr, fs.ModePerm)
  151. }(field.Model.Deviceid, lastdatacache)
  152. }
  153. return nil
  154. }
  155. // 查询历史数据
  156. func (field *HistoryService) SearchHistory() (map[string]interface{}, int, error) {
  157. deviceSvr := new(DeviceService)
  158. //获取设备的模型信息
  159. deviceModelInfo := deviceSvr.GetComboxListById(field.Model.Deviceid)
  160. if deviceModelInfo == nil {
  161. logger.Logger.Error(fmt.Sprintf("设备%s未创建", field.Model.Devicename))
  162. return nil, 0, errors.New("设备" + field.Model.Devicename + "未创建")
  163. }
  164. field.Model.Devicename = deviceModelInfo.Name
  165. modelid := deviceModelInfo.Modelid
  166. ormrow, err := new(bo.Global).GetCodeInfo("model_table_mapping", cast.ToString(modelid))
  167. if err != nil || tools.IsEmpty(ormrow["name"]) == "" {
  168. logger.Logger.Error(fmt.Sprintf("未找到模型id=%d对应的数据表关系", modelid))
  169. return nil, 0, err
  170. }
  171. //获取设备的测点
  172. deviceMpList := deviceSvr.DeviceMpInfo(field.Model.Deviceid)
  173. if len(deviceMpList) == 0 {
  174. logger.Logger.Debug(fmt.Sprintf("未获取到设备%s的测点数据", field.Model.Devicename))
  175. return nil, 0, errors.New(fmt.Sprintf("未获取到设备%s的测点数据", field.Model.Devicename))
  176. }
  177. localTableName := "t_data_" + tools.IsEmpty(ormrow["name"])
  178. //返回格式对象定义.包括表头列名,表体数据行及数据子行
  179. result := map[string]interface{}{"theader": []interface{}{}, "tbody": []orm.Params{}}
  180. total := 0
  181. o := orm.NewOrm()
  182. colnames := []string{}
  183. temCols := []string{} //温度列
  184. humCols := []string{} //湿度列
  185. volCols := []string{} //电压列
  186. headerNames := map[string]interface{}{
  187. "name": "acquisition_time",
  188. "content": "时刻",
  189. }
  190. for _, mp := range deviceMpList {
  191. attrname := strings.ToLower(fmt.Sprintf("%s", mp["attrname"]))
  192. if attrname[0:3] == "tem" {
  193. temCols = append(temCols, attrname)
  194. } else if attrname[0:3] == "hum" {
  195. humCols = append(humCols, attrname)
  196. } else if attrname[0:3] == "vol" {
  197. volCols = append(volCols, attrname)
  198. }
  199. headerNames[attrname] = tools.IsEmpty(mp["mpname"])
  200. }
  201. sort.Strings(temCols)
  202. sort.Strings(humCols)
  203. sort.Strings(volCols)
  204. colnames = append(colnames, temCols...)
  205. colnames = append(colnames, humCols...)
  206. colnames = append(colnames, volCols...)
  207. for _, tv := range colnames {
  208. result["theader"] = append(result["theader"].([]interface{}), map[string]interface{}{"name": tv, "content": headerNames[tv]})
  209. }
  210. //数据查询SQL
  211. sql := fmt.Sprintf(" select acquisition_time, %s from %s where acquisition_time BETWEEN ? and ? and device_id=? ", strings.Join(colnames, ","), localTableName)
  212. //总数查询SQL
  213. totalSql := fmt.Sprintf(" select count(1) cnt from %s where acquisition_time BETWEEN ? and ? and device_id=? ", localTableName)
  214. sqlParas := []interface{}{field.StartDate + " 00:00:00", field.EndDate + " 23:59:59", field.Model.Deviceid}
  215. //分页条件
  216. limit := fmt.Sprintf(" order by acquisition_time desc limit %d,%d", (field.PageIndex-1)*field.PageSize, field.PageSize)
  217. rowset := []orm.Params{}
  218. tbody := [][]orm.Params{}
  219. //查询数据
  220. var wg = sync.WaitGroup{}
  221. wg.Add(2)
  222. go func() {
  223. _, err := o.Raw(sql+limit, sqlParas).Values(&rowset)
  224. if err != nil {
  225. wg.Done()
  226. logger.Logger.Error(err, fmt.Sprintf("SQL:%s 参数:%+v", sql+limit, sqlParas))
  227. return
  228. } else {
  229. //将查询结果转换成父子行
  230. tmpRows := [][]orm.Params{}
  231. for _, row := range rowset {
  232. objTem := orm.Params{
  233. "acquisition_time": row["acquisition_time"],
  234. "type": "温度",
  235. }
  236. objHum := orm.Params{
  237. "acquisition_time": row["acquisition_time"],
  238. "type": "湿度",
  239. }
  240. objVol := orm.Params{
  241. "acquisition_time": row["acquisition_time"],
  242. "type": "电压",
  243. }
  244. hasvol := false //是否有电压测点
  245. hashum := false //是否有湿度测点
  246. for colKey, v := range row {
  247. if colKey == "acquisition_time" {
  248. continue
  249. }
  250. v1 := tools.IsEmpty(v)
  251. if v1 == global.NullNumber {
  252. v1 = ""
  253. } else if v1 == global.ReplaceNumber {
  254. //将值显示为指定字符
  255. v1, _ = bo.GetSysParamValue("invalid_show_rule", "")
  256. }
  257. if colKey[0:3] == "tem" {
  258. objTem[colKey] = v1
  259. } else {
  260. objTem[colKey] = ""
  261. }
  262. if colKey[0:3] == "hum" {
  263. objHum[colKey] = v1
  264. hashum = true
  265. } else {
  266. objHum[colKey] = ""
  267. }
  268. if colKey[0:3] == "vol" {
  269. objVol[colKey] = v1
  270. hasvol = true
  271. } else {
  272. objVol[colKey] = ""
  273. }
  274. }
  275. if !hashum {
  276. objHum["type"] = ""
  277. }
  278. if !hasvol {
  279. objVol["type"] = ""
  280. }
  281. tmpRows = append(tmpRows, []orm.Params{objTem, objHum, objVol})
  282. }
  283. tbody = tmpRows
  284. }
  285. wg.Done()
  286. }()
  287. totalRowset := []orm.Params{}
  288. go func() {
  289. _, err = o.Raw(totalSql, sqlParas).Values(&totalRowset)
  290. wg.Done()
  291. if err != nil {
  292. logger.Logger.Error(err, fmt.Sprintf("SQL:%s 参数:%+v", totalSql, sqlParas))
  293. return
  294. }
  295. }()
  296. wg.Wait()
  297. if len(totalRowset) > 0 {
  298. total, _ = strconv.Atoi(tools.IsEmpty(totalRowset[0]["cnt"]))
  299. }
  300. result["tbody"] = tbody
  301. result["total"] = total
  302. return result, total, err
  303. }
  304. //生成历史数据echarts line数据。仅支持多个测点
  305. func (field *HistoryService) GetHistoryDataEchartsLine() (map[string]interface{}, error) {
  306. deviceSvr := new(DeviceService)
  307. //获取设备的模型信息
  308. deviceModelInfo := deviceSvr.GetComboxListById(field.Model.Deviceid)
  309. if deviceModelInfo == nil {
  310. logger.Logger.Error(fmt.Sprintf("设备%s未创建", field.Model.Devicename))
  311. return nil, errors.New("设备" + field.Model.Devicename + "未创建")
  312. }
  313. field.Model.Devicename = deviceModelInfo.Name
  314. modelid := deviceModelInfo.Modelid
  315. ormrow, err := new(bo.Global).GetCodeInfo("model_table_mapping", cast.ToString(modelid))
  316. if err != nil || tools.IsEmpty(ormrow["name"]) == "" {
  317. logger.Logger.Error(fmt.Sprintf("未找到模型id=%d对应的数据表关系", modelid))
  318. return nil, err
  319. }
  320. //获取设备的测点
  321. deviceMpList := deviceSvr.DeviceMpInfo(field.Model.Deviceid)
  322. if len(deviceMpList) == 0 {
  323. logger.Logger.Debug(fmt.Sprintf("未获取到设备%s的测点数据", field.Model.Devicename))
  324. return nil, errors.New(fmt.Sprintf("未获取到设备%s的测点数据", field.Model.Devicename))
  325. }
  326. if len(field.MpAttrnames) == 0 && len(field.Mpnames) == 0 {
  327. return nil, errors.New("未指定获取的测点名称或模型属性名")
  328. }
  329. if len(field.MpAttrnames) == 0 && len(field.Mpnames) > 0 {
  330. for _, n1 := range field.Mpnames {
  331. for attrname, n2 := range deviceMpList {
  332. if tools.IsEmpty(n2["mpname"]) == n1 {
  333. field.MpAttrnames = append(field.MpAttrnames, attrname)
  334. break
  335. }
  336. }
  337. }
  338. }
  339. localTableName := "t_data_" + tools.IsEmpty(ormrow["name"])
  340. //返回格式对象定义.
  341. result := map[string]interface{}{"legend": map[string]interface{}{}, "xAxis": map[string]interface{}{}, "series": []interface{}{}}
  342. o := orm.NewOrm()
  343. colnames := []string{}
  344. legendNames := []string{}
  345. deviceMpName := map[string]string{}
  346. for _, mp := range field.MpAttrnames {
  347. mpinfo := deviceMpList[mp]
  348. attrname := strings.ToLower(fmt.Sprintf("%s", mpinfo["attrname"]))
  349. mpname := tools.IsEmpty(mpinfo["mpname"])
  350. legendNames = append(legendNames, mpname)
  351. colnames = append(colnames, attrname)
  352. deviceMpName[attrname] = mpname
  353. }
  354. result["legend"] = map[string]interface{}{"data": legendNames}
  355. //数据查询SQL
  356. sql := fmt.Sprintf(" select acquisition_time, %s from %s where acquisition_time BETWEEN ? and ? order by id ", strings.Join(colnames, ","), localTableName)
  357. sqlParas := []interface{}{field.StartDate + " 00:00:00", field.EndDate + " 23:59:59"}
  358. rowset := []orm.Params{}
  359. _, err = o.Raw(sql, sqlParas).Values(&rowset)
  360. if err != nil {
  361. logger.Logger.Error(err, fmt.Sprintf("SQL:%s 参数:%+v", sql, sqlParas))
  362. return nil, err
  363. }
  364. //生成X轴上的时间点
  365. xAxisData := []string{}
  366. sameDateHour := map[string]int{}
  367. cDate := ""
  368. for _, row := range rowset {
  369. //acquisition_time和day不会同时存在于同一条数据中
  370. xPoint := tools.IsEmpty(row["acquisition_time"])
  371. sameDateHour[xPoint[:len(xPoint)-3]] = 1
  372. xPointParts := strings.Split(xPoint, " ")
  373. if cDate != xPointParts[0] {
  374. xPoint = xPoint[:len(xPoint)-3] //去除最后的秒
  375. } else {
  376. xPoint = xPointParts[1][:len(xPointParts[1])-3] //去除日期和最后的秒
  377. }
  378. cDate = xPointParts[0]
  379. xAxisData = append(xAxisData, xPoint)
  380. }
  381. result["xAxis"] = map[string]interface{}{"data": xAxisData}
  382. //生成该设备每个测点的序列数据
  383. for _, mp := range colnames {
  384. series := map[string]interface{}{}
  385. series["name"] = deviceMpName[mp] //序列名称
  386. datas := []string{}
  387. for _, row := range rowset {
  388. dataTime := tools.IsEmpty(row["acquisition_time"])
  389. if len(dataTime) > 10 {
  390. dataTime = dataTime[:len(dataTime)-3]
  391. }
  392. if sameDateHour[dataTime] == 1 {
  393. v := tools.IsEmpty(row[mp])
  394. if v == global.NullNumber || v == global.ReplaceNumber {
  395. v = ""
  396. }
  397. datas = append(datas, v)
  398. } else {
  399. datas = append(datas, "")
  400. }
  401. }
  402. series["data"] = datas
  403. result["series"] = append(result["series"].([]interface{}), series)
  404. }
  405. return result, err
  406. }
  407. //获取所有或者指定设备的最新数据
  408. func (t *HistoryService) GetRuntimeData() ([]map[string]interface{}, error) {
  409. db := orm.NewOrm()
  410. modelSvr := new(ModelService)
  411. allmode := modelSvr.GetModelListObj()
  412. if len(allmode) == 0 {
  413. return nil, errors.New("未配置应用的物模型!")
  414. }
  415. devSvr := new(DeviceService)
  416. devall, _ := devSvr.GetComboxList()
  417. result := []map[string]interface{}{}
  418. for _, modelitem := range allmode {
  419. modelitem2 := modelitem.(map[string]interface{})
  420. modelid, _ := strconv.Atoi(string(modelitem2["id"].(json.Number)))
  421. ormrow, err := new(bo.Global).GetCodeInfo("model_table_mapping", cast.ToString(modelid))
  422. if err != nil || tools.IsEmpty(ormrow["name"]) == "" {
  423. logger.Logger.Error(fmt.Sprintf("未找到模型id=%d对应的数据表关系", modelid))
  424. continue
  425. }
  426. localTableName := "t_data_" + tools.IsEmpty(ormrow["name"])
  427. //获取模型下的设备列表
  428. for _, devitem := range devall {
  429. if modelid == devitem.Modelid {
  430. //获取设备测点
  431. devmpist := devSvr.DeviceMpInfo(int32(devitem.Deviceid))
  432. if len(devmpist) == 0 {
  433. continue
  434. }
  435. cols := []string{}
  436. theader := []string{}
  437. tbody := orm.Params{}
  438. for attrname, item := range devmpist {
  439. cols = append(cols, strings.ToLower("a."+attrname))
  440. theader = append(theader, tools.IsEmpty(item["mpname"]))
  441. }
  442. sql := "SELECT a.device_id,a.device_name, a.acquisition_time," + strings.Join(cols, ",") + " from " + localTableName + " a,(select max(id) maxid from " + localTableName + " where device_id=?) b where a.id=b.maxid"
  443. tmpRows := []orm.Params{}
  444. _, err := db.Raw(sql, devitem.Deviceid).Values(&tmpRows)
  445. if err != nil {
  446. logger.Logger.Error(err, fmt.Sprintf("SQL:%s 参数:%+v", sql, devitem.Deviceid))
  447. continue
  448. }
  449. if len(tmpRows) > 0 {
  450. for atr, atv := range tmpRows[0] {
  451. vv := tools.IsEmpty(atv)
  452. if vv == global.NullNumber {
  453. tmpRows[0][atr] = ""
  454. } else if vv == global.ReplaceNumber {
  455. tmpRows[0][atr], _ = bo.GetSysParamValue("invalid_show_rule", "")
  456. }
  457. }
  458. tbody = tmpRows[0]
  459. }
  460. result = append(result, map[string]interface{}{
  461. "device_name": devitem.Name,
  462. "device_id": devitem.Deviceid,
  463. "thead": theader,
  464. "tbody": tbody,
  465. })
  466. }
  467. }
  468. }
  469. return result, nil
  470. }
  471. //获取设备最后一次报的历史数据
  472. //设备最后一次报的数据存储在缓存DeviceLastData中
  473. func (t *HistoryService) GetLastData() (map[string]interface{}, error) {
  474. v_devceid := ""
  475. if t.Model.Deviceid > 0 {
  476. v_devceid = tools.IsEmpty(t.Model.Deviceid)
  477. }
  478. if lastdata, h := global.GoCahce.Get("DeviceLastData"); h {
  479. lastdata2 := lastdata.(map[string]interface{})
  480. if v_devceid == "" {
  481. return lastdata2, nil
  482. }
  483. if lastdata2[v_devceid] == nil {
  484. return nil, nil
  485. }
  486. return lastdata2[v_devceid].(map[string]interface{}), nil
  487. } else {
  488. cachePath := "lastdata_cache.json"
  489. d1, ferr := os.ReadFile(cachePath)
  490. if ferr == nil && len(d1) > 0 {
  491. lastdata := map[string]interface{}{}
  492. err := json.Unmarshal(d1, &lastdata)
  493. if err != nil {
  494. logger.Logger.Error(err)
  495. } else {
  496. global.GoCahce.Set("DeviceLastData", lastdata, -1)
  497. }
  498. return lastdata, err
  499. } else {
  500. logger.Logger.Error(ferr)
  501. }
  502. }
  503. return nil, nil
  504. }
  505. //删除本地库历史数据
  506. func (t *HistoryService) DeleteHistory(deviceId int32, MpId int64) {
  507. deviceServiceIns := new(DeviceService)
  508. //获取设备的模型信息
  509. deviceModelInfo := deviceServiceIns.GetComboxListById(deviceId)
  510. if deviceModelInfo == nil {
  511. logger.Logger.Error(fmt.Sprintf("设备%d未创建", deviceId))
  512. return
  513. }
  514. modelid := deviceModelInfo.Modelid
  515. ormrow, err := new(bo.Global).GetCodeInfo("model_table_mapping", cast.ToString(modelid))
  516. if err != nil || tools.IsEmpty(ormrow["name"]) == "" {
  517. logger.Logger.Error(fmt.Sprintf("未找到模型id=%d对应的数据表关系", modelid))
  518. return
  519. }
  520. localTableName := "t_data_" + tools.IsEmpty(ormrow["name"])
  521. o := orm.NewOrm()
  522. var sqlCommandText = ""
  523. var sqlParameter []interface{}
  524. if MpId > 0 {
  525. mpinfo, err := new(MpinfoService).GetMpinfo(MpId)
  526. if err != nil {
  527. return
  528. }
  529. colname := tools.IsEmpty(mpinfo["attrname"])
  530. if colname == "" {
  531. return
  532. }
  533. //设置指定列为空数据,以9999表示数据为空
  534. sqlCommandText = "update " + localTableName + " set " + colname + "=? where mpid=? "
  535. sqlParameter = append(sqlParameter, global.NullNumber)
  536. sqlParameter = append(sqlParameter, MpId)
  537. } else if deviceId > 0 {
  538. sqlCommandText = "delete from " + localTableName + " where device_id=? "
  539. sqlParameter = append(sqlParameter, deviceId)
  540. }
  541. if len(sqlParameter) > 0 {
  542. _, err = o.Raw(sqlCommandText, sqlParameter).Exec()
  543. sqllog := fmt.Sprintf("SQL:%s 参数:%+v", sqlCommandText, sqlParameter)
  544. if err != nil {
  545. logger.Logger.Error(err, sqllog)
  546. new(bo.SystemLog).Fail(
  547. enum.AuditType_datapush,
  548. enum.LogType_Delete,
  549. enum.OptEventType_System,
  550. enum.OptEventLevel_Hight,
  551. sqllog,
  552. global.SystemLogDefaultAccount,
  553. )
  554. }
  555. }
  556. }
  557. //获取当日最高温度、湿度、电压的设备和时间数据
  558. //将当日最高温度、湿度、电压的设备和时间数据存储在缓存MaxDataInfo中
  559. func (t *HistoryService) GetMaxDataInfo() (map[string]interface{}, error) {
  560. if lastdata, h := global.GoCahce.Get("CurrentMaxDataInfo"); h {
  561. lastdata2 := lastdata.(map[string]interface{})
  562. return lastdata2, nil
  563. } else {
  564. cachePath := "current_maxdata_cache.json"
  565. d1, ferr := os.ReadFile(cachePath)
  566. if ferr == nil && len(d1) > 0 {
  567. lastdata := map[string]interface{}{}
  568. err := json.Unmarshal(d1, &lastdata)
  569. if err != nil {
  570. logger.Logger.Error(err)
  571. } else {
  572. global.GoCahce.Set("CurrentMaxDataInfo", lastdata, -1)
  573. }
  574. return lastdata, err
  575. } else {
  576. logger.Logger.Error(ferr)
  577. }
  578. }
  579. return nil, nil
  580. }
  581. //保存当日最高数据信息
  582. func (t *HistoryService) setMaxDataInfo(info map[string]interface{}) {
  583. maxdata := map[string]interface{}{}
  584. maxTem := map[string]interface{}{}
  585. maxVol := map[string]interface{}{}
  586. maxHum := map[string]interface{}{}
  587. value := info["value"].(map[string]interface{})
  588. v_lastdata, _ := global.GoCahce.Get("CurrentMaxDataInfo")
  589. v_tem := float64(-10000)
  590. v_vol := float64(-10000)
  591. v_hum := float64(-10000)
  592. for k, v := range value {
  593. v1 := cast.ToFloat64(v)
  594. if v1 > 99990 {
  595. //无效数据99999,采集float类型,所以比较值比实际值小一些,这样不用转换精度
  596. continue
  597. }
  598. attrnameType := strings.ToLower(k[0:3])
  599. if attrnameType == "tem" && v1 > v_tem {
  600. v_tem = v1
  601. maxTem["attr_name"] = k
  602. } else if attrnameType == "vol" && v1 > v_vol {
  603. v_vol = v1
  604. maxVol["attr_name"] = k
  605. } else if attrnameType == "hum" && v1 > v_hum {
  606. v_hum = v1
  607. maxHum["attr_name"] = k
  608. }
  609. }
  610. maxTem["device_name"] = info["device_name"]
  611. maxVol["device_name"] = info["device_name"]
  612. maxHum["device_name"] = info["device_name"]
  613. maxTem["device_id"] = info["device_id"]
  614. maxVol["device_id"] = info["device_id"]
  615. maxHum["device_id"] = info["device_id"]
  616. maxTem["acquisition_time"] = info["acquisition_time"]
  617. maxVol["acquisition_time"] = info["acquisition_time"]
  618. maxHum["acquisition_time"] = info["acquisition_time"]
  619. if v_tem > -9999 {
  620. maxTem["max"] = v_tem
  621. }
  622. if v_hum > -9999 {
  623. maxHum["max"] = v_hum
  624. }
  625. if v_vol > -9999 {
  626. maxVol["max"] = v_vol
  627. }
  628. if v_lastdata == nil {
  629. maxdata = map[string]interface{}{"tem": maxTem, "vol": maxVol, "hum": maxHum}
  630. } else {
  631. maxdata = v_lastdata.(map[string]interface{})
  632. if tmpv, h := maxdata["tem"]; h {
  633. oldTem := tmpv.(map[string]interface{})
  634. if oldTem["max"] != nil {
  635. oldTemMax := cast.ToFloat64(oldTem["max"])
  636. if v_tem > oldTemMax {
  637. maxdata["tem"] = maxTem
  638. }
  639. } else {
  640. maxdata["tem"] = maxTem
  641. }
  642. } else if v_tem > -9999 {
  643. maxdata["tem"] = maxTem
  644. }
  645. if tmpv, h := maxdata["vol"]; h {
  646. oldTem := tmpv.(map[string]interface{})
  647. if oldTem["max"] != nil {
  648. oldTemMax := cast.ToFloat64(oldTem["max"])
  649. if v_tem > oldTemMax {
  650. maxdata["vol"] = maxVol
  651. }
  652. } else {
  653. maxdata["vol"] = maxVol
  654. }
  655. } else if v_tem > -9999 {
  656. maxdata["vol"] = maxVol
  657. }
  658. if tmpv, h := maxdata["hum"]; h {
  659. oldTem := tmpv.(map[string]interface{})
  660. if oldTem["max"] != nil {
  661. oldTemMax := cast.ToFloat64(oldTem["max"])
  662. if v_tem > oldTemMax {
  663. maxdata["hum"] = maxHum
  664. }
  665. } else {
  666. maxdata["hum"] = maxHum
  667. }
  668. } else if v_tem > -9999 {
  669. maxdata["hum"] = maxHum
  670. }
  671. }
  672. logger.Logger.Debug(fmt.Sprintf("======当前最高值数据统计结果:%+v", maxdata))
  673. global.GoCahce.Set("CurrentMaxDataInfo", maxdata, -1)
  674. //缓存数据持久化到文件中,否则重启后一段时间内将获取不到最后一次的采集数据,从数据库查可能会很慢
  675. lastdataStr, _ := json.Marshal(maxdata)
  676. cachePath := "current_maxdata_cache.json"
  677. os.WriteFile(cachePath, lastdataStr, fs.ModePerm)
  678. //MQTT方式推送方式主机不支持
  679. //msg, _ := json.Marshal(maxdata)
  680. //mqtt.PublishMessage(global.Topic_TodayMaxDataPublish, string(msg))
  681. //采用ws方式
  682. publishMes := map[string]interface{}{}
  683. publishMes["topic"] = global.Topic_TodayMaxDataPublish
  684. publishMes["data"] = maxdata
  685. datachannel.SendDataQueue <- publishMes
  686. }