package com.ruoyi.biz.service.impl; import com.ruoyi.biz.domain.*; import com.ruoyi.biz.mapper.TwinCalcHourSpecMapper; import com.ruoyi.biz.service.*; import com.ruoyi.biz.tools.Tools; import com.ruoyi.common.utils.DateUtils; import lombok.extern.slf4j.Slf4j; import org.apache.ibatis.session.ExecutorType; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * 计算任务 * * @author wukai * @date 2024/5/4 20:35 */ @Service @Slf4j public class TaskServiceImpl implements ITaskService { @Resource private ITwinDeviceService deviceService; @Resource private IIotService iotService; @Resource private JdbcTemplate jdbcTemplate; @Resource private AsyncServiceImpl asyncService; @Resource private ITwinCalcDayService dayService; @Resource private ITwinCalcStopService stopService; @Resource private ITwinCalcAlarmsService alarmsService; @Resource private ITwinCalcHourService hourService; @Resource private SqlSessionFactory factory; /** * 从数据库最后一个时间段统计至当前的上一个偶数时间点 */ @Override public void calc2Curr() { TwinCalcHour lastHour = hourService.lastHour(); LocalDate localDate = DateUtils.toLocalDate(lastHour.getDataDate()); LocalDateTime start = LocalDateTime.of(localDate, LocalTime.MIN).plusHours(lastHour.getHour() + 1); LocalDateTime end = start.plusHours(1); LocalDateTime stop = Tools.currWholeTime(); while (!end.isAfter(stop)) { start = start.minusSeconds(1); calc4device(start, end); log.info("补录数据===========start:{},end:{},stop:{}", start, end, stop); if (end.getHour() == 7) { //跨天,统计前一天的总数据 log.info("----------------{},{}", start, end); dayService.calc4date(start.toLocalDate()); } start = end; end = end.plusHours(1); } } /** * 统计上一个小时数据 */ @Override public void calcLastHour() { LocalDateTime ldt = Tools.currWholeTime(); //上一个小时 ldt = ldt.minusHours(1); //开始时间需要向前取一秒 LocalDateTime start = ldt.minusSeconds(1); LocalDateTime end = ldt.plusHours(1); calc4device(start, end); } /** * 统计昨日数据 */ @Override public void calcYesterday() { LocalDate localDate = LocalDate.now().minusDays(1); calc(localDate); } /** * 统计当日数据,直到当前时间的上一个偶数时间 */ @Override public void calcToday() { Tools.timePeriod().forEach(pair -> { calc4device(pair.getKey(), pair.getValue()); }); } /** * 统计指定日期数据 * * @param date 指定日期 yyyy-mm-dd */ @Override public void calc(String date) { LocalDate localDate = LocalDate.parse(date); calc(localDate); } /** * 统计指定日期指定时段数据 * * @param date 指定日期 yyyy-mm-dd * @param period 时段 */ @Override public void calc(String date, int period) { LocalDate localDate = LocalDate.parse(date); LocalDateTime ldt = LocalDateTime.of(localDate, LocalTime.MIN); LocalDateTime start = ldt.plusHours(period); LocalDateTime end = start.plusHours(1); start = start.minusSeconds(1); calc4device(start, end); } /** * 统计指定日期数据 * * @param date 指定日期 */ @Override public void calc(LocalDate date) { Tools.timePeriod(date).forEach(pair -> { calc4device(pair.getKey(), pair.getValue()); }); } /** * 按设备循环计算 * * @param start 开始时间戳 * @param end 结束时间戳 */ @Override public void calc4device(LocalDateTime start, LocalDateTime end) { //开始时间都减了一秒的,这里要加回来,用于计算时段等数据 LocalDateTime ldt = start.plusSeconds(1); Date date = Date.from(ldt.toLocalDate().atStartOfDay(ZoneOffset.of("+8")).toInstant()); Long startTime = start.toInstant(ZoneOffset.of("+8")).toEpochMilli(); Long endTime = end.toInstant(ZoneOffset.of("+8")).toEpochMilli(); //获取时段范围 时间 1=0-2点 2=2-4 int period = ldt.getHour(); List calcHours = new ArrayList<>(); List stopList = new ArrayList<>(); List alarmsList = new ArrayList<>(); List specList = new ArrayList<>(); List panList = new ArrayList<>(); Date s = new Date(); //为了避免多线程同时获取token导致重复执行,先执行一次获取token iotService.getToken(); TwinDevice search = new TwinDevice(); //查询所有在线的设备 search.setOnline("1"); List list = deviceService.selectTwinDeviceList(search); List errList = exec(list, date, startTime, endTime, period, calcHours, stopList, panList, alarmsList, specList); //重试2次 int repTimes = 1; while (errList.size() != 0) { errList = exec(errList, date, startTime, endTime, period, calcHours, stopList, panList, alarmsList, specList); if (repTimes++ > 1) { break; } } if (specList != null && specList.size() > 0) { try (SqlSession sqlSession = factory.openSession(ExecutorType.BATCH, false)) { TwinCalcHourSpecMapper mapper = sqlSession.getMapper(TwinCalcHourSpecMapper.class); specList.forEach(mapper::insertTwinCalcHourSpec); sqlSession.commit(); } } // dbProcess(endTime, calcHours, stopList, alarmsList, panList); // stopService.process(ldt, list); // alarmsService.process(ldt, list); Date d = new Date(); log.info("总共消耗:{}ms", d.getTime() - s.getTime()); } /** * 执行数据获取操作 * * @param list 设备列表 * @param date 日期 * @param startTime 开始时间 * @param endTime 结束时间 * @param period 小时数 * @param calcHourList 1小时统计数据 * @param stopList 停机数据 * @param panList 盘头数据 * @param alarmsList 告警数据 * @return 返回未获取成功的数据列表 */ private List exec(List list, Date date, Long startTime, Long endTime, int period, List calcHourList, List stopList, List panList, List alarmsList, List specList) { List>>> futureList = new ArrayList<>(); for (int i = 0; i < list.size(); i++) { TwinDevice twinDevice = list.get(i); futureList.add(asyncService.process(twinDevice, date, startTime, endTime, period)); } List errList = new ArrayList<>(); try { for (Future>> future : futureList) { // 任务完成后获取结果 try { Map> map = future.get(10L, TimeUnit.SECONDS); List calcHours = (List) map.get("calc"); List stops = (List) map.get("stopRecord"); List alarms = (List) map.get("alarmRecord"); List panHeadInfoList = (List) map.get("panHead"); List specs = (List) map.get("specList"); specList.addAll(specs); calcHours.forEach(calcHour -> calcHourList.add(calcHour.toArray())); stopList.addAll(stops); alarmsList.addAll(alarms); panHeadInfoList.forEach(pan -> panList.add(pan.toArray())); } catch (TimeoutException e) { errList.add(list.get(futureList.indexOf(future))); } } } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } return errList; } /** * 数据入库操作 * * @param endTime 结束时间 * @param calcHours 1小时统计数据 * @param stopList 停机记录 * @param alarmsList 告警记录 * @param panList 盘头记录 */ private void dbProcess(long endTime, List calcHours, List stopList, List alarmsList, List panList) { //1小时统计数据 String sql = "INSERT INTO TWIN_CALC_HOUR(DATA_DATE,HOUR,DEVICE_ID,KWH,WEIGHT,LENGTH,OPEN_TIME,CLOSE_TIME) VALUES (?,?,?,?,?,?,?,?)"; jdbcTemplate.batchUpdate(sql, calcHours); //盘头数据 sql = "INSERT INTO TWIN_PAN_HEAD_INFO (DEVICE_ID,PH_NUM,PH_MAX,RECORD_TIME) VALUES (?,?,?,?)"; jdbcTemplate.batchUpdate(sql, panList); //执行停机记录处理 stopList.forEach(stop -> { if (stop.getStartTime() != null) { jdbcTemplate.update("INSERT INTO TWIN_RECORD_STOP (DEVICE_ID,DATA_DATE,HOUR,START_TIME,STOP_TYPE,CALC_STATUS) VALUES (?,?,?,?,?,?)", stop.getDeviceId(), stop.getDataDate(), stop.getHour(), stop.getStartTime(), stop.getStopType(), "0"); } if (stop.getEndTime() != null) { jdbcTemplate.update("UPDATE TWIN_RECORD_STOP SET END_TIME=? WHERE DEVICE_ID=? AND STOP_TYPE=? AND END_TIME IS NULL", stop.getEndTime(), stop.getDeviceId(), stop.getStopType()); } }); //补一次,所有没有结束时间的为当前结束时间 jdbcTemplate.update("UPDATE TWIN_RECORD_STOP SET END_TIME=?,CALC_STATUS=1 WHERE END_TIME IS NULL", new Date(endTime)); //执行告警记录处理 alarmsList.forEach(alarms -> { if (alarms.getStartTime() != null) { jdbcTemplate.update("INSERT INTO TWIN_RECORD_ALARMS (DEVICE_ID,DATA_DATE,HOUR,START_TIME,ALARM_TYPE,CALC_STATUS) VALUES (?,?,?,?,?,?)", alarms.getDeviceId(), alarms.getDataDate(), alarms.getHour(), alarms.getStartTime(), alarms.getAlarmType(), "0"); } if (alarms.getEndTime() != null) { jdbcTemplate.update("UPDATE TWIN_RECORD_ALARMS SET END_TIME=? WHERE DEVICE_ID=? AND ALARM_TYPE=? AND END_TIME IS NULL", alarms.getEndTime(), alarms.getDeviceId(), alarms.getAlarmType()); } }); //补一次,所有没有结束时间的为当前结束时间 jdbcTemplate.update("UPDATE TWIN_RECORD_ALARMS SET END_TIME=?,CALC_STATUS=1 WHERE END_TIME IS NULL", new Date(endTime)); } }