diff --git a/jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java b/jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java index 9aad4501d..f557c4d83 100644 --- a/jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java +++ b/jcommon/es/src/main/java/com/xiaomi/mone/es/EsClient.java @@ -4,7 +4,6 @@ import org.apache.http.Header; import org.apache.http.HttpHost; import org.apache.http.client.config.RequestConfig; -import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.apache.http.impl.nio.reactor.IOReactorConfig; import org.apache.http.message.BasicHeader; import org.elasticsearch.action.ActionListener; @@ -26,12 +25,14 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; -import org.elasticsearch.client.*; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.client.core.CountRequest; import org.elasticsearch.client.core.CountResponse; import org.elasticsearch.client.indices.*; import org.elasticsearch.client.sniff.ElasticsearchNodesSniffer; -import org.elasticsearch.client.sniff.NodesSniffer; import org.elasticsearch.client.sniff.SniffOnFailureListener; import org.elasticsearch.client.sniff.Sniffer; import org.elasticsearch.common.unit.TimeValue; @@ -56,182 +57,151 @@ */ public class EsClient { - private static RestClientBuilder restClientBuilder; private static Sniffer sniffer; - private static final int TIME_OUT = 10 * 60 * 1000; private static final int SNIFF_INTERVAL_MILLIS = 30 * 1000; private static final int SNIFF_AFTER_FAILURE_DELAY_MILLIS = 30 * 1000; + private static final int MAX_CONN_PER_ROUTE = 500; + private static final int MAX_CONN_TOTAL = 500; + private static final int SOCKET_TIMEOUT_MS = 10 * 60 * 1000; + private static final int CONNECTION_REQUEST_TIMEOUT_MS = 5000 * 1000; + private static final int CONNECT_TIMEOUT_MS = 5000 * 1000; + private static final long KEEP_ALIVE_DURATION_MS = TimeUnit.MINUTES.toMillis(2); private RestHighLevelClient client; private RestClient restClient; -// public EsClient(String esAddr, String user, String pwd) { -// final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); -// credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, pwd)); -// RestClientBuilder builder = RestClient.builder(new HttpHost(esAddr.split(":")[0], Integer.valueOf(esAddr.split(":")[1]), "http")) -// .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { -// @Override -// public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) { -// return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider); -// } -// }).setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() { -// // 该方法接收一个RequestConfig.Builder对象,对该对象进行修改后然后返回。 -// @Override -// public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder builder) { -// return builder.setConnectTimeout(5000 * 1000) // 连接超时(默认为1秒) -// .setSocketTimeout(6000 * 1000);// 套接字超时(默认为30秒)//更改客户端的超时限制默认30秒现在改为100*1000分钟 -// } -// });// 调整最大重试超时时间(默认为30秒).setMaxRetryTimeoutMillis(60000) -// this.client = new RestHighLevelClient(builder); -// -// } + public static boolean startedSniffer = true; + + private SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener(); public EsClient(String esAddr, String token, String catalog, String database) { - Header[] defaultHeaders = new Header[]{ + validateParams(esAddr, token, catalog, database); + + Header[] defaultHeaders = createDefaultHeaders(token, catalog, database); + + RestClientBuilder builder = createRestClientBuilder(esAddr, defaultHeaders); + + initializeHighLevelClient(builder); + + initializeSnifferIfNeeded(); + } + + private void validateParams(String esAddr, String token, String catalog, String database) { + if (esAddr == null || esAddr.isEmpty() || token == null || token.isEmpty() || catalog == null || catalog.isEmpty() || database == null || database.isEmpty()) { + throw new IllegalArgumentException("Invalid parameters provided"); + } + } + + private Header[] createDefaultHeaders(String token, String catalog, String database) { + return new Header[]{ new BasicHeader("Authorization", token), new BasicHeader("catalog", catalog), new BasicHeader("database", database) }; + } + + private void initializeSnifferIfNeeded() { + if (startedSniffer) { + initializeSniffer(); + } + } + + private RestClientBuilder createRestClientBuilder(String esAddr, Header[] defaultHeaders) { + String[] esAddrParts = esAddr.split(":"); + if (esAddrParts.length != 2) { + throw new IllegalArgumentException("Invalid Elasticsearch address"); + } + + String host = esAddrParts[0]; + int port = Integer.parseInt(esAddrParts[1]); - RestClientBuilder builder = RestClient.builder(new HttpHost(esAddr.split(":")[0], Integer.parseInt(esAddr.split(":")[1]), "http")) + return RestClient.builder(new HttpHost(host, port, "http")) .setDefaultHeaders(defaultHeaders) - .setHttpClientConfigCallback(x -> x.setMaxConnPerRoute(500) - .setMaxConnTotal(500) - .setDefaultRequestConfig(RequestConfig.custom().setSocketTimeout(10 * 60 * 1000) - .setConnectionRequestTimeout(5000 * 1000) - .setConnectTimeout(5000 * 1000) + .setFailureListener(sniffOnFailureListener) + .setHttpClientConfigCallback(x -> x.setMaxConnPerRoute(MAX_CONN_PER_ROUTE) + .setMaxConnTotal(MAX_CONN_TOTAL) + .setDefaultRequestConfig(RequestConfig.custom() + .setSocketTimeout(SOCKET_TIMEOUT_MS) + .setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MS) + .setConnectTimeout(CONNECT_TIMEOUT_MS) .build()) - .setKeepAliveStrategy((response, context) -> TimeUnit.MINUTES.toMillis(2)) + .setKeepAliveStrategy((response, context) -> KEEP_ALIVE_DURATION_MS) .setDefaultIOReactorConfig(IOReactorConfig.custom().setSoKeepAlive(true).build())); - this.client = new RestHighLevelClient(builder); } + public EsClient(String esAddr, String user, String pwd) { + validateParams(esAddr, user, pwd); + + List hosts = createHttpHosts(esAddr); + + Header[] headers = createHeaders(user, pwd); + RestClientBuilder clientBuilder = createRestClientBuilder(hosts, headers); + + initializeHighLevelClient(clientBuilder); + + initializeSnifferIfNeeded(); + } + + private void validateParams(String esAddr, String user, String pwd) { + if (esAddr == null || esAddr.isEmpty() || user == null || user.isEmpty() || pwd == null || pwd.isEmpty()) { + throw new IllegalArgumentException("Invalid parameters provided"); + } + } + + private List createHttpHosts(String esAddr) { String[] addrs = esAddr.split(","); List hosts = new ArrayList<>(); for (String addr : addrs) { String[] hostAndPort = addr.split(":"); + String host = hostAndPort[0]; int port = Integer.parseInt(hostAndPort[1]); - HttpHost host = new HttpHost(hostAndPort[0], port); - hosts.add(host); + hosts.add(new HttpHost(host, port)); } + return hosts; + } + private Header[] createHeaders(String user, String pwd) { String urlEncodePassword = new String(Base64.getUrlEncoder().encode(String.format("%s:%s", user, pwd).getBytes())); String basicAuth = String.format("Basic %s", urlEncodePassword); - Header[] headers = new Header[]{new BasicHeader("Authorization", basicAuth), new BasicHeader("Content-Type", "application/json")}; + return new Header[]{new BasicHeader("Authorization", basicAuth), new BasicHeader("Content-Type", "application/json")}; + } - RestClientBuilder clientBuilder = RestClient.builder(hosts.toArray(new HttpHost[0])) + private RestClientBuilder createRestClientBuilder(List hosts, Header[] headers) { + return RestClient.builder(hosts.toArray(new HttpHost[0])) .setDefaultHeaders(headers) - .setHttpClientConfigCallback(x -> x.setMaxConnPerRoute(500) - .setMaxConnTotal(500) - .setDefaultRequestConfig(RequestConfig.custom().setSocketTimeout(10 * 60 * 1000) - .setConnectionRequestTimeout(5000 * 1000) - .setConnectTimeout(5000 * 1000).build()) - .setKeepAliveStrategy((response, context) -> TimeUnit.MINUTES.toMillis(2)) + .setFailureListener(sniffOnFailureListener) + .setFailureListener(sniffOnFailureListener) + .setHttpClientConfigCallback(x -> x.setMaxConnPerRoute(MAX_CONN_PER_ROUTE) + .setMaxConnTotal(MAX_CONN_TOTAL) + .setDefaultRequestConfig(RequestConfig.custom() + .setSocketTimeout(SOCKET_TIMEOUT_MS) + .setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT_MS) + .setConnectTimeout(CONNECT_TIMEOUT_MS) + .build()) + .setKeepAliveStrategy((response, context) -> KEEP_ALIVE_DURATION_MS) .setDefaultIOReactorConfig(IOReactorConfig.custom().setSoKeepAlive(true).build())); - this.client = new RestHighLevelClient(clientBuilder); - } - public EsClient(List restAddress, int httpPort, String username, String password, int timeOut, int snifferIntervalMillis, int snifferAfterFailDelayMillis) throws IOException { - snifferNodeInit(restAddress, httpPort, username, password, timeOut, snifferIntervalMillis, snifferAfterFailDelayMillis); - } - - public EsClient(List restAddress, int httpPort, String username, String password) throws IOException { - snifferNodeInit(restAddress, httpPort, username, password, TIME_OUT, SNIFF_INTERVAL_MILLIS, SNIFF_AFTER_FAILURE_DELAY_MILLIS); + private void initializeHighLevelClient(RestClientBuilder clientBuilder) { + this.client = new RestHighLevelClient(clientBuilder); + this.restClient = client.getLowLevelClient(); } - private void snifferNodeInit(List restAddress, int httpPort, String username, String password, int timeOut, int snifferIntervalMillis, int snifferAfterFailDelayMillis) throws IOException { - - - HttpHost[] hosts = new HttpHost[restAddress.size()]; - for (int index = 0; index < restAddress.size(); index++) { - hosts[index] = new HttpHost(restAddress.get(index), httpPort, "http"); - } - - RestClientBuilder.RequestConfigCallback requestConfigCallback = new RestClientBuilder.RequestConfigCallback() { - @Override - public RequestConfig.Builder customizeRequestConfig( - RequestConfig.Builder requestConfigBuilder) { - return requestConfigBuilder - .setConnectTimeout(timeOut) - .setSocketTimeout(timeOut); - } - }; - - RestClientBuilder.HttpClientConfigCallback httpClientConfigCallback = new RestClientBuilder.HttpClientConfigCallback() { - @Override - public HttpAsyncClientBuilder customizeHttpClient( - HttpAsyncClientBuilder httpClientBuilder) { - RequestConfig.Builder requestConfigBuilder = RequestConfig.custom() - .setConnectTimeout(timeOut) - .setSocketTimeout(timeOut) - .setConnectionRequestTimeout(timeOut); - httpClientBuilder.setDefaultRequestConfig(requestConfigBuilder.build()); - return httpClientBuilder; - } - }; - - SniffOnFailureListener sniffOnFailureListener = new SniffOnFailureListener(); - if (username != null && password != null) { - String token = "Basic " + new String(Base64.getUrlEncoder().encode((username + ":" + password).getBytes())); - Header[] tokenHeader = new Header[]{new BasicHeader("Authorization", token)}; - restClientBuilder = RestClient.builder(hosts).setNodeSelector(SKIP_DEDICATED_NODES) - .setFailureListener(sniffOnFailureListener) - .setHttpClientConfigCallback(httpClientConfigCallback) - .setRequestConfigCallback(requestConfigCallback) - .setDefaultHeaders(tokenHeader); - } else { - restClientBuilder = RestClient.builder(hosts).setNodeSelector(SKIP_DEDICATED_NODES) - .setFailureListener(sniffOnFailureListener) - .setRequestConfigCallback(requestConfigCallback) - .setHttpClientConfigCallback(httpClientConfigCallback); - } - - client = new RestHighLevelClient(restClientBuilder); - restClient = client.getLowLevelClient(); - - NodesSniffer elasticsearchNodesSniffer = new ElasticsearchNodesSniffer( - restClient, - TimeUnit.SECONDS.toMillis(5), - ElasticsearchNodesSniffer.Scheme.HTTP); - - // important + private void initializeSniffer() { sniffer = Sniffer.builder(restClient) - .setSniffIntervalMillis(snifferIntervalMillis) - .setSniffAfterFailureDelayMillis(snifferAfterFailDelayMillis) - .setNodesSniffer(elasticsearchNodesSniffer) + .setSniffIntervalMillis(SNIFF_INTERVAL_MILLIS) + .setSniffAfterFailureDelayMillis(SNIFF_AFTER_FAILURE_DELAY_MILLIS) + .setNodesSniffer(new ElasticsearchNodesSniffer( + restClient, + TimeUnit.SECONDS.toMillis(5), + ElasticsearchNodesSniffer.Scheme.HTTP)) .build(); sniffOnFailureListener.setSniffer(sniffer); } - // important - private NodeSelector SKIP_DEDICATED_NODES = new NodeSelector() { - @Override - public void select(Iterable nodes) { - for (Iterator itr = nodes.iterator(); itr.hasNext(); ) { - Node node = itr.next(); - if (node.getRoles() == null) continue; - if ((node.getRoles().isMasterEligible() - && false == node.getRoles().isData() - && false == node.getRoles().isIngest()) - || - (node.getAttributes().containsKey("node_type") - && node.getAttributes().get("node_type").contains("client") - && false == node.getRoles().isData())) { - itr.remove(); - } - } - } - - @Override - public String toString() { - return "SKIP_DEDICATED_NODES"; - } - }; - public SearchResponse search(SearchRequest searchRequest) throws IOException { SearchResponse res = this.client.search(searchRequest, RequestOptions.DEFAULT); return res; @@ -407,7 +377,7 @@ public EsRet dateHistogram(String indexName, String field, String interval, long .extendedBounds(new LongBounds(startTime, endTime));//统计范围 SearchRequest searchRequest = new SearchRequest(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(builder).aggregation(aggregationBuilder).size(0).timeout(TimeValue.timeValueSeconds(10l)); + searchSourceBuilder.query(builder).aggregation(aggregationBuilder).size(0).timeout(TimeValue.timeValueSeconds(10L)); searchRequest.source(searchSourceBuilder); searchRequest.indices(indexName); SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);