123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- package com.jjt.out.service.impl;
- import com.alibaba.fastjson.JSONObject;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.github.pagehelper.PageHelper;
- import com.jjt.common.domain.FileDesc;
- import com.jjt.common.domain.IndexDO;
- import com.jjt.common.enums.SyncType;
- import com.jjt.common.utils.CompressZip;
- import com.jjt.common.utils.DateUtils;
- import com.jjt.common.utils.LinuxCommand;
- import com.jjt.out.domain.OutProcessInfo;
- import com.jjt.out.service.IOutEsService;
- import com.jjt.out.service.IOutProcessInfoService;
- import com.jjt.system.service.ISysConfigService;
- import org.apache.commons.codec.digest.DigestUtils;
- import org.apache.http.HttpEntity;
- import org.apache.http.HttpHost;
- import org.apache.http.entity.ContentType;
- import org.apache.http.nio.entity.NStringEntity;
- import org.apache.http.util.EntityUtils;
- import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
- import org.elasticsearch.action.index.IndexRequest;
- import org.elasticsearch.client.Request;
- import org.elasticsearch.client.RequestOptions;
- import org.elasticsearch.client.RestClient;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.elasticsearch.common.xcontent.XContentBuilder;
- import org.elasticsearch.common.xcontent.json.JsonXContent;
- import org.elasticsearch.index.query.QueryBuilders;
- import org.elasticsearch.index.reindex.DeleteByQueryRequest;
- import org.elasticsearch.search.builder.SearchSourceBuilder;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.stereotype.Service;
- import javax.annotation.Resource;
- import java.io.File;
- import java.io.IOException;
- import java.nio.file.Files;
- import java.nio.file.Paths;
- import java.util.ArrayList;
- import java.util.Date;
- import java.util.List;
- /**
- * 数据同步Service业务层处理
- *
- * @author wukai
- * @date 2023-06-06
- */
- @Service
- public class OutEsServiceImpl extends OutBaseService implements IOutEsService {
- private static final Logger log = LoggerFactory.getLogger(OutEsServiceImpl.class);
- @Resource
- private ISysConfigService sysConfigService;
- @Resource
- private IOutProcessInfoService processInfoService;
- /**
- * 获取所有索引信息
- */
- @Override
- public List<IndexDO> getAllIndex() {
- try (RestHighLevelClient client = getClient()) {
- Request request = new Request("GET", "/_cat/indices/*?v&h=index");
- HttpEntity entity = client.getLowLevelClient().performRequest(request).getEntity();
- //结果转字符串
- String res = EntityUtils.toString(entity);
- //以换行分隔
- String[] indexArr = res.split("\n");
- List<IndexDO> list = new ArrayList<>();
- for (String name : indexArr) {
- //除开字符串index,那是表头数据,除开以.开头的默认索引
- if (!"index".equals(name) && !name.startsWith(".")) {
- //根据查询出来的索引名,获取索引信息
- IndexDO ido = getIndexInfo(name);
- list.add(ido);
- }
- }
- return list;
- } catch (Exception e) {
- e.printStackTrace();
- }
- return null;
- }
- /**
- * 生成同步文件
- */
- @Override
- public void exec() {
- String params = sysConfigService.selectConfigByKey("out.es.connect");
- JSONObject jsonObject = JSONObject.parseObject(params);
- String hostname = jsonObject.getString("hostname");
- int port = jsonObject.getIntValue("port");
- String scheme = jsonObject.getString("scheme");
- String index = jsonObject.getString("index");
- //外网临时目录
- String tmpDir = tmpDIr();
- String time = DateUtils.dateTimeNow();
- tmpDir += "es/" + time + "/";
- //创建目录及父目录
- try {
- Files.createDirectories(Paths.get(tmpDir));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- OutProcessInfo opi = new OutProcessInfo();
- opi.setProcessType(SyncType.es.toString());
- Date st = new Date();
- opi.setCreateTime(st);
- opi.setProcessKey(String.valueOf(st.getTime()));
- IndexDO ido = getIndexInfo(index);
- String uri = scheme + "://" + hostname + ":" + port + "/" + index;
- String baseName = "base_" + ido.getIndexName() + ".json";
- String mappingName = "mapping_" + ido.getIndexName() + ".json";
- String dataName = "data_" + ido.getIndexName() + ".json";
- //写入索引基本信息到文件
- try {
- File baseFile = new File(tmpDir + baseName);
- ObjectMapper mapper = new ObjectMapper();
- mapper.writeValue(baseFile, ido);
- } catch (IOException ignored) {
- }
- try {
- //执行导出mapping命令
- List<String> commands = new ArrayList<>();
- commands.add("/usr/bin/elasticdump");
- commands.add("--input");
- commands.add(uri);
- commands.add("--output");
- commands.add(tmpDir + mappingName);
- commands.add("--type=mapping");
- LinuxCommand.exec(commands);
- //执行导出data命令
- commands = new ArrayList<>();
- commands.add("/usr/bin/elasticdump");
- commands.add("--input");
- commands.add(uri);
- commands.add("--output");
- commands.add(tmpDir + dataName);
- commands.add("--type=data");
- LinuxCommand.exec(commands);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- //获取外网同步正式目录
- String syncDir = syncDIr();
- //打包文件--start
- //生成zip文件全路径名
- String zipName = "sync-es-" + time + ".zip";
- //打包目标目录
- File targetDir = new File(tmpDir);
- File zipFile = new File(syncDir + zipName);
- CompressZip.zip(targetDir, zipFile);
- //打包文件--end
- //生成描述json文件--start
- try {
- String descName = syncDir + "sync-99-" + time + ".json";
- String md5 = DigestUtils.md5Hex(Files.newInputStream(zipFile.toPath()));
- FileDesc desc = new FileDesc();
- desc.setName(zipName);
- desc.setMd5(md5);
- desc.setType(SyncType.es);
- File descFile = new File(descName);
- ObjectMapper mapper = new ObjectMapper();
- mapper.writeValue(descFile, desc);
- } catch (IOException e) {
- }
- //生成描述json文件--end
- Date et = new Date();
- opi.setCostTime(et.getTime() - st.getTime());
- processInfoService.insertOutProcessInfo(opi);
- }
- /**
- * 删除索引中的所有数据,不删除结构
- */
- @Override
- public void clean() {
- //先获取上次备份的时间戳
- OutProcessInfo pi = new OutProcessInfo();
- pi.setProcessType(SyncType.es.toString());
- PageHelper.startPage(1, 10, "create_time desc").setReasonable(true);
- List<OutProcessInfo> list = processInfoService.selectOutProcessInfoList(pi);
- String time = list.get(0).getProcessKey();
- String params = sysConfigService.selectConfigByKey("out.es.connect");
- JSONObject jsonObject = JSONObject.parseObject(params);
- String index = jsonObject.getString("index");
- try (RestHighLevelClient client = getClient()) {
- String endpoint = "/" + index + "/_delete_by_query";
- IndexRequest indexRequest = new IndexRequest();
- XContentBuilder builder;
- builder = JsonXContent.contentBuilder()
- .startObject()
- .startObject("query")
- .startObject("range")
- .startObject("time")
- .field("lte", Long.parseLong(time))
- .endObject()
- .endObject()
- .endObject()
- .endObject();
- indexRequest.source(builder);
- String queryWhere = indexRequest.source().utf8ToString();
- HttpEntity entity = new NStringEntity(queryWhere, ContentType.APPLICATION_JSON);
- Request request = new Request("POST", endpoint);
- request.setEntity(entity);
- client.getLowLevelClient().performRequest(request);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- /**
- * 获取es客户端
- *
- * @return 客户端
- */
- private RestHighLevelClient getClient() {
- String params = sysConfigService.selectConfigByKey("out.es.connect");
- JSONObject jsonObject = JSONObject.parseObject(params);
- String hostname = jsonObject.getString("hostname");
- int port = jsonObject.getIntValue("port");
- String scheme = jsonObject.getString("scheme");
- return new RestHighLevelClient(RestClient.builder(new HttpHost(hostname, port, scheme)));
- }
- /**
- * 根据索引名共聚索引分片等信息
- *
- * @param indexName 索引名
- * @return
- */
- private IndexDO getIndexInfo(String indexName) {
- try (RestHighLevelClient client = getClient()) {
- Request request = new Request("GET", "/" + indexName);
- HttpEntity entity = entity = client.getLowLevelClient().performRequest(request).getEntity();
- String res = EntityUtils.toString(entity);
- //结果转json
- JSONObject json = JSONObject.parseObject(res);
- JSONObject indexObject = json.getJSONObject(indexName).getJSONObject("settings").getJSONObject("index");
- //获取索引 number_of_shards 分片数
- String numberOfShards = indexObject.getString("number_of_shards");
- //获取索引 number_of_replicas 副本数
- String numberOfReplicas = indexObject.getString("number_of_replicas");
- IndexDO ido = new IndexDO(indexName, numberOfShards, numberOfReplicas);
- return ido;
- } catch (Exception e) {
- e.printStackTrace();
- return null;
- }
- }
- // elasticdump --input http://localhost:9200/test --output /mnt/test_index_mapping.json --type=mapping
- // elasticdump --input http://localhost:9200/test --output /mnt/test_index_data.json --type=data
- // elasticdump --input ./test_index_mapping.json --output http://localhost:9200/ --type=mapping
- // elasticdump --input ./test_index_data.json --output http://192.168.188.61:9200/ --type=data
- }
|