From bd03aa7128dd71d6309f7918ee53027974179bf6 Mon Sep 17 00:00:00 2001 From: wtt <30461027+wtt40122@users.noreply.github.com> Date: Thu, 29 Aug 2024 20:03:18 +0800 Subject: [PATCH] refactor: update file coll (#889) * refactor: update es and upgrade file * fix: Fixed the issue of reading a large number of files OOM and upgraded the ES version * refactor: update compile jdk version * refactor: update file coll --- .../main/java/com/xiaomi/mone/file/LogFile2.java | 1 + .../java/com/xiaomi/mone/file/ReadListener.java | 3 +++ .../mone/file/listener/OzHeraReadListener.java | 8 ++++++++ .../com/xiaomi/mone/file/ozhera/HeraFile.java | 3 +++ .../xiaomi/mone/file/ozhera/HeraFileMonitor.java | 12 +++++++++--- .../java/com/xiaomi/mone/file/LogFileTest.java | 16 ++++++++++++++-- 6 files changed, 38 insertions(+), 5 deletions(-) diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java index fbbd399c4..0381a698a 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java @@ -190,6 +190,7 @@ public void readLine() throws IOException { readResult.setFilePathName(file); readResult.setLineNumber(++lineNumber); ReadEvent event = new ReadEvent(readResult); + listener.setReadTime(); listener.onEvent(event); } diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/ReadListener.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/ReadListener.java index 0329b66bf..75e92b157 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/ReadListener.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/ReadListener.java @@ -37,4 +37,7 @@ default void setPointer(Object obj) { default void saveProgress() { } + default void setReadTime() { + } + } diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/OzHeraReadListener.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/OzHeraReadListener.java index e2b3c2d42..43fa64876 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/OzHeraReadListener.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/OzHeraReadListener.java @@ -72,4 +72,12 @@ public void setPointer(Object obj) { } } } + + @Override + public void setReadTime() { + HeraFile f = monitor.getFileMap().get(logFile.getFileKey()); + if (null != f) { + f.getReadTime().set(System.currentTimeMillis()); + } + } } diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFile.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFile.java index 8acb1a989..fbbde66fe 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFile.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFile.java @@ -30,4 +30,7 @@ public class HeraFile { @Builder.Default private AtomicLong utime = new AtomicLong(System.currentTimeMillis()); + + @Builder.Default + private AtomicLong readTime = new AtomicLong(System.currentTimeMillis()); } diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java index c8a2d1e84..0f7c7d385 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java @@ -34,6 +34,7 @@ public class HeraFileMonitor { @Getter private ConcurrentHashMap map = new ConcurrentHashMap<>(); + @Getter private ConcurrentHashMap fileMap = new ConcurrentHashMap<>(); @Setter @@ -52,7 +53,7 @@ public HeraFileMonitor(long removeTime) { List> remList = Lists.newArrayList(); long now = System.currentTimeMillis(); fileMap.values().forEach(it -> { - if (now - it.getUtime().get() >= removeTime) { + if (now - it.getUtime().get() >= removeTime && now - it.getReadTime().get() >= removeTime) { remList.add(Pair.of(it.getFileName(), it.getFileKey())); } }); @@ -129,7 +130,7 @@ public void reg(String path, Predicate predicate) throws IOException, In map.putIfAbsent(k, hf); fileMap.put(filePath, hf); - listener.onEvent(FileEvent.builder().type(EventType.create).fileName(file.getPath()).build()); + listener.onEvent(FileEvent.builder().type(EventType.create).fileKey(k).fileName(file.getPath()).build()); } } } @@ -167,7 +168,12 @@ private HeraFile initFile(File it) { log.info("initFile fileName:{},fileKey:{}", name, fileKey); map.put(hf.getFileKey(), hf); fileMap.put(hf.getFileName(), hf); - this.listener.onEvent(FileEvent.builder().pointer(pointer).type(EventType.init).fileName(hf.getFileName()).build()); + this.listener.onEvent(FileEvent.builder() + .pointer(pointer) + .type(EventType.init) + .fileName(hf.getFileName()) + .fileKey(hf.getFileKey()) + .build()); return hf; } catch (Exception e) { log.error("init file error,fileName:{}", name, e); diff --git a/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java b/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java index d7b987194..e57859f58 100644 --- a/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java +++ b/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java @@ -24,6 +24,8 @@ import org.junit.Test; import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -74,9 +76,14 @@ public void testLogFileMonitor() { monitor.setListener(new DefaultMonitorListener(monitor, readEvent -> { System.out.println(readEvent.getReadResult().getLines()); })); - String fileName = "/home/work/log/file.log.*"; + String fileName = "/home/work/log/test/file*.txt"; Pattern pattern = Pattern.compile(fileName); - monitor.reg("/home/work/log", it -> { + + ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + scheduler.scheduleAtFixedRate(this::test, 1, 2, TimeUnit.SECONDS); + + monitor.reg("/home/work/log/test", it -> { boolean matches = pattern.matcher(it).matches(); log.info("file:{},matches:{}", it, true); return true; @@ -85,6 +92,11 @@ public void testLogFileMonitor() { System.in.read(); } + private void test() { + log.info("test save progress"); + FileInfoCache.ins().shutdown(); + } + @Test public void testLogWS() throws IOException { LogFile log = new LogFile("D:\\test.log", new ReadListener() {