Selaa lähdekoodia

ES清除数据,开始备份时记录时间戳,清除数据时,只清除该时间戳之前的数据。

wukai 1 vuosi sitten
vanhempi
commit
04d57ddd72

+ 24 - 1
sync-out/src/main/java/com/jjt/out/service/impl/OutEsServiceImpl.java

@@ -8,7 +8,9 @@ import com.jjt.common.enums.SyncType;
 import com.jjt.common.utils.CompressZip;
 import com.jjt.common.utils.DateUtils;
 import com.jjt.common.utils.LinuxCommand;
+import com.jjt.out.domain.OutProcessInfo;
 import com.jjt.out.service.IOutEsService;
+import com.jjt.out.service.IOutProcessInfoService;
 import com.jjt.system.service.ISysConfigService;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.http.HttpEntity;
@@ -37,6 +39,7 @@ import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.List;
 
 /**
@@ -50,6 +53,8 @@ public class OutEsServiceImpl extends OutBaseService implements IOutEsService {
     private static final Logger log = LoggerFactory.getLogger(OutEsServiceImpl.class);
     @Resource
     private ISysConfigService sysConfigService;
+    @Resource
+    private IOutProcessInfoService processInfoService;
 
 
     /**
@@ -107,6 +112,12 @@ public class OutEsServiceImpl extends OutBaseService implements IOutEsService {
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
+        OutProcessInfo opi = new OutProcessInfo();
+        opi.setProcessType(SyncType.es.toString());
+        Date st = new Date();
+        opi.setCreateTime(st);
+        opi.setProcessKey(String.valueOf(st.getTime()));
+
 
         IndexDO ido = getIndexInfo(index);
         String uri = scheme + "://" + hostname + ":" + port + "/" + index;
@@ -171,6 +182,10 @@ public class OutEsServiceImpl extends OutBaseService implements IOutEsService {
         } catch (IOException e) {
         }
         //生成描述json文件--end
+
+        Date et = new Date();
+        opi.setCostTime(et.getTime() - st.getTime());
+        processInfoService.insertOutProcessInfo(opi);
     }
 
     /**
@@ -178,6 +193,11 @@ public class OutEsServiceImpl extends OutBaseService implements IOutEsService {
      */
     @Override
     public void clean() {
+        //如果是增量,需要获取时间戳
+        OutProcessInfo pi = new OutProcessInfo();
+        pi.setProcessType(SyncType.es.toString());
+        List<OutProcessInfo> list = processInfoService.selectOutProcessInfoList(pi);
+        String time = list.get(0).getProcessKey();
         String params = sysConfigService.selectConfigByKey("out.es.connect");
         JSONObject jsonObject = JSONObject.parseObject(params);
         String index = jsonObject.getString("index");
@@ -188,7 +208,10 @@ public class OutEsServiceImpl extends OutBaseService implements IOutEsService {
             builder = JsonXContent.contentBuilder()
                     .startObject()
                     .startObject("query")
-                    .startObject("match_all")
+                    .startObject("range")
+                    .startObject("time")
+                    .field("lte", Long.parseLong(time))
+                    .endObject()
                     .endObject()
                     .endObject()
                     .endObject();

+ 6 - 6
sync-out/src/main/java/com/jjt/out/service/impl/OutMongoServiceImpl.java

@@ -71,7 +71,7 @@ public class OutMongoServiceImpl extends OutBaseService implements IOutMongoServ
             opi.setProcessType(SyncType.mongo.toString());
             Date st = new Date();
             opi.setCreateTime(st);
-            opi.setProcessKey(String.valueOf(System.currentTimeMillis() / 1000));
+            opi.setProcessKey(String.valueOf(st.getTime() / 1000));
 
             //组装导出命令
             List<String> commands = new ArrayList<>();
@@ -88,7 +88,7 @@ public class OutMongoServiceImpl extends OutBaseService implements IOutMongoServ
             if (isInc) {
                 //如果是增量,需要获取时间戳
                 OutProcessInfo pi = new OutProcessInfo();
-                opi.setProcessType(SyncType.mongo.toString());
+                pi.setProcessType(SyncType.mongo.toString());
                 List<OutProcessInfo> list = processInfoService.selectOutProcessInfoList(pi);
                 String time = list.get(0).getProcessKey();
                 commands.add(time);
@@ -97,10 +97,6 @@ public class OutMongoServiceImpl extends OutBaseService implements IOutMongoServ
 
             LinuxCommand.exec(commands, dir);
 
-            Date et = new Date();
-            opi.setCostTime(et.getTime() - st.getTime());
-            processInfoService.insertOutProcessInfo(opi);
-
 
             //获取外网同步正式目录
             String syncDir = syncDIr();
@@ -142,6 +138,10 @@ public class OutMongoServiceImpl extends OutBaseService implements IOutMongoServ
             } catch (IOException e) {
             }
             //生成描述json文件--end
+
+            Date et = new Date();
+            opi.setCostTime(et.getTime() - st.getTime());
+            processInfoService.insertOutProcessInfo(opi);
         } catch (Exception e) {
             log.error("报错啦:{}", e.getMessage());
             e.printStackTrace();

+ 8 - 1
sync-out/src/test/java/com/test/Test.java

@@ -16,6 +16,10 @@ import java.util.Date;
 
 public class Test {
     public static void main(String[] args) throws Exception {
+        Date d = new Date(1634433572000l);
+
+        System.err.println(d);
+
 
         IndexRequest indexRequest = new IndexRequest();
         XContentBuilder builder;
@@ -23,7 +27,10 @@ public class Test {
             builder = JsonXContent.contentBuilder()
                     .startObject()
                     .startObject("query")
-                    .startObject("match_all")
+                    .startObject("range")
+                    .startObject("time")
+                    .field("lte", 1634433572000l)
+                    .endObject()
                     .endObject()
                     .endObject()
                     .endObject();