Skip to content

Commit

Permalink
feat: add init es sniffer
Browse files Browse the repository at this point in the history
  • Loading branch information
wtt40122 committed Mar 18, 2024
1 parent de024cb commit ba440e4
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 137 deletions.
244 changes: 107 additions & 137 deletions jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.action.ActionListener;
Expand All @@ -26,12 +25,14 @@
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.*;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.client.indices.*;
import org.elasticsearch.client.sniff.ElasticsearchNodesSniffer;
import org.elasticsearch.client.sniff.NodesSniffer;
import org.elasticsearch.client.sniff.SniffOnFailureListener;
import org.elasticsearch.client.sniff.Sniffer;
import org.elasticsearch.common.unit.TimeValue;
Expand All @@ -56,182 +57,151 @@
*/
public class EsClient {

private static RestClientBuilder restClientBuilder;
private static Sniffer sniffer;
private static final int TIME_OUT = 10 * 60 * 1000;
private static final int SNIFF_INTERVAL_MILLIS = 30 * 1000;
private static final int SNIFF_AFTER_FAILURE_DELAY_MILLIS = 30 * 1000;
private static final int MAX_CONN_PER_ROUTE = 500;
private static final int MAX_CONN_TOTAL = 500;
private static final int SOCKET_TIMEOUT_MS = 10 * 60 * 1000;
private static final int CONNECTION_REQUEST_TIMEOUT_MS = 5000 * 1000;
private static final int CONNECT_TIMEOUT_MS = 5000 * 1000;
private static final long KEEP_ALIVE_DURATION_MS = TimeUnit.MINUTES.toMillis(2);

private RestHighLevelClient client;

private RestClient restClient;

// public EsClient(String esAddr, String user, String pwd) {
// final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
// credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, pwd));
// RestClientBuilder builder = RestClient.builder(new HttpHost(esAddr.split(":")[0], Integer.valueOf(esAddr.split(":")[1]), "http"))
// .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
// @Override
// public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
// return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
// }
// }).setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
// // 该方法接收一个RequestConfig.Builder对象,对该对象进行修改后然后返回。
// @Override
// public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) {
// return builder.setConnectTimeout(5000 * 1000) // 连接超时(默认为1秒)
// .setSocketTimeout(6000 * 1000);// 套接字超时(默认为30秒)//更改客户端的超时限制默认30秒现在改为100*1000分钟
// }
// });// 调整最大重试超时时间(默认为30秒).setMaxRetryTimeoutMillis(60000)
// this.client = new RestHighLevelClient(builder);
//
// }
public static boolean startedSniffer = true;

private SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener();

public EsClient(String esAddr, String token, String catalog, String database) {
Header[] defaultHeaders = new Header[]{
validateParams(esAddr, token, catalog, database);

Header[] defaultHeaders = createDefaultHeaders(token, catalog, database);

RestClientBuilder builder = createRestClientBuilder(esAddr, defaultHeaders);

initializeHighLevelClient(builder);

initializeSnifferIfNeeded();
}

private void validateParams(String esAddr, String token, String catalog, String database) {
if (esAddr == null || esAddr.isEmpty() || token == null || token.isEmpty() || catalog == null || catalog.isEmpty() || database == null || database.isEmpty()) {
throw new IllegalArgumentException("Invalid parameters provided");
}
}

private Header[] createDefaultHeaders(String token, String catalog, String database) {
return new Header[]{
new BasicHeader("Authorization", token),
new BasicHeader("catalog", catalog),
new BasicHeader("database", database)
};
}

private void initializeSnifferIfNeeded() {
if (startedSniffer) {
initializeSniffer();
}
}

private RestClientBuilder createRestClientBuilder(String esAddr, Header[] defaultHeaders) {
String[] esAddrParts = esAddr.split(":");
if (esAddrParts.length != 2) {
throw new IllegalArgumentException("Invalid Elasticsearch address");
}

String host = esAddrParts[0];
int port = Integer.parseInt(esAddrParts[1]);

RestClientBuilder builder = RestClient.builder(new HttpHost(esAddr.split(":")[0], Integer.parseInt(esAddr.split(":")[1]), "http"))
return RestClient.builder(new HttpHost(host, port, "http"))
.setDefaultHeaders(defaultHeaders)
.setHttpClientConfigCallback(x -> x.setMaxConnPerRoute(500)
.setMaxConnTotal(500)
.setDefaultRequestConfig(RequestConfig.custom().setSocketTimeout(10 * 60 * 1000)
.setConnectionRequestTimeout(5000 * 1000)
.setConnectTimeout(5000 * 1000)
.setFailureListener(sniffOnFailureListener)
.setHttpClientConfigCallback(x -> x.setMaxConnPerRoute(MAX_CONN_PER_ROUTE)
.setMaxConnTotal(MAX_CONN_TOTAL)
.setDefaultRequestConfig(RequestConfig.custom()
.setSocketTimeout(SOCKET_TIMEOUT_MS)
.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MS)
.setConnectTimeout(CONNECT_TIMEOUT_MS)
.build())
.setKeepAliveStrategy((response, context) -> TimeUnit.MINUTES.toMillis(2))
.setKeepAliveStrategy((response, context) -> KEEP_ALIVE_DURATION_MS)
.setDefaultIOReactorConfig(IOReactorConfig.custom().setSoKeepAlive(true).build()));
this.client = new RestHighLevelClient(builder);
}


public EsClient(String esAddr, String user, String pwd) {
validateParams(esAddr, user, pwd);

List<HttpHost> hosts = createHttpHosts(esAddr);

Header[] headers = createHeaders(user, pwd);

RestClientBuilder clientBuilder = createRestClientBuilder(hosts, headers);

initializeHighLevelClient(clientBuilder);

initializeSnifferIfNeeded();
}

private void validateParams(String esAddr, String user, String pwd) {
if (esAddr == null || esAddr.isEmpty() || user == null || user.isEmpty() || pwd == null || pwd.isEmpty()) {
throw new IllegalArgumentException("Invalid parameters provided");
}
}

private List<HttpHost> createHttpHosts(String esAddr) {
String[] addrs = esAddr.split(",");
List<HttpHost> hosts = new ArrayList<>();
for (String addr : addrs) {
String[] hostAndPort = addr.split(":");
String host = hostAndPort[0];
int port = Integer.parseInt(hostAndPort[1]);
HttpHost host = new HttpHost(hostAndPort[0], port);
hosts.add(host);
hosts.add(new HttpHost(host, port));
}
return hosts;
}

private Header[] createHeaders(String user, String pwd) {
String urlEncodePassword = new String(Base64.getUrlEncoder().encode(String.format("%s:%s", user, pwd).getBytes()));
String basicAuth = String.format("Basic %s", urlEncodePassword);
Header[] headers = new Header[]{new BasicHeader("Authorization", basicAuth), new BasicHeader("Content-Type", "application/json")};
return new Header[]{new BasicHeader("Authorization", basicAuth), new BasicHeader("Content-Type", "application/json")};
}

RestClientBuilder clientBuilder = RestClient.builder(hosts.toArray(new HttpHost[0]))
private RestClientBuilder createRestClientBuilder(List<HttpHost> hosts, Header[] headers) {
return RestClient.builder(hosts.toArray(new HttpHost[0]))
.setDefaultHeaders(headers)
.setHttpClientConfigCallback(x -> x.setMaxConnPerRoute(500)
.setMaxConnTotal(500)
.setDefaultRequestConfig(RequestConfig.custom().setSocketTimeout(10 * 60 * 1000)
.setConnectionRequestTimeout(5000 * 1000)
.setConnectTimeout(5000 * 1000).build())
.setKeepAliveStrategy((response, context) -> TimeUnit.MINUTES.toMillis(2))
.setFailureListener(sniffOnFailureListener)
.setFailureListener(sniffOnFailureListener)
.setHttpClientConfigCallback(x -> x.setMaxConnPerRoute(MAX_CONN_PER_ROUTE)
.setMaxConnTotal(MAX_CONN_TOTAL)
.setDefaultRequestConfig(RequestConfig.custom()
.setSocketTimeout(SOCKET_TIMEOUT_MS)
.setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MS)
.setConnectTimeout(CONNECT_TIMEOUT_MS)
.build())
.setKeepAliveStrategy((response, context) -> KEEP_ALIVE_DURATION_MS)
.setDefaultIOReactorConfig(IOReactorConfig.custom().setSoKeepAlive(true).build()));
this.client = new RestHighLevelClient(clientBuilder);

}

public EsClient(List<String> restAddress, int httpPort, String username, String password, int timeOut, int snifferIntervalMillis, int snifferAfterFailDelayMillis) throws IOException {
snifferNodeInit(restAddress, httpPort, username, password, timeOut, snifferIntervalMillis, snifferAfterFailDelayMillis);
}

public EsClient(List<String> restAddress, int httpPort, String username, String password) throws IOException {
snifferNodeInit(restAddress, httpPort, username, password, TIME_OUT, SNIFF_INTERVAL_MILLIS, SNIFF_AFTER_FAILURE_DELAY_MILLIS);
private void initializeHighLevelClient(RestClientBuilder clientBuilder) {
this.client = new RestHighLevelClient(clientBuilder);
this.restClient = client.getLowLevelClient();
}

private void snifferNodeInit(List<String> restAddress, int httpPort, String username, String password, int timeOut, int snifferIntervalMillis, int snifferAfterFailDelayMillis) throws IOException {


HttpHost[] hosts = new HttpHost[restAddress.size()];
for (int index = 0; index < restAddress.size(); index++) {
hosts[index] = new HttpHost(restAddress.get(index), httpPort, "http");
}

RestClientBuilder.RequestConfigCallback requestConfigCallback = new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(
RequestConfig.Builder requestConfigBuilder) {
return requestConfigBuilder
.setConnectTimeout(timeOut)
.setSocketTimeout(timeOut);
}
};

RestClientBuilder.HttpClientConfigCallback httpClientConfigCallback = new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(
HttpAsyncClientBuilder httpClientBuilder) {
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
.setConnectTimeout(timeOut)
.setSocketTimeout(timeOut)
.setConnectionRequestTimeout(timeOut);
httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build());
return httpClientBuilder;
}
};

SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener();
if (username != null && password != null) {
String token = "Basic " + new String(Base64.getUrlEncoder().encode((username + ":" + password).getBytes()));
Header[] tokenHeader = new Header[]{new BasicHeader("Authorization", token)};
restClientBuilder = RestClient.builder(hosts).setNodeSelector(SKIP_DEDICATED_NODES)
.setFailureListener(sniffOnFailureListener)
.setHttpClientConfigCallback(httpClientConfigCallback)
.setRequestConfigCallback(requestConfigCallback)
.setDefaultHeaders(tokenHeader);
} else {
restClientBuilder = RestClient.builder(hosts).setNodeSelector(SKIP_DEDICATED_NODES)
.setFailureListener(sniffOnFailureListener)
.setRequestConfigCallback(requestConfigCallback)
.setHttpClientConfigCallback(httpClientConfigCallback);
}

client = new RestHighLevelClient(restClientBuilder);
restClient = client.getLowLevelClient();

NodesSniffer elasticsearchNodesSniffer = new ElasticsearchNodesSniffer(
restClient,
TimeUnit.SECONDS.toMillis(5),
ElasticsearchNodesSniffer.Scheme.HTTP);

// important
private void initializeSniffer() {
sniffer = Sniffer.builder(restClient)
.setSniffIntervalMillis(snifferIntervalMillis)
.setSniffAfterFailureDelayMillis(snifferAfterFailDelayMillis)
.setNodesSniffer(elasticsearchNodesSniffer)
.setSniffIntervalMillis(SNIFF_INTERVAL_MILLIS)
.setSniffAfterFailureDelayMillis(SNIFF_AFTER_FAILURE_DELAY_MILLIS)
.setNodesSniffer(new ElasticsearchNodesSniffer(
restClient,
TimeUnit.SECONDS.toMillis(5),
ElasticsearchNodesSniffer.Scheme.HTTP))
.build();
sniffOnFailureListener.setSniffer(sniffer);
}

// important
private NodeSelector SKIP_DEDICATED_NODES = new NodeSelector() {
@Override
public void select(Iterable<Node> nodes) {
for (Iterator<Node> itr = nodes.iterator(); itr.hasNext(); ) {
Node node = itr.next();
if (node.getRoles() == null) continue;
if ((node.getRoles().isMasterEligible()
&& false == node.getRoles().isData()
&& false == node.getRoles().isIngest())
||
(node.getAttributes().containsKey("node_type")
&& node.getAttributes().get("node_type").contains("client")
&& false == node.getRoles().isData())) {
itr.remove();
}
}
}

@Override
public String toString() {
return "SKIP_DEDICATED_NODES";
}
};

public SearchResponse search(SearchRequest searchRequest) throws IOException {
SearchResponse res = this.client.search(searchRequest, RequestOptions.DEFAULT);
return res;
Expand Down Expand Up @@ -407,7 +377,7 @@ public EsRet dateHistogram(String indexName, String field, String interval, long
.extendedBounds(new LongBounds(startTime, endTime));//统计范围
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(builder).aggregation(aggregationBuilder).size(0).timeout(TimeValue.timeValueSeconds(10l));
searchSourceBuilder.query(builder).aggregation(aggregationBuilder).size(0).timeout(TimeValue.timeValueSeconds(10L));
searchRequest.source(searchSourceBuilder);
searchRequest.indices(indexName);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,5 +236,6 @@ public void existsTemplateTest() throws IOException {
IndexTemplatesExistRequest request = new IndexTemplatesExistRequest(templateName);
boolean res = client.existsTemplate(request);
System.out.println("result:" + res);
System.in.read();
}
}

0 comments on commit ba440e4

Please sign in to comment.