Skip to content

Commit

Permalink
refactor: file monitor update (#764)
Browse files Browse the repository at this point in the history
* feat: update file listening create file bug

* refactor: update file monitor
  • Loading branch information
wtt40122 authored Dec 13, 2023
1 parent a7c815f commit f684d9f
Show file tree
Hide file tree
Showing 12 changed files with 122 additions and 30 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.xiaomi.youpin.docean.test.demo.mydemo;

import com.xiaomi.youpin.docean.anno.Component;

/**
* @author wtt
* @version 1.0
* @description
* @date 2023/11/20 10:19
*/
@Component
public class DefaultDemoInterface implements DemoInterface {
@Override
public String hi() {
return "test";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ public class DemoCall {
@Value("$val")
private String val;

@Resource
private DemoInterface demoInterface;


//This is just an interface, but if it only has one implementation class, then Ioc will automatically find this unique implementation class.
@Resource
Expand All @@ -31,6 +34,7 @@ public class DemoCall {


public String hi() {
System.out.println(demoInterface.hi());
return demo.hi();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.xiaomi.youpin.docean.test.demo.mydemo;

/**
* @author [email protected]
* @date 2023/11/18 14:50
*/
public interface DemoInterface {

String hi();

}
14 changes: 9 additions & 5 deletions jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class LogFile2 implements ILogFile {
private int beforePointerHashCode;

@Setter
private long pointer;
private volatile long pointer;

//行号
private long lineNumber;
Expand Down Expand Up @@ -174,7 +174,6 @@ public void readLine() throws IOException {
continue;
}


try {
pointer = raf.getFilePointer();
maxPointer = raf.length();
Expand All @@ -194,8 +193,8 @@ public void readLine() throws IOException {
}
raf.close();
if (stop) {
log.info("stop:{}", this.file);
FileInfoCache.ins().put(this.fileKey.toString(), FileInfo.builder().pointer(this.pointer).build());
log.info("stop:{},pointer:{},fileKey:{}", this.file, this.pointer, this.fileKey);
FileInfoCache.ins().put(this.fileKey.toString(), FileInfo.builder().pointer(this.pointer).fileName(this.file).build());
break;
}
}
Expand Down Expand Up @@ -245,11 +244,16 @@ private boolean contentHasCutting(String line) throws IOException {
return false;
}

public void saveProgress() {
if (!stop) {
FileInfoCache.ins().put(this.fileKey.toString(), FileInfo.builder().pointer(this.pointer).fileName(this.file).build());
}
}

public void shutdown() {
try {
this.stop = true;
FileInfoCache.ins().put(this.fileKey.toString(), FileInfo.builder().pointer(this.pointer).build());
FileInfoCache.ins().put(this.fileKey.toString(), FileInfo.builder().pointer(this.pointer).fileName(this.file).build());
} catch (Throwable ex) {
log.error(ex.getMessage());
}
Expand Down
39 changes: 29 additions & 10 deletions jcommon/file/src/main/java/com/xiaomi/mone/file/MLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand All @@ -43,6 +44,8 @@ public class MLog {
*/
private Pattern customLinePattern;

private ReentrantLock lock = new ReentrantLock();

/**
* 匹配 20xx or [20xx
*/
Expand All @@ -52,26 +55,29 @@ public class MLog {
/**
* 最多聚合200行错误栈,避免queue无限增长
*/
private static final int MAX_MERGE_LINE = 200;
private byte[] lock = new byte[0];
private static final int MAX_MERGE_LINE = 400;

@Deprecated
public List<String> append(String msg) {
appendTime = Instant.now().toEpochMilli();
List<String> res = new ArrayList<>();
boolean isNew = isNew(msg);
if (isNew) {
synchronized (lock) {
try {
lock.lock();
if (msgQueue.size() > 0) {
String e = msgQueue.stream().collect(Collectors.joining("\r\n"));
res.add(e);
msgQueue.clear();
}
//消息入队列
msgQueue.offer(msg);
} finally {
lock.unlock();
}
} else {
synchronized (lock) {
try {
lock.lock();
//最大错误栈行数判断
if (msgQueue.size() >= MAX_MERGE_LINE) {
String e = msgQueue.stream().collect(Collectors.joining("\r\n"));
Expand All @@ -80,6 +86,8 @@ public List<String> append(String msg) {
}
//消息入队列
msgQueue.offer(msg);
} finally {
lock.unlock();
}
}

Expand All @@ -91,23 +99,30 @@ public String append2(String msg) {
String res = null;
boolean isNew = isNew(msg);
if (isNew) {
synchronized (lock) {

try {
lock.lock();
if (msgQueue.size() > 0) {
res = msgQueue.stream().collect(Collectors.joining("\r\n"));
msgQueue.clear();
}
//消息入队列
msgQueue.offer(msg);
} finally {
lock.unlock();
}
} else {
synchronized (lock) {
try {
lock.lock();
//最大错误栈行数判断
if (msgQueue.size() >= MAX_MERGE_LINE) {
res = msgQueue.stream().collect(Collectors.joining("\r\n"));
msgQueue.clear();
}
//消息入队列
msgQueue.offer(msg);
} finally {
lock.unlock();
}
}

Expand All @@ -122,26 +137,30 @@ public String append2(String msg) {
@Deprecated
public List<String> takeRemainMsg() {
List<String> res = new ArrayList<>();
synchronized (lock) {
try {
lock.lock();
if (msgQueue.size() > 0) {
String e = msgQueue.stream().collect(Collectors.joining("\r\n"));
res.add(e);
msgQueue.clear();
}
} finally {
lock.unlock();
}

return res;
}

public String takeRemainMsg2() {
String res = null;
synchronized (lock) {
try {
lock.lock();
if (msgQueue.size() > 0) {
res = msgQueue.stream().collect(Collectors.joining("\r\n"));
msgQueue.clear();
}
} finally {
lock.unlock();
}

return res;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,7 @@ default void setPointer(Object obj) {

}

default void saveProgress() {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ public class FileInfo {

private long pointer;

private String fileName;

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,40 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.io.FileWriter;
import java.lang.reflect.Type;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

/**
* @author [email protected]
* @date 2023/9/26 11:50
*/
@Slf4j
public class FileInfoCache {

private Cache<String, FileInfo> cache = CacheBuilder.newBuilder().maximumSize(50000).build();

private Cache<String, FileInfo> cache = CacheBuilder.newBuilder().maximumSize(10000).build();

private Gson gson = new Gson();
private Gson gson = new GsonBuilder().setLenient().create();

@Setter
private String filePath = "/tmp/.ozhera_pointer";

private volatile boolean loaded = false;

private static final class LazyHolder {
private static final FileInfoCache ins = new FileInfoCache();
}


public static final FileInfoCache ins() {
return LazyHolder.ins;
}
Expand All @@ -45,15 +50,26 @@ public FileInfo get(String key) {
return cache.getIfPresent(key);
}

public void remove(String key) {
cache.invalidate(key);
}

public ConcurrentMap<String, FileInfo> caches() {
return cache.asMap();
}

@SneakyThrows
public void shutdown() {
String str = gson.toJson(cache.asMap());
Files.write(Paths.get(filePath), str.getBytes());
FileWriter writer = new FileWriter(filePath, false);
writer.append(str);
writer.flush();
}

@SneakyThrows
public void load() {
if (new File(filePath).exists()) {
if (!loaded && new File(filePath).exists()) {
loaded = true;
String str = new String(Files.readAllBytes(Paths.get(filePath)));
Type typeOfT = new TypeToken<Map<String, FileInfo>>() {
}.getType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@

import com.xiaomi.mone.file.LogFile2;
import com.xiaomi.mone.file.ReadEvent;
import com.xiaomi.mone.file.ReadListener;
import com.xiaomi.mone.file.common.SafeRun;
import com.xiaomi.mone.file.event.EventListener;
import com.xiaomi.mone.file.event.EventType;
import com.xiaomi.mone.file.event.FileEvent;
import com.xiaomi.mone.file.ozhera.HeraFileMonitor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
Expand All @@ -24,6 +28,9 @@ public class DefaultMonitorListener implements EventListener {

private Consumer<ReadEvent> consumer;

@Getter
private List<ReadListener> readListenerList = new CopyOnWriteArrayList<>();

private ExecutorService pool = Executors.newVirtualThreadPerTaskExecutor();

public DefaultMonitorListener(HeraFileMonitor monitor, Consumer<ReadEvent> consumer) {
Expand All @@ -36,8 +43,10 @@ public void onEvent(FileEvent event) {
if (event.getType().equals(EventType.init)) {
log.info("log file:{}", event.getFileName());
LogFile2 logFile = new LogFile2(event.getFileName());
OzHeraReadListener ozHeraReadListener = new OzHeraReadListener(monitor, logFile, consumer);
readListenerList.add(ozHeraReadListener);
pool.submit(() -> {
logFile.setListener(new OzHeraReadListener(monitor, logFile, consumer));
logFile.setListener(ozHeraReadListener);
SafeRun.run(logFile::readLine);
});
}
Expand All @@ -61,8 +70,10 @@ public void onEvent(FileEvent event) {

// LogFile2 logFile = new LogFile2(event.getFileName());
LogFile2 logFile = new LogFile2(event.getFileName(), 0, 0);
OzHeraReadListener ozHeraReadListener = new OzHeraReadListener(monitor, logFile, consumer);
readListenerList.add(ozHeraReadListener);
pool.submit(() -> {
logFile.setListener(new OzHeraReadListener(monitor, logFile, consumer));
logFile.setListener(ozHeraReadListener);
SafeRun.run(logFile::readLine);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public boolean isContinue(String line) {
return false;
}

@Override
public void saveProgress() {
logFile.saveProgress();
}

@Override
public boolean isBreak(String line) {
if (null == line) {
Expand Down
Loading

0 comments on commit f684d9f

Please sign in to comment.