Skip to content

Commit

Permalink
refactor: update es and upgrade file (#848)
Browse files Browse the repository at this point in the history
  • Loading branch information
wtt40122 authored Jun 11, 2024
1 parent d27b7a6 commit 3f91cb4
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 14 deletions.
6 changes: 3 additions & 3 deletions jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@
public class EsClient {

private static Sniffer sniffer;
private static final int SNIFF_INTERVAL_MILLIS = 30 * 1000;
private static final int SNIFF_AFTER_FAILURE_DELAY_MILLIS = 30 * 1000;
private static final int SNIFF_INTERVAL_MILLIS = 60 * 1000 * 3;
private static final int SNIFF_AFTER_FAILURE_DELAY_MILLIS = 60 * 1000;
private static final int MAX_CONN_PER_ROUTE = 500;
private static final int MAX_CONN_TOTAL = 500;
private static final int SOCKET_TIMEOUT_MS = 10 * 60 * 1000;
Expand Down Expand Up @@ -196,7 +196,7 @@ private void initializeSniffer() {
.setSniffAfterFailureDelayMillis(SNIFF_AFTER_FAILURE_DELAY_MILLIS)
.setNodesSniffer(new ElasticsearchNodesSniffer(
restClient,
TimeUnit.SECONDS.toMillis(5),
TimeUnit.SECONDS.toMillis(60),
ElasticsearchNodesSniffer.Scheme.HTTP))
.build();
sniffOnFailureListener.setSniffer(sniffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,9 @@ public void getClusterHealth() throws IOException {

@Test
public void queryIndexMetadataTest() throws IOException {
GetMappingsResponse metadata = client.queryIndexMapping("zgq_common_milog_staging_app_private_1");
System.out.println(String.format("result:%s", gson.toJson(metadata)));
GetMappingsResponse metadata = client.queryIndexMapping("test_scf_log_index");
// Map<String, MappingMetadata> mappings = metadata.mappings();
// System.out.println(String.format("result:%s", gson.toJson(metadata)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public void bulkInsert() throws InterruptedException {

NacosConfig config = new NacosConfig();
config.setDataId("zzy_new");
config.init();
// config.init();

String ip = config.getConfig("es_ip");
String user = config.getConfig("es_user");
Expand Down Expand Up @@ -53,13 +53,14 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
int count = 0;
while (true) {
// processor.bulkInsert(indexName, data);
processor.bulkUpsert(indexName, "YpzPE4UBt3Uy5NFQ1V5e", data);
processor.bulkInsert(indexName, data);
count++;
if (count == n) {
break;
}
}
Thread.sleep(10000l);
// Thread.sleep(10000l);
System.in.read();
}catch (Exception e){
e.printStackTrace();
}
Expand Down
7 changes: 7 additions & 0 deletions jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,12 @@ public interface ILogFile {

void initLogFile(String file, ReadListener listener, long pointer, long lineNumber);

/**
* It only needs to be called when an exception occurs and can only be called externally.
*/
void setExceptionFinish();

boolean getExceptionFinish();


}
12 changes: 12 additions & 0 deletions jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class LogFile implements ILogFile {
@Setter
private volatile boolean reFresh;

private volatile boolean exceptionFinish;

@Getter
private int beforePointerHashCode;

Expand Down Expand Up @@ -171,6 +173,16 @@ public void initLogFile(String file, ReadListener listener, long pointer, long l
this.lineNumber = lineNumber;
}

@Override
public void setExceptionFinish() {
exceptionFinish = true;
}

@Override
public boolean getExceptionFinish() {
return exceptionFinish;
}

private String lineCutOff(String line) {
if (null != line) {
//todo 大行文件先临时截断
Expand Down
20 changes: 17 additions & 3 deletions jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public class LogFile2 implements ILogFile {
@Setter
private volatile boolean reFresh;

private volatile boolean exceptionFinish;

@Getter
private int beforePointerHashCode;

Expand Down Expand Up @@ -110,20 +112,21 @@ private void open() {
}
}

@Override
public void readLine() throws IOException {
while (true) {
open();
//兼容文件切换时,缓存的pointer
try {
log.info("open file:{},pointer:{}", file, this.pointer);
log.info("open file:{},pointer:{},lineNumber:{},", file, this.pointer, this.lineNumber);
if (pointer > raf.length()) {
pointer = 0;
lineNumber = 0;
}
} catch (Exception e) {
log.error("file.length() IOException, file:{}", this.file, e);
}
log.info("rel open file:{},pointer:{}", file, this.pointer);
log.info("rel open file:{},pointer:{},lineNumber:{}", file, this.pointer, this.lineNumber);
raf.seek(pointer);

while (true) {
Expand Down Expand Up @@ -166,6 +169,7 @@ public void readLine() throws IOException {
}

if (listener.isBreak(line)) {
log.info("isBreak:{},pointer:{},lineNumber:{},fileKey:{}", this.file, this.pointer, this.lineNumber, this.fileKey);
stop = true;
break;
}
Expand Down Expand Up @@ -193,7 +197,7 @@ public void readLine() throws IOException {
}
raf.close();
if (stop) {
log.info("stop:{},pointer:{},fileKey:{}", this.file, this.pointer, this.fileKey);
log.info("stop:{},pointer:{},lineNumber:{},fileKey:{}", this.file, this.pointer, this.lineNumber, this.fileKey);
FileInfoCache.ins().put(this.fileKey.toString(), FileInfo.builder().pointer(this.pointer).fileName(this.file).build());
break;
}
Expand All @@ -209,6 +213,16 @@ public void initLogFile(String file, ReadListener listener, long pointer, long l
this.lineNumber = lineNumber;
}

@Override
public void setExceptionFinish() {
exceptionFinish = true;
}

@Override
public boolean getExceptionFinish() {
return exceptionFinish;
}

private String lineCutOff(String line) {
if (null != line) {
//todo 大行文件先临时截断
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@ public void testLogFileMonitor() {
monitor.setListener(new DefaultMonitorListener(monitor, readEvent -> {
System.out.println(readEvent.getReadResult().getLines());
}));
String fileName = "/home/work/log/test/provider/server.log.*";
String fileName = "/home/work/log/file.log.*";
Pattern pattern = Pattern.compile(fileName);
monitor.reg("/home/work/log/test/provider/", it -> {
monitor.reg("/home/work/log", it -> {
boolean matches = pattern.matcher(it).matches();
log.info("file:{},matches:{}", it, matches);
return matches;
return true;
});
log.info("reg finish");
System.in.read();
Expand Down

0 comments on commit 3f91cb4

Please sign in to comment.