| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- package com.jjt.in.service.impl;
- import com.alibaba.fastjson.JSONObject;
- import com.jjt.common.constant.Constants;
- import com.jjt.common.utils.LinuxCommand;
- import com.jjt.in.service.IInMongoService;
- import com.jjt.system.service.ISysConfigService;
- import com.mongodb.BasicDBObject;
- import com.mongodb.MongoClient;
- import com.mongodb.MongoWriteException;
- import com.mongodb.client.MongoCollection;
- import com.mongodb.client.MongoDatabase;
- import com.mongodb.client.model.Filters;
- import com.mongodb.client.model.IndexModel;
- import com.mongodb.client.model.IndexOptions;
- import org.bson.*;
- import org.bson.conversions.Bson;
- import org.bson.types.BasicBSONList;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.stereotype.Service;
- import javax.annotation.Resource;
- import java.io.BufferedInputStream;
- import java.io.File;
- import java.io.FileInputStream;
- import java.io.InputStream;
- import java.util.ArrayList;
- import java.util.List;
- /**
- * 数据同步Service业务层处理
- *
- * @author wukai
- * @date 2023-06-06
- */
- @Service
- public class InMongoServiceImpl extends InBaseService implements IInMongoService {
- private static final Logger log = LoggerFactory.getLogger(InMongoServiceImpl.class);
- @Resource
- private ISysConfigService sysConfigService;
- /**
- * 解析同步文件
- */
- @Override
- public void parseSyncFile(String dir, boolean isInc) throws Exception {
- String params = sysConfigService.selectConfigByKey("in.mongo.info");
- JSONObject mongoInfo = JSONObject.parseObject(params);
- String host = mongoInfo.getString("host");
- String port = mongoInfo.getString("port");
- String sharding = mongoInfo.getString("sharding");
- if (!dir.endsWith(Constants.DIR_END)) {
- dir += "/";
- }
- String filaName = dir;
- if (isInc) {
- //如果是增量,则需要指定 local/oplog.rs.bson
- //如果是全量,只需指定目录即可
- filaName += "local/oplog.rs.bson";
- if ("1".equals(sharding)) {
- //判断是否分片集群
- sharding(host, port, filaName);
- return;
- }
- }
- try {
- List<String> command = new ArrayList<>();
- command.add("/usr/bin/mongorestore");
- command.add("--host");
- command.add(host);
- command.add("--port");
- command.add(port);
- command.add("--drop");
- command.add("--oplogReplay");
- command.add(filaName);
- // String cmd = String.format("/usr/bin/mongorestore --host %s --port %s --drop --oplogReplay %s", host, port, filaName);
- LinuxCommand.exec(command);
- } catch (Exception e) {
- log.error("还原mongo出错啦:{}", e.getMessage());
- throw new Exception(e.getMessage());
- }
- }
- /**
- * 如果是分片集群,则单独处理
- *
- * @param host
- * @param port
- * @param filename
- */
- private void sharding(String host, String port, String filename) {
- File file = new File(filename);
- BSONDecoder decoder = new BasicBSONDecoder();
- try (InputStream inputStream = new BufferedInputStream(new FileInputStream(file)); MongoClient mongoClient = new MongoClient(host, Integer.parseInt(port));) {
- List<String> dbs = new ArrayList<>();
- for (String db : mongoClient.listDatabaseNames()) {
- dbs.add(db);
- }
- while (inputStream.available() > 0) {
- BSONObject obj = decoder.readObject(inputStream);
- if (obj == null) {
- break;
- }
- String op = obj.get("op").toString();
- String ns = obj.get("ns").toString();
- if (ns.startsWith("config.system.")) {
- continue;
- }
- String[] nss = ns.split("\\.");
- if (!dbs.contains(nss[0])) {
- try {
- MongoDatabase mdb = mongoClient.getDatabase("admin");
- mdb.runCommand(new Document("enablesharding", nss[0]));
- } catch (Exception e) {
- }
- }
- MongoDatabase database = mongoClient.getDatabase(nss[0]);
- MongoCollection<BasicBSONObject> collection = database.getCollection(nss[1], BasicBSONObject.class);
- BasicBSONObject bson = (BasicBSONObject) obj.get("o");
- if ("i".equals(op)) {
- //读取数据插入
- try {
- collection.insertOne(bson);
- } catch (MongoWriteException e1) {
- //重复插入会报错,不管他
- }
- } else if ("u".equals(op)) {
- BasicBSONObject bson2 = (BasicBSONObject) obj.get("o2");
- Bson f = Filters.eq("_id", bson2.get("_id"));
- BasicBSONObject set = (BasicBSONObject) bson.get("$set");
- Document x = new Document("$set", set);
- collection.updateOne(f, x);
- } else if ("c".equals(op)) {
- if (bson.get("commitIndexBuild") != null) {
- MongoCollection<Document> coll = database.getCollection(bson.get("commitIndexBuild").toString());
- BasicBSONList indexes = (BasicBSONList) bson.get("indexes");
- List<IndexModel> indexModels = new ArrayList<>();
- //组合索引
- for (int i = 0; i < indexes.size(); i++) {
- BasicDBObject keyObj = new BasicDBObject();
- BasicBSONObject index = (BasicBSONObject) indexes.get(i);
- BasicBSONObject keys = (BasicBSONObject) index.get("key");
- for (String s : keys.keySet()) {
- keyObj.put(s, keys.get(s));
- }
- //添加配置
- IndexOptions indexOptions = new IndexOptions();
- if (index.get("unique") != null) {
- indexOptions.unique(index.getBoolean("unique"));
- }
- if (index.get("background") != null) {
- indexOptions.background(index.getBoolean("background"));
- }
- //索引名称
- indexOptions.name(index.get("name").toString());
- indexModels.add(new IndexModel(keyObj, indexOptions));
- }
- coll.createIndexes(indexModels);
- } else if (bson.get("createIndexes") != null) {
- MongoCollection<Document> coll = database.getCollection(bson.get("createIndexes").toString());
- List<IndexModel> indexModels = new ArrayList<>();
- //组合索引
- BasicDBObject keyObj = new BasicDBObject();
- BasicBSONObject keys = (BasicBSONObject) bson.get("key");
- for (String s : keys.keySet()) {
- keyObj.put(s, keys.get(s));
- }
- //添加配置
- IndexOptions indexOptions = new IndexOptions();
- if (bson.get("unique") != null) {
- indexOptions.unique(bson.getBoolean("unique"));
- }
- if (bson.get("background") != null) {
- indexOptions.background(bson.getBoolean("background"));
- }
- //索引名称
- indexOptions.name(bson.get("name").toString());
- indexModels.add(new IndexModel(keyObj, indexOptions));
- coll.createIndexes(indexModels);
- }
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
|