| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- package monitor
- import (
- "fmt"
- "rtzh_elec_temperature/logger"
- "rtzh_elec_temperature/models/bo"
- "rtzh_elec_temperature/rtelec_app_public_lib/service"
- "rtzh_elec_temperature/tools"
- "strconv"
- "strings"
- "time"
- "github.com/spf13/cast"
- "github.com/astaxie/beego/orm"
- )
- type DataMonitor struct {
- }
- func StartDataMonitor() {
- go dataTimoutMonitorStart()
- go tableSizeMonitorStart()
- go logSizeMonitorStart()
- go autoMakeReport()
- }
- //数据采集超时处理进程启动
- func dataTimoutMonitorStart() {
- for {
- time.Sleep(time.Minute * 3)
- }
- }
- //业务数据表容量监测.每天监测一次
- func tableSizeMonitorStart() {
- logger.Logger.Println("启动过期数据自动清理进程")
- m := new(DataMonitor)
- m.dataAutoClear()
- for {
- nowHour := time.Now().Format("15")
- //每天早上0点清理一次
- dataTimoutMonitorHour, _ := bo.GetSysParamValue("dataClearHour", "01")
- if nowHour != dataTimoutMonitorHour {
- time.Sleep(time.Minute * 50)
- continue
- }
- m.dataAutoClear()
- time.Sleep(time.Minute * 50)
- }
- }
- //附件自动清理
- func (t *DataMonitor) dataAutoClear() {
- db := orm.NewOrm()
- sql := "select * from t_data_clear_data where skeepday>0 order by id"
- list := []orm.Params{}
- _, err := db.Raw(sql).Values(&list)
- if err != nil {
- logger.Logger.Error(err)
- return
- }
- if len(list) == 0 {
- return
- }
- for _, row := range list {
- skeepday := tools.IsEmpty(row["skeepday"])
- colname := tools.IsEmpty(row["colname"])
- tname := tools.IsEmpty(row["tablename"])
- if colname == "" {
- //时间字段列名,未指定时采用默认列名
- colname = "create_time"
- }
- logger.Logger.Debug(fmt.Sprintf("正在清理数据表%s %s天之前的过期数据", tname, skeepday))
- sql1 := "delete from " + tname + " where " + colname + "<DATE_ADD(now(),INTERVAL -" + skeepday + " DAY)"
- where := tools.IsEmpty(row["filterwhere"])
- if where != "" {
- sql1 = sql1 + " and " + where
- }
- _, err := db.Raw(sql1).Exec()
- if err != nil {
- logger.Logger.Error(err)
- logger.Logger.Println(sql1)
- } else {
- logger.Logger.Debug(fmt.Sprintf("数据表%s %s天之前的过期数据清理完成!", tname, skeepday))
- }
- }
- }
- //日志表容量监测.每天监测一次
- func logSizeMonitorStart() {
- }
- //自动生成统计报表
- func autoMakeReport() {
- logger.Logger.Println("启动自动生成统计报表进程,每天23点自动生成")
- time.Sleep(1 * time.Minute)
- m := new(DataMonitor)
- for {
- nowHour := time.Now().Format("15")
- //每天早上0点清理一次
- dataTimoutMonitorHour, _ := bo.GetSysParamValue("dataStatHour", "00")
- //logger.Logger.Debug(fmt.Sprintf("监听数据统计自动生成,当前时:%s 生成时:%s", nowHour, dataTimoutMonitorHour))
- if nowHour != dataTimoutMonitorHour {
- time.Sleep(time.Minute * 59)
- continue
- }
- logger.Logger.Debug(fmt.Sprintf("开始生成统计数据(包括每台设备前一天的平均值、最大值、最小值统计数据)"))
- m.autoGenReport()
- time.Sleep(time.Minute * 59)
- }
- }
- func (t *DataMonitor) autoGenReport() {
- modelServiceIns := new(service.ModelService)
- //获取设备的模型信息
- modelInfo := modelServiceIns.GetModelListObj()
- if modelInfo == nil {
- logger.Logger.Error("未发现模型配置信息")
- return
- }
- modelMgr := new(service.ModelService)
- for _, item := range modelInfo {
- item2 := item.(map[string]interface{})
- modelid, _ := strconv.ParseInt(tools.IsEmpty(item2["id"]), 10, 64)
- ormrow, err := new(bo.Global).GetCodeInfo("model_table_mapping", cast.ToString(modelid))
- if err != nil || tools.IsEmpty(ormrow["name"]) == "" {
- logger.Logger.Error(fmt.Sprintf("未找到模型id=%d对应的数据表关系", modelid))
- continue
- }
- nowday := time.Now().Add(-2 * time.Hour).Format("2006-01-02") //统计上一天的
- s1 := nowday + " 00:00:00"
- s2 := nowday + " 23:59:59"
- localTableName := "t_data_" + tools.IsEmpty(ormrow["name"])
- //统计今日每个测点的最大采集值
- cols := modelMgr.GetModelAttrList(modelid)
- if len(cols) == 0 {
- continue
- }
- //logger.Logger.Debug(cols)
- vmp_min := map[string]map[string]interface{}{}
- vmp_max := map[string]map[string]interface{}{}
- vmp_avg := map[string]map[string]interface{}{}
- dbo := orm.NewOrm()
- for _, col := range cols {
- colname := strings.ToLower(cast.ToString(col.(map[string]interface{})["attr_name"]))
- statSql := " select device_id,round(Min(" + colname + "),1) min,round(MAX(" + colname + "),1) max,round(avg(" + colname + "),1) avg from " + localTableName + " where acquisition_time BETWEEN ? and ? and " + colname + "<99999 GROUP BY device_id"
- rowset := []orm.Params{}
- _, err := dbo.Raw(statSql, s1, s2).Values(&rowset)
- if err != nil {
- logger.Logger.Error(err)
- } else if len(rowset) > 0 {
- did := cast.ToString(rowset[0]["device_id"])
- if obj, h := vmp_min[did]; h {
- obj[colname] = cast.ToString(rowset[0]["min"])
- } else {
- vmp_min[did] = map[string]interface{}{colname: cast.ToString(rowset[0]["min"])}
- }
- if obj, h := vmp_max[did]; h {
- obj[colname] = cast.ToString(rowset[0]["max"])
- } else {
- vmp_max[did] = map[string]interface{}{colname: cast.ToString(rowset[0]["max"])}
- }
- if obj, h := vmp_avg[did]; h {
- obj[colname] = cast.ToString(rowset[0]["avg"])
- } else {
- vmp_avg[did] = map[string]interface{}{colname: cast.ToString(rowset[0]["avg"])}
- }
- }
- }
- maxtable := localTableName + "_max_day"
- mintable := localTableName + "_min_day"
- avgtable := localTableName + "_avg_day"
- mSqlCols := []string{"day", "device_id"}
- mSqlValues := []string{nowday}
- mChars := []string{"?", "?"}
- for did, vs := range vmp_max {
- mSqlValues = append(mSqlValues, did)
- for k, v := range vs {
- mSqlValues = append(mSqlValues, cast.ToString(v))
- mSqlCols = append(mSqlCols, k)
- mChars = append(mChars, "?")
- }
- mSql := fmt.Sprintf("insert into %s(%s)values(%s)", maxtable, strings.Join(mSqlCols, ","), strings.Join(mChars, ","))
- _, err := dbo.Raw(mSql, mSqlValues).Exec()
- if err != nil {
- logger.Logger.Error(err, fmt.Sprintf("SQL:%s 参数:%+v", mSql, mSqlValues))
- }
- logger.Logger.Debug(fmt.Sprintf("SQL:%s 参数:%+v", mSql, mSqlValues))
- }
- mSqlCols = []string{"day", "device_id"}
- mSqlValues = []string{nowday}
- mChars = []string{"?", "?"}
- for did, vs := range vmp_min {
- mSqlValues = append(mSqlValues, did)
- for k, v := range vs {
- mSqlValues = append(mSqlValues, cast.ToString(v))
- mSqlCols = append(mSqlCols, k)
- mChars = append(mChars, "?")
- }
- mSql := fmt.Sprintf("insert into %s(%s)values(%s)", mintable, strings.Join(mSqlCols, ","), strings.Join(mChars, ","))
- _, err := dbo.Raw(mSql, mSqlValues).Exec()
- if err != nil {
- logger.Logger.Error(err, fmt.Sprintf("SQL:%s 参数:%+v", mSql, mSqlValues))
- }
- logger.Logger.Debug(fmt.Sprintf("SQL:%s 参数:%+v", mSql, mSqlValues))
- }
- mSqlCols = []string{"day", "device_id"}
- mSqlValues = []string{nowday}
- mChars = []string{"?", "?"}
- for did, vs := range vmp_avg {
- mSqlValues = append(mSqlValues, did)
- for k, v := range vs {
- mSqlValues = append(mSqlValues, cast.ToString(v))
- mSqlCols = append(mSqlCols, k)
- mChars = append(mChars, "?")
- }
- mSql := fmt.Sprintf("insert into %s(%s)values(%s)", avgtable, strings.Join(mSqlCols, ","), strings.Join(mChars, ","))
- _, err := dbo.Raw(mSql, mSqlValues).Exec()
- if err != nil {
- logger.Logger.Error(err, fmt.Sprintf("SQL:%s 参数:%+v", mSql, mSqlValues))
- }
- logger.Logger.Debug(fmt.Sprintf("SQL:%s 参数:%+v", mSql, mSqlValues))
- }
- }
- time.Sleep(1 * time.Minute)
- }
|