123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298 |
- package com.ruoyi.biz.service.impl;
- import com.ruoyi.biz.domain.*;
- import com.ruoyi.biz.mapper.TwinCalcHourSpecMapper;
- import com.ruoyi.biz.service.*;
- import com.ruoyi.biz.tools.Tools;
- import com.ruoyi.common.utils.DateUtils;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.ibatis.session.ExecutorType;
- import org.apache.ibatis.session.SqlSession;
- import org.apache.ibatis.session.SqlSessionFactory;
- import org.springframework.jdbc.core.JdbcTemplate;
- import org.springframework.stereotype.Service;
- import javax.annotation.Resource;
- import java.time.LocalDate;
- import java.time.LocalDateTime;
- import java.time.LocalTime;
- import java.time.ZoneOffset;
- import java.util.ArrayList;
- import java.util.Date;
- import java.util.List;
- import java.util.Map;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Future;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.TimeoutException;
- /**
- * 计算任务
- *
- * @author wukai
- * @date 2024/5/4 20:35
- */
- @Service
- @Slf4j
- public class TaskServiceImpl implements ITaskService {
- @Resource
- private ITwinDeviceService deviceService;
- @Resource
- private IIotService iotService;
- @Resource
- private JdbcTemplate jdbcTemplate;
- @Resource
- private AsyncServiceImpl asyncService;
- @Resource
- private ITwinCalcDayService dayService;
- @Resource
- private ITwinCalcStopService stopService;
- @Resource
- private ITwinCalcAlarmsService alarmsService;
- @Resource
- private ITwinCalcHourService hourService;
- @Resource
- private SqlSessionFactory factory;
- /**
- * 从数据库最后一个时间段统计至当前的上一个偶数时间点
- */
- @Override
- public void calc2Curr() {
- TwinCalcHour lastHour = hourService.lastHour();
- LocalDate localDate = DateUtils.toLocalDate(lastHour.getDataDate());
- LocalDateTime start = LocalDateTime.of(localDate, LocalTime.MIN).plusHours(lastHour.getHour() + 1);
- LocalDateTime end = start.plusHours(1);
- LocalDateTime stop = Tools.currWholeTime();
- while (!end.isAfter(stop)) {
- start = start.minusSeconds(1);
- calc4device(start, end);
- log.info("补录数据===========start:{},end:{},stop:{}", start, end, stop);
- if (end.getHour() == 7) {
- //跨天,统计前一天的总数据
- log.info("----------------{},{}", start, end);
- dayService.calc4date(start.toLocalDate());
- }
- start = end;
- end = end.plusHours(1);
- }
- }
- /**
- * 统计上一个小时数据
- */
- @Override
- public void calcLastHour() {
- LocalDateTime ldt = Tools.currWholeTime();
- //上一个小时
- ldt = ldt.minusHours(1);
- //开始时间需要向前取一秒
- LocalDateTime start = ldt.minusSeconds(1);
- LocalDateTime end = ldt.plusHours(1);
- calc4device(start, end);
- }
- /**
- * 统计昨日数据
- */
- @Override
- public void calcYesterday() {
- LocalDate localDate = LocalDate.now().minusDays(1);
- calc(localDate);
- }
- /**
- * 统计当日数据,直到当前时间的上一个偶数时间
- */
- @Override
- public void calcToday() {
- Tools.timePeriod().forEach(pair -> {
- calc4device(pair.getKey(), pair.getValue());
- });
- }
- /**
- * 统计指定日期数据
- *
- * @param date 指定日期 yyyy-mm-dd
- */
- @Override
- public void calc(String date) {
- LocalDate localDate = LocalDate.parse(date);
- calc(localDate);
- }
- /**
- * 统计指定日期指定时段数据
- *
- * @param date 指定日期 yyyy-mm-dd
- * @param period 时段
- */
- @Override
- public void calc(String date, int period) {
- LocalDate localDate = LocalDate.parse(date);
- LocalDateTime ldt = LocalDateTime.of(localDate, LocalTime.MIN);
- LocalDateTime start = ldt.plusHours(period);
- LocalDateTime end = start.plusHours(1);
- start = start.minusSeconds(1);
- calc4device(start, end);
- }
- /**
- * 统计指定日期数据
- *
- * @param date 指定日期
- */
- @Override
- public void calc(LocalDate date) {
- Tools.timePeriod(date).forEach(pair -> {
- calc4device(pair.getKey(), pair.getValue());
- });
- }
- /**
- * 按设备循环计算
- *
- * @param start 开始时间戳
- * @param end 结束时间戳
- */
- @Override
- public void calc4device(LocalDateTime start, LocalDateTime end) {
- //开始时间都减了一秒的,这里要加回来,用于计算时段等数据
- LocalDateTime ldt = start.plusSeconds(1);
- Date date = Date.from(ldt.toLocalDate().atStartOfDay(ZoneOffset.of("+8")).toInstant());
- Long startTime = start.toInstant(ZoneOffset.of("+8")).toEpochMilli();
- Long endTime = end.toInstant(ZoneOffset.of("+8")).toEpochMilli();
- //获取时段范围 时间 1=0-2点 2=2-4
- int period = ldt.getHour();
- List<Object[]> calcHours = new ArrayList<>();
- List<TwinRecordStop> stopList = new ArrayList<>();
- List<TwinRecordAlarms> alarmsList = new ArrayList<>();
- List<TwinCalcHourSpec> specList = new ArrayList<>();
- List<Object[]> panList = new ArrayList<>();
- Date s = new Date();
- //为了避免多线程同时获取token导致重复执行,先执行一次获取token
- iotService.getToken();
- TwinDevice search = new TwinDevice();
- //查询所有在线的设备
- search.setOnline("1");
- List<TwinDevice> list = deviceService.selectTwinDeviceList(search);
- List<TwinDevice> errList = exec(list, date, startTime, endTime, period, calcHours, stopList, panList, alarmsList, specList);
- //重试2次
- int repTimes = 1;
- while (errList.size() != 0) {
- errList = exec(errList, date, startTime, endTime, period, calcHours, stopList, panList, alarmsList, specList);
- if (repTimes++ > 1) {
- break;
- }
- }
- if (specList != null && specList.size() > 0) {
- try (SqlSession sqlSession = factory.openSession(ExecutorType.BATCH, false)) {
- TwinCalcHourSpecMapper mapper = sqlSession.getMapper(TwinCalcHourSpecMapper.class);
- specList.forEach(mapper::insertTwinCalcHourSpec);
- sqlSession.commit();
- }
- }
- // dbProcess(endTime, calcHours, stopList, alarmsList, panList);
- // stopService.process(ldt, list);
- // alarmsService.process(ldt, list);
- Date d = new Date();
- log.info("总共消耗:{}ms", d.getTime() - s.getTime());
- }
- /**
- * 执行数据获取操作
- *
- * @param list 设备列表
- * @param date 日期
- * @param startTime 开始时间
- * @param endTime 结束时间
- * @param period 小时数
- * @param calcHourList 1小时统计数据
- * @param stopList 停机数据
- * @param panList 盘头数据
- * @param alarmsList 告警数据
- * @return 返回未获取成功的数据列表
- */
- private List<TwinDevice> exec(List<TwinDevice> list, Date date, Long startTime, Long endTime, int period, List<Object[]> calcHourList, List<TwinRecordStop> stopList, List<Object[]> panList, List<TwinRecordAlarms> alarmsList, List<TwinCalcHourSpec> specList) {
- List<Future<Map<String, List<?>>>> futureList = new ArrayList<>();
- for (int i = 0; i < list.size(); i++) {
- TwinDevice twinDevice = list.get(i);
- futureList.add(asyncService.process(twinDevice, date, startTime, endTime, period));
- }
- List<TwinDevice> errList = new ArrayList<>();
- try {
- for (Future<Map<String, List<?>>> future : futureList) {
- // 任务完成后获取结果
- try {
- Map<String, List<?>> map = future.get(10L, TimeUnit.SECONDS);
- List<TwinCalcHour> calcHours = (List<TwinCalcHour>) map.get("calc");
- List<TwinRecordStop> stops = (List<TwinRecordStop>) map.get("stopRecord");
- List<TwinRecordAlarms> alarms = (List<TwinRecordAlarms>) map.get("alarmRecord");
- List<TwinPanHeadInfo> panHeadInfoList = (List<TwinPanHeadInfo>) map.get("panHead");
- List<TwinCalcHourSpec> specs = (List<TwinCalcHourSpec>) map.get("specList");
- specList.addAll(specs);
- calcHours.forEach(calcHour -> calcHourList.add(calcHour.toArray()));
- stopList.addAll(stops);
- alarmsList.addAll(alarms);
- panHeadInfoList.forEach(pan -> panList.add(pan.toArray()));
- } catch (TimeoutException e) {
- errList.add(list.get(futureList.indexOf(future)));
- }
- }
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new RuntimeException(e);
- }
- return errList;
- }
- /**
- * 数据入库操作
- *
- * @param endTime 结束时间
- * @param calcHours 1小时统计数据
- * @param stopList 停机记录
- * @param alarmsList 告警记录
- * @param panList 盘头记录
- */
- private void dbProcess(long endTime, List<Object[]> calcHours, List<TwinRecordStop> stopList, List<TwinRecordAlarms> alarmsList, List<Object[]> panList) {
- //1小时统计数据
- String sql = "INSERT INTO TWIN_CALC_HOUR(DATA_DATE,HOUR,DEVICE_ID,KWH,WEIGHT,LENGTH,OPEN_TIME,CLOSE_TIME) VALUES (?,?,?,?,?,?,?,?)";
- jdbcTemplate.batchUpdate(sql, calcHours);
- //盘头数据
- sql = "INSERT INTO TWIN_PAN_HEAD_INFO (DEVICE_ID,PH_NUM,PH_MAX,RECORD_TIME) VALUES (?,?,?,?)";
- jdbcTemplate.batchUpdate(sql, panList);
- //执行停机记录处理
- stopList.forEach(stop -> {
- if (stop.getStartTime() != null) {
- jdbcTemplate.update("INSERT INTO TWIN_RECORD_STOP (DEVICE_ID,DATA_DATE,HOUR,START_TIME,STOP_TYPE,CALC_STATUS) VALUES (?,?,?,?,?,?)", stop.getDeviceId(), stop.getDataDate(), stop.getHour(), stop.getStartTime(), stop.getStopType(), "0");
- }
- if (stop.getEndTime() != null) {
- jdbcTemplate.update("UPDATE TWIN_RECORD_STOP SET END_TIME=? WHERE DEVICE_ID=? AND STOP_TYPE=? AND END_TIME IS NULL", stop.getEndTime(), stop.getDeviceId(), stop.getStopType());
- }
- });
- //补一次,所有没有结束时间的为当前结束时间
- jdbcTemplate.update("UPDATE TWIN_RECORD_STOP SET END_TIME=?,CALC_STATUS=1 WHERE END_TIME IS NULL", new Date(endTime));
- //执行告警记录处理
- alarmsList.forEach(alarms -> {
- if (alarms.getStartTime() != null) {
- jdbcTemplate.update("INSERT INTO TWIN_RECORD_ALARMS (DEVICE_ID,DATA_DATE,HOUR,START_TIME,ALARM_TYPE,CALC_STATUS) VALUES (?,?,?,?,?,?)", alarms.getDeviceId(), alarms.getDataDate(), alarms.getHour(), alarms.getStartTime(), alarms.getAlarmType(), "0");
- }
- if (alarms.getEndTime() != null) {
- jdbcTemplate.update("UPDATE TWIN_RECORD_ALARMS SET END_TIME=? WHERE DEVICE_ID=? AND ALARM_TYPE=? AND END_TIME IS NULL", alarms.getEndTime(), alarms.getDeviceId(), alarms.getAlarmType());
- }
- });
- //补一次,所有没有结束时间的为当前结束时间
- jdbcTemplate.update("UPDATE TWIN_RECORD_ALARMS SET END_TIME=?,CALC_STATUS=1 WHERE END_TIME IS NULL", new Date(endTime));
- }
- }
|