package service import ( "errors" "fmt" "rtzh_elec_temperature/logger" "rtzh_elec_temperature/tools" "strings" "sync" "time" "github.com/spf13/cast" "git.rtzhtech.cn/iss/public-lib/logic" "git.rtzhtech.cn/iss/public-lib/model" ) type TaskService struct { BaseService } var taskStatus = sync.Map{} // make(map[int64]bool) var taskInfoList = sync.Map{} //计划任务所需内容字段 type TaskField struct { Taskid int64 `json:"taskid"` Tasktype int `json:"tasktype"` Starttime string `json:"starttime"` Runtime int `json:"runtime"` Interval int `json:"interval"` Actid int64 `json:"actid"` Status int `json:"status"` Pause int `json:"pause"` } var ActiveCacheList = make(map[int64]ActiveField) type ActiveField struct { Actid int64 `json:"actid"` Deviceid int `json:"deviceid"` Attrname string `json:"attrname"` Value int `json:"value"` } //编辑任务计划(添加或修改) func (t *TaskService) EditTask(parameter model.DevTask) error { var editError error l := logic.NewTaskLogic() var isAdd = false if t.ExistsTaskName(parameter.Appid, parameter.Taskid, parameter.Taskdesc) { return errors.New(fmt.Sprintf("已存在计划任务描述【%s】", parameter.Taskdesc)) } if parameter.Taskid == 0 { parameter.Taskid = new(logic.ToolsLogic).GetOnlyId(cast.ToInt64(RtelecManageApp().RegAppID)) isAdd = true } if isAdd { editError = l.AddTask(¶meter) new(LogService).SaveLog(fmt.Sprintf("添加任务计划,任务参数:%v", parameter)) } else { editError = l.ModifyTask(¶meter) new(LogService).SaveLog(fmt.Sprintf("编辑任务计划,任务参数:%v", parameter)) } t.GetTaskInfo() return editError } //判断计划任务名称是否已经存在 func (t *TaskService) ExistsTaskName(appId int32, taskId int64, taskName string) bool { var exists = false var sqlCommandText = "" if taskId == 0 { sqlCommandText = fmt.Sprintf("select 1 recordcount from dev_task where taskdesc='%s' and appid=%d", taskName, appId) } else { sqlCommandText = fmt.Sprintf("select 1 recordcount from dev_task where taskid!=%d and taskdesc='%s' and appid=%d", taskId, taskName, appId) } var total = QueryTotal{} l := logic.NewTaskLogic() err := l.SvcCtx.AlarmStrategy.Base.Raw(sqlCommandText, &total) if err == nil { if total.Recordcount == 1 { exists = true } } return exists } //删除计划任务表 func (t *TaskService) DeleteTask(taskid int64) error { taskObject := logic.NewTaskLogic() err := taskObject.DelTask(taskid) if err == nil { new(LogService).SaveLog(fmt.Sprintf("删除任务计划,任务编号:%d", taskid)) t.GetTaskInfo() } return err } //计划任务列表记录 func (t *TaskService) TaskList(tasktype, pageIndex, pageSize int) (interface{}, error) { l := logic.NewAlarmStrategyLogic() type Taskinfo struct { Id int `json:"id"` Taskid int64 `json:"taskid,string"` Taskdesc string `json:"taskdesc"` Tasktype string `json:"tasktype"` Tasktypename string `json:"tasktypename"` Starttime string `json:"starttime"` Runtime int `json:"runtime"` Period int `json:"period"` Actid int64 `json:"actid,string"` Status string `json:"status"` Pause string `json:"pause"` Create_at string `json:"create_at"` Update_at string `json:"update_at"` Activename string `json:"activename"` } var list []Taskinfo sqlCommandText := "select a.id,taskid,taskdesc,tasktype,date_format(starttime,'%Y-%m-%d %H:%i:%s') starttime,runtime,period,a.actid,case a.status when 1 then '执行中' when 2 then '执行结束' when 3 then '计时中' when 4 then '暂停' else null end status,case pause when 1 then '正常' when 2 then '暂停' else null end pause,date_format(a.create_at,'%Y-%m-%d %H:%i:%s') create_at,date_format(a.update_at,'%Y-%m-%d %H:%i:%s') update_at,b.actdesc activename from dev_task a left join dev_cpaction b on a.actid=b.actid " totalSql := "select count(1) recordcount from dev_task a " var sqlCondition []string sqlCondition = append(sqlCondition, fmt.Sprintf("a.appid=%s", RtelecManageApp().RegAppID)) if tasktype > 0 { sqlCondition = append(sqlCondition, fmt.Sprintf("a.tasktype=%d", tasktype)) } if len(sqlCondition) > 0 { sqlCommandText += " where " + strings.Join(sqlCondition, " and ") totalSql += " where " + strings.Join(sqlCondition, " and ") } sqlCommandText += fmt.Sprintf(" order by a.id desc limit %d,%d", (pageIndex-1)*pageSize, pageSize) err := l.SvcCtx.AlarmStrategy.Base.Raw(sqlCommandText, &list) var total = QueryTotal{} if err == nil { _ = l.SvcCtx.AlarmStrategy.Base.Raw(totalSql, &total) } return map[string]interface{}{"list": list, "total": total.Recordcount}, err } //获取任务列表,用于执行计划 func (t *TaskService) GetTaskInfo() { logger.Logger.Println("===正在重新加载计划任务列表") taskInfoList = sync.Map{} l := logic.NewTaskLogic() var list []TaskField var sqlCommandText = fmt.Sprintf("select taskid,tasktype,starttime,runtime,period 'interval',actid,ifnull(status,1) status,ifnull(pause,1) pause,0 is_execute from dev_task where status!=2 and appid=%s", RtelecManageApp().RegAppID) err := l.SvcCtx.DevTask.Base.Raw(sqlCommandText, &list) if err == nil { for _, record := range list { taskInfoList.Store(record.Taskid, record) } } else { logger.Logger.Error(err) logger.Logger.Println(sqlCommandText) } taskStatus = sync.Map{} } //获取任务列表,用于执行计划 func (t *TaskService) GetActionCache(actid int64, refresh bool) (ActiveField, error) { l := logic.NewTaskLogic() var list []ActiveField var sqlCommandText = fmt.Sprintf("select a.actid,c.deviceid,c.attrname,cast(b.value as signed) value from dev_task a inner join dev_cpaction b on a.actid=b.actid inner join dev_cpinfo c on b.cpid=c.cpid where a.actid=%d and a.appid=%s", actid, RtelecManageApp().RegAppID) err := l.SvcCtx.DevTask.Base.Raw(sqlCommandText, &list) if err != nil { logger.Logger.Error(err) logger.Logger.Println(sqlCommandText) return ActiveField{}, err } if len(list) == 0 { return ActiveField{}, errors.New("无数据") } return list[0], nil } //执行计划任务 func (t *TaskService) ExecuteTask() { logger.Logger.Println("启动任务执行进程") t.GetTaskInfo() for { taskInfoList.Range(func(_, record1 interface{}) bool { record := record1.(TaskField) if record.Status == 2 || record.Pause == 2 { //已结束或已暂停 return true } loc, _ := time.LoadLocation("Asia/Shanghai") startTime, err := time.ParseInLocation("2006-01-02T15:04:05+08:00", record.Starttime, loc) nowTime := time.Now() if err == nil && (record.Status == 3) && (nowTime.Unix() >= startTime.Unix()) { if record.Tasktype == 1 { //普通任务,只允许执行一次 go t.ExecuteTaskContent(record.Taskid, record.Actid, record.Tasktype) } else { go t.ForeachTask(record.Taskid) } } return true }) time.Sleep(10 * time.Second) } } //循环任务 func (t *TaskService) ForeachTask(taskid int64) { for { var record1, has = taskInfoList.Load(taskid) if !has { //任务已不存在时,退出进程 return } record := record1.(TaskField) record.Status = 1 //表示执行中 taskInfoList.Store(taskid, record) /* //判断是否到时执行结束 loc, _ := time.LoadLocation("Asia/Shanghai") startTime, err := time.ParseInLocation("2006-01-02T15:04:05+08:00", record.Starttime, loc) if err != nil { TaskPause(taskid, 0, 2) return } if (time.Now().Unix() - startTime.Unix()) > int64(record.Runtime) { //已过了执行时间,终止任务 TaskPause(taskid, 0, 2) return } */ go t.ExecuteTaskContent(record.Taskid, record.Actid, record.Tasktype) if record.Interval > 1 { time.Sleep(time.Duration(record.Interval) * time.Second) } else { t.TaskPause(taskid, 0, 2) return } } } //真正执行任务内容的方法 //taskType: 任务分类。1-普通任务 2-循环任务 func (t *TaskService) ExecuteTaskContent(taskId, activeId int64, taskType int) { var cmdParameter CommandMgr commandInfo, err := t.GetActionCache(activeId, true) if err != nil { logger.Logger.Error(fmt.Sprintf("未获取到定时任务%d的动作信息", taskId)) t.TaskPause(taskId, 1, 2) return } cmdParameter.DeviceId = commandInfo.Deviceid cmdParameter.AttrName = commandInfo.Attrname cmdParameter.State = commandInfo.Value //发送消息 logger.Logger.Debug(fmt.Sprintf("正在执行计划任务的操作:%+v", cmdParameter)) go new(CommandService).Command(cmdParameter, tools.GetUid(), 1) if taskType == 1 { //执行结束 err := t.TaskPause(taskId, 1, 2) if err != nil { logger.Logger.Error(err) } } else { //fmt.Println("=========更改状态======") if _, ok := taskStatus.Load(taskId); !ok { //fmt.Println("=========更改状态1======") taskStatus.Store(taskId, true) err := t.TaskPause(taskId, 1, 1) if err != nil { logger.Logger.Error(err) } } } } //任务暂停或恢复 func (t *TaskService) TaskPause(taskId int64, pause, status int) error { if status == 2 { taskInfoList.Delete(taskId) } l := logic.NewTaskLogic() var sqlCommandText = "" var sqlCondition []string if status > 0 { sqlCondition = append(sqlCondition, fmt.Sprintf("`status`=%d", status)) } if pause > 0 { sqlCondition = append(sqlCondition, fmt.Sprintf("`pause`=%d", pause)) } sqlCommandText = "update dev_task set " + strings.Join(sqlCondition, ",") + fmt.Sprintf(" where taskid=%d", taskId) return l.SvcCtx.DevTask.Base.Exec(sqlCommandText) }