TaskServiceImpl.java 11 KB


  1. package com.ruoyi.biz.service.impl;
  2. import com.ruoyi.biz.domain.*;
  3. import com.ruoyi.biz.mapper.TwinCalcHourSpecMapper;
  4. import com.ruoyi.biz.service.*;
  5. import com.ruoyi.biz.tools.Tools;
  6. import com.ruoyi.common.utils.DateUtils;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.apache.ibatis.session.ExecutorType;
  9. import org.apache.ibatis.session.SqlSession;
  10. import org.apache.ibatis.session.SqlSessionFactory;
  11. import org.springframework.jdbc.core.JdbcTemplate;
  12. import org.springframework.stereotype.Service;
  13. import javax.annotation.Resource;
  14. import java.time.LocalDate;
  15. import java.time.LocalDateTime;
  16. import java.time.LocalTime;
  17. import java.time.ZoneOffset;
  18. import java.util.ArrayList;
  19. import java.util.Date;
  20. import java.util.List;
  21. import java.util.Map;
  22. import java.util.concurrent.ExecutionException;
  23. import java.util.concurrent.Future;
  24. import java.util.concurrent.TimeUnit;
  25. import java.util.concurrent.TimeoutException;
  26. /**
  27. * 计算任务
  28. *
  29. * @author wukai
  30. * @date 2024/5/4 20:35
  31. */
  32. @Service
  33. @Slf4j
  34. public class TaskServiceImpl implements ITaskService {
  35. @Resource
  36. private ITwinDeviceService deviceService;
  37. @Resource
  38. private IIotService iotService;
  39. @Resource
  40. private JdbcTemplate jdbcTemplate;
  41. @Resource
  42. private AsyncServiceImpl asyncService;
  43. @Resource
  44. private ITwinCalcDayService dayService;
  45. @Resource
  46. private ITwinCalcStopService stopService;
  47. @Resource
  48. private ITwinCalcAlarmsService alarmsService;
  49. @Resource
  50. private ITwinCalcHourService hourService;
  51. @Resource
  52. private SqlSessionFactory factory;
  53. /**
  54. * 从数据库最后一个时间段统计至当前的上一个偶数时间点
  55. */
  56. @Override
  57. public void calc2Curr() {
  58. TwinCalcHour lastHour = hourService.lastHour();
  59. LocalDate localDate = DateUtils.toLocalDate(lastHour.getDataDate());
  60. LocalDateTime start = LocalDateTime.of(localDate, LocalTime.MIN).plusHours(lastHour.getHour() + 1);
  61. LocalDateTime end = start.plusHours(1);
  62. LocalDateTime stop = Tools.currWholeTime();
  63. while (!end.isAfter(stop)) {
  64. start = start.minusSeconds(1);
  65. calc4device(start, end);
  66. log.info("补录数据===========start:{},end:{},stop:{}", start, end, stop);
  67. if (end.getHour() == 7) {
  68. //跨天,统计前一天的总数据
  69. log.info("----------------{},{}", start, end);
  70. dayService.calc4date(start.toLocalDate());
  71. }
  72. start = end;
  73. end = end.plusHours(1);
  74. }
  75. }
  76. /**
  77. * 统计上一个小时数据
  78. */
  79. @Override
  80. public void calcLastHour() {
  81. LocalDateTime ldt = Tools.currWholeTime();
  82. //上一个小时
  83. ldt = ldt.minusHours(1);
  84. //开始时间需要向前取一秒
  85. LocalDateTime start = ldt.minusSeconds(1);
  86. LocalDateTime end = ldt.plusHours(1);
  87. calc4device(start, end);
  88. }
  89. /**
  90. * 统计昨日数据
  91. */
  92. @Override
  93. public void calcYesterday() {
  94. LocalDate localDate = LocalDate.now().minusDays(1);
  95. calc(localDate);
  96. }
  97. /**
  98. * 统计当日数据,直到当前时间的上一个偶数时间
  99. */
  100. @Override
  101. public void calcToday() {
  102. Tools.timePeriod().forEach(pair -> {
  103. calc4device(pair.getKey(), pair.getValue());
  104. });
  105. }
  106. /**
  107. * 统计指定日期数据
  108. *
  109. * @param date 指定日期 yyyy-mm-dd
  110. */
  111. @Override
  112. public void calc(String date) {
  113. LocalDate localDate = LocalDate.parse(date);
  114. calc(localDate);
  115. }
  116. /**
  117. * 统计指定日期指定时段数据
  118. *
  119. * @param date 指定日期 yyyy-mm-dd
  120. * @param period 时段
  121. */
  122. @Override
  123. public void calc(String date, int period) {
  124. LocalDate localDate = LocalDate.parse(date);
  125. LocalDateTime ldt = LocalDateTime.of(localDate, LocalTime.MIN);
  126. LocalDateTime start = ldt.plusHours(period);
  127. LocalDateTime end = start.plusHours(1);
  128. start = start.minusSeconds(1);
  129. calc4device(start, end);
  130. }
  131. /**
  132. * 统计指定日期数据
  133. *
  134. * @param date 指定日期
  135. */
  136. @Override
  137. public void calc(LocalDate date) {
  138. Tools.timePeriod(date).forEach(pair -> {
  139. calc4device(pair.getKey(), pair.getValue());
  140. });
  141. }
  142. /**
  143. * 按设备循环计算
  144. *
  145. * @param start 开始时间戳
  146. * @param end 结束时间戳
  147. */
  148. @Override
  149. public void calc4device(LocalDateTime start, LocalDateTime end) {
  150. //开始时间都减了一秒的,这里要加回来,用于计算时段等数据
  151. LocalDateTime ldt = start.plusSeconds(1);
  152. Date date = Date.from(ldt.toLocalDate().atStartOfDay(ZoneOffset.of("+8")).toInstant());
  153. Long startTime = start.toInstant(ZoneOffset.of("+8")).toEpochMilli();
  154. Long endTime = end.toInstant(ZoneOffset.of("+8")).toEpochMilli();
  155. //获取时段范围 时间 1=0-2点 2=2-4
  156. int period = ldt.getHour();
  157. List<Object[]> calcHours = new ArrayList<>();
  158. List<TwinRecordStop> stopList = new ArrayList<>();
  159. List<TwinRecordAlarms> alarmsList = new ArrayList<>();
  160. List<TwinCalcHourSpec> specList = new ArrayList<>();
  161. List<Object[]> panList = new ArrayList<>();
  162. Date s = new Date();
  163. //为了避免多线程同时获取token导致重复执行,先执行一次获取token
  164. iotService.getToken();
  165. TwinDevice search = new TwinDevice();
  166. //查询所有在线的设备
  167. search.setOnline("1");
  168. List<TwinDevice> list = deviceService.selectTwinDeviceList(search);
  169. List<TwinDevice> errList = exec(list, date, startTime, endTime, period, calcHours, stopList, panList, alarmsList, specList);
  170. //重试2次
  171. int repTimes = 1;
  172. while (errList.size() != 0) {
  173. errList = exec(errList, date, startTime, endTime, period, calcHours, stopList, panList, alarmsList, specList);
  174. if (repTimes++ > 1) {
  175. break;
  176. }
  177. }
  178. if (specList != null && specList.size() > 0) {
  179. try (SqlSession sqlSession = factory.openSession(ExecutorType.BATCH, false)) {
  180. TwinCalcHourSpecMapper mapper = sqlSession.getMapper(TwinCalcHourSpecMapper.class);
  181. specList.forEach(mapper::insertTwinCalcHourSpec);
  182. sqlSession.commit();
  183. }
  184. }
  185. // dbProcess(endTime, calcHours, stopList, alarmsList, panList);
  186. // stopService.process(ldt, list);
  187. // alarmsService.process(ldt, list);
  188. Date d = new Date();
  189. log.info("总共消耗:{}ms", d.getTime() - s.getTime());
  190. }
  191. /**
  192. * 执行数据获取操作
  193. *
  194. * @param list 设备列表
  195. * @param date 日期
  196. * @param startTime 开始时间
  197. * @param endTime 结束时间
  198. * @param period 小时数
  199. * @param calcHourList 1小时统计数据
  200. * @param stopList 停机数据
  201. * @param panList 盘头数据
  202. * @param alarmsList 告警数据
  203. * @return 返回未获取成功的数据列表
  204. */
  205. private List<TwinDevice> exec(List<TwinDevice> list, Date date, Long startTime, Long endTime, int period, List<Object[]> calcHourList, List<TwinRecordStop> stopList, List<Object[]> panList, List<TwinRecordAlarms> alarmsList, List<TwinCalcHourSpec> specList) {
  206. List<Future<Map<String, List<?>>>> futureList = new ArrayList<>();
  207. for (int i = 0; i < list.size(); i++) {
  208. TwinDevice twinDevice = list.get(i);
  209. futureList.add(asyncService.process(twinDevice, date, startTime, endTime, period));
  210. }
  211. List<TwinDevice> errList = new ArrayList<>();
  212. try {
  213. for (Future<Map<String, List<?>>> future : futureList) {
  214. // 任务完成后获取结果
  215. try {
  216. Map<String, List<?>> map = future.get(10L, TimeUnit.SECONDS);
  217. List<TwinCalcHour> calcHours = (List<TwinCalcHour>) map.get("calc");
  218. List<TwinRecordStop> stops = (List<TwinRecordStop>) map.get("stopRecord");
  219. List<TwinRecordAlarms> alarms = (List<TwinRecordAlarms>) map.get("alarmRecord");
  220. List<TwinPanHeadInfo> panHeadInfoList = (List<TwinPanHeadInfo>) map.get("panHead");
  221. List<TwinCalcHourSpec> specs = (List<TwinCalcHourSpec>) map.get("specList");
  222. specList.addAll(specs);
  223. calcHours.forEach(calcHour -> calcHourList.add(calcHour.toArray()));
  224. stopList.addAll(stops);
  225. alarmsList.addAll(alarms);
  226. panHeadInfoList.forEach(pan -> panList.add(pan.toArray()));
  227. } catch (TimeoutException e) {
  228. errList.add(list.get(futureList.indexOf(future)));
  229. }
  230. }
  231. } catch (InterruptedException e) {
  232. throw new RuntimeException(e);
  233. } catch (ExecutionException e) {
  234. throw new RuntimeException(e);
  235. }
  236. return errList;
  237. }
  238. /**
  239. * 数据入库操作
  240. *
  241. * @param endTime 结束时间
  242. * @param calcHours 1小时统计数据
  243. * @param stopList 停机记录
  244. * @param alarmsList 告警记录
  245. * @param panList 盘头记录
  246. */
  247. private void dbProcess(long endTime, List<Object[]> calcHours, List<TwinRecordStop> stopList, List<TwinRecordAlarms> alarmsList, List<Object[]> panList) {
  248. //1小时统计数据
  249. String sql = "INSERT INTO TWIN_CALC_HOUR(DATA_DATE,HOUR,DEVICE_ID,KWH,WEIGHT,LENGTH,OPEN_TIME,CLOSE_TIME) VALUES (?,?,?,?,?,?,?,?)";
  250. jdbcTemplate.batchUpdate(sql, calcHours);
  251. //盘头数据
  252. sql = "INSERT INTO TWIN_PAN_HEAD_INFO (DEVICE_ID,PH_NUM,PH_MAX,RECORD_TIME) VALUES (?,?,?,?)";
  253. jdbcTemplate.batchUpdate(sql, panList);
  254. //执行停机记录处理
  255. stopList.forEach(stop -> {
  256. if (stop.getStartTime() != null) {
  257. jdbcTemplate.update("INSERT INTO TWIN_RECORD_STOP (DEVICE_ID,DATA_DATE,HOUR,START_TIME,STOP_TYPE,CALC_STATUS) VALUES (?,?,?,?,?,?)", stop.getDeviceId(), stop.getDataDate(), stop.getHour(), stop.getStartTime(), stop.getStopType(), "0");
  258. }
  259. if (stop.getEndTime() != null) {
  260. jdbcTemplate.update("UPDATE TWIN_RECORD_STOP SET END_TIME=? WHERE DEVICE_ID=? AND STOP_TYPE=? AND END_TIME IS NULL", stop.getEndTime(), stop.getDeviceId(), stop.getStopType());
  261. }
  262. });
  263. //补一次,所有没有结束时间的为当前结束时间
  264. jdbcTemplate.update("UPDATE TWIN_RECORD_STOP SET END_TIME=?,CALC_STATUS=1 WHERE END_TIME IS NULL", new Date(endTime));
  265. //执行告警记录处理
  266. alarmsList.forEach(alarms -> {
  267. if (alarms.getStartTime() != null) {
  268. jdbcTemplate.update("INSERT INTO TWIN_RECORD_ALARMS (DEVICE_ID,DATA_DATE,HOUR,START_TIME,ALARM_TYPE,CALC_STATUS) VALUES (?,?,?,?,?,?)", alarms.getDeviceId(), alarms.getDataDate(), alarms.getHour(), alarms.getStartTime(), alarms.getAlarmType(), "0");
  269. }
  270. if (alarms.getEndTime() != null) {
  271. jdbcTemplate.update("UPDATE TWIN_RECORD_ALARMS SET END_TIME=? WHERE DEVICE_ID=? AND ALARM_TYPE=? AND END_TIME IS NULL", alarms.getEndTime(), alarms.getDeviceId(), alarms.getAlarmType());
  272. }
  273. });
  274. //补一次,所有没有结束时间的为当前结束时间
  275. jdbcTemplate.update("UPDATE TWIN_RECORD_ALARMS SET END_TIME=?,CALC_STATUS=1 WHERE END_TIME IS NULL", new Date(endTime));
  276. }
  277. }