From 25b7aa4f3642f1eb764ac4e214e27f03387718fe Mon Sep 17 00:00:00 2001 From: zhangzhiyong Date: Fri, 22 Sep 2023 10:13:13 +0800 Subject: [PATCH 1/2] The file module needs to add a few performance-related unit tests. #731 --- .../docean/plugin/log/test/LogWriterTest.java | 15 ++-- jcommon/file/pom.xml | 32 ++++++--- .../com/xiaomi/mone/file/LogFileTest.java | 13 +--- .../mone/file/RandomAccessFileTest.java | 70 +++++++++++++++++++ 4 files changed, 103 insertions(+), 27 deletions(-) create mode 100644 jcommon/file/src/test/java/com/xiaomi/mone/file/RandomAccessFileTest.java diff --git a/jcommon/docean-plugin/docean-plugin-log/src/test/java/com/xiaomi/youpin/docean/plugin/log/test/LogWriterTest.java b/jcommon/docean-plugin/docean-plugin-log/src/test/java/com/xiaomi/youpin/docean/plugin/log/test/LogWriterTest.java index 62b8abd4a..32f271662 100644 --- a/jcommon/docean-plugin/docean-plugin-log/src/test/java/com/xiaomi/youpin/docean/plugin/log/test/LogWriterTest.java +++ b/jcommon/docean-plugin/docean-plugin-log/src/test/java/com/xiaomi/youpin/docean/plugin/log/test/LogWriterTest.java @@ -16,6 +16,7 @@ package com.xiaomi.youpin.docean.plugin.log.test; +import com.google.common.base.Strings; import com.xiaomi.youpin.docean.plugin.log.LogWriter; import org.junit.Test; @@ -40,14 +41,14 @@ public class LogWriterTest { public void testWrite() { LogWriter logWriter = new LogWriter("/tmp/data"); logWriter.init(1024 * 1024 * 10); - IntStream.range(0, 1000).forEach(it -> { + IntStream.range(0, 1000000).forEach(it -> { System.out.println("run:" + new Date()); - logWriter.write(LocalDateTime.now(), "record:" + (new Date().toString()) + System.lineSeparator()); - try { - TimeUnit.SECONDS.sleep(1); - } catch (InterruptedException e) { - e.printStackTrace(); - } + logWriter.write(LocalDateTime.now(), "record:" + (new Date().toString()) + System.lineSeparator()+ Strings.repeat("abc",10)); +// try { +// TimeUnit.SECONDS.sleep(1); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } }); logWriter.force(); } diff --git a/jcommon/file/pom.xml b/jcommon/file/pom.xml index e35dca653..756373cdc 100644 --- a/jcommon/file/pom.xml +++ b/jcommon/file/pom.xml @@ -1,11 +1,25 @@ - - 4.0.0 - - run.mone - jcommon - 1.4-jdk20-SNAPSHOT - - file + + 4.0.0 + + run.mone + jcommon + 1.4-jdk20-SNAPSHOT + + file + + + + + + com.squareup.okio + okio + 3.5.0 + test + + + + diff --git a/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java b/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java index 8191fa27f..a424ee1cd 100644 --- a/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java +++ b/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java @@ -77,15 +77,8 @@ class MyReadListener implements ReadListener { @Override public void onEvent(ReadEvent event) { - List m = mLog.append(event.getReadResult().getLines().get(0)); - if (m.size() > 0) { - System.out.println("--->" + m); - } -// try { -// TimeUnit.MILLISECONDS.sleep(2); -// } catch (InterruptedException e) { -// e.printStackTrace(); -// } + String m = event.getReadResult().getLines().get(0); + System.out.println(m); } @Override @@ -99,13 +92,11 @@ public boolean isContinue(String line) { public void testLog2() throws IOException { LogFile log = new LogFile("/tmp/zzytest/zzytest/server.log", new MyReadListener()); log.readLine(); - System.in.read(); } @Test public void testReadFileCutting() throws IOException { - System.out.println("111111"); LogFile log = new LogFile("/home/work/log/hera-operator/server.log", new MyReadListener()); log.readLine(); System.in.read(); diff --git a/jcommon/file/src/test/java/com/xiaomi/mone/file/RandomAccessFileTest.java b/jcommon/file/src/test/java/com/xiaomi/mone/file/RandomAccessFileTest.java new file mode 100644 index 000000000..ee519d85d --- /dev/null +++ b/jcommon/file/src/test/java/com/xiaomi/mone/file/RandomAccessFileTest.java @@ -0,0 +1,70 @@ +package com.xiaomi.mone.file; + +import com.google.common.base.Stopwatch; +import okio.BufferedSource; +import okio.Okio; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author goodjava@qq.com + * @date 2023/9/21 15:25 + */ +public class RandomAccessFileTest { + + + @Test + public void test1() throws IOException { + AtomicInteger ai = new AtomicInteger(); + MoneRandomAccessFile mra = new MoneRandomAccessFile("/tmp/data", "r", 1024 * 4); + Stopwatch sw = Stopwatch.createStarted(); + while (true) { + String line = mra.getNextLine(); +// System.out.println(line); + ai.incrementAndGet(); + if (null == line) { + break; + } + } + System.out.println("use time:" + sw.elapsed(TimeUnit.MILLISECONDS)); + System.out.println(ai.get()); + } + + + @Test + public void test2() throws IOException { + RandomAccessFile mra = new RandomAccessFile("/tmp/data", "r"); + Stopwatch sw = Stopwatch.createStarted(); + while (true) { + String line = mra.readLine(); + System.out.println(line); + if (null == line) { + break; + } + } + System.out.println("use time:" + sw.elapsed(TimeUnit.MILLISECONDS)); + } + + @Test + public void test3() throws IOException { + AtomicInteger ai = new AtomicInteger(); + BufferedSource bufferedSource = Okio.buffer(Okio.source(new File("/tmp/data"))); + Stopwatch sw = Stopwatch.createStarted(); + while (true) { + String line = bufferedSource.readUtf8Line(); +// System.out.println(line); + ai.incrementAndGet(); + if (null == line) { + break; + } + } + System.out.println("use time:" + sw.elapsed(TimeUnit.MILLISECONDS)); + System.out.println(ai.get()); + } + +} From 3a888f03e34cfb1ddc8437bbc6b8f5fe03bdb34f Mon Sep 17 00:00:00 2001 From: zhangzhiyong Date: Tue, 26 Sep 2023 14:18:53 +0800 Subject: [PATCH 2/2] Improving Log Agent Performance by Switching to Coroutine and Monitoring Active Log #739 --- jcommon/file/pom.xml | 19 ++ .../java/com/xiaomi/mone/file/ILogFile.java | 1 + .../java/com/xiaomi/mone/file/LogFile2.java | 274 ++++++++++++++++++ .../com/xiaomi/mone/file/ReadListener.java | 9 +- .../com/xiaomi/mone/file/common/FileInfo.java | 16 + .../mone/file/common/FileInfoCache.java | 64 ++++ .../xiaomi/mone/file/common/FileUtils.java | 24 ++ .../com/xiaomi/mone/file/common/Pair.java | 26 ++ .../com/xiaomi/mone/file/common/SafeRun.java | 28 ++ .../xiaomi/mone/file/event/EventListener.java | 11 + .../com/xiaomi/mone/file/event/EventType.java | 16 + .../com/xiaomi/mone/file/event/FileEvent.java | 23 ++ .../file/listener/DefaultMonitorListener.java | 63 ++++ .../file/listener/OzHeraReadListener.java | 65 +++++ .../com/xiaomi/mone/file/ozhera/HeraFile.java | 33 +++ .../mone/file/ozhera/HeraFileMonitor.java | 161 ++++++++++ .../mone/file/ozhera/HeraFileState.java | 12 + .../mone/file/FileMonitorUtilsTest.java | 25 ++ .../com/xiaomi/mone/file/LogFileTest.java | 23 +- .../java/com/xiaomi/mone/file/MapTest.java | 22 ++ 20 files changed, 913 insertions(+), 2 deletions(-) create mode 100644 jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java create mode 100644 jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileInfo.java create mode 100644 jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileInfoCache.java create mode 100644 jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileUtils.java create mode 100644 jcommon/file/src/main/java/com/xiaomi/mone/file/common/Pair.java create mode 100644 jcommon/file/src/main/java/com/xiaomi/mone/file/common/SafeRun.java create mode 100644 jcommon/file/src/main/java/com/xiaomi/mone/file/event/EventListener.java create mode 100644 jcommon/file/src/main/java/com/xiaomi/mone/file/event/EventType.java create mode 100644 jcommon/file/src/main/java/com/xiaomi/mone/file/event/FileEvent.java create mode 100644 jcommon/file/src/main/java/com/xiaomi/mone/file/listener/DefaultMonitorListener.java create mode 100644 jcommon/file/src/main/java/com/xiaomi/mone/file/listener/OzHeraReadListener.java create mode 100644 jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFile.java create mode 100644 jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java create mode 100644 jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileState.java create mode 100644 jcommon/file/src/test/java/com/xiaomi/mone/file/FileMonitorUtilsTest.java create mode 100644 jcommon/file/src/test/java/com/xiaomi/mone/file/MapTest.java diff --git a/jcommon/file/pom.xml b/jcommon/file/pom.xml index 756373cdc..8a6e143f6 100644 --- a/jcommon/file/pom.xml +++ b/jcommon/file/pom.xml @@ -22,4 +22,23 @@ + + + + + maven-compiler-plugin + 3.11.0 + + + --add-modules=jdk.incubator.concurrent + --enable-preview + + 20 + 20 + 20 + + + + + diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java index d1d6f5e59..0a898721e 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java @@ -16,4 +16,5 @@ public interface ILogFile { void initLogFile(String file, ReadListener listener, long pointer, long lineNumber); + } diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java new file mode 100644 index 000000000..cb039ff02 --- /dev/null +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java @@ -0,0 +1,274 @@ +package com.xiaomi.mone.file; + +import com.google.common.collect.Lists; +import com.xiaomi.mone.file.common.FileInfo; +import com.xiaomi.mone.file.common.FileInfoCache; +import com.xiaomi.mone.file.common.FileUtils; +import lombok.Getter; +import lombok.Setter; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.security.MessageDigest; +import java.util.concurrent.TimeUnit; + +/** + * @author goodjava@qq.com + */ +@Slf4j +public class LogFile2 implements ILogFile { + + @Getter + private String file; + + @Getter + private Object fileKey; + + @Getter + private MoneRandomAccessFile raf; + + @Setter + private ReadListener listener; + + @Setter + private volatile boolean stop; + + @Setter + private volatile boolean reOpen; + + @Setter + private volatile boolean reFresh; + + @Getter + private int beforePointerHashCode; + + @Setter + private long pointer; + + //行号 + private long lineNumber; + + //每次读取时文件的最大偏移量 + private long maxPointer; + + private String md5; + + private static final int LINE_MAX_LENGTH = 50000; + + public LogFile2() { + + } + + public LogFile2(String file, ReadListener listener) { + this.file = file; + File f = new File(this.file); + this.fileKey = FileUtils.fileKey(f); + this.md5 = md5(file); + this.listener = listener; + this.pointer = readPointer(); + } + + public LogFile2(String file) { + this.file = file; + File f = new File(this.file); + this.fileKey = FileUtils.fileKey(f); + this.md5 = md5(file); + this.pointer = readPointer(); + } + + + public LogFile2(String file, ReadListener listener, long pointer, long lineNumber) { + this.file = file; + this.md5 = md5(file); + this.listener = listener; + this.pointer = pointer; + this.lineNumber = lineNumber; + } + + private void open() { + try { + //4kb + this.raf = new MoneRandomAccessFile(file, "r", 1024 * 4); + reOpen = false; + reFresh = false; + } catch (FileNotFoundException e) { + log.error("open file FileNotFoundException", e); + } catch (IOException e) { + log.error("open file IOException", e); + } + } + + public void readLine() throws IOException { + while (true) { + open(); + //兼容文件切换时,缓存的pointer + try { + log.info("open file:{},pointer:{}", file, this.pointer); + if (pointer > raf.length()) { + pointer = 0; + lineNumber = 0; + } + } catch (Exception e) { + log.error("file.length() IOException, file:{}", this.file, e); + } + raf.seek(pointer); + + while (true) { + listener.setPointer(this); + if (this.pointer == -1) { + pointer = 0; + this.lineNumber = 0; + log.info("empty break"); + break; + } + String line = raf.getNextLine(); + if (null != line && lineNumber == 0 && pointer == 0) { + String hashLine = line.length() > 100 ? line.substring(0, 100) : line; + beforePointerHashCode = hashLine.hashCode(); + } + //大行文件先临时截断 + line = lineCutOff(line); + + if (reFresh) { + break; + } + + if (reOpen) { + pointer = 0; + lineNumber = 0; + break; + } + + if (stop) { + break; + } + + //文件内容被切割,重头开始采集内容 + if (contentHasCutting(line)) { + reOpen = true; + pointer = 0; + lineNumber = 0; + log.warn("file:{} content have been cut, goto reOpen file", file); + break; + } + + if (listener.isBreak(line)) { + stop = true; + break; + } + + if (listener.isContinue(line)) { + continue; + } + + + try { + pointer = raf.getFilePointer(); + maxPointer = raf.length(); + } catch (IOException e) { + log.error("file.length() IOException, file:{}", this.file, e); + } + + ReadResult readResult = new ReadResult(); + readResult.setLines(Lists.newArrayList(line)); + readResult.setPointer(pointer); + readResult.setFileMaxPointer(maxPointer); + readResult.setLineNumber(++lineNumber); + ReadEvent event = new ReadEvent(readResult); + + listener.onEvent(event); + } + raf.close(); + if (stop) { + log.info("stop:{}", this.file); + FileInfoCache.ins().put(this.fileKey.toString(), FileInfo.builder().pointer(this.pointer).build()); + break; + } + } + } + + @Override + public void initLogFile(String file, ReadListener listener, long pointer, long lineNumber) { + this.file = file; + this.md5 = md5(file); + this.listener = listener; + this.pointer = pointer; + this.lineNumber = lineNumber; + } + + private String lineCutOff(String line) { + if (null != line) { + //todo 大行文件先临时截断 + if (line.length() > LINE_MAX_LENGTH) { + line = line.substring(0, LINE_MAX_LENGTH); + } + } + + return line; + } + + private boolean contentHasCutting(String line) throws IOException { + if (null != line) { + return false; + } + + long currentFileMaxPointer; + try { + currentFileMaxPointer = raf.length(); + if (currentFileMaxPointer == 0L) { + raf.getFD().sync(); + TimeUnit.MILLISECONDS.sleep(30); + currentFileMaxPointer = raf.length(); + } + } catch (IOException e) { + log.error("get fileMaxPointer IOException", e); + return false; + } catch (InterruptedException e) { + log.error("get fileMaxPointer InterruptedException", e); + return false; + } + + return false; + } + + + public void shutdown() { + try { + this.stop = true; + FileInfoCache.ins().put(this.fileKey.toString(), FileInfo.builder().pointer(this.pointer).build()); + } catch (Throwable ex) { + log.error(ex.getMessage()); + } + } + + + public long readPointer() { + try { + FileInfo fi = FileInfoCache.ins().get(this.fileKey.toString()); + if (null != fi) { + return fi.getPointer(); + } + } catch (Throwable e) { + log.error(e.getMessage()); + } + return 0; + } + + + @SneakyThrows + public String md5(String msg) { + MessageDigest md = MessageDigest.getInstance("MD5"); + md.update(msg.getBytes()); + byte[] digest = md.digest(); + StringBuilder sb = new StringBuilder(2 * digest.length); + for (byte b : digest) { + sb.append(String.format("%02x", b & 0xff)); + } + return sb.toString().toUpperCase(); + } + + +} diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/ReadListener.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/ReadListener.java index 1504f5939..2f5d10ee9 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/ReadListener.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/ReadListener.java @@ -22,9 +22,16 @@ */ public interface ReadListener { - void onEvent(ReadEvent event); boolean isContinue(String line); + default boolean isBreak(String line) { + return false; + } + + default void setPointer(Object obj) { + + } + } diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileInfo.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileInfo.java new file mode 100644 index 000000000..98ac9bd68 --- /dev/null +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileInfo.java @@ -0,0 +1,16 @@ +package com.xiaomi.mone.file.common; + +import lombok.Builder; +import lombok.Data; + +/** + * @author goodjava@qq.com + * @date 2023/9/26 11:51 + */ +@Data +@Builder +public class FileInfo { + + private long pointer; + +} diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileInfoCache.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileInfoCache.java new file mode 100644 index 000000000..e4b4256a4 --- /dev/null +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileInfoCache.java @@ -0,0 +1,64 @@ +package com.xiaomi.mone.file.common; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import lombok.SneakyThrows; + +import java.io.File; +import java.lang.reflect.Type; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Map; + +/** + * @author goodjava@qq.com + * @date 2023/9/26 11:50 + */ +public class FileInfoCache { + + + private Cache cache = CacheBuilder.newBuilder().maximumSize(10000).build(); + + private Gson gson = new Gson(); + + private static final String filePath = "/tmp/.ozhera_pointer"; + + + private static final class LazyHolder { + private static final FileInfoCache ins = new FileInfoCache(); + } + + + public static final FileInfoCache ins() { + return LazyHolder.ins; + } + + public void put(String key, FileInfo val) { + cache.put(key, val); + } + + + public FileInfo get(String key) { + return cache.getIfPresent(key); + } + + @SneakyThrows + public void shutdown() { + String str = gson.toJson(cache.asMap()); + Files.write(Paths.get(filePath), str.getBytes()); + } + + @SneakyThrows + public void load() { + if (new File(filePath).exists()) { + String str = new String(Files.readAllBytes(Paths.get(filePath))); + Type typeOfT = new TypeToken>() { + }.getType(); + Map map = gson.fromJson(str, typeOfT); + map.forEach((k, v) -> cache.put(k, v)); + } + } + +} diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileUtils.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileUtils.java new file mode 100644 index 000000000..25d50daf9 --- /dev/null +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileUtils.java @@ -0,0 +1,24 @@ +package com.xiaomi.mone.file.common; + +import lombok.SneakyThrows; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.attribute.BasicFileAttributeView; +import java.nio.file.attribute.BasicFileAttributes; + +/** + * @author goodjava@qq.com + * @date 2023/9/26 10:42 + */ +public abstract class FileUtils { + + @SneakyThrows + public static Object fileKey(File file) { + BasicFileAttributeView basicview = Files.getFileAttributeView(file.toPath(), BasicFileAttributeView.class); + BasicFileAttributes attr = basicview.readAttributes(); + return attr.fileKey(); + } + + +} diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/common/Pair.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/common/Pair.java new file mode 100644 index 000000000..4c759cb31 --- /dev/null +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/common/Pair.java @@ -0,0 +1,26 @@ +package com.xiaomi.mone.file.common; + +import lombok.Data; + +/** + * @author goodjava@qq.com + * @date 2023/9/26 10:50 + */ +@Data +public class Pair { + + private K key; + + private V value; + + public Pair(K key, V value) { + this.key = key; + this.value = value; + } + + public static Pair of(K key, V value) { + return new Pair<>(key, value); + } + + +} diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/common/SafeRun.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/common/SafeRun.java new file mode 100644 index 000000000..a6d6d709e --- /dev/null +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/common/SafeRun.java @@ -0,0 +1,28 @@ +package com.xiaomi.mone.file.common; + +import lombok.extern.slf4j.Slf4j; + +/** + * @author goodjava@qq.com + * @date 2023/9/26 13:40 + */ +@Slf4j +public class SafeRun { + + + public static interface ExRunnable { + + void run() throws Throwable; + + } + + + public static void run(ExRunnable runnable) { + try { + runnable.run(); + } catch (Throwable ex) { + log.error(ex.getMessage(), ex); + } + } + +} diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/event/EventListener.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/event/EventListener.java new file mode 100644 index 000000000..860e6537c --- /dev/null +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/event/EventListener.java @@ -0,0 +1,11 @@ +package com.xiaomi.mone.file.event; + +/** + * @author goodjava@qq.com + * @date 2023/9/25 14:38 + */ +public interface EventListener { + + void onEvent(FileEvent event); + +} diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/event/EventType.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/event/EventType.java new file mode 100644 index 000000000..5e8aa3be0 --- /dev/null +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/event/EventType.java @@ -0,0 +1,16 @@ +package com.xiaomi.mone.file.event; + +/** + * @author goodjava@qq.com + * @date 2023/9/25 14:37 + */ +public enum EventType { + + init, + create, + delete, + modify, + rename, + empty, + +} diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/event/FileEvent.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/event/FileEvent.java new file mode 100644 index 000000000..7e07a3df6 --- /dev/null +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/event/FileEvent.java @@ -0,0 +1,23 @@ +package com.xiaomi.mone.file.event; + +import lombok.Builder; +import lombok.Data; + +/** + * @author goodjava@qq.com + * @date 2023/9/25 14:37 + */ +@Data +@Builder +public class FileEvent { + + private EventType type; + + private String fileName; + + private Object fileKey; + + private long pointer; + + +} diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/DefaultMonitorListener.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/DefaultMonitorListener.java new file mode 100644 index 000000000..87105adc0 --- /dev/null +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/DefaultMonitorListener.java @@ -0,0 +1,63 @@ +package com.xiaomi.mone.file.listener; + +import com.xiaomi.mone.file.LogFile2; +import com.xiaomi.mone.file.common.SafeRun; +import com.xiaomi.mone.file.event.EventListener; +import com.xiaomi.mone.file.event.EventType; +import com.xiaomi.mone.file.event.FileEvent; +import com.xiaomi.mone.file.ozhera.HeraFileMonitor; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * @author goodjava@qq.com + * @date 2023/9/26 09:49 + */ +@Slf4j +public class DefaultMonitorListener implements EventListener { + + private HeraFileMonitor monitor; + + private ExecutorService pool = Executors.newVirtualThreadPerTaskExecutor(); + + public DefaultMonitorListener(HeraFileMonitor monitor) { + this.monitor = monitor; + } + + @Override + public void onEvent(FileEvent event) { + if (event.getType().equals(EventType.init)) { + log.info("log file:{}", event.getFileName()); + LogFile2 logFile = new LogFile2(event.getFileName()); + pool.submit(() -> { + logFile.setListener(new OzHeraReadListener(monitor, logFile)); + SafeRun.run(() -> logFile.readLine()); + }); + } + + if (event.getType().equals(EventType.rename)) { + log.info("rename:{} {}", event.getFileKey(), event.getFileName()); + monitor.getMap().remove(event.getFileKey()); + } + + if (event.getType().equals(EventType.delete)) { + log.info("delete:{}", event.getFileName()); + } + + if (event.getType().equals(EventType.empty)) { + log.info("empty:{}", event.getFileName()); + monitor.getMap().get(event.getFileKey()).getPointer().set(-1); + } + + if (event.getType().equals(EventType.create)) { + log.info("create:{}", event.getFileName()); + LogFile2 logFile = new LogFile2(event.getFileName()); + pool.submit(() -> { + logFile.setListener(new OzHeraReadListener(monitor, logFile)); + SafeRun.run(() -> logFile.readLine()); + }); + } + } +} diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/OzHeraReadListener.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/OzHeraReadListener.java new file mode 100644 index 000000000..71b59a521 --- /dev/null +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/OzHeraReadListener.java @@ -0,0 +1,65 @@ +package com.xiaomi.mone.file.listener; + +import com.xiaomi.mone.file.LogFile2; +import com.xiaomi.mone.file.ReadEvent; +import com.xiaomi.mone.file.ReadListener; +import com.xiaomi.mone.file.common.SafeRun; +import com.xiaomi.mone.file.ozhera.HeraFile; +import com.xiaomi.mone.file.ozhera.HeraFileMonitor; +import lombok.extern.slf4j.Slf4j; + +import java.util.concurrent.TimeUnit; + +/** + * @author goodjava@qq.com + * @date 2023/9/25 16:08 + */ +@Slf4j +public class OzHeraReadListener implements ReadListener { + + private HeraFileMonitor monitor; + + private LogFile2 logFile; + + public OzHeraReadListener(HeraFileMonitor monitor, LogFile2 logFile) { + this.monitor = monitor; + this.logFile = logFile; + } + + @Override + public void onEvent(ReadEvent event) { + System.out.println(event.getReadResult().getLines()); + } + + @Override + public boolean isContinue(String line) { + if (null == line) { + SafeRun.run(() -> TimeUnit.MILLISECONDS.sleep(300)); + return true; + } + return false; + } + + @Override + public boolean isBreak(String line) { + if (null == line) { + HeraFile f = monitor.getMap().get(logFile.getFileKey()); + if (null == f || f.getState().get() == 1) { + return true; + } + } + return false; + } + + @Override + public void setPointer(Object obj) { + if (obj instanceof LogFile2) { + LogFile2 lf = (LogFile2) obj; + HeraFile f = monitor.getMap().get(logFile.getFileKey()); + if (null != f && f.getPointer().get() == -1) { + lf.setPointer(-1); + f.getPointer().set(0); + } + } + } +} diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFile.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFile.java new file mode 100644 index 000000000..8acb1a989 --- /dev/null +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFile.java @@ -0,0 +1,33 @@ +package com.xiaomi.mone.file.ozhera; + +import lombok.Builder; +import lombok.Data; + +import java.io.File; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** + * @author goodjava@qq.com + * @date 2023/9/25 15:56 + */ +@Data +@Builder +public class HeraFile { + + private File file; + + private Object fileKey; + + private String fileName; + + @Builder.Default + private AtomicInteger state = new AtomicInteger(0); + + + @Builder.Default + private AtomicLong pointer = new AtomicLong(); + + @Builder.Default + private AtomicLong utime = new AtomicLong(System.currentTimeMillis()); +} diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java new file mode 100644 index 000000000..4cbd8ca6a --- /dev/null +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java @@ -0,0 +1,161 @@ +package com.xiaomi.mone.file.ozhera; + +import com.google.common.collect.Lists; +import com.xiaomi.mone.file.common.FileInfo; +import com.xiaomi.mone.file.common.FileInfoCache; +import com.xiaomi.mone.file.common.FileUtils; +import com.xiaomi.mone.file.common.Pair; +import com.xiaomi.mone.file.event.EventListener; +import com.xiaomi.mone.file.event.EventType; +import com.xiaomi.mone.file.event.FileEvent; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +import java.io.File; +import java.io.IOException; +import java.nio.file.*; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +/** + * @author goodjava@qq.com + * @date 2023/9/25 09:55 + */ +@Slf4j +public class HeraFileMonitor { + + + @Getter + private ConcurrentHashMap map = new ConcurrentHashMap<>(); + + private ConcurrentHashMap fileMap = new ConcurrentHashMap<>(); + + @Setter + private EventListener listener; + + + public HeraFileMonitor() { + Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { + try { + List> remList = Lists.newArrayList(); + long now = System.currentTimeMillis(); + fileMap.values().forEach(it -> { + if (now - it.getUtime().get() >= TimeUnit.SECONDS.toMillis(5)) { + remList.add(Pair.of(it.getFileName(), it.getFileKey())); + } + }); + remList.forEach(it -> { + log.info("remove file:{}", it.getKey()); + fileMap.remove(it.getKey()); + map.remove(it.getValue()); + }); + } catch (Throwable ex) { + log.error(ex.getMessage(), ex); + } + }, 5, 10, TimeUnit.SECONDS); + } + + public HeraFileMonitor(EventListener listener) { + this(); + this.listener = listener; + } + + public void reg(String path) throws IOException, InterruptedException { + Path directory = Paths.get(path); + File f = directory.toFile(); + + Arrays.stream(f.listFiles()).forEach(it -> initFile(it)); + + WatchService watchService = FileSystems.getDefault().newWatchService(); + directory.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_CREATE); + while (true) { + WatchKey key = watchService.take(); + for (WatchEvent event : key.pollEvents()) { + Path modifiedFile = (Path) event.context(); + if (modifiedFile.getFileName().toString().startsWith(".")) { + continue; + } + String filePath = path + "" + modifiedFile.getFileName(); + log.info(event.kind() + " " + filePath); + HeraFile hfile = fileMap.get(filePath); + + if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) { + if (null == hfile) { + hfile = initFile(new File(filePath)); + } + modify(hfile); + } + + if (event.kind() == StandardWatchEventKinds.ENTRY_DELETE) { + fileMap.remove(filePath); + if (null != hfile) { + map.remove(hfile.getFileKey()); + listener.onEvent(FileEvent.builder().type(EventType.delete).fileName(filePath).fileKey(hfile.getFileKey()).build()); + } + } + + if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE) { + File file = new File(path + "" + modifiedFile.getFileName()); + Object k = FileUtils.fileKey(file); + if (map.containsKey(k)) { + log.info("change name " + map.get(k) + "--->" + file); + listener.onEvent(FileEvent.builder().fileKey(k).type(EventType.rename).build()); + } else { + listener.onEvent(FileEvent.builder().type(EventType.create).fileName(file.getPath()).build()); + } + HeraFile hf = HeraFile.builder().file(file).fileKey(k).fileName(filePath).build(); + map.putIfAbsent(k, hf); + fileMap.put(filePath, hf); + } + } + key.reset(); + } + } + + private ReentrantLock lock = new ReentrantLock(); + + private HeraFile initFile(File it) { + if (it.isFile()) { + String name = it.getName(); + if (name.startsWith(".")) { + return null; + } + Object fileKey = FileUtils.fileKey(it); + lock.lock(); + try { + if (map.containsKey(fileKey)) { + return map.get(fileKey); + } + HeraFile hf = HeraFile.builder().file(it).fileKey(fileKey).fileName(it.getPath()).build(); + FileInfo fi = FileInfoCache.ins().get(fileKey.toString()); + long pointer = 0L; + if (null != fi) { + pointer = fi.getPointer(); + } + map.put(hf.getFileKey(), hf); + fileMap.put(hf.getFileName(), hf); + this.listener.onEvent(FileEvent.builder().pointer(pointer).type(EventType.init).fileName(hf.getFileName()).build()); + return hf; + } finally { + lock.unlock(); + } + } + return null; + } + + + private void modify(HeraFile hfile) { + hfile.getUtime().set(System.currentTimeMillis()); + if (hfile.getFile().length() == 0) { + listener.onEvent(FileEvent.builder().type(EventType.empty).fileName(hfile.getFileName()).fileKey(hfile.getFileKey()).build()); + } else { + listener.onEvent(FileEvent.builder().type(EventType.modify).build()); + } + } + +} diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileState.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileState.java new file mode 100644 index 000000000..4c2e91094 --- /dev/null +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileState.java @@ -0,0 +1,12 @@ +package com.xiaomi.mone.file.ozhera; + +/** + * @author goodjava@qq.com + * @date 2023/9/25 15:56 + */ +public enum HeraFileState { + + read, + exit + +} diff --git a/jcommon/file/src/test/java/com/xiaomi/mone/file/FileMonitorUtilsTest.java b/jcommon/file/src/test/java/com/xiaomi/mone/file/FileMonitorUtilsTest.java new file mode 100644 index 000000000..c4ef8ab95 --- /dev/null +++ b/jcommon/file/src/test/java/com/xiaomi/mone/file/FileMonitorUtilsTest.java @@ -0,0 +1,25 @@ +package com.xiaomi.mone.file; + +import com.xiaomi.mone.file.event.EventListener; +import com.xiaomi.mone.file.event.FileEvent; +import com.xiaomi.mone.file.ozhera.HeraFileMonitor; +import lombok.extern.slf4j.Slf4j; +import org.junit.Test; + +import java.io.IOException; + +/** + * @author goodjava@qq.com + * @date 2023/9/25 09:56 + */ +@Slf4j +public class FileMonitorUtilsTest { + + + @Test + public void test1() throws IOException, InterruptedException { + new HeraFileMonitor(event -> log.info("{}", event)).reg("/tmp/e/"); + System.in.read(); + } + +} diff --git a/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java b/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java index a424ee1cd..86fe91436 100644 --- a/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java +++ b/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java @@ -16,16 +16,21 @@ package com.xiaomi.mone.file; +import com.xiaomi.mone.file.common.FileInfoCache; +import com.xiaomi.mone.file.listener.DefaultMonitorListener; +import com.xiaomi.mone.file.ozhera.HeraFileMonitor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; import org.junit.Test; import java.io.IOException; -import java.util.List; import java.util.concurrent.TimeUnit; /** * @Author goodjava@qq.com * @Date 2021/7/8 14:42 */ +@Slf4j public class LogFileTest { @@ -54,6 +59,22 @@ public boolean isContinue(String line) { System.in.read(); } + + @SneakyThrows + @Test + public void testLogFileMonitor() { + FileInfoCache.ins().load(); + Runtime.getRuntime().addShutdownHook(new Thread(()->{ + log.info("shutdown"); + FileInfoCache.ins().shutdown(); + })); + HeraFileMonitor monitor = new HeraFileMonitor(); + monitor.setListener(new DefaultMonitorListener(monitor)); + monitor.reg("/tmp/e/"); + log.info("reg finish"); + System.in.read(); + } + @Test public void testLogWS() throws IOException { LogFileWS log = new LogFileWS("D:\\t", new ReadListener() { diff --git a/jcommon/file/src/test/java/com/xiaomi/mone/file/MapTest.java b/jcommon/file/src/test/java/com/xiaomi/mone/file/MapTest.java new file mode 100644 index 000000000..6c38ebc52 --- /dev/null +++ b/jcommon/file/src/test/java/com/xiaomi/mone/file/MapTest.java @@ -0,0 +1,22 @@ +package com.xiaomi.mone.file; + +import org.junit.Test; + +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author goodjava@qq.com + * @date 2023/9/26 10:00 + */ +public class MapTest { + + + @Test + public void testMap() { + ConcurrentHashMap m = new ConcurrentHashMap<>(); + String v = m.computeIfAbsent("a", k -> "1"); + System.out.println(v); + System.out.println(m.computeIfAbsent("a", k -> "2")); + } + +}