Forráskód Böngészése

Mongo增量备份恢复时,硬解析bson文件操作。

wukai 1 éve
szülő
commit
cf73eb1ee4

+ 1 - 1
sync-admin/src/main/resources/application-dev.yml

@@ -6,7 +6,7 @@ spring:
     druid:
       # 主库数据源
       master:
-        url: jdbc:mysql://192.168.188.62:3306/xjsync?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
+        url: jdbc:mysql://192.168.188.60:3306/xjsync?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
         username: root
         password: 123456
       # 从库数据源

+ 71 - 0
sync-admin/src/test/java/BsonDump.java

@@ -0,0 +1,71 @@
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import com.mongodb.DBObject;
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import org.bson.*;
+import com.mongodb.util.JSON;
+import org.bson.conversions.Bson;
+
+
+public class BsonDump {
+
+    public void bsonDump(String filename) throws FileNotFoundException {
+        File file = new File(filename);
+        InputStream inputStream = new BufferedInputStream(new FileInputStream(file));
+
+        BSONDecoder decoder = new BasicBSONDecoder();
+        int count = 0;
+        try {
+            while (inputStream.available() > 0) {
+
+                BSONObject obj = decoder.readObject(inputStream);
+                BsonDocument bsonDocument = new BsonDocument();
+                if (obj == null) {
+                    break;
+                }
+
+                if (obj.get("op").equals("i")) {
+
+                }
+//                Document doc = Document.parse;
+//                System.out.println(JSON.serialize(obj));
+//                MongoClient mongoClient = new MongoClient("192.168.188.62", 27017);
+//                Document doc=Document.parse(JSON.serialize(obj.get("o")));
+//                MongoDatabase database = mongoClient.getDatabase("test1");
+//
+//                MongoCollection<DBObject> collection = database.getCollection("col", DBObject.class);
+//                DBObject bson = (DBObject) JSON.parse(JSON.serialize(obj.get("o")));
+//                collection.insertOne(bson);
+//                collection.deleteOne(doc);
+//                collection.updateOne(bson);
+                System.out.println(obj);
+                count++;
+
+            }
+        } catch (IOException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        } finally {
+            try {
+                inputStream.close();
+            } catch (IOException e) {
+            }
+        }
+        System.err.println(String.format("%s objects read", count));
+    }
+
+    public static void main(String[] args) throws Exception {
+        String filename = "D:\\SYSTEM\\Desktop\\temp\\mongo\\oplog.rs.bson";
+        BsonDump bsonDump = new BsonDump();
+        bsonDump.bsonDump(filename);
+
+    }
+
+}

+ 132 - 0
sync-admin/src/test/java/BsonTest.java

