Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: update file common #893

Merged
merged 2 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions jcommon/codegen/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,11 @@
<artifactId>mockito-inline</artifactId>
<version>3.9.0</version>
</dependency>

<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>
</dependencies>
</project>
2 changes: 1 addition & 1 deletion jcommon/es/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<version>1.6.0-jdk21-SNAPSHOT</version>
</parent>
<artifactId>es</artifactId>
<version>1.7-jdk8-SNAPSHOT</version>
<version>1.8-jdk8-SNAPSHOT</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public interface ILogFile {

int LINE_MAX_LENGTH = 1100000;

void readLine() throws IOException;
void readLine() throws Exception;

void setStop(boolean stop);

Expand Down
17 changes: 14 additions & 3 deletions jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.xiaomi.mone.file;

import com.google.common.collect.Lists;
import com.xiaomi.mone.file.common.FileUtils;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -90,12 +92,13 @@ private void open() {
}
}

public void readLine() throws IOException {
@Override
public void readLine() throws Exception {
while (true) {
open();
//兼容文件切换时,缓存的pointer
try {
log.info("open file:{},pointer:{}", file, raf.getFilePointer());
log.info("open file:{},pointer:{},fileKey:{}", file, pointer, FileUtils.fileKey(new File(file)));
if (pointer > raf.length()) {
pointer = 0;
lineNumber = 0;
Expand All @@ -104,9 +107,11 @@ public void readLine() throws IOException {
log.error("file.length() IOException, file:{}", this.file, e);
}
raf.seek(pointer);
log.info("start readLine file:{},pointer:{}", file, pointer);

while (true) {
String line = raf.getNextLine();

if (null != line && lineNumber == 0 && pointer == 0) {
String hashLine = line.length() > 100 ? line.substring(0, 100) : line;
beforePointerHashCode = hashLine.hashCode();
Expand All @@ -115,16 +120,19 @@ public void readLine() throws IOException {
line = lineCutOff(line);

if (reFresh) {
log.info("readline reFresh:{},pointer:{},lineNumber:{},fileKey:{}", this.file, this.pointer, this.lineNumber, FileUtils.fileKey(new File(file)));
break;
}

if (reOpen) {
log.info("readline reOpen:{},pointer:{},lineNumber:{},fileKey:{}", this.file, this.pointer, this.lineNumber, FileUtils.fileKey(new File(file)));
pointer = 0;
lineNumber = 0;
break;
}

if (stop) {
log.info("readline stop:{},pointer:{},lineNumber:{},fileKey:{}", this.file, this.pointer, this.lineNumber, FileUtils.fileKey(new File(file)));
break;
}

Expand All @@ -133,11 +141,12 @@ public void readLine() throws IOException {
reOpen = true;
pointer = 0;
lineNumber = 0;
log.warn("file:{} content have been cut, goto reOpen file", file);
log.info("readline file:{} content have been cut, goto reOpen file,pointer:{},lineNumber:{},fileKey:{}", file, pointer, lineNumber, FileUtils.fileKey(new File(file)));
break;
}

if (listener.isContinue(line)) {
log.debug("readline isBreak:{},pointer:{},lineNumber:{},fileKey:{}", this.file, this.pointer, this.lineNumber, FileUtils.fileKey(new File(file)));
continue;
}

Expand All @@ -159,6 +168,7 @@ public void readLine() throws IOException {
}
raf.close();
if (stop) {
log.info("read file stop:{},pointer:{},lineNumber:{},fileKey:{}", this.file, this.pointer, this.lineNumber, FileUtils.fileKey(new File(file)));
break;
}
}
Expand Down Expand Up @@ -218,6 +228,7 @@ private boolean contentHasCutting(String line) throws IOException {
//针对大文件,排除掉局部内容删除的情况,更准确识别内容整体切割的场景(误判重复采集成本较高)
long mPointer = maxPointer > 70000 ? maxPointer - 700 : maxPointer;
if (currentFileMaxPointer < mPointer) {
maxPointer = currentFileMaxPointer;
return true;
}

Expand Down
19 changes: 13 additions & 6 deletions jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.MessageDigest;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -193,6 +194,9 @@ public void readLine() throws IOException {
listener.setReadTime();

listener.onEvent(event);
if (pointer % 100000 == 0 || pointer == 1) {
log.info("file readResult:{}", readResult);
}
}
raf.close();
if (stop) {
Expand All @@ -201,6 +205,7 @@ public void readLine() throws IOException {
break;
}
}
log.info("read file:{},finished,,pointer:{},lineNumber:{},fileKey:{}", file, this.pointer, this.lineNumber, this.fileKey);
}

@Override
Expand Down Expand Up @@ -238,13 +243,13 @@ private boolean contentHasCutting(String line) throws IOException {
return false;
}

long currentFileMaxPointer;
long currentFileMaxLength;
try {
currentFileMaxPointer = raf.length();
if (currentFileMaxPointer == 0L) {
currentFileMaxLength = raf.length();
if (currentFileMaxLength == 0L) {
raf.getFD().sync();
TimeUnit.MILLISECONDS.sleep(30);
currentFileMaxPointer = raf.length();
currentFileMaxLength = raf.length();
}
} catch (IOException e) {
log.error("get fileMaxPointer IOException", e);
Expand Down Expand Up @@ -276,12 +281,14 @@ public void shutdown() {
public long readPointer() {
try {
FileInfo fi = FileInfoCache.ins().get(this.fileKey.toString());
if (null != fi) {
log.info("readPointer:{},file:{},fileKey:{}", fi, this.file, this.fileKey);
if (null != fi && Objects.equals(this.file, fi.getFileName())) {
return fi.getPointer();
}
} catch (Throwable e) {
log.error(e.getMessage());
log.error("readPointer error,file:{},fileKey:{}", file, fileKey, e);
}
log.warn("readPointer from 0,file:{},fileKey:{}", file, fileKey);
return 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ public class MoneRandomAccessFile extends RandomAccessFile {
* @param filename The path of the file to open.
* @param mode Specifies the mode to use ("r", "rw", etc.) See the
* BufferedLineReader documentation for more information.
* @param bufsize The buffer size (in bytes) to use.
* @param bufSize The buffer size (in bytes) to use.
* @throws IOException
*/
public MoneRandomAccessFile(String filename, String mode, int bufsize)
public MoneRandomAccessFile(String filename, String mode, int bufSize)
throws IOException {
super(filename, mode);
invalidate();
BUF_SIZE = bufsize;
BUF_SIZE = bufSize;
buffer = new byte[BUF_SIZE];
}

Expand Down Expand Up @@ -157,9 +157,9 @@ public long getFilePointer() throws IOException {
*/
@Override
public void seek(long pos) throws IOException {
int n = (int) (real_pos - pos);
long n = real_pos - pos;
if (n >= 0 && n <= buf_end) {
buf_pos = buf_end - n;
buf_pos = (int) (buf_end - n);
} else {
super.seek(pos);
invalidate();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.xiaomi.mone.file.common;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
Expand All @@ -15,6 +13,7 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
Expand All @@ -24,7 +23,7 @@
@Slf4j
public class FileInfoCache {

private Cache<String, FileInfo> cache = CacheBuilder.newBuilder().maximumSize(50000).build();
private ConcurrentHashMap<String, FileInfo> cache = new ConcurrentHashMap<>();

private Gson gson = new GsonBuilder().setLenient().create();

Expand All @@ -47,23 +46,25 @@ public void put(String key, FileInfo val) {


public FileInfo get(String key) {
return cache.getIfPresent(key);
return cache.get(key);
}

public void remove(String key) {
cache.invalidate(key);
cache.remove(key);
}

public ConcurrentMap<String, FileInfo> caches() {
return cache.asMap();
return cache;
}

@SneakyThrows
public void shutdown() {
String str = gson.toJson(cache.asMap());
log.info("cache shutdown size:{}", cache.size());
String str = gson.toJson(cache);
FileWriter writer = new FileWriter(filePath, false);
writer.append(str);
writer.flush();
writer.close();
}

@SneakyThrows
Expand All @@ -75,6 +76,7 @@ public void load() {
}.getType();
Map<String, FileInfo> map = gson.fromJson(str, typeOfT);
map.forEach((k, v) -> cache.put(k, v));
log.info("cache load size:{}", cache.size());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,7 @@ public interface EventListener {
default void remove(Object fileKey) {
}

default void stop() {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ public void remove(Object fileKey) {
readListenerMap.remove(fileKey);
}

@Override
public void stop() {
List<ReadListener> readListenerList = getReadListenerList();
for (ReadListener readListener : readListenerList) {
OzHeraReadListener ozHeraReadListener = ((OzHeraReadListener) readListener);
ozHeraReadListener.getLogFile().shutdown();
}
}

public List<ReadListener> getReadListenerList() {
return this.readListenerMap.values().stream().toList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.xiaomi.mone.file.common.SafeRun;
import com.xiaomi.mone.file.ozhera.HeraFile;
import com.xiaomi.mone.file.ozhera.HeraFileMonitor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;
Expand All @@ -20,6 +21,7 @@ public class OzHeraReadListener implements ReadListener {

private HeraFileMonitor monitor;

@Getter
private LogFile2 logFile;

private Consumer<ReadEvent> consumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ public HeraFileMonitor(long removeTime) {
});

remList.forEach(it -> {
log.info("remove file:{}", it.getKey());
log.info("remove file:{},fileKey:{}", it.getKey(), it.getValue());
fileMap.remove(it.getKey());
map.remove(it.getValue());
listener.remove(it.getValue());
});
} catch (Throwable ex) {
log.error(ex.getMessage(), ex);
log.error("remove file error", ex);
}
}, 5, 10, TimeUnit.SECONDS);
}
Expand All @@ -89,7 +89,7 @@ public void reg(String path, Predicate<String> predicate) throws IOException, In

WatchService watchService = FileSystems.getDefault().newWatchService();
directory.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_CREATE);
while (true) {
while (!stop) {
try {

WatchKey key = watchService.take();
Expand Down Expand Up @@ -168,6 +168,7 @@ private HeraFile initFile(File it) {
log.info("initFile fileName:{},fileKey:{}", name, fileKey);
map.put(hf.getFileKey(), hf);
fileMap.put(hf.getFileName(), hf);
log.info("initFile hf:{},map size:{},fileMap size:{}", hf, map.size(), fileMap.size());
this.listener.onEvent(FileEvent.builder()
.pointer(pointer)
.type(EventType.init)
Expand Down Expand Up @@ -196,4 +197,9 @@ private void modify(HeraFile hfile) {
}
}

public void stop() {
this.stop = true;
listener.stop();
}

}
Loading
Loading