123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239 |
- package com.ruoyi.biz.service.impl;
- import com.ruoyi.biz.domain.TwinCalc2hr;
- import com.ruoyi.biz.domain.TwinDevice;
- import com.ruoyi.biz.domain.TwinPanHeadInfo;
- import com.ruoyi.biz.domain.TwinRecordAlarms;
- import com.ruoyi.biz.service.IIotService;
- import com.ruoyi.biz.service.ITaskService;
- import com.ruoyi.biz.service.ITwinCalc2hrService;
- import com.ruoyi.biz.service.ITwinDeviceService;
- import com.ruoyi.biz.tools.Tools;
- import com.ruoyi.common.utils.DateUtils;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.jdbc.core.JdbcTemplate;
- import org.springframework.stereotype.Service;
- import javax.annotation.Resource;
- import java.time.LocalDate;
- import java.time.LocalDateTime;
- import java.time.LocalTime;
- import java.time.ZoneOffset;
- import java.util.ArrayList;
- import java.util.Date;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Future;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.TimeoutException;
- /**
- * 计算任务
- *
- * @author wukai
- * @date 2024/5/4 20:35
- */
- @Service
- @Slf4j
- public class TaskServiceImpl implements ITaskService {
- @Resource
- private ITwinDeviceService deviceService;
- @Resource
- private ITwinCalc2hrService calc2hrService;
- @Resource
- private IIotService iotService;
- @Resource
- private JdbcTemplate jdbcTemplate;
- @Resource
- private AsyncServiceImpl asyncService;
- /**
- * 从数据库最后一个时间段统计至当前的上一个偶数时间点
- */
- @Override
- public void calc2Curr() {
- TwinCalc2hr calc2hr = calc2hrService.lastPeriod();
- LocalDate localDate = DateUtils.toLocalDate(calc2hr.getDataDate());
- Long lastPeriod = calc2hr.getTimePeriod();
- if (lastPeriod == 12L) {
- localDate = localDate.plusDays(1);
- lastPeriod = 1L;
- }
- LocalDateTime start = LocalDateTime.of(localDate, LocalTime.MIN);
- start = start.plusHours(lastPeriod * 2);
- LocalDateTime end = start.plusHours(2);
- LocalDateTime stop = Tools.currWholeTime();
- //当前时间之前的偶数时间段
- stop = stop.minusHours(stop.getHour() % 2);
- do {
- start = start.minusSeconds(1);
- calc4device(start, end);
- start = end;
- end = end.plusHours(2);
- } while (!end.isAfter(stop));
- }
- /**
- * 统计上一个时段数据
- */
- @Override
- public void calcLastPeriod() {
- LocalDateTime ldt = Tools.currWholeTime();
- //当前时间之前的偶数时间段
- ldt = ldt.minusHours(ldt.getHour() % 2).minusHours(2);
- //开始时间需要向前取一秒
- LocalDateTime start = ldt.minusSeconds(1);
- LocalDateTime end = ldt.plusHours(2);
- calc4device(start, end);
- }
- /**
- * 统计昨日数据
- */
- @Override
- public void calcYesterday() {
- LocalDate localDate = LocalDate.now().minusDays(1);
- calc(localDate);
- }
- /**
- * 统计当日数据,直到当前时间的上一个偶数时间
- */
- @Override
- public void calcToday() {
- Tools.timePeriod().forEach(map -> {
- LocalDateTime start = (LocalDateTime) map.get("start");
- LocalDateTime end = (LocalDateTime) map.get("end");
- calc4device(start, end);
- });
- }
- /**
- * 统计指定日期数据
- *
- * @param date 指定日期 yyyy-mm-dd
- */
- @Override
- public void calc(String date) {
- LocalDate localDate = LocalDate.parse(date);
- calc(localDate);
- }
- /**
- * 统计指定日期指定时段数据
- *
- * @param date 指定日期 yyyy-mm-dd
- * @param period 时段
- */
- @Override
- public void calc(String date, int period) {
- LocalDate localDate = LocalDate.parse(date);
- LocalDateTime ldt = LocalDateTime.of(localDate, LocalTime.MIN);
- LocalDateTime start = ldt.plusHours(2 * period).minusHours(2);
- LocalDateTime end = start.plusHours(2);
- start = start.minusSeconds(1);
- calc4device(start, end);
- }
- /**
- * 统计指定日期数据
- *
- * @param date 指定日期
- */
- @Override
- public void calc(LocalDate date) {
- Tools.timePeriod(date).forEach(map -> {
- LocalDateTime start = (LocalDateTime) map.get("start");
- LocalDateTime end = (LocalDateTime) map.get("end");
- calc4device(start, end);
- });
- }
- /**
- * 按设备循环计算
- *
- * @param start 开始时间戳
- * @param end 结束时间戳
- */
- private void calc4device(LocalDateTime start, LocalDateTime end) {
- //开始时间都减了一秒的,这里要加回来,用于计算时段等数据
- LocalDateTime ldt = start.plusSeconds(1);
- Date date = Date.from(ldt.toLocalDate().atStartOfDay(ZoneOffset.of("+8")).toInstant());
- Long startTime = start.toInstant(ZoneOffset.of("+8")).toEpochMilli();
- Long endTime = end.toInstant(ZoneOffset.of("+8")).toEpochMilli();
- //获取时段范围 时间 1=0-2点 2=2-4
- int period = ldt.getHour() / 2 + 1;
- List<Object[]> calc2hrList = new ArrayList<>();
- List<Object[]> recordAlarmsList = new ArrayList<>();
- List<Object[]> panList = new ArrayList<>();
- Date s = new Date();
- //为了避免多线程同时获取token导致重复执行,先执行一次获取token
- iotService.getToken();
- List<TwinDevice> list = deviceService.selectTwinDeviceList(new TwinDevice());
- List<TwinDevice> errList = exec(list, date, startTime, endTime, period, calc2hrList, recordAlarmsList, panList);
- //重试2次
- int repTimes = 1;
- while (errList.size() != 0) {
- errList = exec(list, date, startTime, endTime, period, calc2hrList, recordAlarmsList, panList);
- if (repTimes++ > 1) {
- break;
- }
- }
- Date d = new Date();
- log.info("总共消耗:{}ms", d.getTime() - s.getTime());
- String sql = "INSERT INTO TWIN_CALC_2HR (" +
- "DATA_DATE,TIME_PERIOD,DEVICE_ID,LENGTH_A,OPEN_TIME_A," +
- "CLOSE_TIME_A,LENGTH_B,CLOSE_TIME_B,OPEN_TIME_B,WEIGHT," +
- "WEIGHT_A,WEIGHT_B,ALARM,STOP1_A,STOP2_A," +
- "STOP3_A,STOP1_B,STOP2_B,STOP3_B,REMARK," +
- "KWH,KWH_A,KWH_B)" +
- " VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
- jdbcTemplate.batchUpdate(sql, calc2hrList);
- sql = "INSERT INTO TWIN_RECORD_ALARMS (DEVICE_ID,ALARM_TYPE,ALARM_CODE,DATA_TIME) VALUES (?,?,?,?)";
- jdbcTemplate.batchUpdate(sql, recordAlarmsList);
- sql = "INSERT INTO TWIN_PAN_HEAD_INFO (DEVICE_ID,PH_NUM,PH_MAX,RECORD_TIME) VALUES (?,?,?,?)";
- jdbcTemplate.batchUpdate(sql, panList);
- }
- private List<TwinDevice> exec(List<TwinDevice> list, Date date, Long startTime, Long endTime, int period, List<Object[]> calc2hrList, List<Object[]> recordAlarmsList, List<Object[]> panList) {
- List<Future<Map<String, List<?>>>> futureList = new ArrayList<>();
- for (int i = 0; i < list.size(); i++) {
- TwinDevice twinDevice = list.get(i);
- futureList.add(asyncService.process(twinDevice, date, startTime, endTime, period));
- }
- List<TwinDevice> errList = new ArrayList<>();
- try {
- for (Future<Map<String, List<?>>> future : futureList) {
- // 任务完成后获取结果
- try {
- Map<String, List<?>> map = future.get(10L, TimeUnit.SECONDS);
- List<TwinCalc2hr> calc2hrs = (List<TwinCalc2hr>) map.get("calc");
- List<TwinRecordAlarms> recordAlarms = (List<TwinRecordAlarms>) map.get("record");
- List<TwinPanHeadInfo> panHeadInfoList = (List<TwinPanHeadInfo>) map.get("panHead");
- calc2hrs.forEach(calc2hr -> calc2hrList.add(calc2hr.toArray()));
- recordAlarms.forEach(alarm -> recordAlarmsList.add(alarm.toArray()));
- panHeadInfoList.forEach(pan -> panList.add(pan.toArray()));
- } catch (TimeoutException e) {
- errList.add(list.get(futureList.indexOf(future)));
- }
- }
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new RuntimeException(e);
- }
- return errList;
- }
- }
|