package com.jjt.in.service.impl; import com.alibaba.fastjson.JSONObject; import com.jjt.common.constant.Constants; import com.jjt.common.utils.LinuxCommand; import com.jjt.in.service.IInMongoService; import com.jjt.system.service.ISysConfigService; import com.mongodb.BasicDBObject; import com.mongodb.MongoClient; import com.mongodb.MongoWriteException; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; import com.mongodb.client.model.IndexModel; import com.mongodb.client.model.IndexOptions; import org.bson.*; import org.bson.conversions.Bson; import org.bson.types.BasicBSONList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; import java.io.InputStream; import java.util.ArrayList; import java.util.List; /** * 数据同步Service业务层处理 * * @author wukai * @date 2023-06-06 */ @Service public class InMongoServiceImpl extends InBaseService implements IInMongoService { private static final Logger log = LoggerFactory.getLogger(InMongoServiceImpl.class); @Resource private ISysConfigService sysConfigService; /** * 解析同步文件 */ @Override public void parseSyncFile(String dir, boolean isInc) throws Exception { String params = sysConfigService.selectConfigByKey("in.mongo.info"); JSONObject mongoInfo = JSONObject.parseObject(params); String host = mongoInfo.getString("host"); String port = mongoInfo.getString("port"); String sharding = mongoInfo.getString("sharding"); if (!dir.endsWith(Constants.DIR_END)) { dir += "/"; } String filaName = dir; if (isInc) { //如果是增量,则需要指定 local/oplog.rs.bson //如果是全量,只需指定目录即可 filaName += "local/oplog.rs.bson"; if ("1".equals(sharding)) { //判断是否分片集群 sharding(host, port, filaName); return; } } try { List command = new ArrayList<>(); command.add("/usr/bin/mongorestore"); command.add("--host"); command.add(host); command.add("--port"); command.add(port); command.add("--drop"); command.add("--oplogReplay"); command.add(filaName); // String cmd = String.format("/usr/bin/mongorestore --host %s --port %s --drop --oplogReplay %s", host, port, filaName); LinuxCommand.exec(command); } catch (Exception e) { log.error("还原mongo出错啦:{}", e.getMessage()); throw new Exception(e.getMessage()); } } /** * 如果是分片集群,则单独处理 * * @param host * @param port * @param filename */ private void sharding(String host, String port, String filename) { File file = new File(filename); BSONDecoder decoder = new BasicBSONDecoder(); try (InputStream inputStream = new BufferedInputStream(new FileInputStream(file)); MongoClient mongoClient = new MongoClient(host, Integer.parseInt(port));) { List dbs = new ArrayList<>(); for (String db : mongoClient.listDatabaseNames()) { dbs.add(db); } while (inputStream.available() > 0) { BSONObject obj = decoder.readObject(inputStream); if (obj == null) { break; } String op = obj.get("op").toString(); String ns = obj.get("ns").toString(); if (ns.startsWith("config.system.")) { continue; } String[] nss = ns.split("\\."); if (!dbs.contains(nss[0])) { try { MongoDatabase mdb = mongoClient.getDatabase("admin"); mdb.runCommand(new Document("enablesharding", nss[0])); } catch (Exception e) { } } MongoDatabase database = mongoClient.getDatabase(nss[0]); MongoCollection collection = database.getCollection(nss[1], BasicBSONObject.class); BasicBSONObject bson = (BasicBSONObject) obj.get("o"); if ("i".equals(op)) { //读取数据插入 try { collection.insertOne(bson); } catch (MongoWriteException e1) { //重复插入会报错,不管他 } } else if ("u".equals(op)) { BasicBSONObject bson2 = (BasicBSONObject) obj.get("o2"); Bson f = Filters.eq("_id", bson2.get("_id")); BasicBSONObject set = (BasicBSONObject) bson.get("$set"); Document x = new Document("$set", set); collection.updateOne(f, x); } else if ("c".equals(op)) { if (bson.get("commitIndexBuild") != null) { MongoCollection coll = database.getCollection(bson.get("commitIndexBuild").toString()); BasicBSONList indexes = (BasicBSONList) bson.get("indexes"); List indexModels = new ArrayList<>(); //组合索引 for (int i = 0; i < indexes.size(); i++) { BasicDBObject keyObj = new BasicDBObject(); BasicBSONObject index = (BasicBSONObject) indexes.get(i); BasicBSONObject keys = (BasicBSONObject) index.get("key"); for (String s : keys.keySet()) { keyObj.put(s, keys.get(s)); } //添加配置 IndexOptions indexOptions = new IndexOptions(); if (index.get("unique") != null) { indexOptions.unique(index.getBoolean("unique")); } if (index.get("background") != null) { indexOptions.background(index.getBoolean("background")); } //索引名称 indexOptions.name(index.get("name").toString()); indexModels.add(new IndexModel(keyObj, indexOptions)); } coll.createIndexes(indexModels); } else if (bson.get("createIndexes") != null) { MongoCollection coll = database.getCollection(bson.get("createIndexes").toString()); List indexModels = new ArrayList<>(); //组合索引 BasicDBObject keyObj = new BasicDBObject(); BasicBSONObject keys = (BasicBSONObject) bson.get("key"); for (String s : keys.keySet()) { keyObj.put(s, keys.get(s)); } //添加配置 IndexOptions indexOptions = new IndexOptions(); if (bson.get("unique") != null) { indexOptions.unique(bson.getBoolean("unique")); } if (bson.get("background") != null) { indexOptions.background(bson.getBoolean("background")); } //索引名称 indexOptions.name(bson.get("name").toString()); indexModels.add(new IndexModel(keyObj, indexOptions)); coll.createIndexes(indexModels); } } } } catch (Exception e) { e.printStackTrace(); } } }