dataMonitor.go 7.4 KB


  1. package monitor
  2. import (
  3. "fmt"
  4. "rtzh_elec_temperature/logger"
  5. "rtzh_elec_temperature/models/bo"
  6. "rtzh_elec_temperature/rtelec_app_public_lib/service"
  7. "rtzh_elec_temperature/tools"
  8. "strconv"
  9. "strings"
  10. "time"
  11. "github.com/spf13/cast"
  12. "github.com/astaxie/beego/orm"
  13. )
  14. type DataMonitor struct {
  15. }
  16. func StartDataMonitor() {
  17. go dataTimoutMonitorStart()
  18. go tableSizeMonitorStart()
  19. go logSizeMonitorStart()
  20. go autoMakeReport()
  21. }
  22. //数据采集超时处理进程启动
  23. func dataTimoutMonitorStart() {
  24. for {
  25. time.Sleep(time.Minute * 3)
  26. }
  27. }
  28. //业务数据表容量监测.每天监测一次
  29. func tableSizeMonitorStart() {
  30. logger.Logger.Println("启动过期数据自动清理进程")
  31. m := new(DataMonitor)
  32. m.dataAutoClear()
  33. for {
  34. nowHour := time.Now().Format("15")
  35. //每天早上0点清理一次
  36. dataTimoutMonitorHour, _ := bo.GetSysParamValue("dataClearHour", "01")
  37. if nowHour != dataTimoutMonitorHour {
  38. time.Sleep(time.Minute * 50)
  39. continue
  40. }
  41. m.dataAutoClear()
  42. time.Sleep(time.Minute * 50)
  43. }
  44. }
  45. //附件自动清理
  46. func (t *DataMonitor) dataAutoClear() {
  47. db := orm.NewOrm()
  48. sql := "select * from t_data_clear_data where skeepday>0 order by id"
  49. list := []orm.Params{}
  50. _, err := db.Raw(sql).Values(&list)
  51. if err != nil {
  52. logger.Logger.Error(err)
  53. return
  54. }
  55. if len(list) == 0 {
  56. return
  57. }
  58. for _, row := range list {
  59. skeepday := tools.IsEmpty(row["skeepday"])
  60. colname := tools.IsEmpty(row["colname"])
  61. tname := tools.IsEmpty(row["tablename"])
  62. if colname == "" {
  63. //时间字段列名,未指定时采用默认列名
  64. colname = "create_time"
  65. }
  66. logger.Logger.Debug(fmt.Sprintf("正在清理数据表%s %s天之前的过期数据", tname, skeepday))
  67. sql1 := "delete from " + tname + " where " + colname + "<DATE_ADD(now(),INTERVAL -" + skeepday + " DAY)"
  68. where := tools.IsEmpty(row["filterwhere"])
  69. if where != "" {
  70. sql1 = sql1 + " and " + where
  71. }
  72. _, err := db.Raw(sql1).Exec()
  73. if err != nil {
  74. logger.Logger.Error(err)
  75. logger.Logger.Println(sql1)
  76. } else {
  77. logger.Logger.Debug(fmt.Sprintf("数据表%s %s天之前的过期数据清理完成!", tname, skeepday))
  78. }
  79. }
  80. }
  81. //日志表容量监测.每天监测一次
  82. func logSizeMonitorStart() {
  83. }
  84. //自动生成统计报表
  85. func autoMakeReport() {
  86. logger.Logger.Println("启动自动生成统计报表进程,每天23点自动生成")
  87. time.Sleep(1 * time.Minute)
  88. m := new(DataMonitor)
  89. for {
  90. nowHour := time.Now().Format("15")
  91. //每天早上0点清理一次
  92. dataTimoutMonitorHour, _ := bo.GetSysParamValue("dataStatHour", "00")
  93. //logger.Logger.Debug(fmt.Sprintf("监听数据统计自动生成,当前时:%s 生成时:%s", nowHour, dataTimoutMonitorHour))
  94. if nowHour != dataTimoutMonitorHour {
  95. time.Sleep(time.Minute * 59)
  96. continue
  97. }
  98. logger.Logger.Debug(fmt.Sprintf("开始生成统计数据(包括每台设备前一天的平均值、最大值、最小值统计数据)"))
  99. m.autoGenReport()
  100. time.Sleep(time.Minute * 59)
  101. }
  102. }
  103. func (t *DataMonitor) autoGenReport() {
  104. modelServiceIns := new(service.ModelService)
  105. //获取设备的模型信息
  106. modelInfo := modelServiceIns.GetModelListObj()
  107. if modelInfo == nil {
  108. logger.Logger.Error("未发现模型配置信息")
  109. return
  110. }
  111. modelMgr := new(service.ModelService)
  112. for _, item := range modelInfo {
  113. item2 := item.(map[string]interface{})
  114. modelid, _ := strconv.ParseInt(tools.IsEmpty(item2["id"]), 10, 64)
  115. ormrow, err := new(bo.Global).GetCodeInfo("model_table_mapping", cast.ToString(modelid))
  116. if err != nil || tools.IsEmpty(ormrow["name"]) == "" {
  117. logger.Logger.Error(fmt.Sprintf("未找到模型id=%d对应的数据表关系", modelid))
  118. continue
  119. }
  120. nowday := time.Now().Add(-2 * time.Hour).Format("2006-01-02") //统计上一天的
  121. s1 := nowday + " 00:00:00"
  122. s2 := nowday + " 23:59:59"
  123. localTableName := "t_data_" + tools.IsEmpty(ormrow["name"])
  124. //统计今日每个测点的最大采集值
  125. cols := modelMgr.GetModelAttrList(modelid)
  126. if len(cols) == 0 {
  127. continue
  128. }
  129. //logger.Logger.Debug(cols)
  130. vmp_min := map[string]map[string]interface{}{}
  131. vmp_max := map[string]map[string]interface{}{}
  132. vmp_avg := map[string]map[string]interface{}{}
  133. dbo := orm.NewOrm()
  134. for _, col := range cols {
  135. colname := strings.ToLower(cast.ToString(col.(map[string]interface{})["attr_name"]))
  136. statSql := " select device_id,round(Min(" + colname + "),1) min,round(MAX(" + colname + "),1) max,round(avg(" + colname + "),1) avg from " + localTableName + " where acquisition_time BETWEEN ? and ? and " + colname + "<99999 GROUP BY device_id"
  137. rowset := []orm.Params{}
  138. _, err := dbo.Raw(statSql, s1, s2).Values(&rowset)
  139. if err != nil {
  140. logger.Logger.Error(err)
  141. } else if len(rowset) > 0 {
  142. did := cast.ToString(rowset[0]["device_id"])
  143. if obj, h := vmp_min[did]; h {
  144. obj[colname] = cast.ToString(rowset[0]["min"])
  145. } else {
  146. vmp_min[did] = map[string]interface{}{colname: cast.ToString(rowset[0]["min"])}
  147. }
  148. if obj, h := vmp_max[did]; h {
  149. obj[colname] = cast.ToString(rowset[0]["max"])
  150. } else {
  151. vmp_max[did] = map[string]interface{}{colname: cast.ToString(rowset[0]["max"])}
  152. }
  153. if obj, h := vmp_avg[did]; h {
  154. obj[colname] = cast.ToString(rowset[0]["avg"])
  155. } else {
  156. vmp_avg[did] = map[string]interface{}{colname: cast.ToString(rowset[0]["avg"])}
  157. }
  158. }
  159. }
  160. maxtable := localTableName + "_max_day"
  161. mintable := localTableName + "_min_day"
  162. avgtable := localTableName + "_avg_day"
  163. mSqlCols := []string{"day", "device_id"}
  164. mSqlValues := []string{nowday}
  165. mChars := []string{"?", "?"}
  166. for did, vs := range vmp_max {
  167. mSqlValues = append(mSqlValues, did)
  168. for k, v := range vs {
  169. mSqlValues = append(mSqlValues, cast.ToString(v))
  170. mSqlCols = append(mSqlCols, k)
  171. mChars = append(mChars, "?")
  172. }
  173. mSql := fmt.Sprintf("insert into %s(%s)values(%s)", maxtable, strings.Join(mSqlCols, ","), strings.Join(mChars, ","))
  174. _, err := dbo.Raw(mSql, mSqlValues).Exec()
  175. if err != nil {
  176. logger.Logger.Error(err, fmt.Sprintf("SQL:%s 参数:%+v", mSql, mSqlValues))
  177. }
  178. logger.Logger.Debug(fmt.Sprintf("SQL:%s 参数:%+v", mSql, mSqlValues))
  179. }
  180. mSqlCols = []string{"day", "device_id"}
  181. mSqlValues = []string{nowday}
  182. mChars = []string{"?", "?"}
  183. for did, vs := range vmp_min {
  184. mSqlValues = append(mSqlValues, did)
  185. for k, v := range vs {
  186. mSqlValues = append(mSqlValues, cast.ToString(v))
  187. mSqlCols = append(mSqlCols, k)
  188. mChars = append(mChars, "?")
  189. }
  190. mSql := fmt.Sprintf("insert into %s(%s)values(%s)", mintable, strings.Join(mSqlCols, ","), strings.Join(mChars, ","))
  191. _, err := dbo.Raw(mSql, mSqlValues).Exec()
  192. if err != nil {
  193. logger.Logger.Error(err, fmt.Sprintf("SQL:%s 参数:%+v", mSql, mSqlValues))
  194. }
  195. logger.Logger.Debug(fmt.Sprintf("SQL:%s 参数:%+v", mSql, mSqlValues))
  196. }
  197. mSqlCols = []string{"day", "device_id"}
  198. mSqlValues = []string{nowday}
  199. mChars = []string{"?", "?"}
  200. for did, vs := range vmp_avg {
  201. mSqlValues = append(mSqlValues, did)
  202. for k, v := range vs {
  203. mSqlValues = append(mSqlValues, cast.ToString(v))
  204. mSqlCols = append(mSqlCols, k)
  205. mChars = append(mChars, "?")
  206. }
  207. mSql := fmt.Sprintf("insert into %s(%s)values(%s)", avgtable, strings.Join(mSqlCols, ","), strings.Join(mChars, ","))
  208. _, err := dbo.Raw(mSql, mSqlValues).Exec()
  209. if err != nil {
  210. logger.Logger.Error(err, fmt.Sprintf("SQL:%s 参数:%+v", mSql, mSqlValues))
  211. }
  212. logger.Logger.Debug(fmt.Sprintf("SQL:%s 参数:%+v", mSql, mSqlValues))
  213. }
  214. }
  215. time.Sleep(1 * time.Minute)
  216. }