Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: file monitor update #764

Merged
merged 3 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.xiaomi.youpin.docean.test.demo.mydemo;

import com.xiaomi.youpin.docean.anno.Component;

/**
* @author wtt
* @version 1.0
* @description
* @date 2023/11/20 10:19
*/
@Component
public class DefaultDemoInterface implements DemoInterface {
@Override
public String hi() {
return "test";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ public class DemoCall {
@Value("$val")
private String val;

@Resource
private DemoInterface demoInterface;


//This is just an interface, but if it only has one implementation class, then Ioc will automatically find this unique implementation class.
@Resource
Expand All @@ -31,6 +34,7 @@ public class DemoCall {


public String hi() {
System.out.println(demoInterface.hi());
return demo.hi();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.xiaomi.youpin.docean.test.demo.mydemo;

/**
* @author [email protected]
* @date 2023/11/18 14:50
*/
public interface DemoInterface {

String hi();

}
14 changes: 9 additions & 5 deletions jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class LogFile2 implements ILogFile {
private int beforePointerHashCode;

@Setter
private long pointer;
private volatile long pointer;

//行号
private long lineNumber;
Expand Down Expand Up @@ -174,7 +174,6 @@ public void readLine() throws IOException {
continue;
}


try {
pointer = raf.getFilePointer();
maxPointer = raf.length();
Expand All @@ -194,8 +193,8 @@ public void readLine() throws IOException {
}
raf.close();
if (stop) {
log.info("stop:{}", this.file);
FileInfoCache.ins().put(this.fileKey.toString(), FileInfo.builder().pointer(this.pointer).build());
log.info("stop:{},pointer:{},fileKey:{}", this.file, this.pointer, this.fileKey);
FileInfoCache.ins().put(this.fileKey.toString(), FileInfo.builder().pointer(this.pointer).fileName(this.file).build());
break;
}
}
Expand Down Expand Up @@ -245,11 +244,16 @@ private boolean contentHasCutting(String line) throws IOException {
return false;
}

public void saveProgress() {
if (!stop) {
FileInfoCache.ins().put(this.fileKey.toString(), FileInfo.builder().pointer(this.pointer).fileName(this.file).build());
}
}

public void shutdown() {
try {
this.stop = true;
FileInfoCache.ins().put(this.fileKey.toString(), FileInfo.builder().pointer(this.pointer).build());
FileInfoCache.ins().put(this.fileKey.toString(), FileInfo.builder().pointer(this.pointer).fileName(this.file).build());
} catch (Throwable ex) {
log.error(ex.getMessage());
}
Expand Down
39 changes: 29 additions & 10 deletions jcommon/file/src/main/java/com/xiaomi/mone/file/MLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

Expand All @@ -43,6 +44,8 @@ public class MLog {
*/
private Pattern customLinePattern;

private ReentrantLock lock = new ReentrantLock();

/**
* 匹配 20xx or [20xx
*/
Expand All @@ -52,26 +55,29 @@ public class MLog {
/**
* 最多聚合200行错误栈,避免queue无限增长
*/
private static final int MAX_MERGE_LINE = 200;
private byte[] lock = new byte[0];
private static final int MAX_MERGE_LINE = 400;

@Deprecated
public List<String> append(String msg) {
appendTime = Instant.now().toEpochMilli();
List<String> res = new ArrayList<>();
boolean isNew = isNew(msg);
if (isNew) {
synchronized (lock) {
try {
lock.lock();
if (msgQueue.size() > 0) {
String e = msgQueue.stream().collect(Collectors.joining("\r\n"));
res.add(e);
msgQueue.clear();
}
//消息入队列
msgQueue.offer(msg);
} finally {
lock.unlock();
}
} else {
synchronized (lock) {
try {
lock.lock();
//最大错误栈行数判断
if (msgQueue.size() >= MAX_MERGE_LINE) {
String e = msgQueue.stream().collect(Collectors.joining("\r\n"));
Expand All @@ -80,6 +86,8 @@ public List<String> append(String msg) {
}
//消息入队列
msgQueue.offer(msg);
} finally {
lock.unlock();
}
}

Expand All @@ -91,23 +99,30 @@ public String append2(String msg) {
String res = null;
boolean isNew = isNew(msg);
if (isNew) {
synchronized (lock) {

try {
lock.lock();
if (msgQueue.size() > 0) {
res = msgQueue.stream().collect(Collectors.joining("\r\n"));
msgQueue.clear();
}
//消息入队列
msgQueue.offer(msg);
} finally {
lock.unlock();
}
} else {
synchronized (lock) {
try {
lock.lock();
//最大错误栈行数判断
if (msgQueue.size() >= MAX_MERGE_LINE) {
res = msgQueue.stream().collect(Collectors.joining("\r\n"));
msgQueue.clear();
}
//消息入队列
msgQueue.offer(msg);
} finally {
lock.unlock();
}
}

Expand All @@ -122,26 +137,30 @@ public String append2(String msg) {
@Deprecated
public List<String> takeRemainMsg() {
List<String> res = new ArrayList<>();
synchronized (lock) {
try {
lock.lock();
if (msgQueue.size() > 0) {
String e = msgQueue.stream().collect(Collectors.joining("\r\n"));
res.add(e);
msgQueue.clear();
}
} finally {
lock.unlock();
}

return res;
}

public String takeRemainMsg2() {
String res = null;
synchronized (lock) {
try {
lock.lock();
if (msgQueue.size() > 0) {
res = msgQueue.stream().collect(Collectors.joining("\r\n"));
msgQueue.clear();
}
} finally {
lock.unlock();
}

return res;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,7 @@ default void setPointer(Object obj) {

}

default void saveProgress() {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ public class FileInfo {

private long pointer;

private String fileName;

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,35 +3,40 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.io.FileWriter;
import java.lang.reflect.Type;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

/**
* @author [email protected]
* @date 2023/9/26 11:50
*/
@Slf4j
public class FileInfoCache {

private Cache<String, FileInfo> cache = CacheBuilder.newBuilder().maximumSize(50000).build();

private Cache<String, FileInfo> cache = CacheBuilder.newBuilder().maximumSize(10000).build();

private Gson gson = new Gson();
private Gson gson = new GsonBuilder().setLenient().create();

@Setter
private String filePath = "/tmp/.ozhera_pointer";

private volatile boolean loaded = false;

private static final class LazyHolder {
private static final FileInfoCache ins = new FileInfoCache();
}


public static final FileInfoCache ins() {
return LazyHolder.ins;
}
Expand All @@ -45,15 +50,26 @@ public FileInfo get(String key) {
return cache.getIfPresent(key);
}

public void remove(String key) {
cache.invalidate(key);
}

public ConcurrentMap<String, FileInfo> caches() {
return cache.asMap();
}

@SneakyThrows
public void shutdown() {
String str = gson.toJson(cache.asMap());
Files.write(Paths.get(filePath), str.getBytes());
FileWriter writer = new FileWriter(filePath, false);
writer.append(str);
writer.flush();
}

@SneakyThrows
public void load() {
if (new File(filePath).exists()) {
if (!loaded && new File(filePath).exists()) {
loaded = true;
String str = new String(Files.readAllBytes(Paths.get(filePath)));
Type typeOfT = new TypeToken<Map<String, FileInfo>>() {
}.getType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@

import com.xiaomi.mone.file.LogFile2;
import com.xiaomi.mone.file.ReadEvent;
import com.xiaomi.mone.file.ReadListener;
import com.xiaomi.mone.file.common.SafeRun;
import com.xiaomi.mone.file.event.EventListener;
import com.xiaomi.mone.file.event.EventType;
import com.xiaomi.mone.file.event.FileEvent;
import com.xiaomi.mone.file.ozhera.HeraFileMonitor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
Expand All @@ -24,6 +28,9 @@ public class DefaultMonitorListener implements EventListener {

private Consumer<ReadEvent> consumer;

@Getter
private List<ReadListener> readListenerList = new CopyOnWriteArrayList<>();

private ExecutorService pool = Executors.newVirtualThreadPerTaskExecutor();

public DefaultMonitorListener(HeraFileMonitor monitor, Consumer<ReadEvent> consumer) {
Expand All @@ -36,8 +43,10 @@ public void onEvent(FileEvent event) {
if (event.getType().equals(EventType.init)) {
log.info("log file:{}", event.getFileName());
LogFile2 logFile = new LogFile2(event.getFileName());
OzHeraReadListener ozHeraReadListener = new OzHeraReadListener(monitor, logFile, consumer);
readListenerList.add(ozHeraReadListener);
pool.submit(() -> {
logFile.setListener(new OzHeraReadListener(monitor, logFile, consumer));
logFile.setListener(ozHeraReadListener);
SafeRun.run(logFile::readLine);
});
}
Expand All @@ -61,8 +70,10 @@ public void onEvent(FileEvent event) {

// LogFile2 logFile = new LogFile2(event.getFileName());
LogFile2 logFile = new LogFile2(event.getFileName(), 0, 0);
OzHeraReadListener ozHeraReadListener = new OzHeraReadListener(monitor, logFile, consumer);
readListenerList.add(ozHeraReadListener);
pool.submit(() -> {
logFile.setListener(new OzHeraReadListener(monitor, logFile, consumer));
logFile.setListener(ozHeraReadListener);
SafeRun.run(logFile::readLine);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public boolean isContinue(String line) {
return false;
}

@Override
public void saveProgress() {
logFile.saveProgress();
}

@Override
public boolean isBreak(String line) {
if (null == line) {
Expand Down
Loading
Loading