diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java index 21511a96b..b0a7d7b56 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/GroupServiceImpl.java @@ -78,6 +78,7 @@ public List listGroupsFromKafka(ClusterPhy clusterPhy) throws AdminOpera } props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterPhy.getBootstrapServers()); + props.put(AdminClientConfig.CLIENT_ID_CONFIG, String.format("KSPartialAdminClient||clusterPhyId=%d", clusterPhy.getId())); adminClient = KSPartialKafkaAdminClient.create(props); KSListGroupsResult listConsumerGroupsResult = adminClient.listConsumerGroups( @@ -178,6 +179,7 @@ public KSGroupDescription getGroupDescriptionFromKafka(ClusterPhy clusterPhy, St } props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterPhy.getBootstrapServers()); + props.put(AdminClientConfig.CLIENT_ID_CONFIG, String.format("KSPartialAdminClient||clusterPhyId=%d", clusterPhy.getId())); adminClient = KSPartialKafkaAdminClient.create(props); diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaAdminClient.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaAdminClient.java index 20447798c..fccf35dd4 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaAdminClient.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaAdminClient.java @@ -12,6 +12,7 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -76,10 +77,12 @@ private void closeKafkaAdminClient(Long clusterPhyId) { LOGGER.info("close kafka AdminClient starting, clusterPhyId:{}", clusterPhyId); - boolean allSuccess = this.closeAdminClientList(adminClientList); + boolean allSuccess = this.closeAdminClientList(clusterPhyId, adminClientList); if (allSuccess) { LOGGER.info("close kafka AdminClient success, clusterPhyId:{}", clusterPhyId); + } else { + LOGGER.error("close kafka AdminClient exist failed and can ignore this error, clusterPhyId:{}", clusterPhyId); } } catch (Exception e) { LOGGER.error("close kafka AdminClient failed, clusterPhyId:{}", clusterPhyId, e); @@ -116,6 +119,7 @@ private AdminClient createKafkaAdminClient(Long clusterPhyId, String bootstrapSe adminClientList = new ArrayList<>(); for (int i = 0; i < clientCnt; ++i) { + props.put(AdminClientConfig.CLIENT_ID_CONFIG, String.format("ApacheAdminClient||clusterPhyId=%d||Cnt=%d", clusterPhyId, i)); adminClientList.add(AdminClient.create(props)); } @@ -125,7 +129,7 @@ private AdminClient createKafkaAdminClient(Long clusterPhyId, String bootstrapSe } catch (Exception e) { LOGGER.error("create kafka AdminClient failed, clusterPhyId:{} props:{}", clusterPhyId, props, e); - this.closeAdminClientList(adminClientList); + this.closeAdminClientList(clusterPhyId, adminClientList); } finally { modifyClientMapLock.unlock(); } @@ -133,7 +137,7 @@ private AdminClient createKafkaAdminClient(Long clusterPhyId, String bootstrapSe return KAFKA_ADMIN_CLIENT_MAP.get(clusterPhyId).get((int)(System.currentTimeMillis() % clientCnt)); } - private boolean closeAdminClientList(List adminClientList) { + private boolean closeAdminClientList(Long clusterPhyId, List adminClientList) { if (adminClientList == null) { return true; } @@ -141,9 +145,11 @@ private boolean closeAdminClientList(List adminClientList) { boolean allSuccess = true; for (AdminClient adminClient: adminClientList) { try { - adminClient.close(); + // 关闭客户端,超时时间为30秒 + adminClient.close(Duration.ofSeconds(30)); } catch (Exception e) { // ignore + LOGGER.error("close kafka AdminClient exist failed, clusterPhyId:{}", clusterPhyId, e); allSuccess = false; } }