Skip to content

Commit

Permalink
update file montior (#745)
Browse files Browse the repository at this point in the history
* update file montior

* update file montior pattern
  • Loading branch information
wtt40122 authored Oct 23, 2023
1 parent 5af96d4 commit 72911f6
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public void readLine() throws IOException {
readResult.setLines(Lists.newArrayList(line));
readResult.setPointer(pointer);
readResult.setFileMaxPointer(maxPointer);
readResult.setFilePathName(file);
readResult.setLineNumber(++lineNumber);
ReadEvent event = new ReadEvent(readResult);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public class ReadResult {

private List<String> lines;

private String filePathName;

private long pointer;

private Long fileMaxPointer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.common.cache.CacheBuilder;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import lombok.Setter;
import lombok.SneakyThrows;

import java.io.File;
Expand All @@ -23,8 +24,8 @@ public class FileInfoCache {

private Gson gson = new Gson();

private static final String filePath = "/tmp/.ozhera_pointer";

@Setter
private String filePath = "/tmp/.ozhera_pointer";

private static final class LazyHolder {
private static final FileInfoCache ins = new FileInfoCache();
Expand Down Expand Up @@ -61,4 +62,9 @@ public void load() {
}
}

@SneakyThrows
public void load(String filePath) {
this.filePath = filePath;
this.load();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ public abstract class FileUtils {

@SneakyThrows
public static Object fileKey(File file) {
BasicFileAttributeView basicview = Files.getFileAttributeView(file.toPath(), BasicFileAttributeView.class);
BasicFileAttributes attr = basicview.readAttributes();
return attr.fileKey();
BasicFileAttributeView basicView = Files.getFileAttributeView(file.toPath(), BasicFileAttributeView.class);
BasicFileAttributes attr = basicView.readAttributes();
if (null != attr && null != attr.fileKey()) {
return attr.fileKey();
}
return file.getPath();
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.xiaomi.mone.file.listener;

import com.xiaomi.mone.file.LogFile2;
import com.xiaomi.mone.file.ReadEvent;
import com.xiaomi.mone.file.common.SafeRun;
import com.xiaomi.mone.file.event.EventListener;
import com.xiaomi.mone.file.event.EventType;
Expand All @@ -10,6 +11,7 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

/**
* @author [email protected]
Expand All @@ -20,10 +22,13 @@ public class DefaultMonitorListener implements EventListener {

private HeraFileMonitor monitor;

private Consumer<ReadEvent> consumer;

private ExecutorService pool = Executors.newVirtualThreadPerTaskExecutor();

public DefaultMonitorListener(HeraFileMonitor monitor) {
public DefaultMonitorListener(HeraFileMonitor monitor, Consumer<ReadEvent> consumer) {
this.monitor = monitor;
this.consumer = consumer;
}

@Override
Expand All @@ -32,7 +37,7 @@ public void onEvent(FileEvent event) {
log.info("log file:{}", event.getFileName());
LogFile2 logFile = new LogFile2(event.getFileName());
pool.submit(() -> {
logFile.setListener(new OzHeraReadListener(monitor, logFile));
logFile.setListener(new OzHeraReadListener(monitor, logFile, consumer));
SafeRun.run(() -> logFile.readLine());
});
}
Expand All @@ -55,7 +60,7 @@ public void onEvent(FileEvent event) {
log.info("create:{}", event.getFileName());
LogFile2 logFile = new LogFile2(event.getFileName());
pool.submit(() -> {
logFile.setListener(new OzHeraReadListener(monitor, logFile));
logFile.setListener(new OzHeraReadListener(monitor, logFile, consumer));
SafeRun.run(() -> logFile.readLine());
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
* @author [email protected]
Expand All @@ -21,14 +22,17 @@ public class OzHeraReadListener implements ReadListener {

private LogFile2 logFile;

public OzHeraReadListener(HeraFileMonitor monitor, LogFile2 logFile) {
private Consumer<ReadEvent> consumer;

public OzHeraReadListener(HeraFileMonitor monitor, LogFile2 logFile, Consumer<ReadEvent> consumer) {
this.monitor = monitor;
this.logFile = logFile;
this.consumer = consumer;
}

@Override
public void onEvent(ReadEvent event) {
System.out.println(event.getReadResult().getLines());
consumer.accept(event);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import java.nio.file.*;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;

/**
* @author [email protected]
Expand Down Expand Up @@ -65,23 +67,23 @@ public HeraFileMonitor(EventListener listener) {
this.listener = listener;
}

public void reg(String path) throws IOException, InterruptedException {
public void reg(String path, Predicate<String> predicate) throws IOException, InterruptedException {
Path directory = Paths.get(path);
File f = directory.toFile();

Arrays.stream(f.listFiles()).forEach(it -> initFile(it));
Arrays.stream(Objects.requireNonNull(f.listFiles())).filter(it -> predicate.test(it.getPath())).forEach(this::initFile);

WatchService watchService = FileSystems.getDefault().newWatchService();
directory.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_CREATE);
while (true) {
WatchKey key = watchService.take();
for (WatchEvent<?> event : key.pollEvents()) {
Path modifiedFile = (Path) event.context();
if (modifiedFile.getFileName().toString().startsWith(".")) {
String filePath = String.format("%s%s", path, modifiedFile.getFileName().toString());
if (!predicate.test(filePath) || modifiedFile.getFileName().toString().startsWith(".")) {
continue;
}
String filePath = path + "" + modifiedFile.getFileName();
log.info(event.kind() + " " + filePath);
log.debug("epoll result,path:{}", event.kind() + filePath);
HeraFile hfile = fileMap.get(filePath);

if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.xiaomi.mone.file;

import com.xiaomi.mone.file.event.EventListener;
import com.xiaomi.mone.file.event.FileEvent;
import com.xiaomi.mone.file.ozhera.HeraFileMonitor;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
Expand All @@ -18,7 +16,7 @@ public class FileMonitorUtilsTest {

@Test
public void test1() throws IOException, InterruptedException {
new HeraFileMonitor(event -> log.info("{}", event)).reg("/tmp/e/");
new HeraFileMonitor(event -> log.info("{}", event)).reg("/tmp/e/", it -> true);
System.in.read();
}

Expand Down
18 changes: 14 additions & 4 deletions jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

/**
* @Author [email protected]
Expand Down Expand Up @@ -63,14 +64,23 @@ public boolean isContinue(String line) {
@SneakyThrows
@Test
public void testLogFileMonitor() {
FileInfoCache.ins().load();
Runtime.getRuntime().addShutdownHook(new Thread(()->{
// FileInfoCache.ins().load();
FileInfoCache.ins().load("/home/work/log/log-agent/milog/memory/.ozhera_pointer");
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("shutdown");
FileInfoCache.ins().shutdown();
}));
HeraFileMonitor monitor = new HeraFileMonitor();
monitor.setListener(new DefaultMonitorListener(monitor));
monitor.reg("/tmp/e/");
monitor.setListener(new DefaultMonitorListener(monitor, readEvent -> {
System.out.println(readEvent.getReadResult().getLines());
}));
String fileName = "/home/work/log/test/provider/server.log.*";
Pattern pattern = Pattern.compile(fileName);
monitor.reg("/home/work/log/test/provider/", it -> {
boolean matches = pattern.matcher(it).matches();
log.info("file:{},matches:{}", it, matches);
return matches;
});
log.info("reg finish");
System.in.read();
}
Expand Down

0 comments on commit 72911f6

Please sign in to comment.