Skip to content

Commit

Permalink
refactor: update file common (#893)
Browse files Browse the repository at this point in the history
* refactor: update file common

* refactor: update pom compile
  • Loading branch information
wtt40122 authored Oct 25, 2024
1 parent 9544cd2 commit 5deb08a
Show file tree
Hide file tree
Showing 13 changed files with 98 additions and 65 deletions.
6 changes: 6 additions & 0 deletions jcommon/codegen/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,11 @@
<artifactId>mockito-inline</artifactId>
<version>3.9.0</version>
</dependency>

<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>
</dependencies>
</project>
2 changes: 1 addition & 1 deletion jcommon/es/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<version>1.6.0-jdk21-SNAPSHOT</version>
</parent>
<artifactId>es</artifactId>
<version>1.7-jdk8-SNAPSHOT</version>
<version>1.8-jdk8-SNAPSHOT</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ public interface ILogFile {

int LINE_MAX_LENGTH = 1100000;

void readLine() throws IOException;
void readLine() throws Exception;

void setStop(boolean stop);

Expand Down
17 changes: 14 additions & 3 deletions jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.xiaomi.mone.file;

import com.google.common.collect.Lists;
import com.xiaomi.mone.file.common.FileUtils;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -90,12 +92,13 @@ private void open() {
}
}

public void readLine() throws IOException {
@Override
public void readLine() throws Exception {
while (true) {
open();
//兼容文件切换时,缓存的pointer
try {
log.info("open file:{},pointer:{}", file, raf.getFilePointer());
log.info("open file:{},pointer:{},fileKey:{}", file, pointer, FileUtils.fileKey(new File(file)));
if (pointer > raf.length()) {
pointer = 0;
lineNumber = 0;
Expand All @@ -104,9 +107,11 @@ public void readLine() throws IOException {
log.error("file.length() IOException, file:{}", this.file, e);
}
raf.seek(pointer);
log.info("start readLine file:{},pointer:{}", file, pointer);

while (true) {
String line = raf.getNextLine();

if (null != line && lineNumber == 0 && pointer == 0) {
String hashLine = line.length() > 100 ? line.substring(0, 100) : line;
beforePointerHashCode = hashLine.hashCode();
Expand All @@ -115,16 +120,19 @@ public void readLine() throws IOException {
line = lineCutOff(line);

if (reFresh) {
log.info("readline reFresh:{},pointer:{},lineNumber:{},fileKey:{}", this.file, this.pointer, this.lineNumber, FileUtils.fileKey(new File(file)));
break;
}

if (reOpen) {
log.info("readline reOpen:{},pointer:{},lineNumber:{},fileKey:{}", this.file, this.pointer, this.lineNumber, FileUtils.fileKey(new File(file)));
pointer = 0;
lineNumber = 0;
break;
}

if (stop) {
log.info("readline stop:{},pointer:{},lineNumber:{},fileKey:{}", this.file, this.pointer, this.lineNumber, FileUtils.fileKey(new File(file)));
break;
}

Expand All @@ -133,11 +141,12 @@ public void readLine() throws IOException {
reOpen = true;
pointer = 0;
lineNumber = 0;
log.warn("file:{} content have been cut, goto reOpen file", file);
log.info("readline file:{} content have been cut, goto reOpen file,pointer:{},lineNumber:{},fileKey:{}", file, pointer, lineNumber, FileUtils.fileKey(new File(file)));
break;
}

if (listener.isContinue(line)) {
log.debug("readline isBreak:{},pointer:{},lineNumber:{},fileKey:{}", this.file, this.pointer, this.lineNumber, FileUtils.fileKey(new File(file)));
continue;
}

Expand All @@ -159,6 +168,7 @@ public void readLine() throws IOException {
}
raf.close();
if (stop) {
log.info("read file stop:{},pointer:{},lineNumber:{},fileKey:{}", this.file, this.pointer, this.lineNumber, FileUtils.fileKey(new File(file)));
break;
}
}
Expand Down Expand Up @@ -218,6 +228,7 @@ private boolean contentHasCutting(String line) throws IOException {
//针对大文件,排除掉局部内容删除的情况,更准确识别内容整体切割的场景(误判重复采集成本较高)
long mPointer = maxPointer > 70000 ? maxPointer - 700 : maxPointer;
if (currentFileMaxPointer < mPointer) {
maxPointer = currentFileMaxPointer;
return true;
}

Expand Down
19 changes: 13 additions & 6 deletions jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.MessageDigest;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -193,6 +194,9 @@ public void readLine() throws IOException {
listener.setReadTime();

listener.onEvent(event);
if (pointer % 100000 == 0 || pointer == 1) {
log.info("file readResult:{}", readResult);
}
}
raf.close();
if (stop) {
Expand All @@ -201,6 +205,7 @@ public void readLine() throws IOException {
break;
}
}
log.info("read file:{},finished,,pointer:{},lineNumber:{},fileKey:{}", file, this.pointer, this.lineNumber, this.fileKey);
}

@Override
Expand Down Expand Up @@ -238,13 +243,13 @@ private boolean contentHasCutting(String line) throws IOException {
return false;
}

long currentFileMaxPointer;
long currentFileMaxLength;
try {
currentFileMaxPointer = raf.length();
if (currentFileMaxPointer == 0L) {
currentFileMaxLength = raf.length();
if (currentFileMaxLength == 0L) {
raf.getFD().sync();
TimeUnit.MILLISECONDS.sleep(30);
currentFileMaxPointer = raf.length();
currentFileMaxLength = raf.length();
}
} catch (IOException e) {
log.error("get fileMaxPointer IOException", e);
Expand Down Expand Up @@ -276,12 +281,14 @@ public void shutdown() {
public long readPointer() {
try {
FileInfo fi = FileInfoCache.ins().get(this.fileKey.toString());
if (null != fi) {
log.info("readPointer:{},file:{},fileKey:{}", fi, this.file, this.fileKey);
if (null != fi && Objects.equals(this.file, fi.getFileName())) {
return fi.getPointer();
}
} catch (Throwable e) {
log.error(e.getMessage());
log.error("readPointer error,file:{},fileKey:{}", file, fileKey, e);
}
log.warn("readPointer from 0,file:{},fileKey:{}", file, fileKey);
return 0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ public class MoneRandomAccessFile extends RandomAccessFile {
* @param filename The path of the file to open.
* @param mode Specifies the mode to use ("r", "rw", etc.) See the
* BufferedLineReader documentation for more information.
* @param bufsize The buffer size (in bytes) to use.
* @param bufSize The buffer size (in bytes) to use.
* @throws IOException
*/
public MoneRandomAccessFile(String filename, String mode, int bufsize)
public MoneRandomAccessFile(String filename, String mode, int bufSize)
throws IOException {
super(filename, mode);
invalidate();
BUF_SIZE = bufsize;
BUF_SIZE = bufSize;
buffer = new byte[BUF_SIZE];
}

Expand Down Expand Up @@ -157,9 +157,9 @@ public long getFilePointer() throws IOException {
*/
@Override
public void seek(long pos) throws IOException {
int n = (int) (real_pos - pos);
long n = real_pos - pos;
if (n >= 0 && n <= buf_end) {
buf_pos = buf_end - n;
buf_pos = (int) (buf_end - n);
} else {
super.seek(pos);
invalidate();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.xiaomi.mone.file.common;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
Expand All @@ -15,6 +13,7 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
Expand All @@ -24,7 +23,7 @@
@Slf4j
public class FileInfoCache {

private Cache<String, FileInfo> cache = CacheBuilder.newBuilder().maximumSize(50000).build();
private ConcurrentHashMap<String, FileInfo> cache = new ConcurrentHashMap<>();

private Gson gson = new GsonBuilder().setLenient().create();

Expand All @@ -47,23 +46,25 @@ public void put(String key, FileInfo val) {


public FileInfo get(String key) {
return cache.getIfPresent(key);
return cache.get(key);
}

public void remove(String key) {
cache.invalidate(key);
cache.remove(key);
}

public ConcurrentMap<String, FileInfo> caches() {
return cache.asMap();
return cache;
}

@SneakyThrows
public void shutdown() {
String str = gson.toJson(cache.asMap());
log.info("cache shutdown size:{}", cache.size());
String str = gson.toJson(cache);
FileWriter writer = new FileWriter(filePath, false);
writer.append(str);
writer.flush();
writer.close();
}

@SneakyThrows
Expand All @@ -75,6 +76,7 @@ public void load() {
}.getType();
Map<String, FileInfo> map = gson.fromJson(str, typeOfT);
map.forEach((k, v) -> cache.put(k, v));
log.info("cache load size:{}", cache.size());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,7 @@ public interface EventListener {
default void remove(Object fileKey) {
}

default void stop() {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ public void remove(Object fileKey) {
readListenerMap.remove(fileKey);
}

@Override
public void stop() {
List<ReadListener> readListenerList = getReadListenerList();
for (ReadListener readListener : readListenerList) {
OzHeraReadListener ozHeraReadListener = ((OzHeraReadListener) readListener);
ozHeraReadListener.getLogFile().shutdown();
}
}

public List<ReadListener> getReadListenerList() {
return this.readListenerMap.values().stream().toList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.xiaomi.mone.file.common.SafeRun;
import com.xiaomi.mone.file.ozhera.HeraFile;
import com.xiaomi.mone.file.ozhera.HeraFileMonitor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;
Expand All @@ -20,6 +21,7 @@ public class OzHeraReadListener implements ReadListener {

private HeraFileMonitor monitor;

@Getter
private LogFile2 logFile;

private Consumer<ReadEvent> consumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ public HeraFileMonitor(long removeTime) {
});

remList.forEach(it -> {
log.info("remove file:{}", it.getKey());
log.info("remove file:{},fileKey:{}", it.getKey(), it.getValue());
fileMap.remove(it.getKey());
map.remove(it.getValue());
listener.remove(it.getValue());
});
} catch (Throwable ex) {
log.error(ex.getMessage(), ex);
log.error("remove file error", ex);
}
}, 5, 10, TimeUnit.SECONDS);
}
Expand All @@ -89,7 +89,7 @@ public void reg(String path, Predicate<String> predicate) throws IOException, In

WatchService watchService = FileSystems.getDefault().newWatchService();
directory.register(watchService, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_CREATE);
while (true) {
while (!stop) {
try {

WatchKey key = watchService.take();
Expand Down Expand Up @@ -168,6 +168,7 @@ private HeraFile initFile(File it) {
log.info("initFile fileName:{},fileKey:{}", name, fileKey);
map.put(hf.getFileKey(), hf);
fileMap.put(hf.getFileName(), hf);
log.info("initFile hf:{},map size:{},fileMap size:{}", hf, map.size(), fileMap.size());
this.listener.onEvent(FileEvent.builder()
.pointer(pointer)
.type(EventType.init)
Expand Down Expand Up @@ -196,4 +197,9 @@ private void modify(HeraFile hfile) {
}
}

public void stop() {
this.stop = true;
listener.stop();
}

}
Loading

0 comments on commit 5deb08a

Please sign in to comment.