Kaynağa Gözat

处理ES清空数据操作

wukai 1 yıl önce
ebeveyn
işleme
c9818ddb7a

+ 6 - 2
sync-admin/src/test/java/OutEsTest.java

@@ -3,7 +3,6 @@ import com.jjt.out.service.IOutEsService;
 import com.jjt.system.service.ISysConfigService;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.junit4.SpringRunner;
 
@@ -20,6 +19,11 @@ public class OutEsTest {
     @Test
     public void test() {
         System.err.println(configService.selectConfigByKey("out.dir.tmp"));
-        esService.generateSyncFile();
+        esService.exec();
+    }
+
+    @Test
+    public void clean() {
+        esService.clean();
     }
 }

+ 58 - 43
sync-in/src/main/java/com/jjt/in/service/impl/InEsServiceImpl.java

@@ -20,6 +20,8 @@ import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
 import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * 数据同步Service业务层处理
@@ -33,45 +35,6 @@ public class InEsServiceImpl extends InBaseService implements IInEsService {
     @Resource
     private ISysConfigService sysConfigService;
 
-    /**
-     * 获取es客户端
-     *
-     * @return 客户端
-     */
-    private RestHighLevelClient getClient() {
-        String params = sysConfigService.selectConfigByKey("in.es.connect");
-
-        JSONObject jsonObject = JSONObject.parseObject(params);
-        String hostname = jsonObject.getString("hostname");
-        int port = jsonObject.getIntValue("port");
-        String scheme = jsonObject.getString("scheme");
-
-        return new RestHighLevelClient(RestClient.builder(new HttpHost(hostname, port, scheme)));
-    }
-
-    private boolean createIndex(IndexDO ido) {
-        try (RestHighLevelClient client = getClient()) {
-            CreateIndexRequest request = new CreateIndexRequest(ido.getIndexName());
-
-            Settings.Builder settings = Settings.builder();
-            settings.put("index.number_of_shards", ido.getNumberOfShards());
-            settings.put("index.number_of_replicas", ido.getNumberOfReplicas());
-
-            request.settings(settings);
-
-            CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
-            return response.isAcknowledged();
-        } catch (Exception e) {
-            //如果是报已存在,就不管了噻
-            if (e.getMessage().indexOf(Constants.RESOURCE_ALREADY_EXISTS_EXCEPTION) == -1) {
-                log.error("创建出错啦:", e.getMessage());
-                return false;
-            } else {
-                return true;
-            }
-        }
-    }
-
 
     /**
      * 解析同步文件
@@ -109,11 +72,23 @@ public class InEsServiceImpl extends InBaseService implements IInEsService {
                             String dataName = "data_" + ido.getIndexName();
                             try {
                                 //导入mapping命令
-                                String mappingCmd = "/usr/bin/elasticdump --input " + dir + mappingName + " --output " + uri + " --type=mapping";
-                                LinuxCommand.exec(mappingCmd);
+                                List<String> commands = new ArrayList<>();
+                                commands.add("/usr/bin/elasticdump");
+                                commands.add("--input");
+                                commands.add(dir + mappingName + ".json");
+                                commands.add("--output");
+                                commands.add(uri);
+                                commands.add("--type=mapping");
+                                LinuxCommand.exec(commands);
                                 //导入data命令
-                                String dataCmd = "/usr/bin/elasticdump --input " + dir + dataName + " --output " + uri + " --type=data";
-                                LinuxCommand.exec(dataCmd);
+                                commands = new ArrayList<>();
+                                commands.add("/usr/bin/elasticdump");
+                                commands.add("--input");
+                                commands.add(dir + dataName + ".json");
+                                commands.add("--output");
+                                commands.add(uri);
+                                commands.add("--type=data");
+                                LinuxCommand.exec(commands);
                             } catch (Exception e) {
                                 log.error("执行命令出错:");
                                 throw new RuntimeException(e);
@@ -126,4 +101,44 @@ public class InEsServiceImpl extends InBaseService implements IInEsService {
             log.error("处理es出错啦:{}", e.getMessage());
         }
     }
+
+    private boolean createIndex(IndexDO ido) {
+        try (RestHighLevelClient client = getClient()) {
+            CreateIndexRequest request = new CreateIndexRequest(ido.getIndexName());
+
+            Settings.Builder settings = Settings.builder();
+            settings.put("index.number_of_shards", ido.getNumberOfShards());
+            settings.put("index.number_of_replicas", ido.getNumberOfReplicas());
+
+            request.settings(settings);
+
+            CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
+            return response.isAcknowledged();
+        } catch (Exception e) {
+            //如果是报已存在,就不管了噻
+            if (!e.getMessage().contains(Constants.RESOURCE_ALREADY_EXISTS_EXCEPTION)) {
+                log.error("创建出错啦:", e.getMessage());
+                return false;
+            } else {
+                return true;
+            }
+        }
+    }
+
+
+    /**
+     * 获取es客户端
+     *
+     * @return 客户端
+     */
+    private RestHighLevelClient getClient() {
+        String params = sysConfigService.selectConfigByKey("in.es.connect");
+
+        JSONObject jsonObject = JSONObject.parseObject(params);
+        String hostname = jsonObject.getString("hostname");
+        int port = jsonObject.getIntValue("port");
+        String scheme = jsonObject.getString("scheme");
+
+        return new RestHighLevelClient(RestClient.builder(new HttpHost(hostname, port, scheme)));
+    }
 }

