Skip to content

Commit

Permalink
fix: Fixed the issue of reading a large number of files OOM and upgra…
Browse files Browse the repository at this point in the history
…ded the ES version
  • Loading branch information
wtt40122 committed Aug 14, 2024
1 parent 82ea6e4 commit 410775f
Show file tree
Hide file tree
Showing 16 changed files with 197 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public void test1() {
RequestPayload payload = RequestPayload.builder().maxTokens(4000).anthropicVersion("vertex-2023-10-16").messages(Lists.newArrayList(Message.builder().role("user")
.content(content)
.build())).build();
ResponsePayload r = c.call(c.token(), payload);
ResponsePayload r = c.call(c.token(""), payload);
System.out.println(r.getContent().get(0).getText());
}
}
162 changes: 73 additions & 89 deletions jcommon/es/pom.xml
Original file line number Diff line number Diff line change
@@ -1,91 +1,75 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>run.mone</groupId>
<artifactId>jcommon</artifactId>
<version>1.6.0-jdk21-SNAPSHOT</version>
</parent>
<artifactId>es</artifactId>
<version>1.5-jdk21-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.10.0</version>
<exclusions>
<exclusion>
<artifactId>httpclient</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
<exclusion>
<artifactId>httpcore</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
<exclusion>
<artifactId>httpcore-nio</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client-sniffer</artifactId>
<version>7.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.12</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore-nio</artifactId>
<version>4.4.13</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.13</version>
</dependency>
<dependency>
<groupId>run.mone</groupId>
<artifactId>nacos</artifactId>
<version>1.4-v1-jdk20-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>21</source>
<target>21</target>
<verbose>true</verbose>
<encoding>UTF-8</encoding>
<compilerArguments>
<sourcepath>${project.basedir}/src/main/java</sourcepath>
</compilerArguments>
</configuration>
</plugin>
<plugin>
<artifactId>maven-source-plugin</artifactId>
<version>2.1</version>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
<configuration>
<attach>true</attach>
</configuration>
</plugin>
</plugins>
</build>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>run.mone</groupId>
<artifactId>jcommon</artifactId>
<version>1.6.0-jdk21-SNAPSHOT</version>
</parent>
<artifactId>es</artifactId>
<version>1.7-jdk8-SNAPSHOT</version>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<es.version>7.17.21</es.version>
</properties>

