package com.jjt.in.service.impl; import com.alibaba.fastjson.JSONObject; import com.fasterxml.jackson.databind.ObjectMapper; import com.jjt.common.constant.Constants; import com.jjt.common.domain.IndexDO; import com.jjt.common.utils.LinuxCommand; import com.jjt.in.service.IInEsService; import com.jjt.system.service.ISysConfigService; import org.apache.http.HttpHost; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; import org.elasticsearch.common.settings.Settings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.io.*; import java.util.ArrayList; import java.util.List; /** * 数据同步Service业务层处理 * * @author wukai * @date 2023-06-06 */ @Service public class InEsServiceImpl extends InBaseService implements IInEsService { private static final Logger log = LoggerFactory.getLogger(InEsServiceImpl.class); @Resource private ISysConfigService sysConfigService; /** * 解析同步文件 */ @Override public void parseSyncFile(String dir) { String params = sysConfigService.selectConfigByKey("in.es.connect"); JSONObject jsonObject = JSONObject.parseObject(params); String hostname = jsonObject.getString("hostname"); int port = jsonObject.getIntValue("port"); String scheme = jsonObject.getString("scheme"); if (!dir.endsWith(Constants.DIR_END)) { dir += "/"; } try { // 创建File对象 File file = new File(dir); if (file.isDirectory()) { // 判断File对象对应的目录是否存在 String[] names = file.list(); // 获得目录下的所有文件的文件名 for (String name : names) { if (name.startsWith("base_")) { File baseFile = new File(dir + name); ObjectMapper mapper = new ObjectMapper(); IndexDO ido = mapper.readValue(baseFile, IndexDO.class); boolean flag = createIndex(ido); if (flag) { String uri = scheme + "://" + hostname + ":" + port + "/" + ido.getIndexName(); String mappingName = "mapping_" + ido.getIndexName(); String dataName = "data_" + ido.getIndexName(); try { //导入mapping命令 List commands = new ArrayList<>(); commands.add("/usr/bin/elasticdump"); commands.add("--input"); commands.add(dir + mappingName + ".json"); commands.add("--output"); commands.add(uri); commands.add("--type=mapping"); LinuxCommand.exec(commands); //导入data命令 commands = new ArrayList<>(); commands.add("/usr/bin/elasticdump"); commands.add("--input"); commands.add(dir + dataName + ".json"); commands.add("--output"); commands.add(uri); commands.add("--type=data"); LinuxCommand.exec(commands); } catch (Exception e) { log.error("执行命令出错:"); throw new RuntimeException(e); } } } } } } catch (IOException e) { log.error("处理es出错啦:{}", e.getMessage()); } } private boolean createIndex(IndexDO ido) { try (RestHighLevelClient client = getClient()) { CreateIndexRequest request = new CreateIndexRequest(ido.getIndexName()); Settings.Builder settings = Settings.builder(); settings.put("index.number_of_shards", ido.getNumberOfShards()); settings.put("index.number_of_replicas", ido.getNumberOfReplicas()); request.settings(settings); CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT); return response.isAcknowledged(); } catch (Exception e) { //如果是报已存在,就不管了噻 if (!e.getMessage().contains(Constants.RESOURCE_ALREADY_EXISTS_EXCEPTION)) { log.error("创建出错啦:", e.getMessage()); return false; } else { return true; } } } /** * 获取es客户端 * * @return 客户端 */ private RestHighLevelClient getClient() { String params = sysConfigService.selectConfigByKey("in.es.connect"); JSONObject jsonObject = JSONObject.parseObject(params); String hostname = jsonObject.getString("hostname"); int port = jsonObject.getIntValue("port"); String scheme = jsonObject.getString("scheme"); return new RestHighLevelClient(RestClient.builder(new HttpHost(hostname, port, scheme))); } }