diff --git a/jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java b/jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java index f557c4d83..e2b145d2f 100644 --- a/jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java +++ b/jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java @@ -58,8 +58,8 @@ public class EsClient { private static Sniffer sniffer; - private static final int SNIFF_INTERVAL_MILLIS = 30 * 1000; - private static final int SNIFF_AFTER_FAILURE_DELAY_MILLIS = 30 * 1000; + private static final int SNIFF_INTERVAL_MILLIS = 60 * 1000 * 3; + private static final int SNIFF_AFTER_FAILURE_DELAY_MILLIS = 60 * 1000; private static final int MAX_CONN_PER_ROUTE = 500; private static final int MAX_CONN_TOTAL = 500; private static final int SOCKET_TIMEOUT_MS = 10 * 60 * 1000; @@ -196,7 +196,7 @@ private void initializeSniffer() { .setSniffAfterFailureDelayMillis(SNIFF_AFTER_FAILURE_DELAY_MILLIS) .setNodesSniffer(new ElasticsearchNodesSniffer( restClient, - TimeUnit.SECONDS.toMillis(5), + TimeUnit.SECONDS.toMillis(60), ElasticsearchNodesSniffer.Scheme.HTTP)) .build(); sniffOnFailureListener.setSniffer(sniffer); diff --git a/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsClientTest.java b/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsClientTest.java index 97ef93055..bc9b5a24f 100644 --- a/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsClientTest.java +++ b/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsClientTest.java @@ -226,8 +226,9 @@ public void getClusterHealth() throws IOException { @Test public void queryIndexMetadataTest() throws IOException { - GetMappingsResponse metadata = client.queryIndexMapping("zgq_common_milog_staging_app_private_1"); - System.out.println(String.format("result:%s", gson.toJson(metadata))); + GetMappingsResponse metadata = client.queryIndexMapping("test_scf_log_index"); +// Map mappings = metadata.mappings(); +// System.out.println(String.format("result:%s", gson.toJson(metadata))); } @Test diff --git a/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsProcessorClientTest.java b/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsProcessorClientTest.java index 7653a6d56..a6a7fb69b 100644 --- a/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsProcessorClientTest.java +++ b/jcommon/es/src/test/java/com/xiaomi/mone/es/test/EsProcessorClientTest.java @@ -22,7 +22,7 @@ public void bulkInsert() throws InterruptedException { NacosConfig config = new NacosConfig(); config.setDataId("zzy_new"); - config.init(); +// config.init(); String ip = config.getConfig("es_ip"); String user = config.getConfig("es_user"); @@ -53,13 +53,14 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) int count = 0; while (true) { // processor.bulkInsert(indexName, data); - processor.bulkUpsert(indexName, "YpzPE4UBt3Uy5NFQ1V5e", data); + processor.bulkInsert(indexName, data); count++; if (count == n) { break; } } - Thread.sleep(10000l); +// Thread.sleep(10000l); + System.in.read(); }catch (Exception e){ e.printStackTrace(); } diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java index 0a898721e..f78f883e6 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java @@ -16,5 +16,12 @@ public interface ILogFile { void initLogFile(String file, ReadListener listener, long pointer, long lineNumber); + /** + * It only needs to be called when an exception occurs and can only be called externally. + */ + void setExceptionFinish(); + + boolean getExceptionFinish(); + } diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile.java index 5f8b82b61..71654948e 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile.java @@ -35,6 +35,8 @@ public class LogFile implements ILogFile { @Setter private volatile boolean reFresh; + private volatile boolean exceptionFinish; + @Getter private int beforePointerHashCode; @@ -171,6 +173,16 @@ public void initLogFile(String file, ReadListener listener, long pointer, long l this.lineNumber = lineNumber; } + @Override + public void setExceptionFinish() { + exceptionFinish = true; + } + + @Override + public boolean getExceptionFinish() { + return exceptionFinish; + } + private String lineCutOff(String line) { if (null != line) { //todo 大行文件先临时截断 diff --git a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java index 27a921211..e77743c3a 100644 --- a/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java +++ b/jcommon/file/src/main/java/com/xiaomi/mone/file/LogFile2.java @@ -42,6 +42,8 @@ public class LogFile2 implements ILogFile { @Setter private volatile boolean reFresh; + private volatile boolean exceptionFinish; + @Getter private int beforePointerHashCode; @@ -110,12 +112,13 @@ private void open() { } } + @Override public void readLine() throws IOException { while (true) { open(); //兼容文件切换时,缓存的pointer try { - log.info("open file:{},pointer:{}", file, this.pointer); + log.info("open file:{},pointer:{},lineNumber:{},", file, this.pointer, this.lineNumber); if (pointer > raf.length()) { pointer = 0; lineNumber = 0; @@ -123,7 +126,7 @@ public void readLine() throws IOException { } catch (Exception e) { log.error("file.length() IOException, file:{}", this.file, e); } - log.info("rel open file:{},pointer:{}", file, this.pointer); + log.info("rel open file:{},pointer:{},lineNumber:{}", file, this.pointer, this.lineNumber); raf.seek(pointer); while (true) { @@ -166,6 +169,7 @@ public void readLine() throws IOException { } if (listener.isBreak(line)) { + log.info("isBreak:{},pointer:{},lineNumber:{},fileKey:{}", this.file, this.pointer, this.lineNumber, this.fileKey); stop = true; break; } @@ -193,7 +197,7 @@ public void readLine() throws IOException { } raf.close(); if (stop) { - log.info("stop:{},pointer:{},fileKey:{}", this.file, this.pointer, this.fileKey); + log.info("stop:{},pointer:{},lineNumber:{},fileKey:{}", this.file, this.pointer, this.lineNumber, this.fileKey); FileInfoCache.ins().put(this.fileKey.toString(), FileInfo.builder().pointer(this.pointer).fileName(this.file).build()); break; } @@ -209,6 +213,16 @@ public void initLogFile(String file, ReadListener listener, long pointer, long l this.lineNumber = lineNumber; } + @Override + public void setExceptionFinish() { + exceptionFinish = true; + } + + @Override + public boolean getExceptionFinish() { + return exceptionFinish; + } + private String lineCutOff(String line) { if (null != line) { //todo 大行文件先临时截断 diff --git a/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java b/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java index 363f59624..8644d3e01 100644 --- a/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java +++ b/jcommon/file/src/test/java/com/xiaomi/mone/file/LogFileTest.java @@ -74,12 +74,12 @@ public void testLogFileMonitor() { monitor.setListener(new DefaultMonitorListener(monitor, readEvent -> { System.out.println(readEvent.getReadResult().getLines()); })); - String fileName = "/home/work/log/test/provider/server.log.*"; + String fileName = "/home/work/log/file.log.*"; Pattern pattern = Pattern.compile(fileName); - monitor.reg("/home/work/log/test/provider/", it -> { + monitor.reg("/home/work/log", it -> { boolean matches = pattern.matcher(it).matches(); log.info("file:{},matches:{}", it, matches); - return matches; + return true; }); log.info("reg finish"); System.in.read();