Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update file montior #745

Merged
merged 2 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ public void readLine() throws IOException {
readResult.setLines(Lists.newArrayList(line));
readResult.setPointer(pointer);
readResult.setFileMaxPointer(maxPointer);
readResult.setFilePathName(file);
readResult.setLineNumber(++lineNumber);
ReadEvent event = new ReadEvent(readResult);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public class ReadResult {

private List<String> lines;

private String filePathName;

private long pointer;

private Long fileMaxPointer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.common.cache.CacheBuilder;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import lombok.Setter;
import lombok.SneakyThrows;

import java.io.File;
Expand All @@ -23,8 +24,8 @@ public class FileInfoCache {

private Gson gson = new Gson();

private static final String filePath = "/tmp/.ozhera_pointer";

@Setter
private String filePath = "/tmp/.ozhera_pointer";

private static final class LazyHolder {
private static final FileInfoCache ins = new FileInfoCache();
Expand Down Expand Up @@ -61,4 +62,9 @@ public void load() {
}
}

@SneakyThrows
public void load(String filePath) {
this.filePath = filePath;
this.load();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@ public abstract class FileUtils {

@SneakyThrows
public static Object fileKey(File file) {
BasicFileAttributeView basicview = Files.getFileAttributeView(file.toPath(), BasicFileAttributeView.class);
BasicFileAttributes attr = basicview.readAttributes();
return attr.fileKey();
BasicFileAttributeView basicView = Files.getFileAttributeView(file.toPath(), BasicFileAttributeView.class);
BasicFileAttributes attr = basicView.readAttributes();
if (null != attr && null != attr.fileKey()) {
return attr.fileKey();
}
return file.getPath();
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.xiaomi.mone.file.listener;

import com.xiaomi.mone.file.LogFile2;
import com.xiaomi.mone.file.ReadEvent;
import com.xiaomi.mone.file.common.SafeRun;
import com.xiaomi.mone.file.event.EventListener;
import com.xiaomi.mone.file.event.EventType;
Expand All @@ -10,6 +11,7 @@

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

/**
* @author [email protected]
Expand All @@ -20,10 +22,13 @@ public class DefaultMonitorListener implements EventListener {

private HeraFileMonitor monitor;

private Consumer<ReadEvent> consumer;

private ExecutorService pool = Executors.newVirtualThreadPerTaskExecutor();

public DefaultMonitorListener(HeraFileMonitor monitor) {
public DefaultMonitorListener(HeraFileMonitor monitor, Consumer<ReadEvent> consumer) {
this.monitor = monitor;
this.consumer = consumer;
}

@Override
Expand All @@ -32,7 +37,7 @@ public void onEvent(FileEvent event) {
log.info("log file:{}", event.getFileName());
LogFile2 logFile = new LogFile2(event.getFileName());
pool.submit(() -> {
logFile.setListener(new OzHeraReadListener(monitor, logFile));
logFile.setListener(new OzHeraReadListener(monitor, logFile, consumer));
SafeRun.run(() -> logFile.readLine());
});
}
Expand All @@ -55,7 +60,7 @@ public void onEvent(FileEvent event) {
log.info("create:{}", event.getFileName());
LogFile2 logFile = new LogFile2(event.getFileName());
pool.submit(() -> {
logFile.setListener(new OzHeraReadListener(monitor, logFile));
logFile.setListener(new OzHeraReadListener(monitor, logFile, consumer));
SafeRun.run(() -> logFile.readLine());
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
* @author [email protected]
Expand All @@ -21,14 +22,17 @@ public class OzHeraReadListener implements ReadListener {

private LogFile2 logFile;

public OzHeraReadListener(HeraFileMonitor monitor, LogFile2 logFile) {
private Consumer<ReadEvent> consumer;

public OzHeraReadListener(HeraFileMonitor monitor, LogFile2 logFile, Consumer<ReadEvent> consumer) {
this.monitor = monitor;
this.logFile = logFile;
this.consumer = consumer;
}

@Override
public void onEvent(ReadEvent event) {
System.out.println(event.getReadResult().getLines());
consumer.accept(event);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
import java.nio.file.*;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;

/**
* @author [email protected]
Expand Down Expand Up @@ -65,23 +67,23 @@ public HeraFileMonitor(EventListener listener) {
this.listener = listener;
}

public void reg(String path) throws IOException, InterruptedException {
public void reg(String path, Predicate<String> predicate) throws IOException, InterruptedException {
Path directory = Paths.get(path);
File f = directory.toFile();

Arrays.stream(f.listFiles()).forEach(it -> initFile(it));
Arrays.stream(Objects.requireNonNull(f.listFiles())).filter(it -> predicate.test(it.getPath())).forEach(this::initFile);

WatchService watchService = FileSystems.getDefault().newWatchService();
directory.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_CREATE);
while (true) {
WatchKey key = watchService.take();
for (WatchEvent<?> event : key.pollEvents()) {
Path modifiedFile = (Path) event.context();
if (modifiedFile.getFileName().toString().startsWith(".")) {
String filePath = String.format("%s%s", path, modifiedFile.getFileName().toString());
if (!predicate.test(filePath) || modifiedFile.getFileName().toString().startsWith(".")) {
continue;
}
String filePath = path + "" + modifiedFile.getFileName();
log.info(event.kind() + " " + filePath);
log.debug("epoll result,path:{}", event.kind() + filePath);
HeraFile hfile = fileMap.get(filePath);

if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.xiaomi.mone.file;

import com.xiaomi.mone.file.event.EventListener;
import com.xiaomi.mone.file.event.FileEvent;
import com.xiaomi.mone.file.ozhera.HeraFileMonitor;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
Expand All @@ -18,7 +16,7 @@ public class FileMonitorUtilsTest {

@Test
public void test1() throws IOException, InterruptedException {
new HeraFileMonitor(event -> log.info("{}", event)).reg("/tmp/e/");
new HeraFileMonitor(event -> log.info("{}", event)).reg("/tmp/e/", it -> true);
System.in.read();
}

Expand Down
18 changes: 14 additions & 4 deletions jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

/**
* @Author [email protected]
Expand Down Expand Up @@ -63,14 +64,23 @@ public boolean isContinue(String line) {
@SneakyThrows
@Test
public void testLogFileMonitor() {
FileInfoCache.ins().load();
Runtime.getRuntime().addShutdownHook(new Thread(()->{
// FileInfoCache.ins().load();
FileInfoCache.ins().load("/home/work/log/log-agent/milog/memory/.ozhera_pointer");
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log.info("shutdown");
FileInfoCache.ins().shutdown();
}));
HeraFileMonitor monitor = new HeraFileMonitor();
monitor.setListener(new DefaultMonitorListener(monitor));
monitor.reg("/tmp/e/");
monitor.setListener(new DefaultMonitorListener(monitor, readEvent -> {
System.out.println(readEvent.getReadResult().getLines());
}));
String fileName = "/home/work/log/test/provider/server.log.*";
Pattern pattern = Pattern.compile(fileName);
monitor.reg("/home/work/log/test/provider/", it -> {
boolean matches = pattern.matcher(it).matches();
log.info("file:{},matches:{}", it, matches);
return matches;
});
log.info("reg finish");
System.in.read();
}
Expand Down
Loading