Przeglądaj źródła

mongo插入和修改都改为 批量处理

wukai 1 rok temu
rodzic
commit
65b86a43b3

+ 18 - 19
sync-in/src/main/java/com/jjt/in/service/impl/InMongoServiceImpl.java

@@ -9,10 +9,7 @@ import com.mongodb.BasicDBObject;
 import com.mongodb.MongoClient;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
-import com.mongodb.client.model.Filters;
-import com.mongodb.client.model.IndexModel;
-import com.mongodb.client.model.IndexOptions;
-import com.mongodb.client.model.InsertManyOptions;
+import com.mongodb.client.model.*;
 import javafx.util.Pair;
 import org.bson.*;
 import org.bson.conversions.Bson;
@@ -105,7 +102,8 @@ public class InMongoServiceImpl extends InBaseService implements IInMongoService
                 dbs.add(db);
             }
             Map<String, List<BasicBSONObject>> iMap = new HashMap<>(16);
-            Map<String, List<Pair<Bson, Document>>> uMap = new HashMap<>(16);
+//            Map<String, List<Pair<Bson, Document>>> uMap = new HashMap<>(16);
+            Map<String, List<UpdateOneModel<Document>>> mMap = new HashMap<>(16);
             long records = 0;
             while (inputStream.available() > 0) {
                 if (records % 10000 == 0) {
@@ -146,19 +144,23 @@ public class InMongoServiceImpl extends InBaseService implements IInMongoService
                     iMap.put(ns, adds);
                 } else if ("u".equals(op)) {
                     // 数据更新
-                    List<Pair<Bson, Document>> updates = uMap.get(ns);
-                    if (updates == null) {
-                        updates = new ArrayList<>();
+//                    List<Pair<Bson, Document>> updates = uMap.get(ns);
+                    List<UpdateOneModel<Document>> us = mMap.get(ns);
+                    if (us == null) {
+                        us = new ArrayList<>();
                     }
 
+                    System.err.println(obj);
+
                     BasicBSONObject bson2 = (BasicBSONObject) obj.get("o2");
                     Bson f = Filters.eq("_id", bson2.get("_id"));
                     BasicBSONObject set = (BasicBSONObject) bson.get("$set");
                     Document x = new Document("$set", set);
-
-                    Pair<Bson, Document> pair = new Pair<>(f, x);
-                    updates.add(pair);
-                    uMap.put(ns, updates);
+                    us.add(new UpdateOneModel<>(f, x));
+                    mMap.put(ns, us);
+//                    Pair<Bson, Document> pair = new Pair<>(f, x);
+//                    updates.add(pair);
+//                    uMap.put(ns, updates);
 
                 } else if ("c".equals(op)) {
                     MongoDatabase database = getDatabase4Ns(mongoClient, ns, dbs);
@@ -236,16 +238,13 @@ public class InMongoServiceImpl extends InBaseService implements IInMongoService
             }
             log.info("数据插入已完成:{},记录总数:{}", filename, records);
             //处理数据更新收尾
-            for (String s : uMap.keySet()) {
-                List<Pair<Bson, Document>> updates = uMap.get(s);
+            for (String s : mMap.keySet()) {
+                List updates = mMap.get(s);
                 log.info("处理数据更新表:{},记录数:{}", s, updates.size());
-                if (updates != null && updates.size() > 0) {
+                if (updates.size() > 0) {
                     MongoCollection<BasicBSONObject> collection = getCollection4Ns(mongoClient, s, dbs);
-                    for (Pair<Bson, Document> pair : updates) {
-                        collection.updateOne(pair.getKey(), pair.getValue());
-                    }
+                    collection.bulkWrite(updates);
                 }
-
             }
 
 

+ 284 - 0
sync-in/src/main/java/com/jjt/in/service/impl/InMongoServiceImplbak.java

@@ -0,0 +1,284 @@
+//package com.jjt.in.service.impl;
+//
+//import com.alibaba.fastjson.JSONObject;
+//import com.jjt.common.constant.Constants;
+//import com.jjt.common.utils.LinuxCommand;
+//import com.jjt.in.service.IInMongoService;
+//import com.jjt.system.service.ISysConfigService;
+//import com.mongodb.BasicDBObject;
+//import com.mongodb.MongoClient;
+//import com.mongodb.client.MongoCollection;
+//import com.mongodb.client.MongoDatabase;
+//import com.mongodb.client.model.*;
+//import javafx.util.Pair;
+//import org.bson.*;
+//import org.bson.conversions.Bson;
+//import org.bson.types.BasicBSONList;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+//import org.springframework.stereotype.Service;
+//
+//import javax.annotation.Resource;
+//import java.io.BufferedInputStream;
+//import java.io.File;
+//import java.io.InputStream;
+//import java.nio.file.Files;
+//import java.util.ArrayList;
+//import java.util.HashMap;
+//import java.util.List;
+//import java.util.Map;
+//
+///**
+// * 数据同步Service业务层处理
+// *
+// * @author wukai
+// * @date 2023-06-06
+// */
+//@Service
+//public class InMongoServiceImplbak extends InBaseService implements IInMongoService {
+//    private static final Logger log = LoggerFactory.getLogger(InMongoServiceImplbak.class);
+//    @Resource
+//    private ISysConfigService sysConfigService;
+//
+//    /**
+//     * 解析同步文件
+//     */
+//    @Override
+//    public void parseSyncFile(String dir, boolean isInc) throws Exception {
+//        String params = sysConfigService.selectConfigByKey("in.mongo.info");
+//
+//        JSONObject mongoInfo = JSONObject.parseObject(params);
+//        String host = mongoInfo.getString("host");
+//        String port = mongoInfo.getString("port");
+//        String sharding = mongoInfo.getString("sharding");
+//        if (!dir.endsWith(Constants.DIR_END)) {
+//            dir += "/";
+//        }
+//
+//        String filaName = dir;
+//        if (isInc) {
+//            //如果是增量,则需要指定 local/oplog.rs.bson
+//            //如果是全量,只需指定目录即可
+//            filaName += "local/oplog.rs.bson";
+//            if ("1".equals(sharding)) {
+//                //判断是否分片集群
+//                sharding(host, port, filaName);
+//                return;
+//            }
+//        }
+//        try {
+//            List<String> command = new ArrayList<>();
+//            command.add("/usr/bin/mongorestore");
+//            command.add("--host");
+//            command.add(host);
+//            command.add("--port");
+//            command.add(port);
+//            command.add("--drop");
+//            command.add("--oplogReplay");
+//            command.add(filaName);
+//
+////            String cmd = String.format("/usr/bin/mongorestore --host %s --port %s --drop --oplogReplay %s", host, port, filaName);
+//            LinuxCommand.exec(command);
+//        } catch (Exception e) {
+//            log.error("还原mongo出错啦:{}", e.getMessage());
+//            throw new Exception(e.getMessage());
+//        }
+//    }
+//
+//    /**
+//     * 如果是分片集群,则单独处理
+//     *
+//     * @param host     主机地址
+//     * @param port     端口号
+//     * @param filename 文件名
+//     */
+//    private void sharding(String host, String port, String filename) {
+//        File file = new File(filename);
+//        BSONDecoder decoder = new BasicBSONDecoder();
+//
+//        try (InputStream inputStream = new BufferedInputStream(Files.newInputStream(file.toPath())); MongoClient mongoClient = new MongoClient(host, Integer.parseInt(port));) {
+//            List<String> dbs = new ArrayList<>();
+//            for (String db : mongoClient.listDatabaseNames()) {
+//                dbs.add(db);
+//            }
+//            Map<String, List<BasicBSONObject>> iMap = new HashMap<>(16);
+//            Map<String, List<Pair<Bson, Document>>> uMap = new HashMap<>(16);
+//            Map<String, List<UpdateOneModel<Document>>> mMap = new HashMap<>(16);
+//            long records = 0;
+//            while (inputStream.available() > 0) {
+//                if (records % 10000 == 0) {
+//                    log.info("正在解析文件:{},当前处理记录数:{}", filename, records);
+//                }
+//                BSONObject obj = decoder.readObject(inputStream);
+//                if (obj == null) {
+//                    break;
+//                }
+//                //操作符
+//                String op = obj.get("op").toString();
+//                //数据库和表名
+//                String ns = obj.get("ns").toString();
+//                if (ns.startsWith("config.system.")) {
+//                    continue;
+//                }
+//                MongoCollection<BasicBSONObject> collection = getCollection4Ns(mongoClient, ns, dbs);
+//                BasicBSONObject bson = (BasicBSONObject) obj.get("o");
+//                if ("i".equals(op)) {
+//                    List<BasicBSONObject> adds = iMap.get(ns);
+//                    if (adds == null) {
+//                        adds = new ArrayList<>();
+//                    }
+//                    adds.add(bson);
+//
+//                    if (adds.size() > 10000) {
+//                        InsertManyOptions insertManyOptions = new InsertManyOptions();
+//                        insertManyOptions.ordered(false);
+//                        try {
+//                            //可能会报ID重复,不用管,做个冗余
+//                            collection.insertMany(adds, insertManyOptions);
+//                        } catch (Exception ignored) {
+//                        }
+//
+//                        adds = new ArrayList<>();
+//                    }
+//
+//                    iMap.put(ns, adds);
+//                } else if ("u".equals(op)) {
+//                    // 数据更新
+////                    List<Pair<Bson, Document>> updates = uMap.get(ns);
+//                    List<UpdateOneModel<Document>> us = mMap.get(ns);
+//                    if (us == null) {
+//                        us = new ArrayList<>();
+//                    }
+//
+//                    BasicBSONObject bson2 = (BasicBSONObject) obj.get("o2");
+//                    Bson f = Filters.eq("_id", bson2.get("_id"));
+//                    BasicBSONObject set = (BasicBSONObject) bson.get("$set");
+//                    Document x = new Document("$set", set);
+//                    us.add(new UpdateOneModel<>(f, x));
+//                    mMap.put(ns, us);
+////                    Pair<Bson, Document> pair = new Pair<>(f, x);
+////                    updates.add(pair);
+////                    uMap.put(ns, updates);
+//
+//                } else if ("c".equals(op)) {
+//                    MongoDatabase database = getDatabase4Ns(mongoClient, ns, dbs);
+//                    if (bson.get("commitIndexBuild") != null) {
+//                        MongoCollection<Document> coll = database.getCollection(bson.get("commitIndexBuild").toString());
+//                        BasicBSONList indexes = (BasicBSONList) bson.get("indexes");
+//
+//                        List<IndexModel> indexModels = new ArrayList<>();
+//                        //组合索引
+//
+//                        for (int i = 0; i < indexes.size(); i++) {
+//                            BasicDBObject keyObj = new BasicDBObject();
+//                            BasicBSONObject index = (BasicBSONObject) indexes.get(i);
+//                            BasicBSONObject keys = (BasicBSONObject) index.get("key");
+//                            for (String s : keys.keySet()) {
+//                                keyObj.put(s, keys.get(s));
+//                            }
+//
+//                            //添加配置
+//                            IndexOptions indexOptions = new IndexOptions();
+//                            if (index.get("unique") != null) {
+//                                indexOptions.unique(index.getBoolean("unique"));
+//                            }
+//                            if (index.get("background") != null) {
+//                                indexOptions.background(index.getBoolean("background"));
+//                            }
+//                            //索引名称
+//                            indexOptions.name(index.get("name").toString());
+//                            indexModels.add(new IndexModel(keyObj, indexOptions));
+//                        }
+//                        coll.createIndexes(indexModels);
+//                    } else if (bson.get("createIndexes") != null) {
+//                        MongoCollection<Document> coll = database.getCollection(bson.get("createIndexes").toString());
+//
+//                        List<IndexModel> indexModels = new ArrayList<>();
+//                        //组合索引
+//
+//                        BasicDBObject keyObj = new BasicDBObject();
+//                        BasicBSONObject keys = (BasicBSONObject) bson.get("key");
+//                        for (String s : keys.keySet()) {
+//                            keyObj.put(s, keys.get(s));
+//                        }
+//
+//                        //添加配置
+//                        IndexOptions indexOptions = new IndexOptions();
+//                        if (bson.get("unique") != null) {
+//                            indexOptions.unique(bson.getBoolean("unique"));
+//                        }
+//                        if (bson.get("background") != null) {
+//                            indexOptions.background(bson.getBoolean("background"));
+//                        }
+//                        //索引名称
+//                        indexOptions.name(bson.get("name").toString());
+//                        indexModels.add(new IndexModel(keyObj, indexOptions));
+//                        coll.createIndexes(indexModels);
+//                    }
+//                }
+//
+//                records++;
+//            }
+//            //处理数据插入收尾
+//            for (String s : iMap.keySet()) {
+//                List<BasicBSONObject> adds = iMap.get(s);
+//                if (adds != null && adds.size() > 0) {
+//                    MongoCollection<BasicBSONObject> collection = getCollection4Ns(mongoClient, s, dbs);
+//                    InsertManyOptions insertManyOptions = new InsertManyOptions();
+//                    insertManyOptions.ordered(true);
+//                    try {
+//                        //可能会报ID重复,不用管,正常数据不会重复的,做个冗余
+//                        collection.insertMany(adds, insertManyOptions);
+//                    } catch (Exception ignored) {
+//                    }
+//
+//                }
+//            }
+//            log.info("数据插入已完成:{},记录总数:{}", filename, records);
+//            //处理数据更新收尾
+//            for (String s : uMap.keySet()) {
+//                List<Pair<Bson, Document>> updates = uMap.get(s);
+//                log.info("处理数据更新表:{},记录数:{}", s, updates.size());
+//                if (updates.size() > 0) {
+//                    MongoCollection<BasicBSONObject> collection = getCollection4Ns(mongoClient, s, dbs);
+////                    collection.bulkWrite()
+//                    for (Pair<Bson, Document> pair : updates) {
+//                        collection.updateOne(pair.getKey(), pair.getValue());
+//                    }
+//                }
+//
+//            }
+//
+//
+//        } catch (Exception e) {
+//            e.printStackTrace();
+//        }
+//    }
+//
+//    public MongoDatabase getDatabase4Ns(MongoClient mongoClient, String ns, List<String> dbs) {
+//        String[] nss = ns.split("\\.");
+//        if (!dbs.contains(nss[0])) {
+//            //如果当前db未在mongo中,则新建db,并允许分片
+//            //会出现重复的情况,报异常直接抛弃即可
+//            try {
+//                MongoDatabase mdb = mongoClient.getDatabase("admin");
+//                mdb.runCommand(new Document("enablesharding", nss[0]));
+//            } catch (Exception ignored) {
+//            }
+//        }
+//        MongoDatabase database = mongoClient.getDatabase(nss[0]);
+//        return database;
+//    }
+//
+//    public MongoCollection<BasicBSONObject> getCollection4Ns(MongoClient mongoClient, String ns, List<String> dbs) {
+//        MongoDatabase database = getDatabase4Ns(mongoClient, ns, dbs);
+//        String[] nss = ns.split("\\.");
+//        String table = nss[1];
+//        if ("fs".equals(table)) {
+//            table = nss[1] + "." + nss[2];
+//        }
+//        MongoCollection<BasicBSONObject> collection = database.getCollection(table, BasicBSONObject.class);
+//        return collection;
+//    }
+//
+//}