<dependencies>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${es.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client-sniffer</artifactId>
<version>${es.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${es.version}</version>
</dependency>

<dependency>
<groupId>run.mone</groupId>
<artifactId>nacos</artifactId>
<version>1.4-v1-jdk20-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>8</source>
<target>8</target>
<verbose>true</verbose>
<encoding>UTF-8</encoding>
<compilerArguments>
<sourcepath>${project.basedir}/src/main/java</sourcepath>
</compilerArguments>
</configuration>
</plugin>
<plugin>
<artifactId>maven-source-plugin</artifactId>
<version>2.1</version>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
<configuration>
<attach>true</attach>
</configuration>
</plugin>
</plugins>
</build>
</project>
13 changes: 6 additions & 7 deletions jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,14 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.*;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.client.indices.*;
import org.elasticsearch.client.sniff.ElasticsearchNodesSniffer;
import org.elasticsearch.client.sniff.SniffOnFailureListener;
import org.elasticsearch.client.sniff.Sniffer;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand All @@ -47,6 +43,7 @@
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.LongBounds;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xcontent.XContentType;

import java.io.IOException;
import java.util.*;
Expand Down Expand Up @@ -186,7 +183,9 @@ private RestClientBuilder createRestClientBuilder(List<HttpHost> hosts, Header[]
}

private void initializeHighLevelClient(RestClientBuilder clientBuilder) {
this.client = new RestHighLevelClient(clientBuilder);
this.client = new RestHighLevelClientBuilder(clientBuilder.build())
.setApiCompatibilityMode(true)
.build();
this.restClient = client.getLowLevelClient();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.TimeValue;

import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.indices.GetMappingsResponse;
import org.elasticsearch.client.indices.IndexTemplatesExistRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,69 +1,79 @@
package com.xiaomi.mone.es.test;

import com.xiaomi.data.push.nacos.NacosConfig;
import com.xiaomi.mone.es.EsProcessor;
import com.xiaomi.mone.es.EsClient;
import com.xiaomi.mone.es.ProcessorConf;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.junit.Test;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

public class EsProcessorClientTest {

@Test
public void bulkInsert() throws InterruptedException {

NacosConfig config = new NacosConfig();
config.setDataId("zzy_new");
// config.init();

String ip = config.getConfig("es_ip");
String user = config.getConfig("es_user");
String pwd = config.getConfig("es_password");
ProcessorConf conf = new ProcessorConf(100, 5, 1, 100, 3, 5, new EsClient(ip, user, pwd), new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
System.out.println("before insert" + request);
}

@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
System.out.println("success after,request:" + request.getDescription() + " resopnse:" + Arrays.toString(response.getItems()));
}

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
System.out.println("success after,request:" + request + " failure:" + failure);
}
});
EsProcessor processor = new EsProcessor(conf);
try {
String indexName = "zgq_common_milog_staging_free_private_1-" + new SimpleDateFormat("yyyy.MM.dd").format(new Date());
Map<String, Object> data = new HashMap<>();
data.put("timestamp", System.currentTimeMillis());
data.put("filename", "/home/work/log/log-manager/server.log1");
int n = 1;
int count = 0;
while (true) {
//package com.xiaomi.mone.es.test;
//
//import com.google.common.reflect.TypeToken;
//import com.google.gson.Gson;
//import com.xiaomi.data.push.nacos.NacosConfig;
//import com.xiaomi.mone.es.EsClient;
//import com.xiaomi.mone.es.EsProcessor;
//import com.xiaomi.mone.es.ProcessorConf;
//import org.elasticsearch.action.bulk.BulkProcessor;
//import org.elasticsearch.action.bulk.BulkRequest;
//import org.elasticsearch.action.bulk.BulkResponse;
//import org.junit.Test;
//
//import java.text.SimpleDateFormat;
//import java.time.Instant;
//import java.util.Arrays;
//import java.util.Date;
//import java.util.HashMap;
//import java.util.Map;
//
//public class EsProcessorClientTest {
//
// @Test
// public void bulkInsert() throws InterruptedException {
//
// String str = "{\"@timestamp\":\"2024-06-20T19:39:15.871+08:00\",\"@version\":\"1\",\"message\":\"hello world data test wtt~\",\"logger_name\":\"com.xiaomi.ai.Application\",\"thread_name\":\"http-nio-10010-exec-3\",\"level\":\"INFO\",\"level_value\":20000,\"LOG_NAME\":\"ai-workflow\",\"SENTRY_ENABLED\":\"false\",\"user_name\":\"wangjunfei3\",\"user_team\":\"ncl7150\",\"request_uri\":\"/hello\",\"trace_id\":\"9cf73bfe51e877a83806ac01b6630815\",\"trace_flags\":\"01\",\"span_id\":\"c13434f908acebb4\"}";
// Map<String, Object> data = new Gson().fromJson(str, new TypeToken<Map<String, Object>>() {
// }.getType());
// data.put("timeStamp", System.currentTimeMillis());
//
// NacosConfig config = new NacosConfig();
// config.setDataId("zzy_new");
//// config.init();
//
// String ip = "zjydw.api.es.srv:80";
// String user = config.getConfig("es_user");
// String pwd = config.getConfig("es_password");
// String token = "4244b7014a5c44fea63bea711c7697fe";
// String catalog = "es_zjy_log";
// String database = "default";
//
// EsClient esClient = new EsClient(ip, token, catalog, database);
// ProcessorConf conf = new ProcessorConf(100, 5, 1, 100, 3, 5, esClient, new BulkProcessor.Listener() {
// @Override
// public void beforeBulk(long executionId, BulkRequest request) {
// System.out.println("before insert" + request);
// }
//
// @Override
// public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
// System.out.println("success after,request:" + request.getDescription() + " resopnse:" + Arrays.toString(response.getItems()));
// }
//
// @Override
// public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
// System.out.println("success after,request:" + request + " failure:" + failure);
// }
// });
// EsProcessor processor = new EsProcessor(conf);
// try {
// String indexName = "prod_hera_index_95956-" + new SimpleDateFormat("yyyy.MM.dd").format(new Date());
// int n = 1;
// int count = 0;
// while (true) {
//// processor.bulkInsert(indexName, data);
// processor.bulkInsert(indexName, data);
processor.bulkInsert(indexName, data);
count++;
if (count == n) {
break;
}
}
// Thread.sleep(10000l);
System.in.read();
}catch (Exception e){
e.printStackTrace();
}

}
}
// count++;
// if (count == n) {
// break;
// }
// }
//// Thread.sleep(10000l);
// System.in.read();
// } catch (Exception e) {
// e.printStackTrace();
// }
//
// }
//}
2 changes: 2 additions & 0 deletions jcommon/file/src/main/java/com/xiaomi/mone/file/ILogFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
*/
public interface ILogFile {

int LINE_MAX_LENGTH = 1100000;

void readLine() throws IOException;

void setStop(boolean stop);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class LogFile implements ILogFile {

private String md5;

private static final int LINE_MAX_LENGTH = 50000;
// private static final int LINE_MAX_LENGTH = 50000;

public LogFile() {

Expand Down
Loading

0 comments on commit 410775f

Please sign in to comment.