package com.jjt.out.service.impl; import com.alibaba.fastjson.JSONObject; import com.fasterxml.jackson.databind.ObjectMapper; import com.github.pagehelper.PageHelper; import com.jjt.common.domain.FileDesc; import com.jjt.common.domain.IndexDO; import com.jjt.common.enums.SyncType; import com.jjt.common.utils.CompressZip; import com.jjt.common.utils.DateUtils; import com.jjt.common.utils.LinuxCommand; import com.jjt.out.domain.OutProcessInfo; import com.jjt.out.service.IOutEsService; import com.jjt.out.service.IOutProcessInfoService; import com.jjt.system.service.ISysConfigService; import org.apache.commons.codec.digest.DigestUtils; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; import org.apache.http.entity.ContentType; import org.apache.http.nio.entity.NStringEntity; import org.apache.http.util.EntityUtils; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Date; import java.util.List; /** * 数据同步Service业务层处理 * * @author wukai * @date 2023-06-06 */ @Service public class OutEsServiceImpl extends OutBaseService implements IOutEsService { private static final Logger log = LoggerFactory.getLogger(OutEsServiceImpl.class); @Resource private ISysConfigService sysConfigService; @Resource private IOutProcessInfoService processInfoService; /** * 获取所有索引信息 */ @Override public List getAllIndex() { try (RestHighLevelClient client = getClient()) { Request request = new Request("GET", "/_cat/indices/*?v&h=index"); HttpEntity entity = client.getLowLevelClient().performRequest(request).getEntity(); //结果转字符串 String res = EntityUtils.toString(entity); //以换行分隔 String[] indexArr = res.split("\n"); List list = new ArrayList<>(); for (String name : indexArr) { //除开字符串index,那是表头数据,除开以.开头的默认索引 if (!"index".equals(name) && !name.startsWith(".")) { //根据查询出来的索引名,获取索引信息 IndexDO ido = getIndexInfo(name); list.add(ido); } } return list; } catch (Exception e) { e.printStackTrace(); } return null; } /** * 生成同步文件 */ @Override public void exec() { String params = sysConfigService.selectConfigByKey("out.es.connect"); JSONObject jsonObject = JSONObject.parseObject(params); String hostname = jsonObject.getString("hostname"); int port = jsonObject.getIntValue("port"); String scheme = jsonObject.getString("scheme"); String index = jsonObject.getString("index"); //外网临时目录 String tmpDir = tmpDIr(); String time = DateUtils.dateTimeNow(); tmpDir += "es/" + time + "/"; //创建目录及父目录 try { Files.createDirectories(Paths.get(tmpDir)); } catch (IOException e) { throw new RuntimeException(e); } OutProcessInfo opi = new OutProcessInfo(); opi.setProcessType(SyncType.es.toString()); Date st = new Date(); opi.setCreateTime(st); opi.setProcessKey(String.valueOf(st.getTime())); IndexDO ido = getIndexInfo(index); String uri = scheme + "://" + hostname + ":" + port + "/" + index; String baseName = "base_" + ido.getIndexName() + ".json"; String mappingName = "mapping_" + ido.getIndexName() + ".json"; String dataName = "data_" + ido.getIndexName() + ".json"; //写入索引基本信息到文件 try { File baseFile = new File(tmpDir + baseName); ObjectMapper mapper = new ObjectMapper(); mapper.writeValue(baseFile, ido); } catch (IOException ignored) { } try { //执行导出mapping命令 List commands = new ArrayList<>(); commands.add("/usr/bin/elasticdump"); commands.add("--input"); commands.add(uri); commands.add("--output"); commands.add(tmpDir + mappingName); commands.add("--type=mapping"); LinuxCommand.exec(commands); //执行导出data命令 commands = new ArrayList<>(); commands.add("/usr/bin/elasticdump"); commands.add("--input"); commands.add(uri); commands.add("--output"); commands.add(tmpDir + dataName); commands.add("--type=data"); LinuxCommand.exec(commands); } catch (Exception e) { throw new RuntimeException(e); } //获取外网同步正式目录 String syncDir = syncDIr(); //打包文件--start //生成zip文件全路径名 String zipName = "sync-es-" + time + ".zip"; //打包目标目录 File targetDir = new File(tmpDir); File zipFile = new File(syncDir + zipName); CompressZip.zip(targetDir, zipFile); //打包文件--end //生成描述json文件--start try { String descName = syncDir + "sync-99-" + time + ".json"; String md5 = DigestUtils.md5Hex(Files.newInputStream(zipFile.toPath())); FileDesc desc = new FileDesc(); desc.setName(zipName); desc.setMd5(md5); desc.setType(SyncType.es); File descFile = new File(descName); ObjectMapper mapper = new ObjectMapper(); mapper.writeValue(descFile, desc); } catch (IOException e) { } //生成描述json文件--end Date et = new Date(); opi.setCostTime(et.getTime() - st.getTime()); processInfoService.insertOutProcessInfo(opi); } /** * 删除索引中的所有数据,不删除结构 */ @Override public void clean() { //先获取上次备份的时间戳 OutProcessInfo pi = new OutProcessInfo(); pi.setProcessType(SyncType.es.toString()); PageHelper.startPage(1, 10, "create_time desc").setReasonable(true); List list = processInfoService.selectOutProcessInfoList(pi); String time = list.get(0).getProcessKey(); String params = sysConfigService.selectConfigByKey("out.es.connect"); JSONObject jsonObject = JSONObject.parseObject(params); String index = jsonObject.getString("index"); try (RestHighLevelClient client = getClient()) { String endpoint = "/" + index + "/_delete_by_query"; IndexRequest indexRequest = new IndexRequest(); XContentBuilder builder; builder = JsonXContent.contentBuilder() .startObject() .startObject("query") .startObject("range") .startObject("time") .field("lte", Long.parseLong(time)) .endObject() .endObject() .endObject() .endObject(); indexRequest.source(builder); String queryWhere = indexRequest.source().utf8ToString(); HttpEntity entity = new NStringEntity(queryWhere, ContentType.APPLICATION_JSON); Request request = new Request("POST", endpoint); request.setEntity(entity); client.getLowLevelClient().performRequest(request); } catch (Exception e) { e.printStackTrace(); } } /** * 获取es客户端 * * @return 客户端 */ private RestHighLevelClient getClient() { String params = sysConfigService.selectConfigByKey("out.es.connect"); JSONObject jsonObject = JSONObject.parseObject(params); String hostname = jsonObject.getString("hostname"); int port = jsonObject.getIntValue("port"); String scheme = jsonObject.getString("scheme"); return new RestHighLevelClient(RestClient.builder(new HttpHost(hostname, port, scheme))); } /** * 根据索引名共聚索引分片等信息 * * @param indexName 索引名 * @return */ private IndexDO getIndexInfo(String indexName) { try (RestHighLevelClient client = getClient()) { Request request = new Request("GET", "/" + indexName); HttpEntity entity = entity = client.getLowLevelClient().performRequest(request).getEntity(); String res = EntityUtils.toString(entity); //结果转json JSONObject json = JSONObject.parseObject(res); JSONObject indexObject = json.getJSONObject(indexName).getJSONObject("settings").getJSONObject("index"); //获取索引 number_of_shards 分片数 String numberOfShards = indexObject.getString("number_of_shards"); //获取索引 number_of_replicas 副本数 String numberOfReplicas = indexObject.getString("number_of_replicas"); IndexDO ido = new IndexDO(indexName, numberOfShards, numberOfReplicas); return ido; } catch (Exception e) { e.printStackTrace(); return null; } } // elasticdump --input http://localhost:9200/test --output /mnt/test_index_mapping.json --type=mapping // elasticdump --input http://localhost:9200/test --output /mnt/test_index_data.json --type=data // elasticdump --input ./test_index_mapping.json --output http://localhost:9200/ --type=mapping // elasticdump --input ./test_index_data.json --output http://192.168.188.61:9200/ --type=data }