From 37db60a63427ca0fbaf0ac1a20dc000f4f54dec7 Mon Sep 17 00:00:00 2001 From: wtt <30461027+wtt40122@users.noreply.github.com> Date: Tue, 20 Feb 2024 16:36:15 +0800 Subject: [PATCH] refactor: log code optimization update (#290) --- .../mone/log/agent/channel/ChannelEngine.java | 41 ++-- .../log/manager/common/utils/ManagerUtil.java | 6 + .../xiaomi/mone/log/manager/domain/Tpc.java | 6 +- .../service/impl/LogSpaceServiceImpl.java | 2 +- .../impl/MilogLogSearchSaveServiceImpl.java | 5 +- .../mone/log/stream/plugin/es/EsPlugin.java | 185 ++++++++---------- .../xiaomi/mone/log/stream/EsPluginTest.java | 41 ---- 7 files changed, 117 insertions(+), 169 deletions(-) delete mode 100644 ozhera-log/log-stream/src/test/java/com/xiaomi/mone/log/stream/EsPluginTest.java diff --git a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/ChannelEngine.java b/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/ChannelEngine.java index d2bdb611b..fa9d84cbb 100644 --- a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/ChannelEngine.java +++ b/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/ChannelEngine.java @@ -110,13 +110,15 @@ public void init() { fileMonitorListener = new DefaultFileMonitorListener(); log.info("query channelDefineList:{}", gson.toJson(channelDefineList)); - channelServiceList = channelDefineList.stream().map(channelDefine -> { - ChannelService channelService = this.channelServiceTrans(channelDefine); - if (null == channelService) { - failedChannelId.add(channelDefine.getChannelId()); - } - return channelService; - }).filter(Objects::nonNull).collect(Collectors.toList()); + channelServiceList = channelDefineList.stream() + .filter(channelDefine -> filterCollStart(channelDefine.getAppName())) + .map(channelDefine -> { + ChannelService channelService = this.channelServiceTrans(channelDefine); + if (null == channelService) { + failedChannelId.add(channelDefine.getChannelId()); + } + return channelService; + }).filter(Objects::nonNull).collect(Collectors.toList()); // Delete failed channel deleteFailedChannel(failedChannelId, this.channelDefineList, this.channelServiceList); channelServiceList = new CopyOnWriteArrayList<>(channelServiceList); @@ -533,13 +535,16 @@ private List intersection(List origin, List definesIncrement) { List failedChannelId = Lists.newArrayList(); - List channelServices = definesIncrement.stream().filter(Objects::nonNull).map(channelDefine -> { - ChannelService channelService = channelServiceTrans(channelDefine); - if (null == channelService) { - failedChannelId.add(channelDefine.getChannelId()); - } - return channelService; - }).filter(Objects::nonNull).collect(Collectors.toList()); + List channelServices = definesIncrement.stream() + .filter(Objects::nonNull) + .filter(channelDefine -> filterCollStart(channelDefine.getAppName())) + .map(channelDefine -> { + ChannelService channelService = channelServiceTrans(channelDefine); + if (null == channelService) { + failedChannelId.add(channelDefine.getChannelId()); + } + return channelService; + }).filter(Objects::nonNull).collect(Collectors.toList()); deleteFailedChannel(failedChannelId, definesIncrement, channelServices); List successChannelIds = channelStart(channelServices); if (CollectionUtils.isNotEmpty(successChannelIds)) { @@ -549,6 +554,14 @@ public void initIncrement(List definesIncrement) { log.info("[add config] after current channelDefineList:{},channelServiceList:{}", gson.toJson(this.channelDefineList), gson.toJson(gson.toJson(channelServiceList.stream().map(ChannelService::instanceId).collect(Collectors.toList())))); } + private boolean filterCollStart(String appName) { + String serviceName = System.getenv("K8S_SERVICE"); + if (StringUtils.isNotEmpty(serviceName) && StringUtils.isNotEmpty(appName)) { + return serviceName.contains(appName); + } + return true; + } + /** * Send collection progress. diff --git a/ozhera-log/log-manager/src/main/java/com/xiaomi/mone/log/manager/common/utils/ManagerUtil.java b/ozhera-log/log-manager/src/main/java/com/xiaomi/mone/log/manager/common/utils/ManagerUtil.java index 7e5bc69ad..d97909117 100644 --- a/ozhera-log/log-manager/src/main/java/com/xiaomi/mone/log/manager/common/utils/ManagerUtil.java +++ b/ozhera-log/log-manager/src/main/java/com/xiaomi/mone/log/manager/common/utils/ManagerUtil.java @@ -31,6 +31,8 @@ public class ManagerUtil { private static final String TAIL_KEY = "tail"; + private static final String DEFAULT_SERVER_TYPE = "open"; + private ManagerUtil() { } @@ -91,6 +93,10 @@ public static void getConfigFromNanos() { * @return */ public static String getPhysicsDirectory(String logPath) { + String serverType = Config.ins().get("server.type", DEFAULT_SERVER_TYPE); + if (StringUtils.equals(DEFAULT_SERVER_TYPE, serverType)) { + return StringUtils.EMPTY; + } String[] splitPath = StringUtils.split(logPath, "/"); if (splitPath.length > 2) { return splitPath[splitPath.length - 2].trim(); diff --git a/ozhera-log/log-manager/src/main/java/com/xiaomi/mone/log/manager/domain/Tpc.java b/ozhera-log/log-manager/src/main/java/com/xiaomi/mone/log/manager/domain/Tpc.java index e8f1c5a99..1f125226d 100644 --- a/ozhera-log/log-manager/src/main/java/com/xiaomi/mone/log/manager/domain/Tpc.java +++ b/ozhera-log/log-manager/src/main/java/com/xiaomi/mone/log/manager/domain/Tpc.java @@ -51,13 +51,13 @@ public class Tpc { @Resource private MilogSpaceDao milogSpaceDao; - @Reference(interfaceClass = NodeFacade.class, group = "$tpc_dubbo_group", check = false, version = "1.0", timeout = 10000) + @Reference(interfaceClass = NodeFacade.class, group = "$tpc_dubbo_group", check = false, version = "1.0", timeout = 15000) private NodeFacade tpcService; - @Reference(interfaceClass = NodeUserFacade.class, group = "$tpc_dubbo_group", check = false, version = "1.0", timeout = 10000) + @Reference(interfaceClass = NodeUserFacade.class, group = "$tpc_dubbo_group", check = false, version = "1.0", timeout = 15000) private NodeUserFacade tpcUserService; - @Reference(interfaceClass = UserOrgFacade.class, group = "$tpc_dubbo_group", check = false, version = "1.0", timeout = 10000) + @Reference(interfaceClass = UserOrgFacade.class, group = "$tpc_dubbo_group", check = false, version = "1.0", timeout = 15000) private UserOrgFacade userOrgFacade; @Value("${tpc_node_code}") diff --git a/ozhera-log/log-manager/src/main/java/com/xiaomi/mone/log/manager/service/impl/LogSpaceServiceImpl.java b/ozhera-log/log-manager/src/main/java/com/xiaomi/mone/log/manager/service/impl/LogSpaceServiceImpl.java index b0cf3aebc..ebdc98fb4 100644 --- a/ozhera-log/log-manager/src/main/java/com/xiaomi/mone/log/manager/service/impl/LogSpaceServiceImpl.java +++ b/ozhera-log/log-manager/src/main/java/com/xiaomi/mone/log/manager/service/impl/LogSpaceServiceImpl.java @@ -182,7 +182,7 @@ public Result>> getMilogSpaces(Long tenantId) { List nodeVos = new ArrayList<>(); while (true) { - com.xiaomi.youpin.infra.rpc.Result> tpcRes = spaceAuthService.getUserPermSpace("", pageNum, Integer.MAX_VALUE); + com.xiaomi.youpin.infra.rpc.Result> tpcRes = spaceAuthService.getUserPermSpace("", pageNum, 100); if (tpcRes.getCode() != 0) { return Result.fail(CommonError.UNAUTHORIZED); diff --git a/ozhera-log/log-manager/src/main/java/com/xiaomi/mone/log/manager/service/impl/MilogLogSearchSaveServiceImpl.java b/ozhera-log/log-manager/src/main/java/com/xiaomi/mone/log/manager/service/impl/MilogLogSearchSaveServiceImpl.java index 815866319..4a36da842 100644 --- a/ozhera-log/log-manager/src/main/java/com/xiaomi/mone/log/manager/service/impl/MilogLogSearchSaveServiceImpl.java +++ b/ozhera-log/log-manager/src/main/java/com/xiaomi/mone/log/manager/service/impl/MilogLogSearchSaveServiceImpl.java @@ -44,6 +44,7 @@ import javax.annotation.Resource; import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; /** @@ -199,12 +200,12 @@ public Result defavourite(Integer sort, Long id) { } public Result> storeTree() { - List dtoList = new ArrayList<>(); + List dtoList = new CopyOnWriteArrayList<>(); int pageNum = 1; List spaceDTOList = new ArrayList<>(); while (true) { - com.xiaomi.youpin.infra.rpc.Result> userPermSpace = spaceAuthService.getUserPermSpace("", pageNum, Integer.MAX_VALUE); + com.xiaomi.youpin.infra.rpc.Result> userPermSpace = spaceAuthService.getUserPermSpace("", pageNum, 100); if (userPermSpace.getCode() != 0) { return Result.fail(CommonError.UNAUTHORIZED); diff --git a/ozhera-log/log-stream/src/main/java/com/xiaomi/mone/log/stream/plugin/es/EsPlugin.java b/ozhera-log/log-stream/src/main/java/com/xiaomi/mone/log/stream/plugin/es/EsPlugin.java index d9b0b8db1..afb0d6f83 100644 --- a/ozhera-log/log-stream/src/main/java/com/xiaomi/mone/log/stream/plugin/es/EsPlugin.java +++ b/ozhera-log/log-stream/src/main/java/com/xiaomi/mone/log/stream/plugin/es/EsPlugin.java @@ -26,17 +26,15 @@ import com.xiaomi.youpin.docean.plugin.es.EsProcessorConf; import com.xiaomi.youpin.docean.plugin.es.EsService; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.tuple.MutablePair; -import org.apache.commons.lang3.tuple.Pair; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; -import java.util.*; +import java.util.List; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; @@ -49,11 +47,10 @@ public class EsPlugin { private static ConcurrentHashMap esServiceMap = new ConcurrentHashMap<>(); - private static ConcurrentHashMap>> esProcessorMap = new ConcurrentHashMap<>(); + private static ConcurrentHashMap esProcessorMap = new ConcurrentHashMap<>(); - public static final int SINGLE_MESSAGE_BYTES_MAXIMAL = 10 * 1024 * 1024; - private static int DEFAULT_PROCESSOR_COUNT = 3; + public static final int SINGLE_MESSAGE_BYTES_MAXIMAL = 10 * 1024 * 1024; private static ReentrantLock esLock = new ReentrantLock(); @@ -67,7 +64,6 @@ public static boolean InitEsConfig() { config.setFlushInterval(Integer.parseInt(ins.get("es.flush_interval", ""))); config.setRetryNumber(Integer.parseInt(ins.get("es.retry_num", "3"))); config.setRetryInterval(Integer.parseInt(ins.get("es.retry_interval", "3"))); - DEFAULT_PROCESSOR_COUNT = Integer.parseInt(ins.get("es.processor_count", String.valueOf(DEFAULT_PROCESSOR_COUNT))); log.info("[EsPlugin.getEsProcessor] init es config:{}", config); } catch (Exception e) { log.error("[EsPlugin.InitEsConfig] init es config err:", e); @@ -81,23 +77,11 @@ public static EsProcessor getEsProcessor(StorageInfo esInfo, Consumer onFailedConsumer) { - - List> esProcessorList = esProcessorMap.get(cacheKey(esInfo)); - if (CollectionUtils.isEmpty(esProcessorList)) { - esLock.lock(); - try { + esLock.lock(); + try { + EsProcessor esProcessor = esProcessorMap.get(cacheKey(esInfo)); + if (esProcessor == null) { EsService esService = esServiceMap.get(cacheKey(esInfo)); if (esService == null) { if (StringUtils.isNotBlank(esInfo.getUser()) && StringUtils.isNotBlank(esInfo.getPwd())) { @@ -109,96 +93,81 @@ public static EsProcessor getEsProcessor(StorageInfo esInfo, EsConfig config, Co } esServiceMap.put(cacheKey(esInfo), esService); } - esProcessorList = new ArrayList<>(); - for (int i = 0; i < DEFAULT_PROCESSOR_COUNT; i++) { - EsProcessor esProcessor = buildEsProcessor(esInfo, config, onFailedConsumer, esService); - esProcessorList.add(MutablePair.of(esProcessor, 0)); - } - esProcessorMap.put(cacheKey(esInfo), esProcessorList); - } finally { - esLock.unlock(); - } - } - return getEsProcessorAverage(esProcessorMap.get(cacheKey(esInfo))); - } - - private static EsProcessor getEsProcessorAverage(List> esProcessorList) { - Collections.sort(esProcessorList, Comparator.comparingInt(Pair::getValue)); - Pair esProcessorIntegerPair = esProcessorList.get(0); - esProcessorIntegerPair.setValue(esProcessorIntegerPair.getValue() + 1); - return esProcessorIntegerPair.getKey(); - } - - private static EsProcessor buildEsProcessor(StorageInfo esInfo, EsConfig config, Consumer onFailedConsumer, EsService esService) { - EsProcessor esProcessor = esService.getEsProcessor(new EsProcessorConf(config.getBulkActions(), config.getByteSize(), config.getConcurrentRequest(), config.getFlushInterval(), - config.getRetryNumber(), config.getRetryInterval(), new BulkProcessor.Listener() { - @Override - public void beforeBulk(long executionId, BulkRequest request) { + esProcessor = esService.getEsProcessor(new EsProcessorConf(config.getBulkActions(), config.getByteSize(), config.getConcurrentRequest(), config.getFlushInterval(), + config.getRetryNumber(), config.getRetryInterval(), new BulkProcessor.Listener() { + @Override + public void beforeBulk(long executionId, BulkRequest request) { // log.info("before send to es,desc:{}", request.getDescription()); - } - - @Override - public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { - log.debug("success send to es,desc:{}", request.getDescription()); - AtomicInteger count = new AtomicInteger(); - response.spliterator().forEachRemaining(x -> { - if (x.isFailed()) { - BulkItemResponse.Failure failure = x.getFailure(); - String msg = String.format( - "Index:[%s], type:[%s], id:[%s], itemId:[%s], opt:[%s], version:[%s], errMsg:%s" - , x.getIndex() - , x.getType() - , x.getId() - , x.getItemId() - , x.getOpType().getLowercase() - , x.getVersion() - , failure.getCause().getMessage() - ); - log.error("Bulk executionId:[{}] has error messages:\t{}", executionId, msg); - count.incrementAndGet(); } - }); - log.debug("Finished handling bulk commit executionId:[{}] for {} requests with {} errors", executionId, request.numberOfActions(), count.intValue()); - } - @Override - public void afterBulk(long executionId, BulkRequest request, Throwable failure) { - log.error(String.format("fail send %s message to es,desc:%s,es addr:%s", request.numberOfActions(), request.getDescription(), esInfo.getAddr()), new RuntimeException(failure)); - Class clazz = failure.getClass(); - log.error("Bulk [{}] finished with [{}] requests of error:{}, {}, {}:-[{}]", executionId - , request.numberOfActions() - , clazz.getName() - , clazz.getSimpleName() - , clazz.getTypeName() - , clazz.getCanonicalName() - , failure.getMessage()); - MqMessageDTO MqMessageDTO = new MqMessageDTO(); - MqMessageDTO.setEsInfo(esInfo); - List compensateMqDTOS = Lists.newArrayList(); - request.requests().stream().filter(x -> x instanceof IndexRequest) - .forEach(x -> { - Map source = ((IndexRequest) x).sourceAsMap(); - log.error("Failure to handle index:[{}], type:[{}],id:[{}] data:[{}]", x.index(), x.type(), x.id(), JSON.toJSONString(source)); - MqMessageDTO.CompensateMqDTO compensateMqDTO = new MqMessageDTO.CompensateMqDTO(); - compensateMqDTO.setMsg(JSON.toJSONString(source)); - compensateMqDTO.setEsIndex(x.index()); - compensateMqDTOS.add(compensateMqDTO); + @Override + public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { + log.debug("success send to es,desc:{}", request.getDescription()); + AtomicInteger count = new AtomicInteger(); + response.spliterator().forEachRemaining(x -> { + if (x.isFailed()) { + BulkItemResponse.Failure failure = x.getFailure(); + String msg = String.format( + "Index:[%s], type:[%s], id:[%s], itemId:[%s], opt:[%s], version:[%s], errMsg:%s" + , x.getIndex() + , x.getType() + , x.getId() + , x.getItemId() + , x.getOpType().getLowercase() + , x.getVersion() + , failure.getCause().getMessage() + ); + log.error("Bulk executionId:[{}] has error messages:\t{}", executionId, msg); + count.incrementAndGet(); + } }); - //The message is sent to mq for consumption - the data cannot be larger than 10M, otherwise it cannot be written, divided into 2 parts - int length = JSON.toJSONString(compensateMqDTOS).getBytes().length; - if (length > SINGLE_MESSAGE_BYTES_MAXIMAL) { - List> splitList = ListUtil.partition(compensateMqDTOS, 2); - for (List mqDTOS : splitList) { - MqMessageDTO.setCompensateMqDTOS(mqDTOS); - onFailedConsumer.accept(MqMessageDTO); + log.debug("Finished handling bulk commit executionId:[{}] for {} requests with {} errors", executionId, request.numberOfActions(), count.intValue()); } - } else { - MqMessageDTO.setCompensateMqDTOS(compensateMqDTOS); - onFailedConsumer.accept(MqMessageDTO); - } + + @Override + public void afterBulk(long executionId, BulkRequest request, Throwable failure) { + log.error(String.format("fail send %s message to es,desc:%s,es addr:%s", request.numberOfActions(), request.getDescription(), esInfo.getAddr()), new RuntimeException(failure)); + Class clazz = failure.getClass(); + log.error("Bulk [{}] finished with [{}] requests of error:{}, {}, {}:-[{}]", executionId + , request.numberOfActions() + , clazz.getName() + , clazz.getSimpleName() + , clazz.getTypeName() + , clazz.getCanonicalName() + , failure.getMessage()); + MqMessageDTO MqMessageDTO = new MqMessageDTO(); + MqMessageDTO.setEsInfo(esInfo); + List compensateMqDTOS = Lists.newArrayList(); + request.requests().stream().filter(x -> x instanceof IndexRequest) + .forEach(x -> { + Map source = ((IndexRequest) x).sourceAsMap(); + log.error("Failure to handle index:[{}], type:[{}],id:[{}] data:[{}]", x.index(), x.type(), x.id(), JSON.toJSONString(source)); + MqMessageDTO.CompensateMqDTO compensateMqDTO = new MqMessageDTO.CompensateMqDTO(); + compensateMqDTO.setMsg(JSON.toJSONString(source)); + compensateMqDTO.setEsIndex(x.index()); + compensateMqDTOS.add(compensateMqDTO); + }); + //The message is sent to mq for consumption - the data cannot be larger than 10M, otherwise it cannot be written, divided into 2 parts + int length = JSON.toJSONString(compensateMqDTOS).getBytes().length; + if (length > SINGLE_MESSAGE_BYTES_MAXIMAL) { + List> splitList = ListUtil.partition(compensateMqDTOS, 2); + for (List mqDTOS : splitList) { + MqMessageDTO.setCompensateMqDTOS(mqDTOS); + onFailedConsumer.accept(MqMessageDTO); + } + } else { + MqMessageDTO.setCompensateMqDTOS(compensateMqDTOS); + onFailedConsumer.accept(MqMessageDTO); + } + } + })); + esProcessorMap.put(cacheKey(esInfo), esProcessor); + return esProcessor; } - })); - return esProcessor; + return esProcessor; + } finally { + esLock.unlock(); + } } private static String cacheKey(StorageInfo esInfo) { diff --git a/ozhera-log/log-stream/src/test/java/com/xiaomi/mone/log/stream/EsPluginTest.java b/ozhera-log/log-stream/src/test/java/com/xiaomi/mone/log/stream/EsPluginTest.java deleted file mode 100644 index c91caf092..000000000 --- a/ozhera-log/log-stream/src/test/java/com/xiaomi/mone/log/stream/EsPluginTest.java +++ /dev/null @@ -1,41 +0,0 @@ -package com.xiaomi.mone.log.stream; - -import com.xiaomi.mone.es.EsProcessor; -import com.xiaomi.mone.log.model.StorageInfo; -import com.xiaomi.mone.log.stream.plugin.es.EsPlugin; -import com.xiaomi.youpin.docean.Ioc; -import lombok.extern.slf4j.Slf4j; -import org.junit.Before; -import org.junit.Test; - -import static com.xiaomi.mone.log.stream.common.util.StreamUtils.getConfigFromNacos; - -/** - * @author wtt - * @version 1.0 - * @description - * @date 2024/1/30 20:09 - */ -@Slf4j -public class EsPluginTest { - - @Before - public void init() { - getConfigFromNacos(); - Ioc.ins().init("com.xiaomi.mone.log.stream", "com.xiaomi.youpin.docean"); - } - - @Test - public void getEsProcessorTest() { - Long id = 1L; - String addr = "127.0.0.1:80"; - String user = "user"; - String pwd = "pwd"; - StorageInfo esInfo = new StorageInfo(id, addr, user, pwd); - EsProcessor esProcessor = EsPlugin.getEsProcessor(esInfo, null); - - EsProcessor esProcessor1 = EsPlugin.getEsProcessor(esInfo, null); - esProcessor1 = EsPlugin.getEsProcessor(esInfo, null); - log.info("result:{}", esProcessor); - } -}