| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- import com.mongodb.client.model.UpdateOneModel;
- import org.bson.*;
- import java.io.BufferedInputStream;
- import java.io.File;
- import java.io.InputStream;
- import java.nio.file.Files;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- /**
- * MongoParseTest$
- *
- * @author wukai
- * @date 2024/4/19 13:00
- */
- public class MongoParseTest {
- public static void main(String[] args) {
- String fileName = "D:\\SYSTEM\\Desktop\\temp\\sync\\oplog.rs.bson";
- parse(fileName);
- }
- /**
- * 如果是分片集群,则单独处理
- *
- * @param filename 文件名
- */
- public static void parse(String filename) {
- File file = new File(filename);
- BSONDecoder decoder = new BasicBSONDecoder();
- try (InputStream inputStream = new BufferedInputStream(Files.newInputStream(file.toPath()));) {
- Map<String, List<BasicBSONObject>> iMap = new HashMap<>(16);
- // Map<String, List<Pair<Bson, Document>>> uMap = new HashMap<>(16);
- Map<String, List<UpdateOneModel<Document>>> mMap = new HashMap<>(16);
- long records = 0;
- long iIndex = 0;
- long uIndex = 0;
- long cIndex = 0;
- while (inputStream.available() > 0) {
- // if (records % 10000 == 0) {
- // System.err.println("正在解析文件:{},当前处理记录数:{}" + filename + records);
- // }
- BSONObject obj = decoder.readObject(inputStream);
- if (obj == null) {
- break;
- }
- //操作符
- String op = obj.get("op").toString();
- //数据库和表名
- String ns = obj.get("ns").toString();
- if (ns.startsWith("config.system.")) {
- continue;
- }
- BasicBSONObject bson = (BasicBSONObject) obj.get("o");
- if ("i".equals(op)) {
- iIndex++;
- } else if ("u".equals(op)) {
- uIndex++;
- // // 数据更新
- //// List<Pair<Bson, Document>> updates = uMap.get(ns);
- // List<UpdateOneModel<Document>> us = mMap.get(ns);
- // if (us == null) {
- // us = new ArrayList<>();
- // }
- //
- // System.err.println(obj);
- //
- // BasicBSONObject bson2 = (BasicBSONObject) obj.get("o2");
- // Bson f = Filters.eq("_id", bson2.get("_id"));
- // BasicBSONObject set = (BasicBSONObject) bson.get("$set");
- // Document x = new Document("$set", set);
- // us.add(new UpdateOneModel<>(f, x));
- // mMap.put(ns, us);
- // Pair<Bson, Document> pair = new Pair<>(f, x);
- // updates.add(pair);
- // uMap.put(ns, updates);
- } else if ("c".equals(op)) {
- cIndex++;
- System.err.println(obj);
- // MongoDatabase database = getDatabase4Ns(mongoClient, ns, dbs);
- if (bson.get("commitIndexBuild") != null) {
- // MongoCollection<Document> coll = database.getCollection(bson.get("commitIndexBuild").toString());
- // BasicBSONList indexes = (BasicBSONList) bson.get("indexes");
- //
- // List<IndexModel> indexModels = new ArrayList<>();
- // //组合索引
- //
- // for (int i = 0; i < indexes.size(); i++) {
- // BasicDBObject keyObj = new BasicDBObject();
- // BasicBSONObject index = (BasicBSONObject) indexes.get(i);
- // BasicBSONObject keys = (BasicBSONObject) index.get("key");
- // for (String s : keys.keySet()) {
- // keyObj.put(s, keys.get(s));
- // }
- //
- // //添加配置
- // IndexOptions indexOptions = new IndexOptions();
- // if (index.get("unique") != null) {
- // indexOptions.unique(index.getBoolean("unique"));
- // }
- // if (index.get("background") != null) {
- // indexOptions.background(index.getBoolean("background"));
- // }
- // //索引名称
- // indexOptions.name(index.get("name").toString());
- // indexModels.add(new IndexModel(keyObj, indexOptions));
- // }
- // coll.createIndexes(indexModels);
- } else if (bson.get("createIndexes") != null) {
- // MongoCollection<Document> coll = database.getCollection(bson.get("createIndexes").toString());
- //
- // List<IndexModel> indexModels = new ArrayList<>();
- // //组合索引
- //
- // BasicDBObject keyObj = new BasicDBObject();
- // BasicBSONObject keys = (BasicBSONObject) bson.get("key");
- // for (String s : keys.keySet()) {
- // keyObj.put(s, keys.get(s));
- // }
- //
- // //添加配置
- // IndexOptions indexOptions = new IndexOptions();
- // if (bson.get("unique") != null) {
- // indexOptions.unique(bson.getBoolean("unique"));
- // }
- // if (bson.get("background") != null) {
- // indexOptions.background(bson.getBoolean("background"));
- // }
- // //索引名称
- // indexOptions.name(bson.get("name").toString());
- // indexModels.add(new IndexModel(keyObj, indexOptions));
- // coll.createIndexes(indexModels);
- }
- }
- records++;
- }
- System.err.printf("新增:%s\t更新:%s\t索引:%s", iIndex, uIndex, cIndex);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
|