+ 9 - 2
sync-out/src/main/java/com/jjt/out/service/IOutEsService.java

@@ -14,10 +14,17 @@ public interface IOutEsService {
      *
      * @return 索引信息列表
      */
-    public List<IndexDO> getAllIndex();
+    List<IndexDO> getAllIndex();
 
     /**
      * 生成同步文件
      */
-    public void generateSyncFile();
+    void exec();
+
+    /**
+     * 删除索引中的所有数据,不删除结构
+     */
+    void clean();
+
+
 }

+ 134 - 60
sync-out/src/main/java/com/jjt/out/service/impl/OutEsServiceImpl.java

@@ -2,29 +2,40 @@ package com.jjt.out.service.impl;
 
 import com.alibaba.fastjson.JSONObject;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.jjt.common.constant.Constants;
 import com.jjt.common.domain.FileDesc;
+import com.jjt.common.domain.IndexDO;
 import com.jjt.common.enums.SyncType;
 import com.jjt.common.utils.CompressZip;
 import com.jjt.common.utils.DateUtils;
-import com.jjt.common.domain.IndexDO;
 import com.jjt.common.utils.LinuxCommand;
 import com.jjt.out.service.IOutEsService;
 import com.jjt.system.service.ISysConfigService;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpHost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.entity.NStringEntity;
 import org.apache.http.util.EntityUtils;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.reindex.DeleteByQueryRequest;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
