InMongoServiceImpl.java 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. package com.jjt.in.service.impl;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.jjt.common.constant.Constants;
  4. import com.jjt.common.utils.LinuxCommand;
  5. import com.jjt.in.service.IInMongoService;
  6. import com.jjt.system.service.ISysConfigService;
  7. import com.mongodb.BasicDBObject;
  8. import com.mongodb.MongoClient;
  9. import com.mongodb.MongoWriteException;
  10. import com.mongodb.client.MongoCollection;
  11. import com.mongodb.client.MongoDatabase;
  12. import com.mongodb.client.model.Filters;
  13. import com.mongodb.client.model.IndexModel;
  14. import com.mongodb.client.model.IndexOptions;
  15. import org.bson.*;
  16. import org.bson.conversions.Bson;
  17. import org.bson.types.BasicBSONList;
  18. import org.slf4j.Logger;
  19. import org.slf4j.LoggerFactory;
  20. import org.springframework.stereotype.Service;
  21. import javax.annotation.Resource;
  22. import java.io.BufferedInputStream;
  23. import java.io.File;
  24. import java.io.FileInputStream;
  25. import java.io.InputStream;
  26. import java.util.ArrayList;
  27. import java.util.List;
  28. /**
  29. * 数据同步Service业务层处理
  30. *
  31. * @author wukai
  32. * @date 2023-06-06
  33. */
  34. @Service
  35. public class InMongoServiceImpl extends InBaseService implements IInMongoService {
  36. private static final Logger log = LoggerFactory.getLogger(InMongoServiceImpl.class);
  37. @Resource
  38. private ISysConfigService sysConfigService;
  39. /**
  40. * 解析同步文件
  41. */
  42. @Override
  43. public void parseSyncFile(String dir, boolean isInc) throws Exception {
  44. String params = sysConfigService.selectConfigByKey("in.mongo.info");
  45. JSONObject mongoInfo = JSONObject.parseObject(params);
  46. String host = mongoInfo.getString("host");
  47. String port = mongoInfo.getString("port");
  48. String sharding = mongoInfo.getString("sharding");
  49. if (!dir.endsWith(Constants.DIR_END)) {
  50. dir += "/";
  51. }
  52. String filaName = dir;
  53. if (isInc) {
  54. //如果是增量,则需要指定 local/oplog.rs.bson
  55. //如果是全量,只需指定目录即可
  56. filaName += "local/oplog.rs.bson";
  57. if ("1".equals(sharding)) {
  58. //判断是否分片集群
  59. sharding(host, port, filaName);
  60. return;
  61. }
  62. }
  63. try {
  64. List<String> command = new ArrayList<>();
  65. command.add("/usr/bin/mongorestore");
  66. command.add("--host");
  67. command.add(host);
  68. command.add("--port");
  69. command.add(port);
  70. command.add("--drop");
  71. command.add("--oplogReplay");
  72. command.add(filaName);
  73. // String cmd = String.format("/usr/bin/mongorestore --host %s --port %s --drop --oplogReplay %s", host, port, filaName);
  74. LinuxCommand.exec(command);
  75. } catch (Exception e) {
  76. log.error("还原mongo出错啦:{}", e.getMessage());
  77. throw new Exception(e.getMessage());
  78. }
  79. }
  80. /**
  81. * 如果是分片集群,则单独处理
  82. *
  83. * @param host
  84. * @param port
  85. * @param filename
  86. */
  87. private void sharding(String host, String port, String filename) {
  88. File file = new File(filename);
  89. BSONDecoder decoder = new BasicBSONDecoder();
  90. try (InputStream inputStream = new BufferedInputStream(new FileInputStream(file)); MongoClient mongoClient = new MongoClient(host, Integer.parseInt(port));) {
  91. List<String> dbs = new ArrayList<>();
  92. for (String db : mongoClient.listDatabaseNames()) {
  93. dbs.add(db);
  94. }
  95. while (inputStream.available() > 0) {
  96. BSONObject obj = decoder.readObject(inputStream);
  97. if (obj == null) {
  98. break;
  99. }
  100. String op = obj.get("op").toString();
  101. String ns = obj.get("ns").toString();
  102. if (ns.startsWith("config.system.")) {
  103. continue;
  104. }
  105. String[] nss = ns.split("\\.");
  106. if (!dbs.contains(nss[0])) {
  107. try {
  108. MongoDatabase mdb = mongoClient.getDatabase("admin");
  109. mdb.runCommand(new Document("enablesharding", nss[0]));
  110. } catch (Exception e) {
  111. }
  112. }
  113. MongoDatabase database = mongoClient.getDatabase(nss[0]);
  114. MongoCollection<BasicBSONObject> collection = database.getCollection(nss[1], BasicBSONObject.class);
  115. BasicBSONObject bson = (BasicBSONObject) obj.get("o");
  116. if ("i".equals(op)) {
  117. //读取数据插入
  118. try {
  119. collection.insertOne(bson);
  120. } catch (MongoWriteException e1) {
  121. //重复插入会报错,不管他
  122. }
  123. } else if ("u".equals(op)) {
  124. BasicBSONObject bson2 = (BasicBSONObject) obj.get("o2");
  125. Bson f = Filters.eq("_id", bson2.get("_id"));
  126. BasicBSONObject set = (BasicBSONObject) bson.get("$set");
  127. Document x = new Document("$set", set);
  128. collection.updateOne(f, x);
  129. } else if ("c".equals(op)) {
  130. if (bson.get("commitIndexBuild") != null) {
  131. MongoCollection<Document> coll = database.getCollection(bson.get("commitIndexBuild").toString());
  132. BasicBSONList indexes = (BasicBSONList) bson.get("indexes");
  133. List<IndexModel> indexModels = new ArrayList<>();
  134. //组合索引
  135. for (int i = 0; i < indexes.size(); i++) {
  136. BasicDBObject keyObj = new BasicDBObject();
  137. BasicBSONObject index = (BasicBSONObject) indexes.get(i);
  138. BasicBSONObject keys = (BasicBSONObject) index.get("key");
  139. for (String s : keys.keySet()) {
  140. keyObj.put(s, keys.get(s));
  141. }
  142. //添加配置
  143. IndexOptions indexOptions = new IndexOptions();
  144. if (index.get("unique") != null) {
  145. indexOptions.unique(index.getBoolean("unique"));
  146. }
  147. if (index.get("background") != null) {
  148. indexOptions.background(index.getBoolean("background"));
  149. }
  150. //索引名称
  151. indexOptions.name(index.get("name").toString());
  152. indexModels.add(new IndexModel(keyObj, indexOptions));
  153. }
  154. coll.createIndexes(indexModels);
  155. } else if (bson.get("createIndexes") != null) {
  156. MongoCollection<Document> coll = database.getCollection(bson.get("createIndexes").toString());
  157. List<IndexModel> indexModels = new ArrayList<>();
  158. //组合索引
  159. BasicDBObject keyObj = new BasicDBObject();
  160. BasicBSONObject keys = (BasicBSONObject) bson.get("key");
  161. for (String s : keys.keySet()) {
  162. keyObj.put(s, keys.get(s));
  163. }
  164. //添加配置
  165. IndexOptions indexOptions = new IndexOptions();
  166. if (bson.get("unique") != null) {
  167. indexOptions.unique(bson.getBoolean("unique"));
  168. }
  169. if (bson.get("background") != null) {
  170. indexOptions.background(bson.getBoolean("background"));
  171. }
  172. //索引名称
  173. indexOptions.name(bson.get("name").toString());
  174. indexModels.add(new IndexModel(keyObj, indexOptions));
  175. coll.createIndexes(indexModels);
  176. }
  177. }
  178. }
  179. } catch (Exception e) {
  180. e.printStackTrace();
  181. }
  182. }
  183. }