diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java index cb039ff02..83e12ca20 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java @@ -176,6 +176,7 @@ public void readLine() throws IOException { readResult.setLines(Lists.newArrayList(line)); readResult.setPointer(pointer); readResult.setFileMaxPointer(maxPointer); + readResult.setFilePathName(file); readResult.setLineNumber(++lineNumber); ReadEvent event = new ReadEvent(readResult); diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/ReadResult.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/ReadResult.java index 922142560..7586cf08c 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/ReadResult.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/ReadResult.java @@ -12,6 +12,8 @@ public class ReadResult { private List lines; + private String filePathName; + private long pointer; private Long fileMaxPointer; diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileInfoCache.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileInfoCache.java index e4b4256a4..c9b395ad9 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileInfoCache.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileInfoCache.java @@ -4,6 +4,7 @@ import com.google.common.cache.CacheBuilder; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; +import lombok.Setter; import lombok.SneakyThrows; import java.io.File; @@ -23,8 +24,8 @@ public class FileInfoCache { private Gson gson = new Gson(); - private static final String filePath = "/tmp/.ozhera_pointer"; - + @Setter + private String filePath = "/tmp/.ozhera_pointer"; private static final class LazyHolder { private static final FileInfoCache ins = new FileInfoCache(); @@ -61,4 +62,9 @@ public void load() { } } + @SneakyThrows + public void load(String filePath) { + this.filePath = filePath; + this.load(); + } } diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileUtils.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileUtils.java index 25d50daf9..7aca0c07c 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileUtils.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileUtils.java @@ -15,9 +15,12 @@ public abstract class FileUtils { @SneakyThrows public static Object fileKey(File file) { - BasicFileAttributeView basicview = Files.getFileAttributeView(file.toPath(), BasicFileAttributeView.class); - BasicFileAttributes attr = basicview.readAttributes(); - return attr.fileKey(); + BasicFileAttributeView basicView = Files.getFileAttributeView(file.toPath(), BasicFileAttributeView.class); + BasicFileAttributes attr = basicView.readAttributes(); + if (null != attr && null != attr.fileKey()) { + return attr.fileKey(); + } + return file.getPath(); } diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/DefaultMonitorListener.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/DefaultMonitorListener.java index 87105adc0..5592c651f 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/DefaultMonitorListener.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/DefaultMonitorListener.java @@ -1,6 +1,7 @@ package com.xiaomi.mone.file.listener; import com.xiaomi.mone.file.LogFile2; +import com.xiaomi.mone.file.ReadEvent; import com.xiaomi.mone.file.common.SafeRun; import com.xiaomi.mone.file.event.EventListener; import com.xiaomi.mone.file.event.EventType; @@ -10,6 +11,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.function.Consumer; /** * @author goodjava@qq.com @@ -20,10 +22,13 @@ public class DefaultMonitorListener implements EventListener { private HeraFileMonitor monitor; + private Consumer consumer; + private ExecutorService pool = Executors.newVirtualThreadPerTaskExecutor(); - public DefaultMonitorListener(HeraFileMonitor monitor) { + public DefaultMonitorListener(HeraFileMonitor monitor, Consumer consumer) { this.monitor = monitor; + this.consumer = consumer; } @Override @@ -32,7 +37,7 @@ public void onEvent(FileEvent event) { log.info("log file:{}", event.getFileName()); LogFile2 logFile = new LogFile2(event.getFileName()); pool.submit(() -> { - logFile.setListener(new OzHeraReadListener(monitor, logFile)); + logFile.setListener(new OzHeraReadListener(monitor, logFile, consumer)); SafeRun.run(() -> logFile.readLine()); }); } @@ -55,7 +60,7 @@ public void onEvent(FileEvent event) { log.info("create:{}", event.getFileName()); LogFile2 logFile = new LogFile2(event.getFileName()); pool.submit(() -> { - logFile.setListener(new OzHeraReadListener(monitor, logFile)); + logFile.setListener(new OzHeraReadListener(monitor, logFile, consumer)); SafeRun.run(() -> logFile.readLine()); }); } diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/OzHeraReadListener.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/OzHeraReadListener.java index 71b59a521..1e8eca36a 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/OzHeraReadListener.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/OzHeraReadListener.java @@ -9,6 +9,7 @@ import lombok.extern.slf4j.Slf4j; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; /** * @author goodjava@qq.com @@ -21,14 +22,17 @@ public class OzHeraReadListener implements ReadListener { private LogFile2 logFile; - public OzHeraReadListener(HeraFileMonitor monitor, LogFile2 logFile) { + private Consumer consumer; + + public OzHeraReadListener(HeraFileMonitor monitor, LogFile2 logFile, Consumer consumer) { this.monitor = monitor; this.logFile = logFile; + this.consumer = consumer; } @Override public void onEvent(ReadEvent event) { - System.out.println(event.getReadResult().getLines()); + consumer.accept(event); } @Override diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java index 4cbd8ca6a..11787323f 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java @@ -17,10 +17,12 @@ import java.nio.file.*; import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Predicate; /** * @author goodjava@qq.com @@ -65,11 +67,11 @@ public HeraFileMonitor(EventListener listener) { this.listener = listener; } - public void reg(String path) throws IOException, InterruptedException { + public void reg(String path, Predicate predicate) throws IOException, InterruptedException { Path directory = Paths.get(path); File f = directory.toFile(); - Arrays.stream(f.listFiles()).forEach(it -> initFile(it)); + Arrays.stream(Objects.requireNonNull(f.listFiles())).filter(it -> predicate.test(it.getPath())).forEach(this::initFile); WatchService watchService = FileSystems.getDefault().newWatchService(); directory.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_CREATE); @@ -77,11 +79,11 @@ public void reg(String path) throws IOException, InterruptedException { WatchKey key = watchService.take(); for (WatchEvent event : key.pollEvents()) { Path modifiedFile = (Path) event.context(); - if (modifiedFile.getFileName().toString().startsWith(".")) { + String filePath = String.format("%s%s", path, modifiedFile.getFileName().toString()); + if (!predicate.test(filePath) || modifiedFile.getFileName().toString().startsWith(".")) { continue; } - String filePath = path + "" + modifiedFile.getFileName(); - log.info(event.kind() + " " + filePath); + log.debug("epoll result,path:{}", event.kind() + filePath); HeraFile hfile = fileMap.get(filePath); if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) { diff --git a/jcommon/file/src/test/java/com/xiaomi/mone/file/FileMonitorUtilsTest.java b/jcommon/file/src/test/java/com/xiaomi/mone/file/FileMonitorUtilsTest.java index c4ef8ab95..b9d409b59 100644 --- a/jcommon/file/src/test/java/com/xiaomi/mone/file/FileMonitorUtilsTest.java +++ b/jcommon/file/src/test/java/com/xiaomi/mone/file/FileMonitorUtilsTest.java @@ -1,7 +1,5 @@ package com.xiaomi.mone.file; -import com.xiaomi.mone.file.event.EventListener; -import com.xiaomi.mone.file.event.FileEvent; import com.xiaomi.mone.file.ozhera.HeraFileMonitor; import lombok.extern.slf4j.Slf4j; import org.junit.Test; @@ -18,7 +16,7 @@ public class FileMonitorUtilsTest { @Test public void test1() throws IOException, InterruptedException { - new HeraFileMonitor(event -> log.info("{}", event)).reg("/tmp/e/"); + new HeraFileMonitor(event -> log.info("{}", event)).reg("/tmp/e/", it -> true); System.in.read(); } diff --git a/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java b/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java index 86fe91436..b0882d720 100644 --- a/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java +++ b/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; /** * @Author goodjava@qq.com @@ -63,14 +64,23 @@ public boolean isContinue(String line) { @SneakyThrows @Test public void testLogFileMonitor() { - FileInfoCache.ins().load(); - Runtime.getRuntime().addShutdownHook(new Thread(()->{ +// FileInfoCache.ins().load(); + FileInfoCache.ins().load("/home/work/log/log-agent/milog/memory/.ozhera_pointer"); + Runtime.getRuntime().addShutdownHook(new Thread(() -> { log.info("shutdown"); FileInfoCache.ins().shutdown(); })); HeraFileMonitor monitor = new HeraFileMonitor(); - monitor.setListener(new DefaultMonitorListener(monitor)); - monitor.reg("/tmp/e/"); + monitor.setListener(new DefaultMonitorListener(monitor, readEvent -> { + System.out.println(readEvent.getReadResult().getLines()); + })); + String fileName = "/home/work/log/test/provider/server.log.*"; + Pattern pattern = Pattern.compile(fileName); + monitor.reg("/home/work/log/test/provider/", it -> { + boolean matches = pattern.matcher(it).matches(); + log.info("file:{},matches:{}", it, matches); + return matches; + }); log.info("reg finish"); System.in.read(); }