Browse Source

mongodb 增量备份改变调用方式,使用shell脚本

wukai 1 year ago
parent
commit
88ebfbfe36

+ 55 - 29
sync-common/src/main/java/com/jjt/common/utils/LinuxCommand.java

@@ -48,31 +48,6 @@ public class LinuxCommand {
         }
         }
     }
     }
 
 
-    private static void print(Process process) {
-        // 获取子进程的输入流
-        InputStream inputStream = process.getInputStream();
-        // 获取子进程的错误流
-        InputStream errorStream = process.getErrorStream();
-        // 获取子进程的输出流
-        OutputStream outputStream = process.getOutputStream();
-        try (PrintWriter pw = new PrintWriter(new BufferedWriter(new OutputStreamWriter(outputStream)), true);
-             BufferedReader readInput = new BufferedReader(new InputStreamReader(inputStream));
-             BufferedReader readError = new BufferedReader(new InputStreamReader(errorStream))) {
-            pw.println("exit");
-
-            String line;
-            log.info("子进程输入流:");
-            while ((line = readInput.readLine()) != null) {
-                log.info(line);
-            }
-            log.info("子进程错误流:");
-            while ((line = readError.readLine()) != null) {
-                log.info(line);
-            }
-        } catch (IOException e) {
-            log.error("出错啦:{}", e.getMessage());
-        }
-    }
 
 
     /**
     /**
      * 执行mysql导出命令
      * 执行mysql导出命令
@@ -97,21 +72,72 @@ public class LinuxCommand {
     }
     }
 
 
     /**
     /**
-     * 执行mysql导出命令
+     * List执行命令,处理mysql导入啊,Mongo啊
      *
      *
      * @param commands 命令分段,必须要分割,不能直接拼好一条add进来会报错,commands中的String不需要加空格
      * @param commands 命令分段,必须要分割,不能直接拼好一条add进来会报错,commands中的String不需要加空格
      * @throws Exception 异常说明
      * @throws Exception 异常说明
      */
      */
-    public static void mysqlImport(List<String> commands) throws Exception {
+    public static void exec(List<String> commands) throws Exception {
         ProcessBuilder processBuilder = new ProcessBuilder(commands);
         ProcessBuilder processBuilder = new ProcessBuilder(commands);
         Process process = processBuilder.start();
         Process process = processBuilder.start();
 
 
         print(process);
         print(process);
 
 
         int code = process.waitFor();
         int code = process.waitFor();
-        log.info("mysql备份执行状态:{}", code);
+        log.info("命令执行状态:{}", code);
         if (code != 0) {
         if (code != 0) {
-            throw new Exception("执行mysql备份命令出错啦!");
+            throw new Exception("执行命令出错啦!");
+        }
+    }
+
+    /**
+     * 执行shell脚本
+     *
+     * @param commands 命令分段,必须要分割,不能直接拼好一条add进来会报错,commands中的String不需要加空格
+     * @param dir      脚本路径
+     * @throws Exception 异常说明
+     */
+    public static void exec(List<String> commands, String dir) throws Exception {
+        ProcessBuilder processBuilder = new ProcessBuilder(commands);
+        processBuilder.directory(new File(dir));
+        Process process = processBuilder.start();
+
+        print(process);
+
+        int code = process.waitFor();
+        log.info("命令执行状态:{}", code);
+        if (code != 0) {
+            throw new Exception("执行命令出错啦!");
+        }
+    }
+
+    /**
+     *打印执行日志
+     * @param process 进程
+     */
+    private static void print(Process process) {
+        // 获取子进程的输入流
+        InputStream inputStream = process.getInputStream();
+        // 获取子进程的错误流
+        InputStream errorStream = process.getErrorStream();
+        // 获取子进程的输出流
+        OutputStream outputStream = process.getOutputStream();
+        try (PrintWriter pw = new PrintWriter(new BufferedWriter(new OutputStreamWriter(outputStream)), true);
+             BufferedReader readInput = new BufferedReader(new InputStreamReader(inputStream));
+             BufferedReader readError = new BufferedReader(new InputStreamReader(errorStream))) {
+            pw.println("exit");
+
+            String line;
+            log.info("子进程输入流:");
+            while ((line = readInput.readLine()) != null) {
+                log.info(line);
+            }
+            log.info("子进程错误流:");
+            while ((line = readError.readLine()) != null) {
+                log.info(line);
+            }
+        } catch (IOException e) {
+            log.error("出错啦:{}", e.getMessage());
         }
         }
     }
     }
 
 

