Browse Source

mongo 清理数据时 增加时间戳

wukai 1 month ago
parent
commit
ecfa7b2968

+ 1 - 1
sync-admin/src/main/resources/application-dev.yml

@@ -6,7 +6,7 @@ spring:
     druid:
       # 主库数据源
       master:
-        url: jdbc:mysql://192.168.188.60:3306/xjsync?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
+        url: jdbc:mysql://192.168.188.62:3306/xjsync?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
         username: root
         password: 123456
       # 从库数据源

+ 8 - 8
sync-admin/src/main/resources/logback.xml

@@ -1,7 +1,7 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <configuration>
     <!-- 日志存放路径 -->
-	<property name="log.path" value="/home/ruoyi/logs" />
+	<property name="log.path" value="/home/sync/logs" />
     <!-- 日志输出格式 -->
 	<property name="log.pattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{20} - [%method,%line] - %msg%n" />
 
@@ -11,7 +11,7 @@
 			<pattern>${log.pattern}</pattern>
 		</encoder>
 	</appender>
-	
+
 	<!-- 系统日志输出 -->
 	<appender name="file_info" class="ch.qos.logback.core.rolling.RollingFileAppender">
 	    <file>${log.path}/sys-info.log</file>
@@ -34,7 +34,7 @@
             <onMismatch>DENY</onMismatch>
         </filter>
 	</appender>
-	
+
 	<appender name="file_error" class="ch.qos.logback.core.rolling.RollingFileAppender">
 	    <file>${log.path}/sys-error.log</file>
         <!-- 循环政策:基于时间创建日志文件 -->
@@ -56,7 +56,7 @@
             <onMismatch>DENY</onMismatch>
         </filter>
     </appender>
-	
+
 	<!-- 用户访问日志输出  -->
     <appender name="sys-user" class="ch.qos.logback.core.rolling.RollingFileAppender">
 		<file>${log.path}/sys-user.log</file>
@@ -70,7 +70,7 @@
             <pattern>${log.pattern}</pattern>
         </encoder>
     </appender>
-	
+
 	<!-- 系统模块日志级别控制  -->
 	<logger name="com.jjt" level="info" />
 	<!-- Spring日志级别控制  -->
@@ -79,15 +79,15 @@
 	<root level="info">
 		<appender-ref ref="console" />
 	</root>
-	
+
 	<!--系统操作日志-->
     <root level="info">
         <appender-ref ref="file_info" />
         <appender-ref ref="file_error" />
     </root>
-	
+
 	<!--系统用户操作日志-->
     <logger name="sys-user" level="info">
         <appender-ref ref="sys-user"/>
     </logger>
-</configuration> 
+</configuration>

+ 149 - 0
sync-admin/src/test/java/MongoParseTest.java

