| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- package com.jjt.in.service.impl;
- import com.alibaba.fastjson.JSONObject;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.jjt.common.constant.Constants;
- import com.jjt.common.domain.IndexDO;
- import com.jjt.common.utils.LinuxCommand;
- import com.jjt.in.service.IInEsService;
- import com.jjt.system.service.ISysConfigService;
- import org.apache.http.HttpHost;
- import org.elasticsearch.client.RequestOptions;
- import org.elasticsearch.client.RestClient;
- import org.elasticsearch.client.RestHighLevelClient;
- import org.elasticsearch.client.indices.CreateIndexRequest;
- import org.elasticsearch.client.indices.CreateIndexResponse;
- import org.elasticsearch.common.settings.Settings;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.stereotype.Service;
- import javax.annotation.Resource;
- import java.io.*;
- import java.util.ArrayList;
- import java.util.List;
- /**
- * 数据同步Service业务层处理
- *
- * @author wukai
- * @date 2023-06-06
- */
- @Service
- public class InEsServiceImpl extends InBaseService implements IInEsService {
- private static final Logger log = LoggerFactory.getLogger(InEsServiceImpl.class);
- @Resource
- private ISysConfigService sysConfigService;
- /**
- * 解析同步文件
- */
- @Override
- public void parseSyncFile(String dir) {
- String params = sysConfigService.selectConfigByKey("in.es.connect");
- JSONObject jsonObject = JSONObject.parseObject(params);
- String hostname = jsonObject.getString("hostname");
- int port = jsonObject.getIntValue("port");
- String scheme = jsonObject.getString("scheme");
- if (!dir.endsWith(Constants.DIR_END)) {
- dir += "/";
- }
- try {
- // 创建File对象
- File file = new File(dir);
- if (file.isDirectory()) {
- // 判断File对象对应的目录是否存在
- String[] names = file.list();
- // 获得目录下的所有文件的文件名
- for (String name : names) {
- if (name.startsWith("base_")) {
- File baseFile = new File(dir + name);
- ObjectMapper mapper = new ObjectMapper();
- IndexDO ido = mapper.readValue(baseFile, IndexDO.class);
- boolean flag = createIndex(ido);
- if (flag) {
- String uri = scheme + "://" + hostname + ":" + port + "/" + ido.getIndexName();
- String mappingName = "mapping_" + ido.getIndexName();
- String dataName = "data_" + ido.getIndexName();
- try {
- //导入mapping命令
- List<String> commands = new ArrayList<>();
- commands.add("/usr/bin/elasticdump");
- commands.add("--input");
- commands.add(dir + mappingName + ".json");
- commands.add("--output");
- commands.add(uri);
- commands.add("--type=mapping");
- LinuxCommand.exec(commands);
- //导入data命令
- commands = new ArrayList<>();
- commands.add("/usr/bin/elasticdump");
- commands.add("--input");
- commands.add(dir + dataName + ".json");
- commands.add("--output");
- commands.add(uri);
- commands.add("--type=data");
- LinuxCommand.exec(commands);
- } catch (Exception e) {
- log.error("执行命令出错:");
- throw new RuntimeException(e);
- }
- }
- }
- }
- }
- } catch (IOException e) {
- log.error("处理es出错啦:{}", e.getMessage());
- }
- }
- private boolean createIndex(IndexDO ido) {
- try (RestHighLevelClient client = getClient()) {
- CreateIndexRequest request = new CreateIndexRequest(ido.getIndexName());
- Settings.Builder settings = Settings.builder();
- settings.put("index.number_of_shards", ido.getNumberOfShards());
- settings.put("index.number_of_replicas", ido.getNumberOfReplicas());
- request.settings(settings);
- CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
- return response.isAcknowledged();
- } catch (Exception e) {
- //如果是报已存在,就不管了噻
- if (!e.getMessage().contains(Constants.RESOURCE_ALREADY_EXISTS_EXCEPTION)) {
- log.error("创建出错啦:", e.getMessage());
- return false;
- } else {
- return true;
- }
- }
- }
- /**
- * 获取es客户端
- *
- * @return 客户端
- */
- private RestHighLevelClient getClient() {
- String params = sysConfigService.selectConfigByKey("in.es.connect");
- JSONObject jsonObject = JSONObject.parseObject(params);
- String hostname = jsonObject.getString("hostname");
- int port = jsonObject.getIntValue("port");
- String scheme = jsonObject.getString("scheme");
- return new RestHighLevelClient(RestClient.builder(new HttpHost(hostname, port, scheme)));
- }
- }
|