OutEsServiceImpl.java 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. package com.jjt.out.service.impl;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import com.github.pagehelper.PageHelper;
  5. import com.jjt.common.domain.FileDesc;
  6. import com.jjt.common.domain.IndexDO;
  7. import com.jjt.common.enums.SyncType;
  8. import com.jjt.common.utils.CompressZip;
  9. import com.jjt.common.utils.DateUtils;
  10. import com.jjt.common.utils.LinuxCommand;
  11. import com.jjt.out.domain.OutProcessInfo;
  12. import com.jjt.out.service.IOutEsService;
  13. import com.jjt.out.service.IOutProcessInfoService;
  14. import com.jjt.system.service.ISysConfigService;
  15. import org.apache.commons.codec.digest.DigestUtils;
  16. import org.apache.http.HttpEntity;
  17. import org.apache.http.HttpHost;
  18. import org.apache.http.entity.ContentType;
  19. import org.apache.http.nio.entity.NStringEntity;
  20. import org.apache.http.util.EntityUtils;
  21. import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
  22. import org.elasticsearch.action.index.IndexRequest;
  23. import org.elasticsearch.client.Request;
  24. import org.elasticsearch.client.RequestOptions;
  25. import org.elasticsearch.client.RestClient;
  26. import org.elasticsearch.client.RestHighLevelClient;
  27. import org.elasticsearch.common.xcontent.XContentBuilder;
  28. import org.elasticsearch.common.xcontent.json.JsonXContent;
  29. import org.elasticsearch.index.query.QueryBuilders;
  30. import org.elasticsearch.index.reindex.DeleteByQueryRequest;
  31. import org.elasticsearch.search.builder.SearchSourceBuilder;
  32. import org.slf4j.Logger;
  33. import org.slf4j.LoggerFactory;
  34. import org.springframework.stereotype.Service;
  35. import javax.annotation.Resource;
  36. import java.io.File;
  37. import java.io.IOException;
  38. import java.nio.file.Files;
  39. import java.nio.file.Paths;
  40. import java.util.ArrayList;
  41. import java.util.Date;
  42. import java.util.List;
  43. /**
  44. * 数据同步Service业务层处理
  45. *
  46. * @author wukai
  47. * @date 2023-06-06
  48. */
  49. @Service
  50. public class OutEsServiceImpl extends OutBaseService implements IOutEsService {
  51. private static final Logger log = LoggerFactory.getLogger(OutEsServiceImpl.class);
  52. @Resource
  53. private ISysConfigService sysConfigService;
  54. @Resource
  55. private IOutProcessInfoService processInfoService;
  56. /**
  57. * 获取所有索引信息
  58. */
  59. @Override
  60. public List<IndexDO> getAllIndex() {
  61. try (RestHighLevelClient client = getClient()) {
  62. Request request = new Request("GET", "/_cat/indices/*?v&h=index");
  63. HttpEntity entity = client.getLowLevelClient().performRequest(request).getEntity();
  64. //结果转字符串
  65. String res = EntityUtils.toString(entity);
  66. //以换行分隔
  67. String[] indexArr = res.split("\n");
  68. List<IndexDO> list = new ArrayList<>();
  69. for (String name : indexArr) {
  70. //除开字符串index,那是表头数据,除开以.开头的默认索引
  71. if (!"index".equals(name) && !name.startsWith(".")) {
  72. //根据查询出来的索引名,获取索引信息
  73. IndexDO ido = getIndexInfo(name);
  74. list.add(ido);
  75. }
  76. }
  77. return list;
  78. } catch (Exception e) {
  79. e.printStackTrace();
  80. }
  81. return null;
  82. }
  83. /**
  84. * 生成同步文件
  85. */
  86. @Override
  87. public void exec() {
  88. String params = sysConfigService.selectConfigByKey("out.es.connect");
  89. JSONObject jsonObject = JSONObject.parseObject(params);
  90. String hostname = jsonObject.getString("hostname");
  91. int port = jsonObject.getIntValue("port");
  92. String scheme = jsonObject.getString("scheme");
  93. String index = jsonObject.getString("index");
  94. //外网临时目录
  95. String tmpDir = tmpDIr();
  96. String time = DateUtils.dateTimeNow();
  97. tmpDir += "es/" + time + "/";
  98. //创建目录及父目录
  99. try {
  100. Files.createDirectories(Paths.get(tmpDir));
  101. } catch (IOException e) {
  102. throw new RuntimeException(e);
  103. }
  104. OutProcessInfo opi = new OutProcessInfo();
  105. opi.setProcessType(SyncType.es.toString());
  106. Date st = new Date();
  107. opi.setCreateTime(st);
  108. opi.setProcessKey(String.valueOf(st.getTime()));
  109. IndexDO ido = getIndexInfo(index);
  110. String uri = scheme + "://" + hostname + ":" + port + "/" + index;
  111. String baseName = "base_" + ido.getIndexName() + ".json";
  112. String mappingName = "mapping_" + ido.getIndexName() + ".json";
  113. String dataName = "data_" + ido.getIndexName() + ".json";
  114. //写入索引基本信息到文件
  115. try {
  116. File baseFile = new File(tmpDir + baseName);
  117. ObjectMapper mapper = new ObjectMapper();
  118. mapper.writeValue(baseFile, ido);
  119. } catch (IOException ignored) {
  120. }
  121. try {
  122. //执行导出mapping命令
  123. List<String> commands = new ArrayList<>();
  124. commands.add("/usr/bin/elasticdump");
  125. commands.add("--input");
  126. commands.add(uri);
  127. commands.add("--output");
  128. commands.add(tmpDir + mappingName);
  129. commands.add("--type=mapping");
  130. LinuxCommand.exec(commands);
  131. //执行导出data命令
  132. commands = new ArrayList<>();
  133. commands.add("/usr/bin/elasticdump");
  134. commands.add("--input");
  135. commands.add(uri);
  136. commands.add("--output");
  137. commands.add(tmpDir + dataName);
  138. commands.add("--type=data");
  139. LinuxCommand.exec(commands);
  140. } catch (Exception e) {
  141. throw new RuntimeException(e);
  142. }
  143. //获取外网同步正式目录
  144. String syncDir = syncDIr();
  145. //打包文件--start
  146. //生成zip文件全路径名
  147. String zipName = "sync-es-" + time + ".zip";
  148. //打包目标目录
  149. File targetDir = new File(tmpDir);
  150. File zipFile = new File(syncDir + zipName);
  151. CompressZip.zip(targetDir, zipFile);
  152. //打包文件--end
  153. //生成描述json文件--start
  154. try {
  155. String descName = syncDir + "sync-99-" + time + ".json";
  156. String md5 = DigestUtils.md5Hex(Files.newInputStream(zipFile.toPath()));
  157. FileDesc desc = new FileDesc();
  158. desc.setName(zipName);
  159. desc.setMd5(md5);
  160. desc.setType(SyncType.es);
  161. File descFile = new File(descName);
  162. ObjectMapper mapper = new ObjectMapper();
  163. mapper.writeValue(descFile, desc);
  164. } catch (IOException e) {
  165. }
  166. //生成描述json文件--end
  167. Date et = new Date();
  168. opi.setCostTime(et.getTime() - st.getTime());
  169. processInfoService.insertOutProcessInfo(opi);
  170. }
  171. /**
  172. * 删除索引中的所有数据,不删除结构
  173. */
  174. @Override
  175. public void clean() {
  176. //先获取上次备份的时间戳
  177. OutProcessInfo pi = new OutProcessInfo();
  178. pi.setProcessType(SyncType.es.toString());
  179. PageHelper.startPage(1, 10, "create_time desc").setReasonable(true);
  180. List<OutProcessInfo> list = processInfoService.selectOutProcessInfoList(pi);
  181. String time = list.get(0).getProcessKey();
  182. String params = sysConfigService.selectConfigByKey("out.es.connect");
  183. JSONObject jsonObject = JSONObject.parseObject(params);
  184. String index = jsonObject.getString("index");
  185. try (RestHighLevelClient client = getClient()) {
  186. String endpoint = "/" + index + "/_delete_by_query";
  187. IndexRequest indexRequest = new IndexRequest();
  188. XContentBuilder builder;
  189. builder = JsonXContent.contentBuilder()
  190. .startObject()
  191. .startObject("query")
  192. .startObject("range")
  193. .startObject("time")
  194. .field("lte", Long.parseLong(time))
  195. .endObject()
  196. .endObject()
  197. .endObject()
  198. .endObject();
  199. indexRequest.source(builder);
  200. String queryWhere = indexRequest.source().utf8ToString();
  201. HttpEntity entity = new NStringEntity(queryWhere, ContentType.APPLICATION_JSON);
  202. Request request = new Request("POST", endpoint);
  203. request.setEntity(entity);
  204. client.getLowLevelClient().performRequest(request);
  205. } catch (Exception e) {
  206. e.printStackTrace();
  207. }
  208. }
  209. /**
  210. * 获取es客户端
  211. *
  212. * @return 客户端
  213. */
  214. private RestHighLevelClient getClient() {
  215. String params = sysConfigService.selectConfigByKey("out.es.connect");
  216. JSONObject jsonObject = JSONObject.parseObject(params);
  217. String hostname = jsonObject.getString("hostname");
  218. int port = jsonObject.getIntValue("port");
  219. String scheme = jsonObject.getString("scheme");
  220. return new RestHighLevelClient(RestClient.builder(new HttpHost(hostname, port, scheme)));
  221. }
  222. /**
  223. * 根据索引名共聚索引分片等信息
  224. *
  225. * @param indexName 索引名
  226. * @return
  227. */
  228. private IndexDO getIndexInfo(String indexName) {
  229. try (RestHighLevelClient client = getClient()) {
  230. Request request = new Request("GET", "/" + indexName);
  231. HttpEntity entity = entity = client.getLowLevelClient().performRequest(request).getEntity();
  232. String res = EntityUtils.toString(entity);
  233. //结果转json
  234. JSONObject json = JSONObject.parseObject(res);
  235. JSONObject indexObject = json.getJSONObject(indexName).getJSONObject("settings").getJSONObject("index");
  236. //获取索引 number_of_shards 分片数
  237. String numberOfShards = indexObject.getString("number_of_shards");
  238. //获取索引 number_of_replicas 副本数
  239. String numberOfReplicas = indexObject.getString("number_of_replicas");
  240. IndexDO ido = new IndexDO(indexName, numberOfShards, numberOfReplicas);
  241. return ido;
  242. } catch (Exception e) {
  243. e.printStackTrace();
  244. return null;
  245. }
  246. }
  247. // elasticdump --input http://localhost:9200/test --output /mnt/test_index_mapping.json --type=mapping
  248. // elasticdump --input http://localhost:9200/test --output /mnt/test_index_data.json --type=data
  249. // elasticdump --input ./test_index_mapping.json --output http://localhost:9200/ --type=mapping
  250. // elasticdump --input ./test_index_data.json --output http://192.168.188.61:9200/ --type=data
  251. }