InEsServiceImpl.java 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. package com.jjt.in.service.impl;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import com.jjt.common.constant.Constants;
  5. import com.jjt.common.domain.IndexDO;
  6. import com.jjt.common.utils.LinuxCommand;
  7. import com.jjt.in.service.IInEsService;
  8. import com.jjt.system.service.ISysConfigService;
  9. import org.apache.http.HttpHost;
  10. import org.elasticsearch.client.RequestOptions;
  11. import org.elasticsearch.client.RestClient;
  12. import org.elasticsearch.client.RestHighLevelClient;
  13. import org.elasticsearch.client.indices.CreateIndexRequest;
  14. import org.elasticsearch.client.indices.CreateIndexResponse;
  15. import org.elasticsearch.common.settings.Settings;
  16. import org.slf4j.Logger;
  17. import org.slf4j.LoggerFactory;
  18. import org.springframework.stereotype.Service;
  19. import javax.annotation.Resource;
  20. import java.io.*;
  21. import java.util.ArrayList;
  22. import java.util.List;
  23. /**
  24. * 数据同步Service业务层处理
  25. *
  26. * @author wukai
  27. * @date 2023-06-06
  28. */
  29. @Service
  30. public class InEsServiceImpl extends InBaseService implements IInEsService {
  31. private static final Logger log = LoggerFactory.getLogger(InEsServiceImpl.class);
  32. @Resource
  33. private ISysConfigService sysConfigService;
  34. /**
  35. * 解析同步文件
  36. */
  37. @Override
  38. public void parseSyncFile(String dir) {
  39. String params = sysConfigService.selectConfigByKey("in.es.connect");
  40. JSONObject jsonObject = JSONObject.parseObject(params);
  41. String hostname = jsonObject.getString("hostname");
  42. int port = jsonObject.getIntValue("port");
  43. String scheme = jsonObject.getString("scheme");
  44. if (!dir.endsWith(Constants.DIR_END)) {
  45. dir += "/";
  46. }
  47. try {
  48. // 创建File对象
  49. File file = new File(dir);
  50. if (file.isDirectory()) {
  51. // 判断File对象对应的目录是否存在
  52. String[] names = file.list();
  53. // 获得目录下的所有文件的文件名
  54. for (String name : names) {
  55. if (name.startsWith("base_")) {
  56. File baseFile = new File(dir + name);
  57. ObjectMapper mapper = new ObjectMapper();
  58. IndexDO ido = mapper.readValue(baseFile, IndexDO.class);
  59. boolean flag = createIndex(ido);
  60. if (flag) {
  61. String uri = scheme + "://" + hostname + ":" + port + "/" + ido.getIndexName();
  62. String mappingName = "mapping_" + ido.getIndexName();
  63. String dataName = "data_" + ido.getIndexName();
  64. try {
  65. //导入mapping命令
  66. List<String> commands = new ArrayList<>();
  67. commands.add("/usr/bin/elasticdump");
  68. commands.add("--input");
  69. commands.add(dir + mappingName + ".json");
  70. commands.add("--output");
  71. commands.add(uri);
  72. commands.add("--type=mapping");
  73. LinuxCommand.exec(commands);
  74. //导入data命令
  75. commands = new ArrayList<>();
  76. commands.add("/usr/bin/elasticdump");
  77. commands.add("--input");
  78. commands.add(dir + dataName + ".json");
  79. commands.add("--output");
  80. commands.add(uri);
  81. commands.add("--type=data");
  82. LinuxCommand.exec(commands);
  83. } catch (Exception e) {
  84. log.error("执行命令出错:");
  85. throw new RuntimeException(e);
  86. }
  87. }
  88. }
  89. }
  90. }
  91. } catch (IOException e) {
  92. log.error("处理es出错啦:{}", e.getMessage());
  93. }
  94. }
  95. private boolean createIndex(IndexDO ido) {
  96. try (RestHighLevelClient client = getClient()) {
  97. CreateIndexRequest request = new CreateIndexRequest(ido.getIndexName());
  98. Settings.Builder settings = Settings.builder();
  99. settings.put("index.number_of_shards", ido.getNumberOfShards());
  100. settings.put("index.number_of_replicas", ido.getNumberOfReplicas());
  101. request.settings(settings);
  102. CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
  103. return response.isAcknowledged();
  104. } catch (Exception e) {
  105. //如果是报已存在,就不管了噻
  106. if (!e.getMessage().contains(Constants.RESOURCE_ALREADY_EXISTS_EXCEPTION)) {
  107. log.error("创建出错啦:", e.getMessage());
  108. return false;
  109. } else {
  110. return true;
  111. }
  112. }
  113. }
  114. /**
  115. * 获取es客户端
  116. *
  117. * @return 客户端
  118. */
  119. private RestHighLevelClient getClient() {
  120. String params = sysConfigService.selectConfigByKey("in.es.connect");
  121. JSONObject jsonObject = JSONObject.parseObject(params);
  122. String hostname = jsonObject.getString("hostname");
  123. int port = jsonObject.getIntValue("port");
  124. String scheme = jsonObject.getString("scheme");
  125. return new RestHighLevelClient(RestClient.builder(new HttpHost(hostname, port, scheme)));
  126. }
  127. }