+ 1 - 8
sync-in/src/main/java/com/jjt/in/service/impl/InProcessServiceImpl.java

@@ -4,17 +4,11 @@ import com.alibaba.fastjson.JSONObject;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.jjt.common.constant.Constants;
 import com.jjt.common.constant.Constants;
 import com.jjt.common.domain.FileDesc;
 import com.jjt.common.domain.FileDesc;
-import com.jjt.common.domain.IndexDO;
-import com.jjt.common.enums.SyncType;
 import com.jjt.common.utils.*;
 import com.jjt.common.utils.*;
 import com.jjt.in.service.IInEsService;
 import com.jjt.in.service.IInEsService;
 import com.jjt.in.service.IInProcessService;
 import com.jjt.in.service.IInProcessService;
 import com.jjt.system.service.ISysConfigService;
 import com.jjt.system.service.ISysConfigService;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.commons.compress.archivers.zip.ZipUtil;
-import org.apache.http.HttpHost;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestHighLevelClient;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 import org.springframework.stereotype.Service;
@@ -22,7 +16,6 @@ import org.springframework.stereotype.Service;
 import javax.annotation.Resource;
 import javax.annotation.Resource;
 import java.io.File;
 import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
-import java.nio.file.CopyOption;
 import java.nio.file.Files;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.nio.file.StandardCopyOption;
