import com.mongodb.client.model.UpdateOneModel; import org.bson.*; import java.io.BufferedInputStream; import java.io.File; import java.io.InputStream; import java.nio.file.Files; import java.util.HashMap; import java.util.List; import java.util.Map; /** * MongoParseTest$ * * @author wukai * @date 2024/4/19 13:00 */ public class MongoParseTest { public static void main(String[] args) { String fileName = "D:\\SYSTEM\\Desktop\\temp\\sync\\oplog.rs.bson"; parse(fileName); } /** * 如果是分片集群,则单独处理 * * @param filename 文件名 */ public static void parse(String filename) { File file = new File(filename); BSONDecoder decoder = new BasicBSONDecoder(); try (InputStream inputStream = new BufferedInputStream(Files.newInputStream(file.toPath()));) { Map> iMap = new HashMap<>(16); // Map>> uMap = new HashMap<>(16); Map>> mMap = new HashMap<>(16); long records = 0; long iIndex = 0; long uIndex = 0; long cIndex = 0; while (inputStream.available() > 0) { // if (records % 10000 == 0) { // System.err.println("正在解析文件:{},当前处理记录数:{}" + filename + records); // } BSONObject obj = decoder.readObject(inputStream); if (obj == null) { break; } //操作符 String op = obj.get("op").toString(); //数据库和表名 String ns = obj.get("ns").toString(); if (ns.startsWith("config.system.")) { continue; } BasicBSONObject bson = (BasicBSONObject) obj.get("o"); if ("i".equals(op)) { iIndex++; } else if ("u".equals(op)) { uIndex++; // // 数据更新 //// List> updates = uMap.get(ns); // List> us = mMap.get(ns); // if (us == null) { // us = new ArrayList<>(); // } // // System.err.println(obj); // // BasicBSONObject bson2 = (BasicBSONObject) obj.get("o2"); // Bson f = Filters.eq("_id", bson2.get("_id")); // BasicBSONObject set = (BasicBSONObject) bson.get("$set"); // Document x = new Document("$set", set); // us.add(new UpdateOneModel<>(f, x)); // mMap.put(ns, us); // Pair pair = new Pair<>(f, x); // updates.add(pair); // uMap.put(ns, updates); } else if ("c".equals(op)) { cIndex++; System.err.println(obj); // MongoDatabase database = getDatabase4Ns(mongoClient, ns, dbs); if (bson.get("commitIndexBuild") != null) { // MongoCollection coll = database.getCollection(bson.get("commitIndexBuild").toString()); // BasicBSONList indexes = (BasicBSONList) bson.get("indexes"); // // List indexModels = new ArrayList<>(); // //组合索引 // // for (int i = 0; i < indexes.size(); i++) { // BasicDBObject keyObj = new BasicDBObject(); // BasicBSONObject index = (BasicBSONObject) indexes.get(i); // BasicBSONObject keys = (BasicBSONObject) index.get("key"); // for (String s : keys.keySet()) { // keyObj.put(s, keys.get(s)); // } // // //添加配置 // IndexOptions indexOptions = new IndexOptions(); // if (index.get("unique") != null) { // indexOptions.unique(index.getBoolean("unique")); // } // if (index.get("background") != null) { // indexOptions.background(index.getBoolean("background")); // } // //索引名称 // indexOptions.name(index.get("name").toString()); // indexModels.add(new IndexModel(keyObj, indexOptions)); // } // coll.createIndexes(indexModels); } else if (bson.get("createIndexes") != null) { // MongoCollection coll = database.getCollection(bson.get("createIndexes").toString()); // // List indexModels = new ArrayList<>(); // //组合索引 // // BasicDBObject keyObj = new BasicDBObject(); // BasicBSONObject keys = (BasicBSONObject) bson.get("key"); // for (String s : keys.keySet()) { // keyObj.put(s, keys.get(s)); // } // // //添加配置 // IndexOptions indexOptions = new IndexOptions(); // if (bson.get("unique") != null) { // indexOptions.unique(bson.getBoolean("unique")); // } // if (bson.get("background") != null) { // indexOptions.background(bson.getBoolean("background")); // } // //索引名称 // indexOptions.name(bson.get("name").toString()); // indexModels.add(new IndexModel(keyObj, indexOptions)); // coll.createIndexes(indexModels); } } records++; } System.err.printf("新增:%s\t更新:%s\t索引:%s", iIndex, uIndex, cIndex); } catch (Exception e) { e.printStackTrace(); } } }