package com.ruoyi.biz.service.impl; import com.ruoyi.biz.domain.TwinCalc2hr; import com.ruoyi.biz.domain.TwinDevice; import com.ruoyi.biz.domain.TwinPanHeadInfo; import com.ruoyi.biz.domain.TwinRecordAlarms; import com.ruoyi.biz.service.IIotService; import com.ruoyi.biz.service.ITaskService; import com.ruoyi.biz.service.ITwinCalc2hrService; import com.ruoyi.biz.service.ITwinDeviceService; import com.ruoyi.biz.tools.Tools; import com.ruoyi.common.utils.DateUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.ZoneOffset; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * 计算任务 * * @author wukai * @date 2024/5/4 20:35 */ @Service @Slf4j public class TaskServiceImpl implements ITaskService { @Resource private ITwinDeviceService deviceService; @Resource private ITwinCalc2hrService calc2hrService; @Resource private IIotService iotService; @Resource private JdbcTemplate jdbcTemplate; @Resource private AsyncServiceImpl asyncService; /** * 从数据库最后一个时间段统计至当前的上一个偶数时间点 */ @Override public void calc2Curr() { TwinCalc2hr calc2hr = calc2hrService.lastPeriod(); LocalDate localDate = DateUtils.toLocalDate(calc2hr.getDataDate()); Long lastPeriod = calc2hr.getTimePeriod(); if (lastPeriod == 12L) { localDate = localDate.plusDays(1); lastPeriod = 1L; } LocalDateTime start = LocalDateTime.of(localDate, LocalTime.MIN); start = start.plusHours(lastPeriod * 2); LocalDateTime end = start.plusHours(2); LocalDateTime stop = Tools.currWholeTime(); //当前时间之前的偶数时间段 stop = stop.minusHours(stop.getHour() % 2); do { start = start.minusSeconds(1); calc4device(start, end); start = end; end = end.plusHours(2); } while (!end.isAfter(stop)); } /** * 统计上一个时段数据 */ @Override public void calcLastPeriod() { LocalDateTime ldt = Tools.currWholeTime(); //当前时间之前的偶数时间段 ldt = ldt.minusHours(ldt.getHour() % 2).minusHours(2); //开始时间需要向前取一秒 LocalDateTime start = ldt.minusSeconds(1); LocalDateTime end = ldt.plusHours(2); calc4device(start, end); } /** * 统计昨日数据 */ @Override public void calcYesterday() { LocalDate localDate = LocalDate.now().minusDays(1); calc(localDate); } /** * 统计当日数据,直到当前时间的上一个偶数时间 */ @Override public void calcToday() { Tools.timePeriod().forEach(map -> { LocalDateTime start = (LocalDateTime) map.get("start"); LocalDateTime end = (LocalDateTime) map.get("end"); calc4device(start, end); }); } /** * 统计指定日期数据 * * @param date 指定日期 yyyy-mm-dd */ @Override public void calc(String date) { LocalDate localDate = LocalDate.parse(date); calc(localDate); } /** * 统计指定日期指定时段数据 * * @param date 指定日期 yyyy-mm-dd * @param period 时段 */ @Override public void calc(String date, int period) { LocalDate localDate = LocalDate.parse(date); LocalDateTime ldt = LocalDateTime.of(localDate, LocalTime.MIN); LocalDateTime start = ldt.plusHours(2 * period).minusHours(2); LocalDateTime end = start.plusHours(2); start = start.minusSeconds(1); calc4device(start, end); } /** * 统计指定日期数据 * * @param date 指定日期 */ @Override public void calc(LocalDate date) { Tools.timePeriod(date).forEach(map -> { LocalDateTime start = (LocalDateTime) map.get("start"); LocalDateTime end = (LocalDateTime) map.get("end"); calc4device(start, end); }); } /** * 按设备循环计算 * * @param start 开始时间戳 * @param end 结束时间戳 */ private void calc4device(LocalDateTime start, LocalDateTime end) { //开始时间都减了一秒的,这里要加回来,用于计算时段等数据 LocalDateTime ldt = start.plusSeconds(1); Date date = Date.from(ldt.toLocalDate().atStartOfDay(ZoneOffset.of("+8")).toInstant()); Long startTime = start.toInstant(ZoneOffset.of("+8")).toEpochMilli(); Long endTime = end.toInstant(ZoneOffset.of("+8")).toEpochMilli(); //获取时段范围 时间 1=0-2点 2=2-4 int period = ldt.getHour() / 2 + 1; List calc2hrList = new ArrayList<>(); List recordAlarmsList = new ArrayList<>(); List panList = new ArrayList<>(); Date s = new Date(); //为了避免多线程同时获取token导致重复执行,先执行一次获取token iotService.getToken(); List list = deviceService.selectTwinDeviceList(new TwinDevice()); List errList = exec(list, date, startTime, endTime, period, calc2hrList, recordAlarmsList, panList); //重试2次 int repTimes = 1; while (errList.size() != 0) { errList = exec(list, date, startTime, endTime, period, calc2hrList, recordAlarmsList, panList); if (repTimes++ > 1) { break; } } Date d = new Date(); log.info("总共消耗:{}ms", d.getTime() - s.getTime()); String sql = "INSERT INTO TWIN_CALC_2HR (" + "DATA_DATE,TIME_PERIOD,DEVICE_ID,LENGTH_A,OPEN_TIME_A," + "CLOSE_TIME_A,LENGTH_B,CLOSE_TIME_B,OPEN_TIME_B,WEIGHT," + "WEIGHT_A,WEIGHT_B,ALARM,STOP1_A,STOP2_A," + "STOP3_A,STOP1_B,STOP2_B,STOP3_B,REMARK," + "KWH,KWH_A,KWH_B)" + " VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; jdbcTemplate.batchUpdate(sql, calc2hrList); sql = "INSERT INTO TWIN_RECORD_ALARMS (DEVICE_ID,ALARM_TYPE,ALARM_CODE,DATA_TIME) VALUES (?,?,?,?)"; jdbcTemplate.batchUpdate(sql, recordAlarmsList); sql = "INSERT INTO TWIN_PAN_HEAD_INFO (DEVICE_ID,PH_NUM,PH_MAX,RECORD_TIME) VALUES (?,?,?,?)"; jdbcTemplate.batchUpdate(sql, panList); } private List exec(List list, Date date, Long startTime, Long endTime, int period, List calc2hrList, List recordAlarmsList, List panList) { List>>> futureList = new ArrayList<>(); for (int i = 0; i < list.size(); i++) { TwinDevice twinDevice = list.get(i); futureList.add(asyncService.process(twinDevice, date, startTime, endTime, period)); } List errList = new ArrayList<>(); try { for (Future>> future : futureList) { // 任务完成后获取结果 try { Map> map = future.get(10L, TimeUnit.SECONDS); List calc2hrs = (List) map.get("calc"); List recordAlarms = (List) map.get("record"); List panHeadInfoList = (List) map.get("panHead"); calc2hrs.forEach(calc2hr -> calc2hrList.add(calc2hr.toArray())); recordAlarms.forEach(alarm -> recordAlarmsList.add(alarm.toArray())); panHeadInfoList.forEach(pan -> panList.add(pan.toArray())); } catch (TimeoutException e) { errList.add(list.get(futureList.indexOf(future))); } } } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { throw new RuntimeException(e); } return errList; } }