@@ -180,7 +173,7 @@ public class InProcessServiceImpl extends InBaseService implements IInProcessSer
 
 
         //组装导入命令
         //组装导入命令
         try {
         try {
-            LinuxCommand.mysqlImport(commands);
+            LinuxCommand.exec(commands);
         } catch (Exception e) {
         } catch (Exception e) {
             log.error("导入mysql文件出错啦:{}", e.getMessage());
             log.error("导入mysql文件出错啦:{}", e.getMessage());
             e.printStackTrace();
             e.printStackTrace();

+ 52 - 42
sync-out/src/main/java/com/jjt/out/service/impl/OutMongoServiceImpl.java

@@ -60,9 +60,7 @@ public class OutMongoServiceImpl extends OutBaseService implements IOutMongoServ
 
 
         JSONObject mongoInfo = JSONObject.parseObject(params);
         JSONObject mongoInfo = JSONObject.parseObject(params);
         String host = mongoInfo.getString("host");
         String host = mongoInfo.getString("host");
-        int port = mongoInfo.getIntValue("port");
-        String user = mongoInfo.getString("user");
-        String pass = mongoInfo.getString("pass");
+        String port = mongoInfo.getString("port");
 
 
         String tmpDir = tmpDIr();
         String tmpDir = tmpDIr();
         OutProcessInfo opi = new OutProcessInfo();
         OutProcessInfo opi = new OutProcessInfo();
@@ -73,49 +71,61 @@ public class OutMongoServiceImpl extends OutBaseService implements IOutMongoServ
         String nowTime = DateUtils.dateTimeNow();
         String nowTime = DateUtils.dateTimeNow();
 
 
         tmpDir += "mongo/" + nowTime + "/";
         tmpDir += "mongo/" + nowTime + "/";
-        String queryStr = String.format("{\"ts\":{\"$gt\":{\"$timestamp\":{\"t\":%s,\"i\":1}}},\"op\":{\"$ne\":\"d\"},\"op\":{\"$ne\":\"n\"}}", time);
-        String cmd = String.format("/usr/bin/mongodump --host %s --port %s  -d local -c oplog.rs -q '%s' -o %s", host, port, queryStr, tmpDir);
-        opi = new OutProcessInfo();
-        opi.setProcessType(SyncType.mongo.toString());
-        Date st = new Date();
-        opi.setCreateTime(st);
         try {
         try {
-            LinuxCommand.exec(cmd);
+            //创建目录及父目录
+            Files.createDirectories(Paths.get(tmpDir));
+
+            opi = new OutProcessInfo();
+            opi.setProcessType(SyncType.mongo.toString());
+            Date st = new Date();
+            opi.setCreateTime(st);
+            opi.setProcessKey(String.valueOf(System.currentTimeMillis() / 1000));
+
+            //组装导出命令
+            List<String> commands = new ArrayList<>();
+            commands.add("/usr/bin/sh");
+            commands.add("mongo-inc-bak.sh");
+            commands.add(host);
+            commands.add(port);
+            commands.add(tmpDir);
+
+            LinuxCommand.exec(commands, "/mnt/");
+
+            Date et = new Date();
+            opi.setCostTime(et.getTime() - st.getTime());
+            processInfoService.insertOutProcessInfo(opi);
+
+
+            //获取外网同步正式目录
+            String syncDir = syncDIr();
+
+            //打包文件--start
+            //生成zip文件全路径名
+            String zipName = "sync-mongo-" + time + ".zip";
+
+            //打包目标目录
+            File targetDir = new File(tmpDir);
+            File zipFile = new File(syncDir + zipName);
+            CompressZip.zip(targetDir, zipFile);
+            //打包文件--end
+
+            //生成描述json文件--start
+            try {
+                String descName = syncDir + "sync-70-" + time + ".json";
+                String md5 = DigestUtils.md5Hex(Files.newInputStream(zipFile.toPath()));
+                FileDesc desc = new FileDesc();
+                desc.setName(zipName);
+                desc.setMd5(md5);
+                desc.setType(SyncType.mongo);
+                File descFile = new File(descName);
+                ObjectMapper mapper = new ObjectMapper();
+                mapper.writeValue(descFile, desc);
+            } catch (IOException e) {
+            }
+            //生成描述json文件--end
         } catch (Exception e) {
         } catch (Exception e) {
             log.error("报错啦:{}", e.getMessage());
             log.error("报错啦:{}", e.getMessage());
             e.printStackTrace();
             e.printStackTrace();
         }
         }
-        Date et = new Date();
-        opi.setCostTime(et.getTime() - st.getTime());
-        processInfoService.insertOutProcessInfo(opi);
-
-
-        //获取外网同步正式目录
-        String syncDir = syncDIr();
-
-        //打包文件--start
-        //生成zip文件全路径名
-        String zipName = "sync-mongo-" + time + ".zip";
-
-        //打包目标目录
-        File targetDir = new File(tmpDir);
-        File zipFile = new File(syncDir + zipName);
-        CompressZip.zip(targetDir, zipFile);
-        //打包文件--end
-
-        //生成描述json文件--start
-        try {
-            String descName = syncDir + "sync-70-" + time + ".json";
-            String md5 = DigestUtils.md5Hex(Files.newInputStream(zipFile.toPath()));
-            FileDesc desc = new FileDesc();
-            desc.setName(zipName);
-            desc.setMd5(md5);
-            desc.setType(SyncType.mongo);
-            File descFile = new File(descName);
-            ObjectMapper mapper = new ObjectMapper();
-            mapper.writeValue(descFile, desc);
-        } catch (IOException e) {
-        }
-        //生成描述json文件--end
     }
     }
 }
 }

+ 6 - 0
sync-out/src/main/resources/mongo-inc-bak.sh

@@ -0,0 +1,6 @@
+#!/bin/bash
+host=$1
+port=$2
+ts=$3
+dir=$4
+/usr/bin/mongodump --host ${host} --port ${port}  -d local -c oplog.rs -q '{"ts":{"$gt":{"$timestamp":{"t":'${ts}',"i":1}}},"op":{"$nin":["d","n"]}}' -o ${dir}