@@ -0,0 +1,132 @@
+import com.mongodb.BasicDBObject;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoWriteException;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.model.IndexModel;
+import com.mongodb.client.model.IndexOptions;
+import org.bson.*;
+import org.bson.conversions.Bson;
+import org.bson.types.BasicBSONList;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+
+public class BsonTest {
+    public static void main(String[] args) throws Exception {
+        String filename = "D:\\SYSTEM\\Desktop\\temp\\mongo\\sync-mongo-20230805191227\\20230805191227\\local\\oplog.rs.bson";
+        BsonTest bsonDump = new BsonTest();
+        bsonDump.bsonDump(filename);
+
+    }
+
+    public void bsonDump(String filename) throws FileNotFoundException {
+        File file = new File(filename);
+        BSONDecoder decoder = new BasicBSONDecoder();
+        int count = 0;
+        MongoClient mongoClient = new MongoClient("192.168.188.62", 27017);
+        List<String> dbs = new ArrayList<>();
+        for (String db : mongoClient.listDatabaseNames()) {
+            dbs.add(db);
+        }
+        try (InputStream inputStream = new BufferedInputStream(new FileInputStream(file))) {
+            while (inputStream.available() > 0) {
+
+                BSONObject obj = decoder.readObject(inputStream);
+                if (obj == null) {
+                    break;
+                }
+                String op = obj.get("op").toString();
+                String ns = obj.get("ns").toString();
+                if (ns.startsWith("config.system.")) {
+                    continue;
+                }
+                String[] nss = ns.split("\\.");
+                if (!dbs.contains(nss[0])) {
+                    System.err.println("wdnmd" + nss[0]);
+//                    MongoDatabase mdb = mongoClient.getDatabase("admin");
+//                    mdb.runCommand(new Document("enablesharding", nss[0]));
+                }
+                MongoDatabase database = mongoClient.getDatabase(nss[0]);
+                MongoCollection<BasicBSONObject> collection = database.getCollection(nss[1], BasicBSONObject.class);
+                BasicBSONObject bson = (BasicBSONObject) obj.get("o");
+                if ("i".equals(op)) {
+                    //读取数据插入
+                    try {
+                        collection.insertOne(bson);
+                    } catch (MongoWriteException e1) {
+//                        System.err.println("error");
+                    }
+                } else if ("u".equals(op)) {
+                    BasicBSONObject bson2 = (BasicBSONObject) obj.get("o2");
+                    Bson f = Filters.eq("_id", bson2.get("_id"));
+                    BasicBSONObject set = (BasicBSONObject) bson.get("$set");
+                    Document x = new Document("$set", set);
+                    collection.updateOne(f, x);
+                } else if ("c".equals(op)) {
+                    System.err.println(bson);
+                    if (bson.get("commitIndexBuild") != null) {
+                        MongoCollection<Document> coll = database.getCollection(bson.get("commitIndexBuild").toString());
+                        BasicBSONList indexes = (BasicBSONList) bson.get("indexes");
+
+                        List<IndexModel> indexModels = new ArrayList<>();
+                        //组合索引
+
+                        for (int i = 0; i < indexes.size(); i++) {
+                            BasicDBObject keyObj = new BasicDBObject();
+                            BasicBSONObject index = (BasicBSONObject) indexes.get(i);
+                            BasicBSONObject keys = (BasicBSONObject) index.get("key");
+                            for (String s : keys.keySet()) {
+                                keyObj.put(s, keys.get(s));
+                            }
+
+                            //添加配置
+                            IndexOptions indexOptions = new IndexOptions();
+                            if (index.get("unique") != null) {
+                                indexOptions.unique(index.getBoolean("unique"));
+                            }
+                            if (index.get("background") != null) {
+                                indexOptions.background(index.getBoolean("background"));
+                            }
+                            //索引名称
+                            indexOptions.name(index.get("name").toString());
+                            indexModels.add(new IndexModel(keyObj, indexOptions));
+                        }
+                        coll.createIndexes(indexModels);
+                    } else if (bson.get("createIndexes") != null) {
+                        MongoCollection<Document> coll = database.getCollection(bson.get("createIndexes").toString());
+
+                        List<IndexModel> indexModels = new ArrayList<>();
+                        //组合索引
+
+                        BasicDBObject keyObj = new BasicDBObject();
+                        BasicBSONObject keys = (BasicBSONObject) bson.get("key");
+                        for (String s : keys.keySet()) {
+                            keyObj.put(s, keys.get(s));
+                        }
+
+                        //添加配置
+                        IndexOptions indexOptions = new IndexOptions();
+                        if (bson.get("unique") != null) {
+                            indexOptions.unique(bson.getBoolean("unique"));
+                        }
+                        if (bson.get("background") != null) {
+                            indexOptions.background(bson.getBoolean("background"));
+                        }
+                        //索引名称
+                        indexOptions.name(bson.get("name").toString());
+                        indexModels.add(new IndexModel(keyObj, indexOptions));
+                        coll.createIndexes(indexModels);
+                    }
+                }
+                count++;
+
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        System.err.println(String.format("%s objects read", count));
+    }
+}

+ 21 - 0
sync-admin/src/test/java/FileTest.java

@@ -0,0 +1,21 @@
+import com.alibaba.fastjson.JSONObject;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.stream.Stream;
+
+public class FileTest {
+    public static void main(String[] args) {
+        Path path = Paths.get("D:\\SYSTEM\\Desktop\\temp\\mongo\\test.json");
+        try (Stream<String> stream = Files.lines(path)) {
+            stream.forEach(l -> {
+                JSONObject json = JSONObject.parseObject(l);
+                System.err.println(json.getString("o"));
+            });
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+}

+ 8 - 0
sync-admin/src/test/java/InEsTest.java

@@ -21,6 +21,14 @@ public class InEsTest {
     private IInProcessService processService;
 
     @Test
+    public void zpiT() {
+        String size = configService.selectConfigByKey("file.split.size");
+        //GB转换成byte
+        long splitSize = 1024 * 1024 * 1024 * Long.parseLong(size);
+        System.err.println(splitSize);
+    }
+
+    @Test
     public void test() {
         String dir = "/data/sync/tmp/es/20230607163048";
         esService.parseSyncFile(dir);

+ 41 - 0
sync-admin/src/test/java/MongoTest.java

@@ -0,0 +1,41 @@
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.MongoIterable;
+import org.bson.Document;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class MongoTest {
+    public static void main(String[] args) throws Exception {
+        MongoClient mongoClient = new MongoClient("192.168.100.13", 27017);
+        MongoDatabase database = mongoClient.getDatabase("test");
+        database.runCommand(new Document("enablesharding", "test"));
+
+        List<String> dbs = new ArrayList<>((Collection) mongoClient.listDatabaseNames());
+
+        System.err.println(dbs);
+
+
+//        MongoCollection<Document> collection = database.getCollection("sync_test");
+//        List<IndexModel> indexModels = new ArrayList<>();
+//        //组合索引
+//        BasicDBObject index1 = new BasicDBObject();
+//        //"name"->索引列  1/-1 正序/倒序
+//        index1.put("item", 1);
+//        index1.put("qty", -1);
+//        //添加配置
+//        IndexOptions indexOptions = new IndexOptions();
+//        indexOptions.background(true);
+//        indexOptions.unique(true);
+//        //索引名称
+//        indexOptions.name("xx_1");
+//        indexModels.add(new IndexModel(index1, indexOptions));
+//        System.err.println("name_age_birthday 开始创建索引");
+//        collection.createIndexes(indexModels);
+
+    }
+
+
+}

+ 6 - 0
sync-framework/pom.xml

@@ -70,6 +70,12 @@
             <groupId>com.github.oshi</groupId>
             <artifactId>oshi-core</artifactId>
         </dependency>
+        <!-- 引入mongodb driver-->
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongo-java-driver</artifactId>
+            <version>3.12.14</version>
+        </dependency>
 
         <!-- 系统模块-->
         <dependency>

+ 5 - 2
sync-in/pom.xml

@@ -16,7 +16,11 @@
     </description>
 
     <dependencies>
-
+        <!-- 核心模块-->
+        <dependency>
+            <groupId>com.jjt</groupId>
+            <artifactId>jjt-framework</artifactId>
+        </dependency>
         <!-- 通用工具-->
         <dependency>
             <groupId>com.jjt</groupId>
@@ -64,7 +68,6 @@
             <groupId>org.projectlombok</groupId>
             <artifactId>lombok</artifactId>
         </dependency>
-
     </dependencies>
 
 </project>

+ 128 - 11
sync-in/src/main/java/com/jjt/in/service/impl/InMongoServiceImpl.java

@@ -1,27 +1,30 @@
 package com.jjt.in.service.impl;
 
 import com.alibaba.fastjson.JSONObject;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.jjt.common.constant.Constants;
-import com.jjt.common.domain.IndexDO;
 import com.jjt.common.utils.LinuxCommand;
-import com.jjt.in.service.IInEsService;
 import com.jjt.in.service.IInMongoService;
 import com.jjt.system.service.ISysConfigService;
-import org.apache.http.HttpHost;
-import org.elasticsearch.client.RequestOptions;
-import org.elasticsearch.client.RestClient;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.client.indices.CreateIndexRequest;
-import org.elasticsearch.client.indices.CreateIndexResponse;
-import org.elasticsearch.common.settings.Settings;
+import com.mongodb.BasicDBObject;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoWriteException;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.model.IndexModel;
+import com.mongodb.client.model.IndexOptions;
+import org.bson.*;
+import org.bson.conversions.Bson;
+import org.bson.types.BasicBSONList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
+import java.io.BufferedInputStream;
 import java.io.File;
-import java.io.IOException;
+import java.io.FileInputStream;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -47,6 +50,7 @@ public class InMongoServiceImpl extends InBaseService implements IInMongoService
         JSONObject mongoInfo = JSONObject.parseObject(params);
         String host = mongoInfo.getString("host");
         String port = mongoInfo.getString("port");
+        String sharding = mongoInfo.getString("sharding");
         if (!dir.endsWith(Constants.DIR_END)) {
             dir += "/";
         }
@@ -56,6 +60,10 @@ public class InMongoServiceImpl extends InBaseService implements IInMongoService
             //如果是增量,则需要指定 local/oplog.rs.bson
             //如果是全量,只需指定目录即可
             filaName += "local/oplog.rs.bson";
+            if ("1".equals(sharding)) {
+                //判断是否分片集群
+                sharding(host, port, filaName);
+            }
         }
         try {
             List<String> command = new ArrayList<>();
@@ -75,4 +83,113 @@ public class InMongoServiceImpl extends InBaseService implements IInMongoService
             throw new Exception(e.getMessage());
         }
     }
+
+    /**
+     * 如果是分片集群,则单独处理
+     *
+     * @param host
+     * @param port
+     * @param filename
+     */
+    private void sharding(String host, String port, String filename) {
+        File file = new File(filename);
+        BSONDecoder decoder = new BasicBSONDecoder();
+        MongoClient mongoClient = new MongoClient("192.168.188.62", 27017);
+        List<String> dbs = new ArrayList<>();
+        for (String db : mongoClient.listDatabaseNames()) {
+            dbs.add(db);
+        }
+        try (InputStream inputStream = new BufferedInputStream(new FileInputStream(file))) {
+            while (inputStream.available() > 0) {
+
+                BSONObject obj = decoder.readObject(inputStream);
+                if (obj == null) {
+                    break;
+                }
+                String op = obj.get("op").toString();
+                String ns = obj.get("ns").toString();
+                if (ns.startsWith("config.system.")) {
+                    continue;
+                }
+                String[] nss = ns.split("\\.");
+                if (!dbs.contains(nss[0])) {
+                    MongoDatabase mdb = mongoClient.getDatabase("admin");
+                    mdb.runCommand(new Document("enablesharding", nss[0]));
+                }
+                MongoDatabase database = mongoClient.getDatabase(nss[0]);
+                MongoCollection<BasicBSONObject> collection = database.getCollection(nss[1], BasicBSONObject.class);
+                BasicBSONObject bson = (BasicBSONObject) obj.get("o");
+                if ("i".equals(op)) {
+                    //读取数据插入
+                    try {
+                        collection.insertOne(bson);
+                    } catch (MongoWriteException e1) {
+                        //重复插入会报错,不管他
+                    }
+                } else if ("u".equals(op)) {
+                    BasicBSONObject bson2 = (BasicBSONObject) obj.get("o2");
+                    Bson f = Filters.eq("_id", bson2.get("_id"));
+                    BasicBSONObject set = (BasicBSONObject) bson.get("$set");
+                    Document x = new Document("$set", set);
+                    collection.updateOne(f, x);
+                } else if ("c".equals(op)) {
+                    if (bson.get("commitIndexBuild") != null) {
+                        MongoCollection<Document> coll = database.getCollection(bson.get("commitIndexBuild").toString());
+                        BasicBSONList indexes = (BasicBSONList) bson.get("indexes");
+
+                        List<IndexModel> indexModels = new ArrayList<>();
+                        //组合索引
+
+                        for (int i = 0; i < indexes.size(); i++) {
+                            BasicDBObject keyObj = new BasicDBObject();
+                            BasicBSONObject index = (BasicBSONObject) indexes.get(i);
+                            BasicBSONObject keys = (BasicBSONObject) index.get("key");
+                            for (String s : keys.keySet()) {
+                                keyObj.put(s, keys.get(s));
+                            }
+
+                            //添加配置
+                            IndexOptions indexOptions = new IndexOptions();
+                            if (index.get("unique") != null) {
+                                indexOptions.unique(index.getBoolean("unique"));
+                            }
+                            if (index.get("background") != null) {
+                                indexOptions.background(index.getBoolean("background"));
+                            }
+                            //索引名称
+                            indexOptions.name(index.get("name").toString());
+                            indexModels.add(new IndexModel(keyObj, indexOptions));
+                        }
+                        coll.createIndexes(indexModels);
+                    } else if (bson.get("createIndexes") != null) {
+                        MongoCollection<Document> coll = database.getCollection(bson.get("createIndexes").toString());
+
+                        List<IndexModel> indexModels = new ArrayList<>();
+                        //组合索引
+
+                        BasicDBObject keyObj = new BasicDBObject();
+                        BasicBSONObject keys = (BasicBSONObject) bson.get("key");
+                        for (String s : keys.keySet()) {
+                            keyObj.put(s, keys.get(s));
+                        }
+
+                        //添加配置
+                        IndexOptions indexOptions = new IndexOptions();
+                        if (bson.get("unique") != null) {
+                            indexOptions.unique(bson.getBoolean("unique"));
+                        }
+                        if (bson.get("background") != null) {
+                            indexOptions.background(bson.getBoolean("background"));
+                        }
+                        //索引名称
+                        indexOptions.name(bson.get("name").toString());
+                        indexModels.add(new IndexModel(keyObj, indexOptions));
+                        coll.createIndexes(indexModels);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
 }

+ 19 - 2
sync-in/src/main/java/com/jjt/in/task/InProcessTask.java

@@ -17,9 +17,26 @@ public class InProcessTask {
     private IInProcessService processService;
 
     public void sync() {
-        processService.downloadFtpFile();
-        processService.deleteFtpFile();
+
+        try {
+            processService.downloadFtpFile();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        System.out.printf("下载完成");
+
+        try {
+            processService.deleteFtpFile();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        System.out.printf("删除完成");
+
+
+        System.out.printf("开始处理");
         processService.unzip();
+        System.out.printf("数据处理完成");
     }
 
     public void clean(Integer day) {

+ 5 - 6
sync-out/pom.xml

@@ -16,6 +16,11 @@
     </description>
 
     <dependencies>
+        <!-- 核心模块-->
+        <dependency>
+            <groupId>com.jjt</groupId>
+            <artifactId>jjt-framework</artifactId>
+        </dependency>
         <!-- 通用工具-->
         <dependency>
             <groupId>com.jjt</groupId>
@@ -26,12 +31,6 @@
             <groupId>com.jjt</groupId>
             <artifactId>jjt-system</artifactId>
         </dependency>
-        <!-- 引入mongodb-->
-        <dependency>
-            <groupId>org.mongodb</groupId>
-            <artifactId>mongo-java-driver</artifactId>
-            <version>3.11.2</version>
-        </dependency>
 
         <!--引入es-high-level-client相关依赖  start-->
         <dependency>

+ 1 - 1
sync-out/src/test/java/test/Zip4jTest.java

@@ -20,7 +20,7 @@ import java.util.List;
 public class Zip4jTest {
     public static void main(String[] args) throws ZipException {
         ZipFile zipFile = new ZipFile("D:\\SYSTEM\\Desktop\\temp\\zip\\test1.zip");
-        File file = new File("D:\\SYSTEM\\Desktop\\temp\\ser");
+        File file = new File("D:\\SYSTEM\\Desktop\\temp\\XNYQT");
         ZipParameters parameter = new ZipParameters();
         //压缩方式,使用JDK内置zip
         parameter.setCompressionMethod(CompressionMethod.DEFLATE);