| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291 |
- package service
- import (
- "errors"
- "fmt"
- "rtzh_elec_temperature/logger"
- "rtzh_elec_temperature/tools"
- "strings"
- "sync"
- "time"
- "github.com/spf13/cast"
- "git.rtzhtech.cn/iss/public-lib/logic"
- "git.rtzhtech.cn/iss/public-lib/model"
- )
- type TaskService struct {
- BaseService
- }
- var taskStatus = sync.Map{} // make(map[int64]bool)
- var taskInfoList = sync.Map{}
- //计划任务所需内容字段
- type TaskField struct {
- Taskid int64 `json:"taskid"`
- Tasktype int `json:"tasktype"`
- Starttime string `json:"starttime"`
- Runtime int `json:"runtime"`
- Interval int `json:"interval"`
- Actid int64 `json:"actid"`
- Status int `json:"status"`
- Pause int `json:"pause"`
- }
- var ActiveCacheList = make(map[int64]ActiveField)
- type ActiveField struct {
- Actid int64 `json:"actid"`
- Deviceid int `json:"deviceid"`
- Attrname string `json:"attrname"`
- Value int `json:"value"`
- }
- //编辑任务计划(添加或修改)
- func (t *TaskService) EditTask(parameter model.DevTask) error {
- var editError error
- l := logic.NewTaskLogic()
- var isAdd = false
- if t.ExistsTaskName(parameter.Appid, parameter.Taskid, parameter.Taskdesc) {
- return errors.New(fmt.Sprintf("已存在计划任务描述【%s】", parameter.Taskdesc))
- }
- if parameter.Taskid == 0 {
- parameter.Taskid = new(logic.ToolsLogic).GetOnlyId(cast.ToInt64(RtelecManageApp().RegAppID))
- isAdd = true
- }
- if isAdd {
- editError = l.AddTask(¶meter)
- new(LogService).SaveLog(fmt.Sprintf("添加任务计划,任务参数:%v", parameter))
- } else {
- editError = l.ModifyTask(¶meter)
- new(LogService).SaveLog(fmt.Sprintf("编辑任务计划,任务参数:%v", parameter))
- }
- t.GetTaskInfo()
- return editError
- }
- //判断计划任务名称是否已经存在
- func (t *TaskService) ExistsTaskName(appId int32, taskId int64, taskName string) bool {
- var exists = false
- var sqlCommandText = ""
- if taskId == 0 {
- sqlCommandText = fmt.Sprintf("select 1 recordcount from dev_task where taskdesc='%s' and appid=%d", taskName, appId)
- } else {
- sqlCommandText = fmt.Sprintf("select 1 recordcount from dev_task where taskid!=%d and taskdesc='%s' and appid=%d", taskId, taskName, appId)
- }
- var total = QueryTotal{}
- l := logic.NewTaskLogic()
- err := l.SvcCtx.AlarmStrategy.Base.Raw(sqlCommandText, &total)
- if err == nil {
- if total.Recordcount == 1 {
- exists = true
- }
- }
- return exists
- }
- //删除计划任务表
- func (t *TaskService) DeleteTask(taskid int64) error {
- taskObject := logic.NewTaskLogic()
- err := taskObject.DelTask(taskid)
- if err == nil {
- new(LogService).SaveLog(fmt.Sprintf("删除任务计划,任务编号:%d", taskid))
- t.GetTaskInfo()
- }
- return err
- }
- //计划任务列表记录
- func (t *TaskService) TaskList(tasktype, pageIndex, pageSize int) (interface{}, error) {
- l := logic.NewAlarmStrategyLogic()
- type Taskinfo struct {
- Id int `json:"id"`
- Taskid int64 `json:"taskid,string"`
- Taskdesc string `json:"taskdesc"`
- Tasktype string `json:"tasktype"`
- Tasktypename string `json:"tasktypename"`
- Starttime string `json:"starttime"`
- Runtime int `json:"runtime"`
- Period int `json:"period"`
- Actid int64 `json:"actid,string"`
- Status string `json:"status"`
- Pause string `json:"pause"`
- Create_at string `json:"create_at"`
- Update_at string `json:"update_at"`
- Activename string `json:"activename"`
- }
- var list []Taskinfo
- sqlCommandText := "select a.id,taskid,taskdesc,tasktype,date_format(starttime,'%Y-%m-%d %H:%i:%s') starttime,runtime,period,a.actid,case a.status when 1 then '执行中' when 2 then '执行结束' when 3 then '计时中' when 4 then '暂停' else null end status,case pause when 1 then '正常' when 2 then '暂停' else null end pause,date_format(a.create_at,'%Y-%m-%d %H:%i:%s') create_at,date_format(a.update_at,'%Y-%m-%d %H:%i:%s') update_at,b.actdesc activename from dev_task a left join dev_cpaction b on a.actid=b.actid "
- totalSql := "select count(1) recordcount from dev_task a "
- var sqlCondition []string
- sqlCondition = append(sqlCondition, fmt.Sprintf("a.appid=%s", RtelecManageApp().RegAppID))
- if tasktype > 0 {
- sqlCondition = append(sqlCondition, fmt.Sprintf("a.tasktype=%d", tasktype))
- }
- if len(sqlCondition) > 0 {
- sqlCommandText += " where " + strings.Join(sqlCondition, " and ")
- totalSql += " where " + strings.Join(sqlCondition, " and ")
- }
- sqlCommandText += fmt.Sprintf(" order by a.id desc limit %d,%d", (pageIndex-1)*pageSize, pageSize)
- err := l.SvcCtx.AlarmStrategy.Base.Raw(sqlCommandText, &list)
- var total = QueryTotal{}
- if err == nil {
- _ = l.SvcCtx.AlarmStrategy.Base.Raw(totalSql, &total)
- }
- return map[string]interface{}{"list": list, "total": total.Recordcount}, err
- }
- //获取任务列表,用于执行计划
- func (t *TaskService) GetTaskInfo() {
- logger.Logger.Println("===正在重新加载计划任务列表")
- taskInfoList = sync.Map{}
- l := logic.NewTaskLogic()
- var list []TaskField
- var sqlCommandText = fmt.Sprintf("select taskid,tasktype,starttime,runtime,period 'interval',actid,ifnull(status,1) status,ifnull(pause,1) pause,0 is_execute from dev_task where status!=2 and appid=%s", RtelecManageApp().RegAppID)
- err := l.SvcCtx.DevTask.Base.Raw(sqlCommandText, &list)
- if err == nil {
- for _, record := range list {
- taskInfoList.Store(record.Taskid, record)
- }
- } else {
- logger.Logger.Error(err)
- logger.Logger.Println(sqlCommandText)
- }
- taskStatus = sync.Map{}
- }
- //获取任务列表,用于执行计划
- func (t *TaskService) GetActionCache(actid int64, refresh bool) (ActiveField, error) {
- l := logic.NewTaskLogic()
- var list []ActiveField
- var sqlCommandText = fmt.Sprintf("select a.actid,c.deviceid,c.attrname,cast(b.value as signed) value from dev_task a inner join dev_cpaction b on a.actid=b.actid inner join dev_cpinfo c on b.cpid=c.cpid where a.actid=%d and a.appid=%s", actid, RtelecManageApp().RegAppID)
- err := l.SvcCtx.DevTask.Base.Raw(sqlCommandText, &list)
- if err != nil {
- logger.Logger.Error(err)
- logger.Logger.Println(sqlCommandText)
- return ActiveField{}, err
- }
- if len(list) == 0 {
- return ActiveField{}, errors.New("无数据")
- }
- return list[0], nil
- }
- //执行计划任务
- func (t *TaskService) ExecuteTask() {
- logger.Logger.Println("启动任务执行进程")
- t.GetTaskInfo()
- for {
- taskInfoList.Range(func(_, record1 interface{}) bool {
- record := record1.(TaskField)
- if record.Status == 2 || record.Pause == 2 {
- //已结束或已暂停
- return true
- }
- loc, _ := time.LoadLocation("Asia/Shanghai")
- startTime, err := time.ParseInLocation("2006-01-02T15:04:05+08:00", record.Starttime, loc)
- nowTime := time.Now()
- if err == nil && (record.Status == 3) && (nowTime.Unix() >= startTime.Unix()) {
- if record.Tasktype == 1 { //普通任务,只允许执行一次
- go t.ExecuteTaskContent(record.Taskid, record.Actid, record.Tasktype)
- } else {
- go t.ForeachTask(record.Taskid)
- }
- }
- return true
- })
- time.Sleep(10 * time.Second)
- }
- }
- //循环任务
- func (t *TaskService) ForeachTask(taskid int64) {
- for {
- var record1, has = taskInfoList.Load(taskid)
- if !has {
- //任务已不存在时,退出进程
- return
- }
- record := record1.(TaskField)
- record.Status = 1 //表示执行中
- taskInfoList.Store(taskid, record)
- /*
- //判断是否到时执行结束
- loc, _ := time.LoadLocation("Asia/Shanghai")
- startTime, err := time.ParseInLocation("2006-01-02T15:04:05+08:00", record.Starttime, loc)
- if err != nil {
- TaskPause(taskid, 0, 2)
- return
- }
- if (time.Now().Unix() - startTime.Unix()) > int64(record.Runtime) {
- //已过了执行时间,终止任务
- TaskPause(taskid, 0, 2)
- return
- }
- */
- go t.ExecuteTaskContent(record.Taskid, record.Actid, record.Tasktype)
- if record.Interval > 1 {
- time.Sleep(time.Duration(record.Interval) * time.Second)
- } else {
- t.TaskPause(taskid, 0, 2)
- return
- }
- }
- }
- //真正执行任务内容的方法
- //taskType: 任务分类。1-普通任务 2-循环任务
- func (t *TaskService) ExecuteTaskContent(taskId, activeId int64, taskType int) {
- var cmdParameter CommandMgr
- commandInfo, err := t.GetActionCache(activeId, true)
- if err != nil {
- logger.Logger.Error(fmt.Sprintf("未获取到定时任务%d的动作信息", taskId))
- t.TaskPause(taskId, 1, 2)
- return
- }
- cmdParameter.DeviceId = commandInfo.Deviceid
- cmdParameter.AttrName = commandInfo.Attrname
- cmdParameter.State = commandInfo.Value
- //发送消息
- logger.Logger.Debug(fmt.Sprintf("正在执行计划任务的操作:%+v", cmdParameter))
- go new(CommandService).Command(cmdParameter, tools.GetUid(), 1)
- if taskType == 1 {
- //执行结束
- err := t.TaskPause(taskId, 1, 2)
- if err != nil {
- logger.Logger.Error(err)
- }
- } else {
- //fmt.Println("=========更改状态======")
- if _, ok := taskStatus.Load(taskId); !ok {
- //fmt.Println("=========更改状态1======")
- taskStatus.Store(taskId, true)
- err := t.TaskPause(taskId, 1, 1)
- if err != nil {
- logger.Logger.Error(err)
- }
- }
- }
- }
- //任务暂停或恢复
- func (t *TaskService) TaskPause(taskId int64, pause, status int) error {
- if status == 2 {
- taskInfoList.Delete(taskId)
- }
- l := logic.NewTaskLogic()
- var sqlCommandText = ""
- var sqlCondition []string
- if status > 0 {
- sqlCondition = append(sqlCondition, fmt.Sprintf("`status`=%d", status))
- }
- if pause > 0 {
- sqlCondition = append(sqlCondition, fmt.Sprintf("`pause`=%d", pause))
- }
- sqlCommandText = "update dev_task set " + strings.Join(sqlCondition, ",") + fmt.Sprintf(" where taskid=%d", taskId)
- return l.SvcCtx.DevTask.Base.Exec(sqlCommandText)
- }
|