diff --git a/jcommon/codegen/pom.xml b/jcommon/codegen/pom.xml
index 01f03b86c..d44272f32 100644
--- a/jcommon/codegen/pom.xml
+++ b/jcommon/codegen/pom.xml
@@ -84,5 +84,11 @@
mockito-inline
3.9.0
+
+
+ javax.annotation
+ javax.annotation-api
+ 1.3.2
+
diff --git a/jcommon/es/pom.xml b/jcommon/es/pom.xml
index b5a376445..29969aa88 100644
--- a/jcommon/es/pom.xml
+++ b/jcommon/es/pom.xml
@@ -9,7 +9,7 @@
1.6.0-jdk21-SNAPSHOT
es
- 1.7-jdk8-SNAPSHOT
+ 1.8-jdk8-SNAPSHOT
UTF-8
diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java
index 3889f4f08..028ca5228 100644
--- a/jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java
+++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java
@@ -10,7 +10,7 @@ public interface ILogFile {
int LINE_MAX_LENGTH = 1100000;
- void readLine() throws IOException;
+ void readLine() throws Exception;
void setStop(boolean stop);
diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile.java
index ddc7878a7..24415976d 100644
--- a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile.java
+++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile.java
@@ -1,11 +1,13 @@
package com.xiaomi.mone.file;
import com.google.common.collect.Lists;
+import com.xiaomi.mone.file.common.FileUtils;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
+import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
@@ -90,12 +92,13 @@ private void open() {
}
}
- public void readLine() throws IOException {
+ @Override
+ public void readLine() throws Exception {
while (true) {
open();
//兼容文件切换时,缓存的pointer
try {
- log.info("open file:{},pointer:{}", file, raf.getFilePointer());
+ log.info("open file:{},pointer:{},fileKey:{}", file, pointer, FileUtils.fileKey(new File(file)));
if (pointer > raf.length()) {
pointer = 0;
lineNumber = 0;
@@ -104,9 +107,11 @@ public void readLine() throws IOException {
log.error("file.length() IOException, file:{}", this.file, e);
}
raf.seek(pointer);
+ log.info("start readLine file:{},pointer:{}", file, pointer);
while (true) {
String line = raf.getNextLine();
+
if (null != line && lineNumber == 0 && pointer == 0) {
String hashLine = line.length() > 100 ? line.substring(0, 100) : line;
beforePointerHashCode = hashLine.hashCode();
@@ -115,16 +120,19 @@ public void readLine() throws IOException {
line = lineCutOff(line);
if (reFresh) {
+ log.info("readline reFresh:{},pointer:{},lineNumber:{},fileKey:{}", this.file, this.pointer, this.lineNumber, FileUtils.fileKey(new File(file)));
break;
}
if (reOpen) {
+ log.info("readline reOpen:{},pointer:{},lineNumber:{},fileKey:{}", this.file, this.pointer, this.lineNumber, FileUtils.fileKey(new File(file)));
pointer = 0;
lineNumber = 0;
break;
}
if (stop) {
+ log.info("readline stop:{},pointer:{},lineNumber:{},fileKey:{}", this.file, this.pointer, this.lineNumber, FileUtils.fileKey(new File(file)));
break;
}
@@ -133,11 +141,12 @@ public void readLine() throws IOException {
reOpen = true;
pointer = 0;
lineNumber = 0;
- log.warn("file:{} content have been cut, goto reOpen file", file);
+ log.info("readline file:{} content have been cut, goto reOpen file,pointer:{},lineNumber:{},fileKey:{}", file, pointer, lineNumber, FileUtils.fileKey(new File(file)));
break;
}
if (listener.isContinue(line)) {
+ log.debug("readline isBreak:{},pointer:{},lineNumber:{},fileKey:{}", this.file, this.pointer, this.lineNumber, FileUtils.fileKey(new File(file)));
continue;
}
@@ -159,6 +168,7 @@ public void readLine() throws IOException {
}
raf.close();
if (stop) {
+ log.info("read file stop:{},pointer:{},lineNumber:{},fileKey:{}", this.file, this.pointer, this.lineNumber, FileUtils.fileKey(new File(file)));
break;
}
}
@@ -218,6 +228,7 @@ private boolean contentHasCutting(String line) throws IOException {
//针对大文件,排除掉局部内容删除的情况,更准确识别内容整体切割的场景(误判重复采集成本较高)
long mPointer = maxPointer > 70000 ? maxPointer - 700 : maxPointer;
if (currentFileMaxPointer < mPointer) {
+ maxPointer = currentFileMaxPointer;
return true;
}
diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java
index 0381a698a..850391462 100644
--- a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java
+++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java
@@ -13,6 +13,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.MessageDigest;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
@@ -193,6 +194,9 @@ public void readLine() throws IOException {
listener.setReadTime();
listener.onEvent(event);
+ if (pointer % 100000 == 0 || pointer == 1) {
+ log.info("file readResult:{}", readResult);
+ }
}
raf.close();
if (stop) {
@@ -201,6 +205,7 @@ public void readLine() throws IOException {
break;
}
}
+ log.info("read file:{},finished,,pointer:{},lineNumber:{},fileKey:{}", file, this.pointer, this.lineNumber, this.fileKey);
}
@Override
@@ -238,13 +243,13 @@ private boolean contentHasCutting(String line) throws IOException {
return false;
}
- long currentFileMaxPointer;
+ long currentFileMaxLength;
try {
- currentFileMaxPointer = raf.length();
- if (currentFileMaxPointer == 0L) {
+ currentFileMaxLength = raf.length();
+ if (currentFileMaxLength == 0L) {
raf.getFD().sync();
TimeUnit.MILLISECONDS.sleep(30);
- currentFileMaxPointer = raf.length();
+ currentFileMaxLength = raf.length();
}
} catch (IOException e) {
log.error("get fileMaxPointer IOException", e);
@@ -276,12 +281,14 @@ public void shutdown() {
public long readPointer() {
try {
FileInfo fi = FileInfoCache.ins().get(this.fileKey.toString());
- if (null != fi) {
+ log.info("readPointer:{},file:{},fileKey:{}", fi, this.file, this.fileKey);
+ if (null != fi && Objects.equals(this.file, fi.getFileName())) {
return fi.getPointer();
}
} catch (Throwable e) {
- log.error(e.getMessage());
+ log.error("readPointer error,file:{},fileKey:{}", file, fileKey, e);
}
+ log.warn("readPointer from 0,file:{},fileKey:{}", file, fileKey);
return 0;
}
diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/MoneRandomAccessFile.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/MoneRandomAccessFile.java
index d7252150f..fadd32e5b 100644
--- a/jcommon/file/src/main/java/com/xiaomi/mone/file/MoneRandomAccessFile.java
+++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/MoneRandomAccessFile.java
@@ -49,14 +49,14 @@ public class MoneRandomAccessFile extends RandomAccessFile {
* @param filename The path of the file to open.
* @param mode Specifies the mode to use ("r", "rw", etc.) See the
* BufferedLineReader documentation for more information.
- * @param bufsize The buffer size (in bytes) to use.
+ * @param bufSize The buffer size (in bytes) to use.
* @throws IOException
*/
- public MoneRandomAccessFile(String filename, String mode, int bufsize)
+ public MoneRandomAccessFile(String filename, String mode, int bufSize)
throws IOException {
super(filename, mode);
invalidate();
- BUF_SIZE = bufsize;
+ BUF_SIZE = bufSize;
buffer = new byte[BUF_SIZE];
}
@@ -157,9 +157,9 @@ public long getFilePointer() throws IOException {
*/
@Override
public void seek(long pos) throws IOException {
- int n = (int) (real_pos - pos);
+ long n = real_pos - pos;
if (n >= 0 && n <= buf_end) {
- buf_pos = buf_end - n;
+ buf_pos = (int) (buf_end - n);
} else {
super.seek(pos);
invalidate();
diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileInfoCache.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileInfoCache.java
index b065fb0db..52bdcf90f 100644
--- a/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileInfoCache.java
+++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/common/FileInfoCache.java
@@ -1,7 +1,5 @@
package com.xiaomi.mone.file.common;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
@@ -15,6 +13,7 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
@@ -24,7 +23,7 @@
@Slf4j
public class FileInfoCache {
- private Cache cache = CacheBuilder.newBuilder().maximumSize(50000).build();
+ private ConcurrentHashMap cache = new ConcurrentHashMap<>();
private Gson gson = new GsonBuilder().setLenient().create();
@@ -47,23 +46,25 @@ public void put(String key, FileInfo val) {
public FileInfo get(String key) {
- return cache.getIfPresent(key);
+ return cache.get(key);
}
public void remove(String key) {
- cache.invalidate(key);
+ cache.remove(key);
}
public ConcurrentMap caches() {
- return cache.asMap();
+ return cache;
}
@SneakyThrows
public void shutdown() {
- String str = gson.toJson(cache.asMap());
+ log.info("cache shutdown size:{}", cache.size());
+ String str = gson.toJson(cache);
FileWriter writer = new FileWriter(filePath, false);
writer.append(str);
writer.flush();
+ writer.close();
}
@SneakyThrows
@@ -75,6 +76,7 @@ public void load() {
}.getType();
Map map = gson.fromJson(str, typeOfT);
map.forEach((k, v) -> cache.put(k, v));
+ log.info("cache load size:{}", cache.size());
}
}
diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/event/EventListener.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/event/EventListener.java
index fb35d1cfc..979b09f4f 100644
--- a/jcommon/file/src/main/java/com/xiaomi/mone/file/event/EventListener.java
+++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/event/EventListener.java
@@ -11,4 +11,7 @@ public interface EventListener {
default void remove(Object fileKey) {
}
+ default void stop() {
+ }
+
}
diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/DefaultMonitorListener.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/DefaultMonitorListener.java
index f7a855994..b6cf9de98 100644
--- a/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/DefaultMonitorListener.java
+++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/DefaultMonitorListener.java
@@ -85,6 +85,15 @@ public void remove(Object fileKey) {
readListenerMap.remove(fileKey);
}
+ @Override
+ public void stop() {
+ List readListenerList = getReadListenerList();
+ for (ReadListener readListener : readListenerList) {
+ OzHeraReadListener ozHeraReadListener = ((OzHeraReadListener) readListener);
+ ozHeraReadListener.getLogFile().shutdown();
+ }
+ }
+
public List getReadListenerList() {
return this.readListenerMap.values().stream().toList();
}
diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/OzHeraReadListener.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/OzHeraReadListener.java
index 43fa64876..419acd620 100644
--- a/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/OzHeraReadListener.java
+++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/listener/OzHeraReadListener.java
@@ -6,6 +6,7 @@
import com.xiaomi.mone.file.common.SafeRun;
import com.xiaomi.mone.file.ozhera.HeraFile;
import com.xiaomi.mone.file.ozhera.HeraFileMonitor;
+import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@@ -20,6 +21,7 @@ public class OzHeraReadListener implements ReadListener {
private HeraFileMonitor monitor;
+ @Getter
private LogFile2 logFile;
private Consumer consumer;
diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java
index 0f7c7d385..2a83ed856 100644
--- a/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java
+++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/ozhera/HeraFileMonitor.java
@@ -59,13 +59,13 @@ public HeraFileMonitor(long removeTime) {
});
remList.forEach(it -> {
- log.info("remove file:{}", it.getKey());
+ log.info("remove file:{},fileKey:{}", it.getKey(), it.getValue());
fileMap.remove(it.getKey());
map.remove(it.getValue());
listener.remove(it.getValue());
});
} catch (Throwable ex) {
- log.error(ex.getMessage(), ex);
+ log.error("remove file error", ex);
}
}, 5, 10, TimeUnit.SECONDS);
}
@@ -89,7 +89,7 @@ public void reg(String path, Predicate predicate) throws IOException, In
WatchService watchService = FileSystems.getDefault().newWatchService();
directory.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_CREATE);
- while (true) {
+ while (!stop) {
try {
WatchKey key = watchService.take();
@@ -168,6 +168,7 @@ private HeraFile initFile(File it) {
log.info("initFile fileName:{},fileKey:{}", name, fileKey);
map.put(hf.getFileKey(), hf);
fileMap.put(hf.getFileName(), hf);
+ log.info("initFile hf:{},map size:{},fileMap size:{}", hf, map.size(), fileMap.size());
this.listener.onEvent(FileEvent.builder()
.pointer(pointer)
.type(EventType.init)
@@ -196,4 +197,9 @@ private void modify(HeraFile hfile) {
}
}
+ public void stop() {
+ this.stop = true;
+ listener.stop();
+ }
+
}
diff --git a/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java b/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java
index e57859f58..82a7d69e8 100644
--- a/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java
+++ b/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java
@@ -38,7 +38,7 @@ public class LogFileTest {
@Test
- public void testLog() throws IOException {
+ public void testLog() throws Exception {
LogFile log = new LogFile("/var/log/system.log", new ReadListener() {
@Override
public void onEvent(ReadEvent event) {
@@ -76,14 +76,14 @@ public void testLogFileMonitor() {
monitor.setListener(new DefaultMonitorListener(monitor, readEvent -> {
System.out.println(readEvent.getReadResult().getLines());
}));
- String fileName = "/home/work/log/test/file*.txt";
+ String fileName = "/home/work/log/log/test/*.log";
Pattern pattern = Pattern.compile(fileName);
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(this::test, 1, 2, TimeUnit.SECONDS);
- monitor.reg("/home/work/log/test", it -> {
+ monitor.reg("/home/work/log/log/test", it -> {
boolean matches = pattern.matcher(it).matches();
log.info("file:{},matches:{}", it, true);
return true;
@@ -98,7 +98,7 @@ private void test() {
}
@Test
- public void testLogWS() throws IOException {
+ public void testLogWS() throws Exception {
LogFile log = new LogFile("D:\\test.log", new ReadListener() {
@Override
public void onEvent(ReadEvent event) {
@@ -132,14 +132,14 @@ public boolean isContinue(String line) {
@Test
- public void testLog2() throws IOException {
+ public void testLog2() throws Exception {
LogFile log = new LogFile("/tmp/zzytest/zzytest/server.log", new MyReadListener());
log.readLine();
}
@Test
- public void testReadFileCutting() throws IOException {
+ public void testReadFileCutting() throws Exception {
LogFile log = new LogFile("/home/work/log/hera-operator/server.log", new MyReadListener());
log.readLine();
System.in.read();
diff --git a/jcommon/pom.xml b/jcommon/pom.xml
index 317b7d665..c4f8c9c42 100644
--- a/jcommon/pom.xml
+++ b/jcommon/pom.xml
@@ -91,27 +91,14 @@
ai/neo4j
-
-
-
-
-
-
-
-
-
-
-
- central
- maven-release-virtual
- https://pkgs.d.xiaomi.net/artifactory/maven-release-virtual
+ ossrh
+ https://s01.oss.sonatype.org/service/local/staging/deploy/maven2/
- snapshots
- maven-snapshot-virtual
- https://pkgs.d.xiaomi.net/artifactory/maven-snapshot-virtual
+ ossrh
+ https://s01.oss.sonatype.org/content/repositories/snapshots
@@ -201,22 +188,22 @@
3.4.0
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+ org.apache.maven.plugins
+ maven-javadoc-plugin
+ 3.5.0
+
+ false
+
+
+
+ attach-javadocs
+
+ jar
+
+
+
+