-import java.io.*;
+import java.io.File;
+import java.io.IOException;
 import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -40,21 +51,6 @@ public class OutEsServiceImpl extends OutBaseService implements IOutEsService {
     @Resource
     private ISysConfigService sysConfigService;
 
-    /**
-     * 获取es客户端
-     *
-     * @return 客户端
-     */
-    private RestHighLevelClient getClient() {
-        String params = sysConfigService.selectConfigByKey("out.es.connect");
-
-        JSONObject jsonObject = JSONObject.parseObject(params);
-        String hostname = jsonObject.getString("hostname");
-        int port = jsonObject.getIntValue("port");
-        String scheme = jsonObject.getString("scheme");
-
-        return new RestHighLevelClient(RestClient.builder(new HttpHost(hostname, port, scheme)));
-    }
 
     /**
      * 获取所有索引信息
@@ -73,20 +69,7 @@ public class OutEsServiceImpl extends OutBaseService implements IOutEsService {
                 //除开字符串index,那是表头数据,除开以.开头的默认索引
                 if (!"index".equals(name) && !name.startsWith(".")) {
                     //根据查询出来的索引名,获取索引信息
-                    request = new Request("GET", "/" + name);
-                    entity = client.getLowLevelClient().performRequest(request).getEntity();
-                    //结果转字符串
-                    res = EntityUtils.toString(entity);
-                    //结果转json
-                    JSONObject json = JSONObject.parseObject(res);
-
-                    JSONObject indexObject = json.getJSONObject(name).getJSONObject("settings").getJSONObject("index");
-                    //获取索引 number_of_shards 分片数
-                    String numberOfShards = indexObject.getString("number_of_shards");
-                    //获取索引 number_of_replicas 副本数
-                    String numberOfReplicas = indexObject.getString("number_of_replicas");
-
-                    IndexDO ido = new IndexDO(name, numberOfShards, numberOfReplicas);
+                    IndexDO ido = getIndexInfo(name);
 
                     list.add(ido);
                 }
@@ -98,17 +81,19 @@ public class OutEsServiceImpl extends OutBaseService implements IOutEsService {
         return null;
     }
 
+
     /**
      * 生成同步文件
      */
     @Override
-    public void generateSyncFile() {
+    public void exec() {
         String params = sysConfigService.selectConfigByKey("out.es.connect");
 
         JSONObject jsonObject = JSONObject.parseObject(params);
         String hostname = jsonObject.getString("hostname");
         int port = jsonObject.getIntValue("port");
         String scheme = jsonObject.getString("scheme");
+        String index = jsonObject.getString("index");
         //外网临时目录
         String tmpDir = tmpDIr();
 
@@ -116,36 +101,47 @@ public class OutEsServiceImpl extends OutBaseService implements IOutEsService {
 
         tmpDir += "es/" + time + "/";
 
-        //如果没有目录,则创建目录
-        File file = new File(tmpDir);
-        if (!file.exists()) {
-            file.mkdirs();
+        //创建目录及父目录
+        try {
+            Files.createDirectories(Paths.get(tmpDir));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
         }
 
-        List<IndexDO> list = getAllIndex();
-        for (IndexDO ido : list) {
-            String uri = scheme + "://" + hostname + ":" + port + "/" + ido.getIndexName();
-            String baseName = "base_" + ido.getIndexName() + ".json";
-            String mappingName = "mapping_" + ido.getIndexName() + ".json";
-            String dataName = "data_" + ido.getIndexName() + ".json";
-            //写入索引基本信息到文件
-            try {
-                File baseFile = new File(tmpDir + baseName);
-                ObjectMapper mapper = new ObjectMapper();
-                mapper.writeValue(baseFile, ido);
-            } catch (IOException ignored) {
-            }
+        IndexDO ido = getIndexInfo(index);
+        String uri = scheme + "://" + hostname + ":" + port + "/" + index;
+        String baseName = "base_" + ido.getIndexName() + ".json";
+        String mappingName = "mapping_" + ido.getIndexName() + ".json";
+        String dataName = "data_" + ido.getIndexName() + ".json";
+        //写入索引基本信息到文件
+        try {
+            File baseFile = new File(tmpDir + baseName);
+            ObjectMapper mapper = new ObjectMapper();
+            mapper.writeValue(baseFile, ido);
+        } catch (IOException ignored) {
+        }
 
-            try {
-                //导出mapping命令
-                String mappingCmd = "/usr/bin/elasticdump --input " + uri + " --output " + tmpDir + mappingName + " --type=mapping";
-                LinuxCommand.exec(mappingCmd);
-                //导出data命令
-                String dataCmd = "/usr/bin/elasticdump --input " + uri + " --output " + tmpDir + dataName + " --type=data";
-                LinuxCommand.exec(dataCmd);
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
+        try {
+            //执行导出mapping命令
+            List<String> commands = new ArrayList<>();
+            commands.add("/usr/bin/elasticdump");
+            commands.add("--input");
+            commands.add(uri);
+            commands.add("--output");
+            commands.add(tmpDir + mappingName);
+            commands.add("--type=mapping");
+            LinuxCommand.exec(commands);
+            //执行导出data命令
+            commands = new ArrayList<>();
+            commands.add("/usr/bin/elasticdump");
+            commands.add("--input");
+            commands.add(uri);
+            commands.add("--output");
+            commands.add(tmpDir + dataName);
+            commands.add("--type=data");
+            LinuxCommand.exec(commands);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
         }
 
         //获取外网同步正式目录
@@ -177,6 +173,84 @@ public class OutEsServiceImpl extends OutBaseService implements IOutEsService {
         //生成描述json文件--end
     }
 
+    /**
+     * 删除索引中的所有数据,不删除结构
+     */
+    @Override
+    public void clean() {
+        String params = sysConfigService.selectConfigByKey("out.es.connect");
+        JSONObject jsonObject = JSONObject.parseObject(params);
+        String index = jsonObject.getString("index");
+        try (RestHighLevelClient client = getClient()) {
+            String endpoint = "/" + index + "/_delete_by_query";
+            IndexRequest indexRequest = new IndexRequest();
+            XContentBuilder builder;
+            builder = JsonXContent.contentBuilder()
+                    .startObject()
+                    .startObject("query")
+                    .startObject("match_all")
+                    .endObject()
+                    .endObject()
+                    .endObject();
+            indexRequest.source(builder);
+            String queryWhere = indexRequest.source().utf8ToString();
+            HttpEntity entity = new NStringEntity(queryWhere, ContentType.APPLICATION_JSON);
+
+            Request request = new Request("POST", endpoint);
+            request.setEntity(entity);
+
+            client.getLowLevelClient().performRequest(request);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 获取es客户端
+     *
+     * @return 客户端
+     */
+    private RestHighLevelClient getClient() {
+        String params = sysConfigService.selectConfigByKey("out.es.connect");
+
+        JSONObject jsonObject = JSONObject.parseObject(params);
+        String hostname = jsonObject.getString("hostname");
+        int port = jsonObject.getIntValue("port");
+        String scheme = jsonObject.getString("scheme");
+
+        return new RestHighLevelClient(RestClient.builder(new HttpHost(hostname, port, scheme)));
+    }
+
+    /**
+     * 根据索引名共聚索引分片等信息
+     *
+     * @param indexName 索引名
+     * @return
+     */
+    private IndexDO getIndexInfo(String indexName) {
+        try (RestHighLevelClient client = getClient()) {
+            Request request = new Request("GET", "/" + indexName);
+            HttpEntity entity = entity = client.getLowLevelClient().performRequest(request).getEntity();
+
+            String res = EntityUtils.toString(entity);
+            //结果转json
+            JSONObject json = JSONObject.parseObject(res);
+
+            JSONObject indexObject = json.getJSONObject(indexName).getJSONObject("settings").getJSONObject("index");
+            //获取索引 number_of_shards 分片数
+            String numberOfShards = indexObject.getString("number_of_shards");
+            //获取索引 number_of_replicas 副本数
+            String numberOfReplicas = indexObject.getString("number_of_replicas");
+
+            IndexDO ido = new IndexDO(indexName, numberOfShards, numberOfReplicas);
+            return ido;
+        } catch (Exception e) {
+            e.printStackTrace();
+            return null;
+        }
+    }
+
+
 //    elasticdump --input http://localhost:9200/test --output /mnt/test_index_mapping.json --type=mapping
 //    elasticdump --input http://localhost:9200/test --output /mnt/test_index_data.json --type=data
 

+ 4 - 4
sync-out/src/main/java/com/jjt/out/task/EsOutTask.java

@@ -15,12 +15,12 @@ public class EsOutTask {
     @Resource
     private IOutEsService outEsService;
 
-    public void generateSyncFile() {
-        outEsService.generateSyncFile();
+    public void exec() {
+        outEsService.exec();
     }
 
-    public void esImport() {
-        System.out.println("执行有参方法:");
+    public void clean() {
+        outEsService.clean();
     }
 
 }

+ 24 - 2
sync-out/src/test/java/com/test/Test.java

@@ -4,17 +4,39 @@ import com.alibaba.fastjson.JSONObject;
 import com.jjt.common.enums.SyncType;
 import net.lingala.zip4j.ZipFile;
 import org.apache.commons.io.FileUtils;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.json.JsonXContent;
 
 import java.io.File;
+import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Date;
 
 public class Test {
     public static void main(String[] args) throws Exception {
+
+        IndexRequest indexRequest = new IndexRequest();
+        XContentBuilder builder;
+        try {
+            builder = JsonXContent.contentBuilder()
+                    .startObject()
+                    .startObject("query")
+                    .startObject("match_all")
+                    .endObject()
+                    .endObject()
+                    .endObject();
+            indexRequest.source(builder);
+        } catch (IOException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+        String source = indexRequest.source().utf8ToString();
+        System.err.println(source);
 //        Files.delete(Paths.get("D:\\SYSTEM\\Desktop\\temp\\zip"));
-        File file = new File("D:\\SYSTEM\\Desktop\\temp\\624");
-        FileUtils.deleteDirectory(file);
+//        File file = new File("D:\\SYSTEM\\Desktop\\temp\\624");
+//        FileUtils.deleteDirectory(file);
 //        System.err.println(file.getName());
 //
 //        JSONObject object = new JSONObject();

+ 0 - 40
sync-out/src/test/java/test/ElasticsearchSearchTest.java

@@ -1,40 +0,0 @@
-package test;
-
-import org.apache.http.HttpHost;
-import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
-import org.elasticsearch.client.GetAliasesResponse;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.cluster.metadata.AliasMetadata;
-
-import java.io.IOException;
-import java.util.*;
-
-public class ElasticsearchSearchTest {
-    public static void main(String[] args) throws IOException {
-        RestHighLevelClient client = new RestHighLevelClient(
-                RestClient.builder(
-                        new HttpHost("192.168.188.99", 19200, "http")
-                )
-
-        );
-        List<Map<String, Object>> resultList = new ArrayList<>();
-        GetAliasesRequest reqeust = new GetAliasesRequest();
-        GetAliasesResponse response = client.indices().getAlias(reqeust, RequestOptions.DEFAULT);
-        Map<String, Set<AliasMetadata>> map = response.getAliases();
-
-        map.forEach((k, v) -> {
-            if (!k.startsWith(".")) {
-                //忽略elasticesearch 默认的
-                Map map1 = new HashMap();
-                map1.put("indexName", k);
-                System.err.println(k + v);
-                resultList.add(map1);
-            }
-        });
-
-//        System.out.println(objectMapper.writeValueAsString(resultList));
-
-    }
-}