TaskServiceImpl.java 8.4 KB


  1. package com.ruoyi.biz.service.impl;
  2. import com.ruoyi.biz.domain.TwinCalc2hr;
  3. import com.ruoyi.biz.domain.TwinDevice;
  4. import com.ruoyi.biz.domain.TwinPanHeadInfo;
  5. import com.ruoyi.biz.domain.TwinRecordAlarms;
  6. import com.ruoyi.biz.service.IIotService;
  7. import com.ruoyi.biz.service.ITaskService;
  8. import com.ruoyi.biz.service.ITwinCalc2hrService;
  9. import com.ruoyi.biz.service.ITwinDeviceService;
  10. import com.ruoyi.biz.tools.Tools;
  11. import com.ruoyi.common.utils.DateUtils;
  12. import lombok.extern.slf4j.Slf4j;
  13. import org.springframework.jdbc.core.JdbcTemplate;
  14. import org.springframework.stereotype.Service;
  15. import javax.annotation.Resource;
  16. import java.time.LocalDate;
  17. import java.time.LocalDateTime;
  18. import java.time.LocalTime;
  19. import java.time.ZoneOffset;
  20. import java.util.ArrayList;
  21. import java.util.Date;
  22. import java.util.List;
  23. import java.util.Map;
  24. import java.util.concurrent.ExecutionException;
  25. import java.util.concurrent.Future;
  26. import java.util.concurrent.TimeUnit;
  27. import java.util.concurrent.TimeoutException;
  28. /**
  29. * 计算任务
  30. *
  31. * @author wukai
  32. * @date 2024/5/4 20:35
  33. */
  34. @Service
  35. @Slf4j
  36. public class TaskServiceImpl implements ITaskService {
  37. @Resource
  38. private ITwinDeviceService deviceService;
  39. @Resource
  40. private ITwinCalc2hrService calc2hrService;
  41. @Resource
  42. private IIotService iotService;
  43. @Resource
  44. private JdbcTemplate jdbcTemplate;
  45. @Resource
  46. private AsyncServiceImpl asyncService;
  47. /**
  48. * 从数据库最后一个时间段统计至当前的上一个偶数时间点
  49. */
  50. @Override
  51. public void calc2Curr() {
  52. TwinCalc2hr calc2hr = calc2hrService.lastPeriod();
  53. LocalDate localDate = DateUtils.toLocalDate(calc2hr.getDataDate());
  54. Long lastPeriod = calc2hr.getTimePeriod();
  55. if (lastPeriod == 12L) {
  56. localDate = localDate.plusDays(1);
  57. lastPeriod = 1L;
  58. }
  59. LocalDateTime start = LocalDateTime.of(localDate, LocalTime.MIN);
  60. start = start.plusHours(lastPeriod * 2);
  61. LocalDateTime end = start.plusHours(2);
  62. LocalDateTime stop = Tools.currWholeTime();
  63. //当前时间之前的偶数时间段
  64. stop = stop.minusHours(stop.getHour() % 2);
  65. do {
  66. start = start.minusSeconds(1);
  67. calc4device(start, end);
  68. start = end;
  69. end = end.plusHours(2);
  70. } while (!end.isAfter(stop));
  71. }
  72. /**
  73. * 统计上一个时段数据
  74. */
  75. @Override
  76. public void calcLastPeriod() {
  77. LocalDateTime ldt = Tools.currWholeTime();
  78. //当前时间之前的偶数时间段
  79. ldt = ldt.minusHours(ldt.getHour() % 2).minusHours(2);
  80. //开始时间需要向前取一秒
  81. LocalDateTime start = ldt.minusSeconds(1);
  82. LocalDateTime end = ldt.plusHours(2);
  83. calc4device(start, end);
  84. }
  85. /**
  86. * 统计昨日数据
  87. */
  88. @Override
  89. public void calcYesterday() {
  90. LocalDate localDate = LocalDate.now().minusDays(1);
  91. calc(localDate);
  92. }
  93. /**
  94. * 统计当日数据,直到当前时间的上一个偶数时间
  95. */
  96. @Override
  97. public void calcToday() {
  98. Tools.timePeriod().forEach(map -> {
  99. LocalDateTime start = (LocalDateTime) map.get("start");
  100. LocalDateTime end = (LocalDateTime) map.get("end");
  101. calc4device(start, end);
  102. });
  103. }
  104. /**
  105. * 统计指定日期数据
  106. *
  107. * @param date 指定日期 yyyy-mm-dd
  108. */
  109. @Override
  110. public void calc(String date) {
  111. LocalDate localDate = LocalDate.parse(date);
  112. calc(localDate);
  113. }
  114. /**
  115. * 统计指定日期指定时段数据
  116. *
  117. * @param date 指定日期 yyyy-mm-dd
  118. * @param period 时段
  119. */
  120. @Override
  121. public void calc(String date, int period) {
  122. LocalDate localDate = LocalDate.parse(date);
  123. LocalDateTime ldt = LocalDateTime.of(localDate, LocalTime.MIN);
  124. LocalDateTime start = ldt.plusHours(2 * period).minusHours(2);
  125. LocalDateTime end = start.plusHours(2);
  126. start = start.minusSeconds(1);
  127. calc4device(start, end);
  128. }
  129. /**
  130. * 统计指定日期数据
  131. *
  132. * @param date 指定日期
  133. */
  134. @Override
  135. public void calc(LocalDate date) {
  136. Tools.timePeriod(date).forEach(map -> {
  137. LocalDateTime start = (LocalDateTime) map.get("start");
  138. LocalDateTime end = (LocalDateTime) map.get("end");
  139. calc4device(start, end);
  140. });
  141. }
  142. /**
  143. * 按设备循环计算
  144. *
  145. * @param start 开始时间戳
  146. * @param end 结束时间戳
  147. */
  148. private void calc4device(LocalDateTime start, LocalDateTime end) {
  149. //开始时间都减了一秒的,这里要加回来,用于计算时段等数据
  150. LocalDateTime ldt = start.plusSeconds(1);
  151. Date date = Date.from(ldt.toLocalDate().atStartOfDay(ZoneOffset.of("+8")).toInstant());
  152. Long startTime = start.toInstant(ZoneOffset.of("+8")).toEpochMilli();
  153. Long endTime = end.toInstant(ZoneOffset.of("+8")).toEpochMilli();
  154. //获取时段范围 时间 1=0-2点 2=2-4
  155. int period = ldt.getHour() / 2 + 1;
  156. List<Object[]> calc2hrList = new ArrayList<>();
  157. List<Object[]> recordAlarmsList = new ArrayList<>();
  158. List<Object[]> panList = new ArrayList<>();
  159. Date s = new Date();
  160. //为了避免多线程同时获取token导致重复执行,先执行一次获取token
  161. iotService.getToken();
  162. List<TwinDevice> list = deviceService.selectTwinDeviceList(new TwinDevice());
  163. List<TwinDevice> errList = exec(list, date, startTime, endTime, period, calc2hrList, recordAlarmsList, panList);
  164. //重试2次
  165. int repTimes = 1;
  166. while (errList.size() != 0) {
  167. errList = exec(list, date, startTime, endTime, period, calc2hrList, recordAlarmsList, panList);
  168. if (repTimes++ > 1) {
  169. break;
  170. }
  171. }
  172. Date d = new Date();
  173. log.info("总共消耗:{}ms", d.getTime() - s.getTime());
  174. String sql = "INSERT INTO TWIN_CALC_2HR (" +
  175. "DATA_DATE,TIME_PERIOD,DEVICE_ID,LENGTH_A,OPEN_TIME_A," +
  176. "CLOSE_TIME_A,LENGTH_B,CLOSE_TIME_B,OPEN_TIME_B,WEIGHT," +
  177. "WEIGHT_A,WEIGHT_B,ALARM,STOP1_A,STOP2_A," +
  178. "STOP3_A,STOP1_B,STOP2_B,STOP3_B,REMARK," +
  179. "KWH,KWH_A,KWH_B)" +
  180. " VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
  181. jdbcTemplate.batchUpdate(sql, calc2hrList);
  182. sql = "INSERT INTO TWIN_RECORD_ALARMS (DEVICE_ID,ALARM_TYPE,ALARM_CODE,DATA_TIME) VALUES (?,?,?,?)";
  183. jdbcTemplate.batchUpdate(sql, recordAlarmsList);
  184. sql = "INSERT INTO TWIN_PAN_HEAD_INFO (DEVICE_ID,PH_NUM,PH_MAX,RECORD_TIME) VALUES (?,?,?,?)";
  185. jdbcTemplate.batchUpdate(sql, panList);
  186. }
  187. private List<TwinDevice> exec(List<TwinDevice> list, Date date, Long startTime, Long endTime, int period, List<Object[]> calc2hrList, List<Object[]> recordAlarmsList, List<Object[]> panList) {
  188. List<Future<Map<String, List<?>>>> futureList = new ArrayList<>();
  189. for (int i = 0; i < list.size(); i++) {
  190. TwinDevice twinDevice = list.get(i);
  191. futureList.add(asyncService.process(twinDevice, date, startTime, endTime, period));
  192. }
  193. List<TwinDevice> errList = new ArrayList<>();
  194. try {
  195. for (Future<Map<String, List<?>>> future : futureList) {
  196. // 任务完成后获取结果
  197. try {
  198. Map<String, List<?>> map = future.get(10L, TimeUnit.SECONDS);
  199. List<TwinCalc2hr> calc2hrs = (List<TwinCalc2hr>) map.get("calc");
  200. List<TwinRecordAlarms> recordAlarms = (List<TwinRecordAlarms>) map.get("record");
  201. List<TwinPanHeadInfo> panHeadInfoList = (List<TwinPanHeadInfo>) map.get("panHead");
  202. calc2hrs.forEach(calc2hr -> calc2hrList.add(calc2hr.toArray()));
  203. recordAlarms.forEach(alarm -> recordAlarmsList.add(alarm.toArray()));
  204. panHeadInfoList.forEach(pan -> panList.add(pan.toArray()));
  205. } catch (TimeoutException e) {
  206. errList.add(list.get(futureList.indexOf(future)));
  207. }
  208. }
  209. } catch (InterruptedException e) {
  210. throw new RuntimeException(e);
  211. } catch (ExecutionException e) {
  212. throw new RuntimeException(e);
  213. }
  214. return errList;
  215. }
  216. }