task_service.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. package service
  2. import (
  3. "errors"
  4. "fmt"
  5. "rtzh_elec_temperature/logger"
  6. "rtzh_elec_temperature/tools"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/spf13/cast"
  11. "git.rtzhtech.cn/iss/public-lib/logic"
  12. "git.rtzhtech.cn/iss/public-lib/model"
  13. )
  14. type TaskService struct {
  15. BaseService
  16. }
  17. var taskStatus = sync.Map{} // make(map[int64]bool)
  18. var taskInfoList = sync.Map{}
  19. //计划任务所需内容字段
  20. type TaskField struct {
  21. Taskid int64 `json:"taskid"`
  22. Tasktype int `json:"tasktype"`
  23. Starttime string `json:"starttime"`
  24. Runtime int `json:"runtime"`
  25. Interval int `json:"interval"`
  26. Actid int64 `json:"actid"`
  27. Status int `json:"status"`
  28. Pause int `json:"pause"`
  29. }
  30. var ActiveCacheList = make(map[int64]ActiveField)
  31. type ActiveField struct {
  32. Actid int64 `json:"actid"`
  33. Deviceid int `json:"deviceid"`
  34. Attrname string `json:"attrname"`
  35. Value int `json:"value"`
  36. }
  37. //编辑任务计划(添加或修改)
  38. func (t *TaskService) EditTask(parameter model.DevTask) error {
  39. var editError error
  40. l := logic.NewTaskLogic()
  41. var isAdd = false
  42. if t.ExistsTaskName(parameter.Appid, parameter.Taskid, parameter.Taskdesc) {
  43. return errors.New(fmt.Sprintf("已存在计划任务描述【%s】", parameter.Taskdesc))
  44. }
  45. if parameter.Taskid == 0 {
  46. parameter.Taskid = new(logic.ToolsLogic).GetOnlyId(cast.ToInt64(RtelecManageApp().RegAppID))
  47. isAdd = true
  48. }
  49. if isAdd {
  50. editError = l.AddTask(&parameter)
  51. new(LogService).SaveLog(fmt.Sprintf("添加任务计划,任务参数:%v", parameter))
  52. } else {
  53. editError = l.ModifyTask(&parameter)
  54. new(LogService).SaveLog(fmt.Sprintf("编辑任务计划,任务参数:%v", parameter))
  55. }
  56. t.GetTaskInfo()
  57. return editError
  58. }
  59. //判断计划任务名称是否已经存在
  60. func (t *TaskService) ExistsTaskName(appId int32, taskId int64, taskName string) bool {
  61. var exists = false
  62. var sqlCommandText = ""
  63. if taskId == 0 {
  64. sqlCommandText = fmt.Sprintf("select 1 recordcount from dev_task where taskdesc='%s' and appid=%d", taskName, appId)
  65. } else {
  66. sqlCommandText = fmt.Sprintf("select 1 recordcount from dev_task where taskid!=%d and taskdesc='%s' and appid=%d", taskId, taskName, appId)
  67. }
  68. var total = QueryTotal{}
  69. l := logic.NewTaskLogic()
  70. err := l.SvcCtx.AlarmStrategy.Base.Raw(sqlCommandText, &total)
  71. if err == nil {
  72. if total.Recordcount == 1 {
  73. exists = true
  74. }
  75. }
  76. return exists
  77. }
  78. //删除计划任务表
  79. func (t *TaskService) DeleteTask(taskid int64) error {
  80. taskObject := logic.NewTaskLogic()
  81. err := taskObject.DelTask(taskid)
  82. if err == nil {
  83. new(LogService).SaveLog(fmt.Sprintf("删除任务计划,任务编号:%d", taskid))
  84. t.GetTaskInfo()
  85. }
  86. return err
  87. }
  88. //计划任务列表记录
  89. func (t *TaskService) TaskList(tasktype, pageIndex, pageSize int) (interface{}, error) {
  90. l := logic.NewAlarmStrategyLogic()
  91. type Taskinfo struct {
  92. Id int `json:"id"`
  93. Taskid int64 `json:"taskid,string"`
  94. Taskdesc string `json:"taskdesc"`
  95. Tasktype string `json:"tasktype"`
  96. Tasktypename string `json:"tasktypename"`
  97. Starttime string `json:"starttime"`
  98. Runtime int `json:"runtime"`
  99. Period int `json:"period"`
  100. Actid int64 `json:"actid,string"`
  101. Status string `json:"status"`
  102. Pause string `json:"pause"`
  103. Create_at string `json:"create_at"`
  104. Update_at string `json:"update_at"`
  105. Activename string `json:"activename"`
  106. }
  107. var list []Taskinfo
  108. sqlCommandText := "select a.id,taskid,taskdesc,tasktype,date_format(starttime,'%Y-%m-%d %H:%i:%s') starttime,runtime,period,a.actid,case a.status when 1 then '执行中' when 2 then '执行结束' when 3 then '计时中' when 4 then '暂停' else null end status,case pause when 1 then '正常' when 2 then '暂停' else null end pause,date_format(a.create_at,'%Y-%m-%d %H:%i:%s') create_at,date_format(a.update_at,'%Y-%m-%d %H:%i:%s') update_at,b.actdesc activename from dev_task a left join dev_cpaction b on a.actid=b.actid "
  109. totalSql := "select count(1) recordcount from dev_task a "
  110. var sqlCondition []string
  111. sqlCondition = append(sqlCondition, fmt.Sprintf("a.appid=%s", RtelecManageApp().RegAppID))
  112. if tasktype > 0 {
  113. sqlCondition = append(sqlCondition, fmt.Sprintf("a.tasktype=%d", tasktype))
  114. }
  115. if len(sqlCondition) > 0 {
  116. sqlCommandText += " where " + strings.Join(sqlCondition, " and ")
  117. totalSql += " where " + strings.Join(sqlCondition, " and ")
  118. }
  119. sqlCommandText += fmt.Sprintf(" order by a.id desc limit %d,%d", (pageIndex-1)*pageSize, pageSize)
  120. err := l.SvcCtx.AlarmStrategy.Base.Raw(sqlCommandText, &list)
  121. var total = QueryTotal{}
  122. if err == nil {
  123. _ = l.SvcCtx.AlarmStrategy.Base.Raw(totalSql, &total)
  124. }
  125. return map[string]interface{}{"list": list, "total": total.Recordcount}, err
  126. }
  127. //获取任务列表,用于执行计划
  128. func (t *TaskService) GetTaskInfo() {
  129. logger.Logger.Println("===正在重新加载计划任务列表")
  130. taskInfoList = sync.Map{}
  131. l := logic.NewTaskLogic()
  132. var list []TaskField
  133. var sqlCommandText = fmt.Sprintf("select taskid,tasktype,starttime,runtime,period 'interval',actid,ifnull(status,1) status,ifnull(pause,1) pause,0 is_execute from dev_task where status!=2 and appid=%s", RtelecManageApp().RegAppID)
  134. err := l.SvcCtx.DevTask.Base.Raw(sqlCommandText, &list)
  135. if err == nil {
  136. for _, record := range list {
  137. taskInfoList.Store(record.Taskid, record)
  138. }
  139. } else {
  140. logger.Logger.Error(err)
  141. logger.Logger.Println(sqlCommandText)
  142. }
  143. taskStatus = sync.Map{}
  144. }
  145. //获取任务列表,用于执行计划
  146. func (t *TaskService) GetActionCache(actid int64, refresh bool) (ActiveField, error) {
  147. l := logic.NewTaskLogic()
  148. var list []ActiveField
  149. var sqlCommandText = fmt.Sprintf("select a.actid,c.deviceid,c.attrname,cast(b.value as signed) value from dev_task a inner join dev_cpaction b on a.actid=b.actid inner join dev_cpinfo c on b.cpid=c.cpid where a.actid=%d and a.appid=%s", actid, RtelecManageApp().RegAppID)
  150. err := l.SvcCtx.DevTask.Base.Raw(sqlCommandText, &list)
  151. if err != nil {
  152. logger.Logger.Error(err)
  153. logger.Logger.Println(sqlCommandText)
  154. return ActiveField{}, err
  155. }
  156. if len(list) == 0 {
  157. return ActiveField{}, errors.New("无数据")
  158. }
  159. return list[0], nil
  160. }
  161. //执行计划任务
  162. func (t *TaskService) ExecuteTask() {
  163. logger.Logger.Println("启动任务执行进程")
  164. t.GetTaskInfo()
  165. for {
  166. taskInfoList.Range(func(_, record1 interface{}) bool {
  167. record := record1.(TaskField)
  168. if record.Status == 2 || record.Pause == 2 {
  169. //已结束或已暂停
  170. return true
  171. }
  172. loc, _ := time.LoadLocation("Asia/Shanghai")
  173. startTime, err := time.ParseInLocation("2006-01-02T15:04:05+08:00", record.Starttime, loc)
  174. nowTime := time.Now()
  175. if err == nil && (record.Status == 3) && (nowTime.Unix() >= startTime.Unix()) {
  176. if record.Tasktype == 1 { //普通任务,只允许执行一次
  177. go t.ExecuteTaskContent(record.Taskid, record.Actid, record.Tasktype)
  178. } else {
  179. go t.ForeachTask(record.Taskid)
  180. }
  181. }
  182. return true
  183. })
  184. time.Sleep(10 * time.Second)
  185. }
  186. }
  187. //循环任务
  188. func (t *TaskService) ForeachTask(taskid int64) {
  189. for {
  190. var record1, has = taskInfoList.Load(taskid)
  191. if !has {
  192. //任务已不存在时,退出进程
  193. return
  194. }
  195. record := record1.(TaskField)
  196. record.Status = 1 //表示执行中
  197. taskInfoList.Store(taskid, record)
  198. /*
  199. //判断是否到时执行结束
  200. loc, _ := time.LoadLocation("Asia/Shanghai")
  201. startTime, err := time.ParseInLocation("2006-01-02T15:04:05+08:00", record.Starttime, loc)
  202. if err != nil {
  203. TaskPause(taskid, 0, 2)
  204. return
  205. }
  206. if (time.Now().Unix() - startTime.Unix()) > int64(record.Runtime) {
  207. //已过了执行时间,终止任务
  208. TaskPause(taskid, 0, 2)
  209. return
  210. }
  211. */
  212. go t.ExecuteTaskContent(record.Taskid, record.Actid, record.Tasktype)
  213. if record.Interval > 1 {
  214. time.Sleep(time.Duration(record.Interval) * time.Second)
  215. } else {
  216. t.TaskPause(taskid, 0, 2)
  217. return
  218. }
  219. }
  220. }
  221. //真正执行任务内容的方法
  222. //taskType: 任务分类。1-普通任务 2-循环任务
  223. func (t *TaskService) ExecuteTaskContent(taskId, activeId int64, taskType int) {
  224. var cmdParameter CommandMgr
  225. commandInfo, err := t.GetActionCache(activeId, true)
  226. if err != nil {
  227. logger.Logger.Error(fmt.Sprintf("未获取到定时任务%d的动作信息", taskId))
  228. t.TaskPause(taskId, 1, 2)
  229. return
  230. }
  231. cmdParameter.DeviceId = commandInfo.Deviceid
  232. cmdParameter.AttrName = commandInfo.Attrname
  233. cmdParameter.State = commandInfo.Value
  234. //发送消息
  235. logger.Logger.Debug(fmt.Sprintf("正在执行计划任务的操作:%+v", cmdParameter))
  236. go new(CommandService).Command(cmdParameter, tools.GetUid(), 1)
  237. if taskType == 1 {
  238. //执行结束
  239. err := t.TaskPause(taskId, 1, 2)
  240. if err != nil {
  241. logger.Logger.Error(err)
  242. }
  243. } else {
  244. //fmt.Println("=========更改状态======")
  245. if _, ok := taskStatus.Load(taskId); !ok {
  246. //fmt.Println("=========更改状态1======")
  247. taskStatus.Store(taskId, true)
  248. err := t.TaskPause(taskId, 1, 1)
  249. if err != nil {
  250. logger.Logger.Error(err)
  251. }
  252. }
  253. }
  254. }
  255. //任务暂停或恢复
  256. func (t *TaskService) TaskPause(taskId int64, pause, status int) error {
  257. if status == 2 {
  258. taskInfoList.Delete(taskId)
  259. }
  260. l := logic.NewTaskLogic()
  261. var sqlCommandText = ""
  262. var sqlCondition []string
  263. if status > 0 {
  264. sqlCondition = append(sqlCondition, fmt.Sprintf("`status`=%d", status))
  265. }
  266. if pause > 0 {
  267. sqlCondition = append(sqlCondition, fmt.Sprintf("`pause`=%d", pause))
  268. }
  269. sqlCommandText = "update dev_task set " + strings.Join(sqlCondition, ",") + fmt.Sprintf(" where taskid=%d", taskId)
  270. return l.SvcCtx.DevTask.Base.Exec(sqlCommandText)
  271. }