@@ -0,0 +1,149 @@
+import com.mongodb.client.model.UpdateOneModel;
+import org.bson.*;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * MongoParseTest$
+ *
+ * @author wukai
+ * @date 2024/4/19 13:00
+ */
+public class MongoParseTest {
+    public static void main(String[] args) {
+        String fileName = "D:\\SYSTEM\\Desktop\\temp\\sync\\oplog.rs.bson";
+        parse(fileName);
+
+    }
+
+    /**
+     * 如果是分片集群,则单独处理
+     *
+     * @param filename 文件名
+     */
+    public static void parse(String filename) {
+        File file = new File(filename);
+        BSONDecoder decoder = new BasicBSONDecoder();
+
+        try (InputStream inputStream = new BufferedInputStream(Files.newInputStream(file.toPath()));) {
+            Map<String, List<BasicBSONObject>> iMap = new HashMap<>(16);
+//            Map<String, List<Pair<Bson, Document>>> uMap = new HashMap<>(16);
+            Map<String, List<UpdateOneModel<Document>>> mMap = new HashMap<>(16);
+            long records = 0;
+            long iIndex = 0;
+            long uIndex = 0;
+            long cIndex = 0;
+
+
+            while (inputStream.available() > 0) {
+//                if (records % 10000 == 0) {
+//                System.err.println("正在解析文件:{},当前处理记录数:{}" + filename + records);
+//                }
+                BSONObject obj = decoder.readObject(inputStream);
+                if (obj == null) {
+                    break;
+                }
+                //操作符
+                String op = obj.get("op").toString();
+                //数据库和表名
+                String ns = obj.get("ns").toString();
+                if (ns.startsWith("config.system.")) {
+                    continue;
+                }
+                BasicBSONObject bson = (BasicBSONObject) obj.get("o");
+                if ("i".equals(op)) {
+                    iIndex++;
+                } else if ("u".equals(op)) {
+                    uIndex++;
+//                    // 数据更新
+////                    List<Pair<Bson, Document>> updates = uMap.get(ns);
+//                    List<UpdateOneModel<Document>> us = mMap.get(ns);
+//                    if (us == null) {
+//                        us = new ArrayList<>();
+//                    }
+//
+//                    System.err.println(obj);
+//
+//                    BasicBSONObject bson2 = (BasicBSONObject) obj.get("o2");
+//                    Bson f = Filters.eq("_id", bson2.get("_id"));
+//                    BasicBSONObject set = (BasicBSONObject) bson.get("$set");
+//                    Document x = new Document("$set", set);
+//                    us.add(new UpdateOneModel<>(f, x));
+//                    mMap.put(ns, us);
+//                    Pair<Bson, Document> pair = new Pair<>(f, x);
+//                    updates.add(pair);
+//                    uMap.put(ns, updates);
+
+                } else if ("c".equals(op)) {
+                    cIndex++;
+                    System.err.println(obj);
+//                    MongoDatabase database = getDatabase4Ns(mongoClient, ns, dbs);
+                    if (bson.get("commitIndexBuild") != null) {
+//                        MongoCollection<Document> coll = database.getCollection(bson.get("commitIndexBuild").toString());
+//                        BasicBSONList indexes = (BasicBSONList) bson.get("indexes");
+//
+//                        List<IndexModel> indexModels = new ArrayList<>();
+//                        //组合索引
+//
+//                        for (int i = 0; i < indexes.size(); i++) {
+//                            BasicDBObject keyObj = new BasicDBObject();
+//                            BasicBSONObject index = (BasicBSONObject) indexes.get(i);
+//                            BasicBSONObject keys = (BasicBSONObject) index.get("key");
+//                            for (String s : keys.keySet()) {
+//                                keyObj.put(s, keys.get(s));
+//                            }
+//
+//                            //添加配置
+//                            IndexOptions indexOptions = new IndexOptions();
+//                            if (index.get("unique") != null) {
+//                                indexOptions.unique(index.getBoolean("unique"));
+//                            }
+//                            if (index.get("background") != null) {
+//                                indexOptions.background(index.getBoolean("background"));
+//                            }
+//                            //索引名称
+//                            indexOptions.name(index.get("name").toString());
+//                            indexModels.add(new IndexModel(keyObj, indexOptions));
+//                        }
+//                        coll.createIndexes(indexModels);
+                    } else if (bson.get("createIndexes") != null) {
+//                        MongoCollection<Document> coll = database.getCollection(bson.get("createIndexes").toString());
+//
+//                        List<IndexModel> indexModels = new ArrayList<>();
+//                        //组合索引
+//
+//                        BasicDBObject keyObj = new BasicDBObject();
+//                        BasicBSONObject keys = (BasicBSONObject) bson.get("key");
+//                        for (String s : keys.keySet()) {
+//                            keyObj.put(s, keys.get(s));
+//                        }
+//
+//                        //添加配置
+//                        IndexOptions indexOptions = new IndexOptions();
+//                        if (bson.get("unique") != null) {
+//                            indexOptions.unique(bson.getBoolean("unique"));
+//                        }
+//                        if (bson.get("background") != null) {
+//                            indexOptions.background(bson.getBoolean("background"));
+//                        }
+//                        //索引名称
+//                        indexOptions.name(bson.get("name").toString());
+//                        indexModels.add(new IndexModel(keyObj, indexOptions));
+//                        coll.createIndexes(indexModels);
+                    }
+                }
+
+                records++;
+            }
+            System.err.printf("新增:%s\t更新:%s\t索引:%s", iIndex, uIndex, cIndex);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}

+ 59 - 0
sync-admin/src/test/java/OutMongoTest.java

@@ -1,12 +1,17 @@
 import com.jjt.RuoYiApplication;
 import com.jjt.out.service.IOutMongoService;
 import com.jjt.system.service.ISysConfigService;
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.junit4.SpringRunner;
 
 import javax.annotation.Resource;
+import java.util.ArrayList;
+import java.util.List;
 
 @RunWith(SpringRunner.class)
 @SpringBootTest(classes = RuoYiApplication.class)
@@ -20,4 +25,58 @@ public class OutMongoTest {
     public void test() {
         mongoService.clean();
     }
+
+    @Test
+    public void clean() {
+
+        String host = "192.168.188.62";
+        String port = "27017";
+        MongoClient mongo = new MongoClient(host, Integer.parseInt(port));
+
+
+        List<String> baseDB = new ArrayList<>();
+        baseDB.add("admin");
+        baseDB.add("config");
+        baseDB.add("local");
+        //查询所有的databases
+        for (String dbName : mongo.listDatabaseNames()) {
+            if (!baseDB.contains(dbName)) {
+                MongoDatabase db = mongo.getDatabase(dbName);
+                db.drop();
+                //查询所有的聚集集合
+//                for (String name : db.listCollectionNames()) {
+//                    MongoCollection col = db.getCollection(name);
+//                    col.deleteMany(new BasicDBObject());
+//                }
+            }
+        }
+    }
+
+    @Test
+    public void records() {
+
+        String host = "192.168.188.62";
+        String port = "27017";
+        MongoClient mongo = new MongoClient(host, Integer.parseInt(port));
+
+
+        List<String> baseDB = new ArrayList<>();
+        baseDB.add("admin");
+        baseDB.add("config");
+        baseDB.add("local");
+        //查询所有的databases
+        for (String dbName : mongo.listDatabaseNames()) {
+            if (!baseDB.contains(dbName)) {
+                MongoDatabase db = mongo.getDatabase(dbName);
+                //查询所有的聚集集合
+                for (String name : db.listCollectionNames()) {
+                    MongoCollection col = db.getCollection(name);
+                    if (col.countDocuments() > 0) {
+                        System.err.println(dbName + "\t" + name + "\t" + col.countDocuments());
+                    }
+//                    col.deleteMany(new BasicDBObject());
+                }
+            }
+        }
+    }
 }

+ 2 - 0
sync-common/src/main/java/com/jjt/common/utils/LinuxCommand.java

@@ -2,6 +2,7 @@ package com.jjt.common.utils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
 
 import java.io.*;
 import java.nio.charset.StandardCharsets;
@@ -15,6 +16,7 @@ import java.util.concurrent.Executors;
  *
  * @author wukai
  */
+@Service
 public class LinuxCommand {
     private static final Logger log = LoggerFactory.getLogger(LinuxCommand.class);
 

+ 20 - 2
sync-out/src/main/java/com/jjt/out/service/impl/OutMongoServiceImpl.java

@@ -15,8 +15,11 @@ import com.mongodb.BasicDBObject;
 import com.mongodb.MongoClient;
 import com.mongodb.client.MongoCollection;
 import com.mongodb.client.MongoDatabase;
+import com.mongodb.client.model.Filters;
+import com.mongodb.client.result.DeleteResult;
 import net.lingala.zip4j.ZipFile;
 import org.apache.commons.io.FileUtils;
+import org.bson.conversions.Bson;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
@@ -160,7 +163,15 @@ public class OutMongoServiceImpl extends OutBaseService implements IOutMongoServ
     @Override
     public void clean() {
         String params = sysConfigService.selectConfigByKey("out.mongo.info");
-
+        //获取已经同步的数据时间戳
+        OutProcessInfo pi = new OutProcessInfo();
+        pi.setProcessType(SyncType.mongo.toString());
+        PageHelper.startPage(1, 10, "create_time desc").setReasonable(true);
+        List<OutProcessInfo> list = processInfoService.selectOutProcessInfoList(pi);
+        String time = list.get(0).getProcessKey();
+        long s = Long.parseLong(time);
+        Date d = new Date(s*1000);
+        System.err.println(d);
         JSONObject mongoInfo = JSONObject.parseObject(params);
         String host = mongoInfo.getString("host");
         String port = mongoInfo.getString("port");
@@ -185,7 +196,14 @@ public class OutMongoServiceImpl extends OutBaseService implements IOutMongoServ
                 for (String name : db.listCollectionNames()) {
                     log.info("开始删除集合{}的数据", name);
                     MongoCollection col = db.getCollection(name);
-                    col.deleteMany(new BasicDBObject());
+
+                    // 构建时间条件(假设文档中有 createdAt 或 updateAt 字段)
+                    Bson filter = Filters.lt("createdAt", d);
+
+                    // 执行删除操作
+                    DeleteResult result = col.deleteMany(filter);
+                    log.info("已删除 {} 条过期数据", result.getDeletedCount());
+//                    col.deleteMany(new BasicDBObject());
                 }
             }
         }

+ 25 - 27
sync-out/src/test/java/test/Test.java

@@ -1,36 +1,34 @@
 package test;
 
-import cn.hutool.extra.ftp.FtpMode;
-import com.jjt.common.json.JSONObject;
-import com.jjt.common.utils.FtpUtil;
-import com.jjt.common.utils.http.HttpUtils;
-
-import java.util.Arrays;
+import java.util.Date;
 
 public class Test {
     public static void main(String[] args) {
+        long s = Long.parseLong("1744717860");
+        Date d = new Date(s*1000);
+        System.err.println(d);
 
-        String cmd = "mysqldump -u%s -p%s %s > %s.sql";
-        cmd = String.format(cmd, "root", "1233", "ab", "ab");
-        System.err.println(cmd);
-
-        FtpUtil ftpUtil = new FtpUtil("192.168.188.61", 21, "ftpadmin", "ftpadmin","/home/ftpadmin/ftp/upload");
-
-//        ftpUtil.upload("/home/ftpadmin/ftp/upload", "/data/sync/in/sync/");
-
-        ftpUtil.delete();
-//        ftpUtil.download("/mnt/test", "/data/sync/in/");
-
-        String s = "sync-es-20230608110532.zip";
-        System.err.println(s.split("\\.")[0]);
-//        JSONObject obj=new JSONObject();
-//        String result = HttpUtils.sendGet("http://192.168.188.60:9200/_cat/indices?v");
-//        System.err.println(result);
+//        String cmd = "mysqldump -u%s -p%s %s > %s.sql";
+//        cmd = String.format(cmd, "root", "1233", "ab", "ab");
+//        System.err.println(cmd);
+//
+//        FtpUtil ftpUtil = new FtpUtil("192.168.188.61", 21, "ftpadmin", "ftpadmin","/home/ftpadmin/ftp/upload");
+//
+////        ftpUtil.upload("/home/ftpadmin/ftp/upload", "/data/sync/in/sync/");
+//
+//        ftpUtil.delete();
+////        ftpUtil.download("/mnt/test", "/data/sync/in/");
 //
-//        String[] rr = result.split("\n");
-//        System.err.println("wft");
-//        for (int i = 0; i < rr.length; i++) {
-//            System.err.println(rr[i]);
-//        }
+//        String s = "sync-es-20230608110532.zip";
+//        System.err.println(s.split("\\.")[0]);
+////        JSONObject obj=new JSONObject();
+////        String result = HttpUtils.sendGet("http://192.168.188.60:9200/_cat/indices?v");
+////        System.err.println(result);
+////
+////        String[] rr = result.split("\n");
+////        System.err.println("wft");
+////        for (int i = 0; i < rr.length; i++) {
+////            System.err.println(rr[i]);
+////        }
     }
 }