package com.ruoyi.biz.service.impl; import cn.hutool.json.JSONArray; import cn.hutool.json.JSONObject; import com.ruoyi.biz.domain.*; import com.ruoyi.biz.service.IIotService; import com.ruoyi.biz.tools.Tools; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.math.BigDecimal; import java.math.RoundingMode; import java.util.*; import java.util.concurrent.Future; import java.util.stream.Collectors; /** * 多线程执行任务 * * @author wukai * @date 2024/5/4 20:35 */ @Service @Slf4j public class AsyncServiceImpl { @Resource private IIotService iotService; @Async("threadPoolTaskExecutor") public Future> currData(TwinDevice twinDevice) { String table = twinDevice.getDevicePath(); String sql = "select last * from " + table; JSONObject jsonObject = iotService.query(sql); JSONObject data = jsonObject.getJSONObject("data"); JSONArray values = data.getJSONArray("values"); Map dataMap = Tools.json2Map(values, table); dataMap.put("device", twinDevice); dataMap.put("total", values.size()); return new AsyncResult<>(dataMap); } @Async("threadPoolTaskExecutor") public Future>> process(TwinDevice twinDevice, Date date, long startTime, long endTime, int period) { Map> result = new HashMap<>(16); List calcHours = new ArrayList<>(); String table = twinDevice.getDevicePath(); TwinCalcHour hour = new TwinCalcHour(); hour.setDeviceId(twinDevice.getDeviceId()); hour.setDataDate(date); hour.setHour(period); Map map = calc(table, startTime, endTime); //0.已织造米数 1.总重量 2.总能耗 float[] total = (float[]) map.get("total"); hour.setLength(BigDecimal.valueOf(total[0])); hour.setWeight(BigDecimal.valueOf(total[1])); hour.setKwh(BigDecimal.valueOf(total[2])); hour.setOpenTime((long) total[3]); hour.setCloseTime((long) total[4]); calcHours.add(hour); List panHeadInfo = (List) map.get("panHead"); panHeadInfo.forEach(info -> info.setDeviceId(twinDevice.getDeviceId())); List stopRecord = (List) map.get("stopRecord"); stopRecord.forEach(stop -> { stop.setDeviceId(twinDevice.getDeviceId()); stop.setDataDate(date); stop.setHour(period); }); List alarmRecord = (List) map.get("alarmRecord"); alarmRecord.forEach(alarm -> { alarm.setDeviceId(twinDevice.getDeviceId()); alarm.setDataDate(date); alarm.setHour(period); }); result.put("calc", calcHours); result.put("stopRecord", stopRecord); result.put("alarmRecord", alarmRecord); result.put("panHead", panHeadInfo); return new AsyncResult<>(result); } /** * 字段列表,方便看下标 */ private final String[] fields = {"Capacity_data_2", "Capacity_data_37", "Capacity_data_38", "Capacity_data_39", "Capacity_data_42", "Capacity_data_43", "Capacity_data_44", "Capacity_data_48", "Formula_data_3", "Formula_data_13", "Capacity_data_36", "Capacity_data_41", "Alarm_unit_1", "Alarm_unit_2", "Alarm_unit_3", "Alarm_unit_4", "Alarm_unit_5", "Alarm_unit_6", "Alarm_unit_7", "Alarm_unit_8", "Alarm_unit_9", "Alarm_unit_10", "Alarm_unit_11", "Alarm_unit_12", "Alarm_unit_13", "Alarm_unit_14", "Alarm_unit_15", "Alarm_unit_16", "Alarm_unit_17", "Alarm_unit_18", "Alarm_unit_19", "Alarm_unit_20", "Alarm_unit_21", "Alarm_unit_22", "Alarm_unit_23", "Alarm_unit_24", "Alarm_unit_25", "Alarm_unit_26", "Alarm_unit_27", "Capacity_data_33", "Capacity_data_15", "Capacity_data_16", "Capacity_data_17", "Capacity_data_18", "Capacity_data_19" }; /** * 字段列表,方便查找位置 */ private final List fieldList = Arrays.stream(fields).collect(Collectors.toList()); /** * // 0 Capacity_data_2 已织造米数 * // 1 Capacity_data_37 A班组开机时间 * // 2 Capacity_data_38 A班组停机时间 * // 3 Capacity_data_39 A班当前产量 * // 4 Capacity_data_42 B班组开机时间 * // 5 Capacity_data_43 B班组停机时间 * // 6 Capacity_data_44 B班当前产量 * // 7 Capacity_data_48 停机状态 * // 8 Formula_data_3 米克重 * // 9 Formula_data_13 卷曲幅宽 */ public Map calc(String table, long startTime, long endTime) { String sql = "select %s from %s where time>%s and time <=%s"; sql = String.format(sql, Arrays.stream(fields).collect(Collectors.joining(",")), table, startTime, endTime); long s = System.currentTimeMillis(); JSONObject jsonObject = iotService.query(sql); long e = System.currentTimeMillis(); log.info("接口耗时:{}ms,table:{},time:{}", e - s, table, new Date(endTime)); JSONObject data = jsonObject.getJSONObject("data"); JSONArray values = data.getJSONArray("values"); JSONArray timestamps = data.getJSONArray("timestamps"); //初始时间点数据 //0-data2,1-data37,2-data38,3-data39,4-data42,5-data-43,6data-44 //2024-06-26 1-6不要了 0织造米长 float[] first = new float[1]; //上一个时间点数据 float[] last = new float[1]; //统计数据,后面2个分别代表0.织造米长 1.重量,2.电量,3.开机时间,4.停机时间 float[] total = new float[5]; //上一个时间点盘头数据 int[] lastPanHead = new int[5]; //米克重 int lastMkz = 0; //卷曲幅宽 float lastFk = 0f; int last48 = 0; //上一轮最后一条停机记录 JSONArray old = null; //上一轮最后一条告警记录 // Object oldV = CacheUtils.get(Constants.IOT_TOKEN, table + "-stop"); // if (oldV != null) { // //获取存储的最后一次停机状态值 // old = (JSONArray) oldV; // } //告警时间记录 List alarmRecord = new ArrayList<>(); //停机记录 List stopRecord = new ArrayList<>(); //盘头记录 List panHeadInfo = new ArrayList<>(); for (int i = 0; i < timestamps.size(); i++) { JSONArray da = values.getJSONArray(i); //0-data2,1-data37,2-data38,3-data39,4-data42,5-data-43,6data-44 //当前时间数据 float[] curr = {da.getFloat(0)}; int[] currPan = {da.getInt(fieldList.indexOf("Capacity_data_15")), da.getInt(fieldList.indexOf("Capacity_data_16")), da.getInt(fieldList.indexOf("Capacity_data_17")), da.getInt(fieldList.indexOf("Capacity_data_18")), da.getInt(fieldList.indexOf("Capacity_data_19"))}; int curr48 = da.getInt(7); if (i == 0) { //第一次数据是上次最后一条,只做记录用,不做处理 first = curr.clone(); last = curr.clone(); lastPanHead = currPan.clone(); lastMkz = da.getInt(8); lastFk = da.getFloat(9); last48 = curr48; continue; } //计算电量 calcKwh(da, total); //计算盘头 calcPan(currPan, lastPanHead, panHeadInfo, timestamps.getLong(i)); for (int j = 0; j < first.length; j++) { //如果当前值为小于上一个,且上一个值不为0,则计算 //因为会出现数据波动,停机再启动之后的第一个点不为0,为0.00几几几的 //计算累加类的数据 if (curr[j] < last[j] && last[j] != 0f) { calcTotal(j, last, first, total, lastMkz, lastFk); first[j] = curr[j]; } } calcAlarms(values, old, timestamps.getLong(i), alarmRecord, i); calcStops(curr48, last48, timestamps.getLong(i), total, stopRecord, old, i); //复制数组,设置last值为当前值 last = curr.clone(); lastPanHead = currPan.clone(); lastMkz = da.getInt(8); lastFk = da.getFloat(9); last48 = curr48; } //最后再补一次计算 for (int j = 0; j < first.length; j++) { calcTotal(j, last, first, total, lastMkz, lastFk); } //存入最后一条记录的停机状态 // CacheUtils.put(Constants.IOT_TOKEN, table, values.getJSONArray(values.size() - 1)); total[3] = (endTime - startTime) / 1000 + 1 - total[4]; Map result = new HashMap<>(16); result.put("total", total); result.put("stopRecord", stopRecord); result.put("alarmRecord", alarmRecord); result.put("panHead", panHeadInfo); return result; } /** * 计算盘头信息 * 数组 分别是GB1-GB5 * * @param currPan 当前剩余圈数 * @param lastPanHead 上一条记录剩余圈数 * @param panHeadInfo info * @param time 时间戳 */ private void calcPan(int[] currPan, int[] lastPanHead, List panHeadInfo, Long time) { //如果当前记录大于上一条记录,则证明是重新叫料了,需要记录下当前值 for (int i = 0; i < currPan.length; i++) { if (currPan[i] > lastPanHead[i]) { TwinPanHeadInfo info = new TwinPanHeadInfo(); info.setRecordTime(new Date(time)); info.setPhNum((long) (i + 1)); info.setPhMax((long) currPan[i]); panHeadInfo.add(info); } } } /** * 能耗计算 * total[2].总电量 * * @param da 当前数据 * @param total 统计数组 */ private void calcKwh(JSONArray da, float[] total) { BigDecimal v = da.getBigDecimal(fieldList.indexOf("Capacity_data_33")); //单位为W 换算成当前这一秒的用电量 需要除以1000变成千瓦,再除以3600 变成千瓦时 v = v.divide(BigDecimal.valueOf(3600), 5, RoundingMode.HALF_UP); v = v.divide(BigDecimal.valueOf(1000), 5, RoundingMode.HALF_UP); float kwh = v.floatValue(); total[2] = total[2] + kwh; } /** * 停机次数计算 */ /** * 停机处理 * * @param curr 当前值 * @param last 上一条记录的值 * @param time 时间 * @param total 统计 * @param stopRecord 记录 * @param oldV 上次的最后一条记录的值 * @param i 当前时序 */ private void calcStops(int curr, int last, long time, float[] total, List stopRecord, JSONArray oldV, int i) { //取消毫秒 time = time / 1000 * 1000; TwinRecordStop stop = new TwinRecordStop(); //如果第一条记录为0,需要判断之前有木有没有结束时间的停机 //暂时取消这个判断 // if (i == 1 && oldV != null) { // int old = oldV.getInt(7); // if (last != old) { // stop.setStopType(old); // stop.setEndTime(new Date(time)); // stopRecord.add(stop); // return; // } // } //如果第一条记录不为0,则直接记录停机开始时间 if (i == 1 && curr != 0) { stop = new TwinRecordStop(); //记录停机开始时间 stop.setStopType(curr); stop.setStartTime(new Date(time)); stopRecord.add(stop); return; } if (curr != 0) { total[4]++; if (curr != last) { if (last != 0) { //记录上个停机类型的结束时间 stop.setStopType(last); stop.setEndTime(new Date(time)); stopRecord.add(stop); } stop = new TwinRecordStop(); //记录停机开始时间 stop.setStopType(curr); stop.setStartTime(new Date(time)); stopRecord.add(stop); } } else if (last != 0) { //判断停机结束之后的开机 stop.setStopType(last); stop.setEndTime(new Date(time)); stopRecord.add(stop); } } /** * 告警计算 * * @param values 所有记录 * @param old 上次存储的 * @param time 当前时间 * @param alarmRecord 告警记录 * @param i 时序 */ private void calcAlarms(JSONArray values, JSONArray old, long time, List alarmRecord, int i) { JSONArray curr = values.getJSONArray(i); JSONArray last = values.getJSONArray(i - 1); TwinRecordAlarms recordAlarms; int index = 26; /*暂时取消 // if (i == 1 && old != null) { // //判断第一条记录为0,并且上一轮存储的值,需要判断之前有没有未结束的告警 // int j = 0; // for (; j < index; j++) { // boolean oldV = curr.getBool(j + 12); // boolean lastV = last.getBool(j + 12); // if (oldV && !lastV) { // recordAlarms = new TwinRecordAlarms(); // recordAlarms.setAlarmType(j + 1); // recordAlarms.setEndTime(new Date(time)); // alarmRecord.add(recordAlarms); // } // } // //需要单独计算alarm27 // //数据位置 // int pos = j + 12; // int old27 = curr.getInt(pos); // //上面已经处理过i=0,所以这里i不可能等于0 // int last27 = last.getInt(pos); // if (old27 != 0 && last27 == 0) { // recordAlarms = new TwinRecordAlarms(); // recordAlarms.setAlarmType(j + 1); // recordAlarms.setEndTime(new Date(time)); // alarmRecord.add(recordAlarms); // } // } */ boolean[] flags = new boolean[index + 1]; for (int kk = 0; kk < flags.length; kk++) { flags[kk] = true; } if (i == 1) { int j = 0; for (; j < index; j++) { boolean currV = curr.getBool(j + 12); if (currV) { flags[j] = false; recordAlarms = new TwinRecordAlarms(); recordAlarms.setAlarmType(j + 1); recordAlarms.setStartTime(new Date(time)); alarmRecord.add(recordAlarms); } } //需要单独计算alarm27 //数据位置 int pos = j + 12; int curr27 = curr.getInt(pos); if (curr27 != 0) { flags[j] = false; recordAlarms = new TwinRecordAlarms(); recordAlarms.setAlarmType(j + 1); recordAlarms.setStartTime(new Date(time)); alarmRecord.add(recordAlarms); } } int j = 0; for (; j < index; j++) { if (flags[j]) { boolean currV = curr.getBool(j + 12); boolean lastV = last.getBool(j + 12); if (currV && !lastV) { recordAlarms = new TwinRecordAlarms(); recordAlarms.setAlarmType(j + 1); recordAlarms.setStartTime(new Date(time)); alarmRecord.add(recordAlarms); } if (!currV && lastV) { recordAlarms = new TwinRecordAlarms(); recordAlarms.setAlarmType(j + 1); recordAlarms.setEndTime(new Date(time)); alarmRecord.add(recordAlarms); } } } if (flags[j]) { //需要单独计算alarm27 //数据位置 int pos = j + 12; int alarm27 = curr.getInt(pos); //上面已经处理过i=0,所以这里i不可能等于0 int last27 = last.getInt(pos); if (alarm27 != 0 && last27 == 0) { recordAlarms = new TwinRecordAlarms(); recordAlarms.setAlarmType(j + 1); recordAlarms.setStartTime(new Date(time)); alarmRecord.add(recordAlarms); } if (alarm27 == 0 && last27 != 0) { recordAlarms = new TwinRecordAlarms(); recordAlarms.setAlarmType(j + 1); recordAlarms.setEndTime(new Date(time)); alarmRecord.add(recordAlarms); } } } /** * 重量计算 提取公共方法 */ private void calcTotal(int j, float[] last, float[] first, float[] total, int lastMkz, float lastFk) { float v = last[j] - first[j]; total[j] += v; if (j == 0) { //如果是米长,则计算重量 float weight = BigDecimal.valueOf(v * lastMkz * lastFk / 1000 / 1000).setScale(2, RoundingMode.HALF_UP).floatValue(); total[1] = total[1] + weight; } } }