AsyncServiceImpl.java 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495
  1. package com.ruoyi.biz.service.impl;
  2. import cn.hutool.json.JSONArray;
  3. import cn.hutool.json.JSONObject;
  4. import com.ruoyi.biz.domain.*;
  5. import com.ruoyi.biz.service.IIotService;
  6. import com.ruoyi.biz.tools.Tools;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.springframework.scheduling.annotation.Async;
  9. import org.springframework.scheduling.annotation.AsyncResult;
  10. import org.springframework.stereotype.Service;
  11. import javax.annotation.Resource;
  12. import java.math.BigDecimal;
  13. import java.math.RoundingMode;
  14. import java.util.*;
  15. import java.util.concurrent.Future;
  16. import java.util.stream.Collectors;
  17. /**
  18. * 多线程执行任务
  19. *
  20. * @author wukai
  21. * @date 2024/5/4 20:35
  22. */
  23. @Service
  24. @Slf4j
  25. public class AsyncServiceImpl {
  26. @Resource
  27. private IIotService iotService;
  28. @Async("threadPoolTaskExecutor")
  29. public Future<Map<String, Object>> currData(TwinDevice twinDevice) {
  30. String table = twinDevice.getDevicePath();
  31. String sql = "select last * from " + table;
  32. JSONObject jsonObject = iotService.query(sql);
  33. JSONObject data = jsonObject.getJSONObject("data");
  34. JSONArray values = data.getJSONArray("values");
  35. Map<String, Object> dataMap = Tools.json2Map(values, table);
  36. dataMap.put("device", twinDevice);
  37. dataMap.put("total", values.size());
  38. return new AsyncResult<>(dataMap);
  39. }
  40. @Async("threadPoolTaskExecutor")
  41. public Future<Map<String, List<?>>> process(TwinDevice twinDevice, Date date, long startTime, long endTime, int period) {
  42. Map<String, List<?>> result = new HashMap<>(16);
  43. List<TwinCalcHour> calcHours = new ArrayList<>();
  44. String table = twinDevice.getDevicePath();
  45. TwinCalcHour hour = new TwinCalcHour();
  46. hour.setDeviceId(twinDevice.getDeviceId());
  47. hour.setDataDate(date);
  48. hour.setHour(period);
  49. Map<String, Object> map = calc(table, startTime, endTime);
  50. //0.已织造米数 1.总能耗 2.总重量
  51. float[] total = (float[]) map.get("total");
  52. hour.setLength(BigDecimal.valueOf(total[0]));
  53. hour.setWeight(BigDecimal.valueOf(total[2]));
  54. hour.setKwh(BigDecimal.valueOf(total[1]));
  55. hour.setOpenTime((long) total[3]);
  56. hour.setCloseTime((long) total[4]);
  57. calcHours.add(hour);
  58. List<TwinPanHeadInfo> panHeadInfo = (List<TwinPanHeadInfo>) map.get("panHead");
  59. panHeadInfo.forEach(info -> info.setDeviceId(twinDevice.getDeviceId()));
  60. List<TwinCalcHourSpec> specs = (List<TwinCalcHourSpec>) map.get("specList");
  61. specs.forEach(spec -> {
  62. spec.setDeviceId(twinDevice.getDeviceId());
  63. spec.setDataDate(date);
  64. spec.setHour(period);
  65. });
  66. List<TwinRecordStop> stopRecord = (List<TwinRecordStop>) map.get("stopRecord");
  67. stopRecord.forEach(stop -> {
  68. stop.setDeviceId(twinDevice.getDeviceId());
  69. stop.setDataDate(date);
  70. stop.setHour(period);
  71. });
  72. List<TwinRecordAlarms> alarmRecord = (List<TwinRecordAlarms>) map.get("alarmRecord");
  73. alarmRecord.forEach(alarm -> {
  74. alarm.setDeviceId(twinDevice.getDeviceId());
  75. alarm.setDataDate(date);
  76. alarm.setHour(period);
  77. });
  78. result.put("calc", calcHours);
  79. result.put("stopRecord", stopRecord);
  80. result.put("alarmRecord", alarmRecord);
  81. result.put("panHead", panHeadInfo);
  82. result.put("specList", specs);
  83. return new AsyncResult<>(result);
  84. }
  85. /**
  86. * 字段列表,方便看下标
  87. */
  88. private final String[] fields = {"Capacity_data_2", "Capacity_data_37", "Capacity_data_38", "Capacity_data_39", "Capacity_data_42",
  89. "Capacity_data_43", "Capacity_data_44", "Capacity_data_48", "Formula_data_3", "Formula_data_13",
  90. "Capacity_data_36", "Capacity_data_41", "Alarm_unit_1", "Alarm_unit_2", "Alarm_unit_3",
  91. "Alarm_unit_4", "Alarm_unit_5", "Alarm_unit_6", "Alarm_unit_7", "Alarm_unit_8",
  92. "Alarm_unit_9", "Alarm_unit_10", "Alarm_unit_11", "Alarm_unit_12", "Alarm_unit_13",
  93. "Alarm_unit_14", "Alarm_unit_15", "Alarm_unit_16", "Alarm_unit_17", "Alarm_unit_18",
  94. "Alarm_unit_19", "Alarm_unit_20", "Alarm_unit_21", "Alarm_unit_22", "Alarm_unit_23",
  95. "Alarm_unit_24", "Alarm_unit_25", "Alarm_unit_26", "Alarm_unit_27", "Capacity_data_33",
  96. "Capacity_data_15", "Capacity_data_16", "Capacity_data_17", "Capacity_data_18", "Capacity_data_19",
  97. "Capacity_data_34", "Formula_data_24"
  98. };
  99. /**
  100. * 字段列表,方便查找位置
  101. */
  102. private final List<String> fieldList = Arrays.stream(fields).collect(Collectors.toList());
  103. /**
  104. * // 0 Capacity_data_2 已织造米数
  105. * // 1 Capacity_data_37 A班组开机时间
  106. * // 2 Capacity_data_38 A班组停机时间
  107. * // 3 Capacity_data_39 A班当前产量
  108. * // 4 Capacity_data_42 B班组开机时间
  109. * // 5 Capacity_data_43 B班组停机时间
  110. * // 6 Capacity_data_44 B班当前产量
  111. * // 7 Capacity_data_48 停机状态
  112. * // 8 Formula_data_3 米克重
  113. * // 9 Formula_data_13 卷曲幅宽
  114. */
  115. public Map<String, Object> calc(String table, long startTime, long endTime) {
  116. String sql = "select %s from %s where time>%s and time <=%s";
  117. sql = String.format(sql, Arrays.stream(fields).collect(Collectors.joining(",")), table, startTime, endTime);
  118. long s = System.currentTimeMillis();
  119. JSONObject jsonObject = iotService.query(sql);
  120. long e = System.currentTimeMillis();
  121. log.info("接口耗时:{}ms,table:{},time:{}", e - s, table, new Date(endTime));
  122. JSONObject data = jsonObject.getJSONObject("data");
  123. JSONArray values = data.getJSONArray("values");
  124. JSONArray timestamps = data.getJSONArray("timestamps");
  125. //初始时间点数据
  126. //0-data2,1-data37,2-data38,3-data39,4-data42,5-data-43,6data-44
  127. //2024-06-26 1-6不要了 0织造米长 1.电量
  128. List<TwinCalcHourSpec> specList = new ArrayList<>();
  129. //1.米长 2.电量 3.密度
  130. float[] first = new float[3];
  131. //上一个时间点数据
  132. float[] last = new float[3];
  133. //统计数据,后面2个分别代表0.织造米长,1.电量 2.重量,3.开机时间,4.停机时间
  134. float[] total = new float[5];
  135. //上一个时间点盘头数据
  136. int[] lastPanHead = new int[5];
  137. //米克重
  138. int lastMkz = 0;
  139. //卷曲幅宽
  140. float lastFk = 0f;
  141. //上次密度记录的米长
  142. float lastSpecLength = 0f;
  143. //开始不为0的电能
  144. //不为0的电能结束
  145. float startKwh = 0f, endKwh = 0f;
  146. int last48 = 0;
  147. //上一轮最后一条停机记录
  148. JSONArray old = null;
  149. //上一轮最后一条告警记录
  150. // Object oldV = CacheUtils.get(Constants.IOT_TOKEN, table + "-stop");
  151. // if (oldV != null) {
  152. // //获取存储的最后一次停机状态值
  153. // old = (JSONArray) oldV;
  154. // }
  155. //告警时间记录
  156. List<TwinRecordAlarms> alarmRecord = new ArrayList<>();
  157. //停机记录
  158. List<TwinRecordStop> stopRecord = new ArrayList<>();
  159. //盘头记录
  160. List<TwinPanHeadInfo> panHeadInfo = new ArrayList<>();
  161. for (int i = 0; i < timestamps.size(); i++) {
  162. JSONArray da = values.getJSONArray(i);
  163. //0-data2,1-data37,2-data38,3-data39,4-data42,5-data-43,6data-44
  164. //当前时间数据
  165. float[] curr = {da.getFloat(0), da.getFloat(fieldList.indexOf("Capacity_data_34")), da.getFloat(fieldList.indexOf("Formula_data_24"))};
  166. int[] currPan = {da.getInt(fieldList.indexOf("Capacity_data_15")), da.getInt(fieldList.indexOf("Capacity_data_16")), da.getInt(fieldList.indexOf("Capacity_data_17")),
  167. da.getInt(fieldList.indexOf("Capacity_data_18")), da.getInt(fieldList.indexOf("Capacity_data_19"))};
  168. int curr48 = da.getInt(7);
  169. if (i == 0) {
  170. //第一次数据是上次最后一条,只做记录用,不做处理
  171. first = curr.clone();
  172. last = curr.clone();
  173. lastPanHead = currPan.clone();
  174. lastMkz = da.getInt(8);
  175. lastFk = da.getFloat(9);
  176. last48 = curr48;
  177. startKwh = da.getFloat(fieldList.indexOf("Capacity_data_34"));
  178. continue;
  179. }
  180. //计算盘头
  181. calcPan(currPan, lastPanHead, panHeadInfo, timestamps.getLong(i));
  182. //计算规格米长
  183. if (last[2] != curr[2]) {
  184. calcSpec(total[0], lastSpecLength, last[2], lastMkz, specList);
  185. lastSpecLength = total[0];
  186. }
  187. //处理电量跳点
  188. float currKwh = da.getFloat(fieldList.indexOf("Capacity_data_34"));
  189. if (startKwh == 0) {
  190. startKwh = currKwh;
  191. }
  192. if (currKwh != 0f) {
  193. endKwh = currKwh;
  194. }
  195. // for (int j = 0; j < first.length; j++) {
  196. //如果当前值为小于上一个,且上一个值不为0,则计算
  197. //因为会出现数据波动,停机再启动之后的第一个点不为0,为0.00几几几的
  198. //计算累加类的数据
  199. //这里只计算米长
  200. if (curr[0] < last[0] && last[0] != 0f) {
  201. calcTotal(0, last, first, total, lastMkz, lastFk);
  202. first[0] = curr[0];
  203. }
  204. // }
  205. calcAlarms(values, old, timestamps.getLong(i), alarmRecord, i);
  206. calcStops(curr48, last48, timestamps.getLong(i), total, stopRecord, old, i);
  207. //复制数组,设置last值为当前值
  208. last = curr.clone();
  209. lastPanHead = currPan.clone();
  210. lastMkz = da.getInt(8);
  211. lastFk = da.getFloat(9);
  212. last48 = curr48;
  213. }
  214. //最后再补一次计算
  215. // for (int j = 0; j < first.length; j++) {
  216. // //这里米长和电量都要计算
  217. // calcTotal(j, last, first, total, lastMkz, lastFk);
  218. // }
  219. //还是只计算米长
  220. calcTotal(0, last, first, total, lastMkz, lastFk);
  221. calcSpec(total[0], lastSpecLength, last[2], lastMkz, specList);
  222. //计算电量
  223. total[1] = endKwh - startKwh;
  224. //存入最后一条记录的停机状态
  225. // CacheUtils.put(Constants.IOT_TOKEN, table, values.getJSONArray(values.size() - 1));
  226. long openTime = (endTime - startTime) / 1000 + 1;
  227. if (openTime < total[4]) {
  228. total[4] = openTime;
  229. }
  230. total[3] = openTime - total[4];
  231. Map<String, Object> result = new HashMap<>(16);
  232. result.put("total", total);
  233. result.put("stopRecord", stopRecord);
  234. result.put("alarmRecord", alarmRecord);
  235. result.put("panHead", panHeadInfo);
  236. result.put("specList", specList);
  237. return result;
  238. }
  239. /**
  240. * 按规格计算米长
  241. *
  242. * @param len 当前米长
  243. * @param lastLen 上次记录时米长
  244. * @param lastDensity 密度
  245. * @param lastMkz 米克重
  246. * @param specList 列表
  247. */
  248. private void calcSpec(float len, float lastLen, float lastDensity, int lastMkz, List<TwinCalcHourSpec> specList) {
  249. //计算规格米长
  250. //如果密度有变化,就记录下来
  251. TwinCalcHourSpec spec = new TwinCalcHourSpec();
  252. spec.setDensity(BigDecimal.valueOf(lastDensity));
  253. float len1 = len - lastLen;
  254. spec.setLength(BigDecimal.valueOf(len1));
  255. spec.setMick(lastMkz);
  256. specList.add(spec);
  257. }
  258. /**
  259. * 计算盘头信息
  260. * 数组 分别是GB1-GB5
  261. *
  262. * @param currPan 当前剩余圈数
  263. * @param lastPanHead 上一条记录剩余圈数
  264. * @param panHeadInfo info
  265. * @param time 时间戳
  266. */
  267. private void calcPan(int[] currPan, int[] lastPanHead, List<TwinPanHeadInfo> panHeadInfo, Long time) {
  268. //如果当前记录大于上一条记录,则证明是重新叫料了,需要记录下当前值
  269. for (int i = 0; i < currPan.length; i++) {
  270. if (currPan[i] > lastPanHead[i]) {
  271. TwinPanHeadInfo info = new TwinPanHeadInfo();
  272. info.setRecordTime(new Date(time));
  273. info.setPhNum((long) (i + 1));
  274. info.setPhMax((long) currPan[i]);
  275. panHeadInfo.add(info);
  276. }
  277. }
  278. }
  279. /**
  280. * 停机次数计算
  281. * 停机处理
  282. *
  283. * @param curr 当前值
  284. * @param last 上一条记录的值
  285. * @param time 时间
  286. * @param total 统计
  287. * @param stopRecord 记录
  288. * @param oldV 上次的最后一条记录的值
  289. * @param i 当前时序
  290. */
  291. private void calcStops(int curr, int last, long time, float[] total, List<TwinRecordStop> stopRecord, JSONArray oldV, int i) {
  292. //取消毫秒
  293. time = time / 1000 * 1000;
  294. TwinRecordStop stop = new TwinRecordStop();
  295. //如果第一条记录为0,需要判断之前有木有没有结束时间的停机
  296. //暂时取消这个判断
  297. // if (i == 1 && oldV != null) {
  298. // int old = oldV.getInt(7);
  299. // if (last != old) {
  300. // stop.setStopType(old);
  301. // stop.setEndTime(new Date(time));
  302. // stopRecord.add(stop);
  303. // return;
  304. // }
  305. // }
  306. //如果第一条记录不为0,则直接记录停机开始时间
  307. if (i == 1 && curr != 0) {
  308. stop = new TwinRecordStop();
  309. //记录停机开始时间
  310. stop.setStopType(curr);
  311. stop.setStartTime(new Date(time));
  312. stopRecord.add(stop);
  313. return;
  314. }
  315. if (curr != 0) {
  316. total[4]++;
  317. if (curr != last) {
  318. if (last != 0) {
  319. //记录上个停机类型的结束时间
  320. stop.setStopType(last);
  321. stop.setEndTime(new Date(time));
  322. stopRecord.add(stop);
  323. }
  324. stop = new TwinRecordStop();
  325. //记录停机开始时间
  326. stop.setStopType(curr);
  327. stop.setStartTime(new Date(time));
  328. stopRecord.add(stop);
  329. }
  330. } else if (last != 0) {
  331. //判断停机结束之后的开机
  332. stop.setStopType(last);
  333. stop.setEndTime(new Date(time));
  334. stopRecord.add(stop);
  335. }
  336. }
  337. /**
  338. * 告警计算
  339. *
  340. * @param values 所有记录
  341. * @param old 上次存储的
  342. * @param time 当前时间
  343. * @param alarmRecord 告警记录
  344. * @param i 时序
  345. */
  346. private void calcAlarms(JSONArray values, JSONArray old, long time, List<TwinRecordAlarms> alarmRecord, int i) {
  347. JSONArray curr = values.getJSONArray(i);
  348. JSONArray last = values.getJSONArray(i - 1);
  349. TwinRecordAlarms recordAlarms;
  350. int index = 26;
  351. /*暂时取消
  352. // if (i == 1 && old != null) {
  353. // //判断第一条记录为0,并且上一轮存储的值,需要判断之前有没有未结束的告警
  354. // int j = 0;
  355. // for (; j < index; j++) {
  356. // boolean oldV = curr.getBool(j + 12);
  357. // boolean lastV = last.getBool(j + 12);
  358. // if (oldV && !lastV) {
  359. // recordAlarms = new TwinRecordAlarms();
  360. // recordAlarms.setAlarmType(j + 1);
  361. // recordAlarms.setEndTime(new Date(time));
  362. // alarmRecord.add(recordAlarms);
  363. // }
  364. // }
  365. // //需要单独计算alarm27
  366. // //数据位置
  367. // int pos = j + 12;
  368. // int old27 = curr.getInt(pos);
  369. // //上面已经处理过i=0,所以这里i不可能等于0
  370. // int last27 = last.getInt(pos);
  371. // if (old27 != 0 && last27 == 0) {
  372. // recordAlarms = new TwinRecordAlarms();
  373. // recordAlarms.setAlarmType(j + 1);
  374. // recordAlarms.setEndTime(new Date(time));
  375. // alarmRecord.add(recordAlarms);
  376. // }
  377. // }
  378. */
  379. boolean[] flags = new boolean[index + 1];
  380. Arrays.fill(flags, true);
  381. if (i == 1) {
  382. int j = 0;
  383. for (; j < index; j++) {
  384. //允许的告警编号
  385. if (Tools.findAllow(j + 1)) {
  386. boolean currV = curr.getBool(j + 12);
  387. if (currV) {
  388. flags[j] = false;
  389. recordAlarms = new TwinRecordAlarms();
  390. recordAlarms.setAlarmType(j + 1);
  391. recordAlarms.setStartTime(new Date(time));
  392. alarmRecord.add(recordAlarms);
  393. }
  394. }
  395. }
  396. //需要单独计算alarm27
  397. //数据位置
  398. int pos = j + 12;
  399. int curr27 = curr.getInt(pos);
  400. if (curr27 != 0) {
  401. flags[j] = false;
  402. recordAlarms = new TwinRecordAlarms();
  403. recordAlarms.setAlarmType(j + 1);
  404. recordAlarms.setStartTime(new Date(time));
  405. alarmRecord.add(recordAlarms);
  406. }
  407. }
  408. int j = 0;
  409. for (; j < index; j++) {
  410. //允许的告警编号
  411. if (Tools.findAllow(j + 1)) {
  412. if (flags[j]) {
  413. boolean currV = curr.getBool(j + 12);
  414. boolean lastV = last.getBool(j + 12);
  415. if (currV && !lastV) {
  416. recordAlarms = new TwinRecordAlarms();
  417. recordAlarms.setAlarmType(j + 1);
  418. recordAlarms.setStartTime(new Date(time));
  419. alarmRecord.add(recordAlarms);
  420. }
  421. if (!currV && lastV) {
  422. recordAlarms = new TwinRecordAlarms();
  423. recordAlarms.setAlarmType(j + 1);
  424. recordAlarms.setEndTime(new Date(time));
  425. alarmRecord.add(recordAlarms);
  426. }
  427. }
  428. }
  429. }
  430. if (flags[j]) {
  431. //需要单独计算alarm27
  432. //数据位置
  433. int pos = j + 12;
  434. int alarm27 = curr.getInt(pos);
  435. //上面已经处理过i=0,所以这里i不可能等于0
  436. int last27 = last.getInt(pos);
  437. if (alarm27 != 0 && last27 == 0) {
  438. recordAlarms = new TwinRecordAlarms();
  439. recordAlarms.setAlarmType(j + 1);
  440. recordAlarms.setStartTime(new Date(time));
  441. alarmRecord.add(recordAlarms);
  442. }
  443. if (alarm27 == 0 && last27 != 0) {
  444. recordAlarms = new TwinRecordAlarms();
  445. recordAlarms.setAlarmType(j + 1);
  446. recordAlarms.setEndTime(new Date(time));
  447. alarmRecord.add(recordAlarms);
  448. }
  449. }
  450. }
  451. /**
  452. * 重量计算 提取公共方法
  453. */
  454. private void calcTotal(int j, float[] last, float[] first, float[] total, int lastMkz, float lastFk) {
  455. float v = last[j] - first[j];
  456. total[j] += v;
  457. if (j == 0) {
  458. //如果是米长,则计算重量,米长*米克重*2 2024-11-15
  459. float weight = BigDecimal.valueOf(v * lastMkz * 2 / 1000 / 1000).setScale(2, RoundingMode.HALF_UP).floatValue();
  460. total[2] = total[2] + weight;
  461. }
  462. }
  463. }