Explorar el Código

加入日志,mongo修改为批量执行。

wukai hace 1 año
padre
commit
1bd9e8de6e

+ 4 - 1
sync-in/src/main/java/com/jjt/in/service/impl/InEsServiceImpl.java

@@ -19,7 +19,8 @@ import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
-import java.io.*;
+import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -79,6 +80,7 @@ public class InEsServiceImpl extends InBaseService implements IInEsService {
                                 commands.add("--output");
                                 commands.add(uri);
                                 commands.add("--type=mapping");
+                                log.info("开始执行导入mapping命令:{}", commands);
                                 LinuxCommand.exec(commands);
                                 //导入data命令
                                 commands = new ArrayList<>();
@@ -88,6 +90,7 @@ public class InEsServiceImpl extends InBaseService implements IInEsService {
                                 commands.add("--output");
                                 commands.add(uri);
                                 commands.add("--type=data");
+                                log.info("开始执行导入data命令:{}", commands);
                                 LinuxCommand.exec(commands);
                             } catch (Exception e) {
                                 log.error("执行命令出错:");

+ 107 - 27
sync-in/src/main/java/com/jjt/in/service/impl/InMongoServiceImpl.java

@@ -7,12 +7,13 @@ import com.jjt.in.service.IInMongoService;
 import com.jjt.system.service.ISysConfigService;
 import com.mongodb.BasicDBObject;
 import com.mongodb.MongoClient;
-import com.mongodb.MongoWriteException;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
 import com.mongodb.client.model.Filters;
 import com.mongodb.client.model.IndexModel;
 import com.mongodb.client.model.IndexOptions;
+import com.mongodb.client.model.InsertManyOptions;
+import javafx.util.Pair;
 import org.bson.*;
 import org.bson.conversions.Bson;
 import org.bson.types.BasicBSONList;
@@ -23,10 +24,12 @@ import org.springframework.stereotype.Service;
 import javax.annotation.Resource;
 import java.io.BufferedInputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.InputStream;
+import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * 数据同步Service业务层处理
@@ -88,59 +91,77 @@ public class InMongoServiceImpl extends InBaseService implements IInMongoService
     /**
      * 如果是分片集群,则单独处理
      *
-     * @param host
-     * @param port
-     * @param filename
+     * @param host     主机地址
+     * @param port     端口号
+     * @param filename 文件名
      */
     private void sharding(String host, String port, String filename) {
         File file = new File(filename);
         BSONDecoder decoder = new BasicBSONDecoder();
 
-        try (InputStream inputStream = new BufferedInputStream(new FileInputStream(file)); MongoClient mongoClient = new MongoClient(host, Integer.parseInt(port));) {
+        try (InputStream inputStream = new BufferedInputStream(Files.newInputStream(file.toPath())); MongoClient mongoClient = new MongoClient(host, Integer.parseInt(port));) {
             List<String> dbs = new ArrayList<>();
             for (String db : mongoClient.listDatabaseNames()) {
                 dbs.add(db);
             }
+            Map<String, List<BasicBSONObject>> iMap = new HashMap<>(16);
+            Map<String, List<Pair<Bson, Document>>> uMap = new HashMap<>(16);
+            long records = 0;
             while (inputStream.available() > 0) {
-
+                if (records % 10000 == 0) {
+                    log.info("正在解析文件:{},当前处理记录数:{}", filename, records);
+                }
                 BSONObject obj = decoder.readObject(inputStream);
                 if (obj == null) {
                     break;
                 }
+                //操作符
                 String op = obj.get("op").toString();
+                //数据库和表名
                 String ns = obj.get("ns").toString();
                 if (ns.startsWith("config.system.")) {
                     continue;
                 }
-                String[] nss = ns.split("\\.");
-                if (!dbs.contains(nss[0])) {
-                    try {
-                        MongoDatabase mdb = mongoClient.getDatabase("admin");
-                        mdb.runCommand(new Document("enablesharding", nss[0]));
-                    } catch (Exception e) {
-                    }
-                }
-                MongoDatabase database = mongoClient.getDatabase(nss[0]);
-                String table = nss[1];
-                if ("fs".equals(table)) {
-                    table = nss[1] + "." + nss[2];
-                }
-                MongoCollection<BasicBSONObject> collection = database.getCollection(table, BasicBSONObject.class);
+                MongoCollection<BasicBSONObject> collection = getCollection4Ns(mongoClient, ns, dbs);
                 BasicBSONObject bson = (BasicBSONObject) obj.get("o");
                 if ("i".equals(op)) {
-                    //读取数据插入
-                    try {
-                        collection.insertOne(bson);
-                    } catch (MongoWriteException e1) {
-                        //重复插入会报错,不管他
+                    List<BasicBSONObject> adds = iMap.get(ns);
+                    if (adds == null) {
+                        adds = new ArrayList<>();
+                    }
+                    adds.add(bson);
+
+                    if (adds.size() > 10000) {
+                        InsertManyOptions insertManyOptions = new InsertManyOptions();
+                        insertManyOptions.ordered(false);
+                        try {
+                            //可能会报ID重复,不用管,做个冗余
+                            collection.insertMany(adds, insertManyOptions);
+                        } catch (Exception ignored) {
+                        }
+
+                        adds = new ArrayList<>();
                     }
+
+                    iMap.put(ns, adds);
                 } else if ("u".equals(op)) {
+                    // 数据更新
+                    List<Pair<Bson, Document>> updates = uMap.get(ns);
+                    if (updates == null) {
+                        updates = new ArrayList<>();
+                    }
+
                     BasicBSONObject bson2 = (BasicBSONObject) obj.get("o2");
                     Bson f = Filters.eq("_id", bson2.get("_id"));
                     BasicBSONObject set = (BasicBSONObject) bson.get("$set");
                     Document x = new Document("$set", set);
-                    collection.updateOne(f, x);
+
+                    Pair<Bson, Document> pair = new Pair<>(f, x);
+                    updates.add(pair);
+                    uMap.put(ns, updates);
+
                 } else if ("c".equals(op)) {
+                    MongoDatabase database = getDatabase4Ns(mongoClient, ns, dbs);
                     if (bson.get("commitIndexBuild") != null) {
                         MongoCollection<Document> coll = database.getCollection(bson.get("commitIndexBuild").toString());
                         BasicBSONList indexes = (BasicBSONList) bson.get("indexes");
@@ -195,9 +216,68 @@ public class InMongoServiceImpl extends InBaseService implements IInMongoService
                         coll.createIndexes(indexModels);
                     }
                 }
+
+                records++;
+            }
+            //处理数据插入收尾
+            for (String s : iMap.keySet()) {
+                List<BasicBSONObject> adds = iMap.get(s);
+                if (adds != null && adds.size() > 0) {
+                    MongoCollection<BasicBSONObject> collection = getCollection4Ns(mongoClient, s, dbs);
+                    InsertManyOptions insertManyOptions = new InsertManyOptions();
+                    insertManyOptions.ordered(true);
+                    try {
+                        //可能会报ID重复,不用管,正常数据不会重复的,做个冗余
+                        collection.insertMany(adds, insertManyOptions);
+                    } catch (Exception ignored) {
+                    }
+
+                }
             }
+            log.info("数据插入已完成:{},记录总数:{}", filename, records);
+            //处理数据更新收尾
+            for (String s : uMap.keySet()) {
+                List<Pair<Bson, Document>> updates = uMap.get(s);
+                log.info("处理数据更新表:{},记录数:{}", s, updates.size());
+                if (updates != null && updates.size() > 0) {
+                    MongoCollection<BasicBSONObject> collection = getCollection4Ns(mongoClient, s, dbs);
+                    for (Pair<Bson, Document> pair : updates) {
+                        collection.updateOne(pair.getKey(), pair.getValue());
+                    }
+                }
+
+            }
+
+
         } catch (Exception e) {
             e.printStackTrace();
         }
     }
+
+    public MongoDatabase getDatabase4Ns(MongoClient mongoClient, String ns, List<String> dbs) {
+        String[] nss = ns.split("\\.");
+        if (!dbs.contains(nss[0])) {
+            //如果当前db未在mongo中,则新建db,并允许分片
+            //会出现重复的情况,报异常直接抛弃即可
+            try {
+                MongoDatabase mdb = mongoClient.getDatabase("admin");
+                mdb.runCommand(new Document("enablesharding", nss[0]));
+            } catch (Exception ignored) {
+            }
+        }
+        MongoDatabase database = mongoClient.getDatabase(nss[0]);
+        return database;
+    }
+
+    public MongoCollection<BasicBSONObject> getCollection4Ns(MongoClient mongoClient, String ns, List<String> dbs) {
+        MongoDatabase database = getDatabase4Ns(mongoClient, ns, dbs);
+        String[] nss = ns.split("\\.");
+        String table = nss[1];
+        if ("fs".equals(table)) {
+            table = nss[1] + "." + nss[2];
+        }
+        MongoCollection<BasicBSONObject> collection = database.getCollection(table, BasicBSONObject.class);
+        return collection;
+    }
+
 }

+ 3 - 0
sync-in/src/main/java/com/jjt/in/service/impl/InProcessServiceImpl.java

@@ -148,6 +148,8 @@ public class InProcessServiceImpl extends InBaseService implements IInProcessSer
                     Arrays.sort(files, Comparator.comparing(File::getName));
                 }
 
+                log.info("目录:{} 文件总数:{}", syncDir, files.length);
+
                 for (File file : files) {
                     //写入同步记录表
                     InSyncInfo syncInfo = new InSyncInfo();
@@ -169,6 +171,7 @@ public class InProcessServiceImpl extends InBaseService implements IInProcessSer
 
                         syncInfo.setSyncFileMd5(md5);
                         syncInfo.setSyncFileSize(zipFile.length());
+                        log.info("开始处理:{},md5验证:{}", desc.getName(), md5.equals(desc.getMd5()));
                         if (md5.equals(desc.getMd5())) {
                             syncInfo.setSyncValid("Y");
                             switch (desc.getType()) {