diff --git a/ozhera-log/log-agent-server/pom.xml b/ozhera-log/log-agent-server/pom.xml
index 773cace5d..565476889 100644
--- a/ozhera-log/log-agent-server/pom.xml
+++ b/ozhera-log/log-agent-server/pom.xml
@@ -3,9 +3,9 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ org.apache.ozhera
ozhera-log
- run.mone
- 1.4.0-jdk21
+ 2.0.0-SNAPSHOT
4.0.0
@@ -153,7 +153,7 @@
false
- com.xiaomi.mone.log.server.LogAgentServerBootstrap
+ org.apache.ozhera.log.server.LogAgentServerBootstrap
diff --git a/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/LogAgentServerBootstrap.java b/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/LogAgentServerBootstrap.java
deleted file mode 100644
index 00a82c7d4..000000000
--- a/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/LogAgentServerBootstrap.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xiaomi.mone.log.server;
-
-import com.google.common.collect.Lists;
-import com.xiaomi.data.push.rpc.RpcCmd;
-import com.xiaomi.data.push.rpc.RpcServer;
-import com.xiaomi.data.push.rpc.common.Pair;
-import com.xiaomi.mone.log.common.Config;
-import com.xiaomi.mone.log.common.Constant;
-import com.xiaomi.mone.log.server.porcessor.AgentCollectProgressProcessor;
-import com.xiaomi.mone.log.server.porcessor.AgentConfigProcessor;
-import com.xiaomi.mone.log.server.porcessor.PingProcessor;
-import com.xiaomi.youpin.docean.Ioc;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-
-import static com.xiaomi.mone.log.server.common.ServerConstant.SERVER_PORT;
-
-/**
- * @author wtt
- * @version 1.0
- * @description
- * @date 2022/12/5 11:24
- */
-@Slf4j
-public class LogAgentServerBootstrap {
-
- public static void main(String[] args) throws IOException {
- String nacosAddr = Config.ins().get("nacosAddr", "");
- String serverName = Config.ins().get("serverName", "");
- log.info("nacos:{} name:{}", nacosAddr, serverName);
- RpcServer rpcServer = new RpcServer(nacosAddr, serverName);
- rpcServer.setListenPort(SERVER_PORT);
- //Register the processor
- rpcServer.setProcessorList(Lists.newArrayList(
- new Pair<>(RpcCmd.pingReq, new PingProcessor()),
- new Pair<>(Constant.RPCCMD_AGENT_CODE, new AgentCollectProgressProcessor()),
- new Pair<>(Constant.RPCCMD_AGENT_CONFIG_CODE, new AgentConfigProcessor())
- ));
- rpcServer.init();
- rpcServer.start();
-
- Ioc.ins().putBean(rpcServer);
- Ioc.ins().init("com.xiaomi.mone", "com.xiaomi.youpin");
- log.info("log server start finish");
- }
-}
diff --git a/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/common/ServerConstant.java b/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/common/ServerConstant.java
deleted file mode 100644
index 8b6070e77..000000000
--- a/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/common/ServerConstant.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xiaomi.mone.log.server.common;
-
-/**
- * @author wtt
- * @version 1.0
- * @description
- * @date 2023/10/9 19:34
- */
-public class ServerConstant {
- public static final Integer SERVER_PORT = 9899;
-}
diff --git a/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/common/Version.java b/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/common/Version.java
deleted file mode 100644
index 841e59f45..000000000
--- a/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/common/Version.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xiaomi.mone.log.server.common;
-
-import java.io.Serializable;
-
-/**
- * @author wangtao
- */
-public class Version implements Serializable {
-
-
- @Override
- public String toString() {
- return "log-agent-server:2022-12-05:0.0.2";
- }
-}
diff --git a/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/porcessor/AgentCollectProgressProcessor.java b/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/porcessor/AgentCollectProgressProcessor.java
deleted file mode 100644
index bc74b354f..000000000
--- a/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/porcessor/AgentCollectProgressProcessor.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xiaomi.mone.log.server.porcessor;
-
-import com.xiaomi.data.push.rpc.netty.NettyRequestProcessor;
-import com.xiaomi.data.push.rpc.protocol.RemotingCommand;
-import com.xiaomi.mone.log.api.model.vo.UpdateLogProcessCmd;
-import com.xiaomi.mone.log.common.Constant;
-import com.xiaomi.mone.log.server.common.Version;
-import com.xiaomi.mone.log.server.service.DefaultLogProcessCollector;
-import com.xiaomi.youpin.docean.Ioc;
-import com.xiaomi.youpin.docean.anno.Component;
-import io.netty.channel.ChannelHandlerContext;
-import lombok.extern.slf4j.Slf4j;
-
-import javax.annotation.Resource;
-import java.nio.charset.StandardCharsets;
-
-import static com.xiaomi.mone.log.common.Constant.GSON;
-
-/**
- * @author wtt
- * @version 1.0
- * @description The receiver that communicates with the agent ---- the acquisition progress
- * @date 2021/8/19 15:32
- */
-@Slf4j
-@Component
-public class AgentCollectProgressProcessor implements NettyRequestProcessor {
-
- @Resource
- DefaultLogProcessCollector processService;
-
- private static Version version = new Version();
-
- @Override
- public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
- log.debug("received a message from the agent");
- RemotingCommand response = RemotingCommand.createResponseCommand(Constant.RPCCMD_AGENT_CODE);
- String body = new String(request.getBody(), StandardCharsets.UTF_8);
- UpdateLogProcessCmd cmd = GSON.fromJson(body, UpdateLogProcessCmd.class);
- log.debug("a request from the client sent by the agent:{}", cmd.getIp());
- if (null == processService && Ioc.ins().containsBean(DefaultLogProcessCollector.class.getCanonicalName())) {
- processService = Ioc.ins().getBean(DefaultLogProcessCollector.class);
- }
- if (null != processService) {
- processService.collectLogProcess(cmd);
- }
- response.setBody(version.toString().getBytes());
- response.setBody(Constant.SUCCESS_MESSAGE.getBytes());
- return response;
- }
-
- @Override
- public boolean rejectRequest() {
- return false;
- }
-}
diff --git a/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/porcessor/AgentConfigProcessor.java b/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/porcessor/AgentConfigProcessor.java
deleted file mode 100644
index 6b4e41d69..000000000
--- a/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/porcessor/AgentConfigProcessor.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.xiaomi.mone.log.server.porcessor;
-
-import com.xiaomi.data.push.rpc.netty.NettyRequestProcessor;
-import com.xiaomi.data.push.rpc.protocol.RemotingCommand;
-import com.xiaomi.mone.log.api.model.meta.LogCollectMeta;
-import com.xiaomi.mone.log.api.service.AgentConfigService;
-import com.xiaomi.mone.log.common.Constant;
-import com.xiaomi.mone.log.server.service.DefaultAgentConfigAcquirer;
-import com.xiaomi.youpin.docean.Ioc;
-import io.netty.channel.ChannelHandlerContext;
-import lombok.extern.slf4j.Slf4j;
-
-import static com.xiaomi.mone.log.common.Constant.GSON;
-
-/**
- * @author wtt
- * @version 1.0
- * @description The receiver that communicates with the agent ---- the agent starts to get the configuration
- * @date 2021/8/19 15:32
- */
-@Slf4j
-public class AgentConfigProcessor implements NettyRequestProcessor {
-
- @Override
- public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
- RemotingCommand response = RemotingCommand.createResponseCommand(Constant.RPCCMD_AGENT_CONFIG_CODE);
- String ip = new String(request.getBody());
- log.info("agent start get metadata config,agent ip:{}", ip);
-
- AgentConfigService agentConfigService = Ioc.ins().getBean(DefaultAgentConfigAcquirer.class);
-
- LogCollectMeta logCollectMeta = agentConfigService.getLogCollectMetaFromManager(ip);
- String responseInfo = GSON.toJson(logCollectMeta);
- log.info("agent start get metadata config info:{}", responseInfo);
- response.setBody(responseInfo.getBytes());
- return response;
- }
-
- @Override
- public boolean rejectRequest() {
- return false;
- }
-}
diff --git a/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/porcessor/PingProcessor.java b/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/porcessor/PingProcessor.java
deleted file mode 100644
index 0edc8af9d..000000000
--- a/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/porcessor/PingProcessor.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xiaomi.mone.log.server.porcessor;
-
-import com.xiaomi.data.push.context.AgentContext;
-import com.xiaomi.data.push.rpc.RpcCmd;
-import com.xiaomi.data.push.rpc.common.RemotingHelper;
-import com.xiaomi.data.push.rpc.netty.AgentChannel;
-import com.xiaomi.data.push.rpc.netty.NettyRequestProcessor;
-import com.xiaomi.data.push.rpc.protocol.RemotingCommand;
-import com.xiaomi.mone.log.api.model.meta.AppLogMeta;
-import com.xiaomi.mone.log.api.model.vo.PingReq;
-import com.xiaomi.mone.log.server.common.Version;
-import com.xiaomi.mone.log.utils.NetUtil;
-import io.netty.channel.ChannelHandlerContext;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static com.xiaomi.mone.log.common.Constant.GSON;
-import static com.xiaomi.mone.log.server.common.ServerConstant.SERVER_PORT;
-
-/**
- * @Author goodjava@qq.com
- * @Date 2021/6/24 11:38
- */
-@Slf4j
-public class PingProcessor implements NettyRequestProcessor {
-
- public static Map agentHeartTimeStampMap = new ConcurrentHashMap<>(1024);
-
- private static Version version = new Version();
-
- private DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss SSS");
-
- @Override
- public RemotingCommand processRequest(ChannelHandlerContext channelHandlerContext, RemotingCommand remotingCommand) {
- final String remoteAddress = RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel());
- RemotingCommand response = RemotingCommand.createResponseCommand(RpcCmd.pingRes);
- String body = new String(remotingCommand.getBody());
- PingReq pr = GSON.fromJson(body, PingReq.class);
-
- AgentChannel ch = AgentContext.ins().map.get(remoteAddress);
- if (null != ch) {
- ch.setIp(pr.getIp());
- }
- String requestBody = String.format("%s->%s->%s:%s->%s", version.toString(), dateTimeFormatter.format(LocalDateTime.now()), NetUtil.getLocalIp(), SERVER_PORT, remoteAddress);
- response.setBody(requestBody.getBytes());
- if (null != pr && StringUtils.isNotBlank(pr.getIp())) {
- agentHeartTimeStampMap.put(pr.getIp(), Instant.now().toEpochMilli());
- }
-
- if (pr.getMessage().equals("load")) {
- AppLogMeta meta = new AppLogMeta();
- meta.setAppName("log-manager");
- meta.setAppId(ThreadLocalRandom.current().nextLong());
- response.setBody(GSON.toJson(meta).getBytes());
- }
-
- return response;
- }
-
- @Override
- public boolean rejectRequest() {
- return false;
- }
-}
diff --git a/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/service/ApplicationShutdownHook.java b/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/service/ApplicationShutdownHook.java
deleted file mode 100644
index 6bf2d1b83..000000000
--- a/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/service/ApplicationShutdownHook.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xiaomi.mone.log.server.service;
-
-import com.xiaomi.data.push.rpc.RpcServer;
-import com.xiaomi.youpin.docean.anno.Component;
-import lombok.extern.slf4j.Slf4j;
-
-import javax.annotation.Resource;
-
-/**
- * @author wtt
- * @version 1.0
- * @description
- * @date 2023/10/13 14:11
- */
-@Component
-@Slf4j
-public class ApplicationShutdownHook {
-
- @Resource
- private RpcServer rpcServer;
-
- public void init() {
- addRuntimeShutdownHook();
- }
-
- /**
- * addRuntimeShutdownHook server deregisterInstance
- */
- private void addRuntimeShutdownHook() {
- Runtime.getRuntime().addShutdownHook(new Thread(() -> rpcServer.shutdown()));
- }
-
-}
diff --git a/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/service/DefaultAgentConfigAcquirer.java b/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/service/DefaultAgentConfigAcquirer.java
deleted file mode 100644
index cde3e56aa..000000000
--- a/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/service/DefaultAgentConfigAcquirer.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.xiaomi.mone.log.server.service;
-
-import com.xiaomi.mone.log.api.model.meta.LogCollectMeta;
-import com.xiaomi.mone.log.api.service.AgentConfigService;
-import com.xiaomi.youpin.docean.anno.Component;
-import com.xiaomi.youpin.docean.plugin.dubbo.anno.Reference;
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * @author wtt
- * @version 1.0
- * @description Obtain configuration from dashboard through dubbo interface
- * @date 2022/12/6 14:30
- */
-@Component
-@Slf4j
-public class DefaultAgentConfigAcquirer implements AgentConfigService {
-
- @Reference(interfaceClass = AgentConfigService.class, group = "$dubbo.group", check = false, timeout = 10000)
- private AgentConfigService agentConfigService;
-
- @Override
- public LogCollectMeta getLogCollectMetaFromManager(String ip) {
- LogCollectMeta logCollectMeta = new LogCollectMeta();
- try {
- logCollectMeta = agentConfigService.getLogCollectMetaFromManager(ip);
- } catch (Exception e) {
- log.error("getLogCollectMetaFromManager error,ip:{}", ip, e);
- }
- return logCollectMeta;
- }
-}
-
-
diff --git a/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/service/DefaultLogProcessCollector.java b/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/service/DefaultLogProcessCollector.java
deleted file mode 100644
index c6a98e953..000000000
--- a/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/service/DefaultLogProcessCollector.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xiaomi.mone.log.server.service;
-
-import cn.hutool.core.util.NumberUtil;
-import com.google.common.collect.Lists;
-import com.xiaomi.mone.log.api.model.vo.AgentLogProcessDTO;
-import com.xiaomi.mone.log.api.model.vo.TailLogProcessDTO;
-import com.xiaomi.mone.log.api.model.vo.UpdateLogProcessCmd;
-import com.xiaomi.mone.log.api.service.LogProcessCollector;
-import com.xiaomi.youpin.docean.anno.Component;
-import com.xiaomi.youpin.docean.plugin.dubbo.anno.Service;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-
-import java.time.Instant;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static com.xiaomi.mone.log.common.Constant.GSON;
-
-/**
- * @author wtt
- * @version 1.0
- * @description
- * @date 2022/12/6 14:32
- */
-@Slf4j
-@Component
-@Service(interfaceClass = LogProcessCollector.class, group = "$dubbo.group", timeout = 10000)
-public class DefaultLogProcessCollector implements LogProcessCollector {
-
- private final Map> tailProgressMap = new ConcurrentHashMap<>(256);
-
- private static final Integer MAX_INTERRUPT_TIME = 10;
-
- private static final Integer MAX_STATIC_INTERRUPT_TIME_HOUR = 4;
-
- private static final String PROCESS_SEPARATOR = "%";
-
- @Override
- public void collectLogProcess(UpdateLogProcessCmd cmd) {
- log.debug("[LogProcess.updateLogProcess] cmd:{} ", cmd);
- if (cmd == null || StringUtils.isEmpty(cmd.getIp())) {
- return;
- }
- tailProgressMap.put(cmd.getIp(), cmd.getCollectList());
- }
-
- @Override
- public List getTailLogProcess(Long tailId, String tailName, String targetIp) {
- if (null == tailId || StringUtils.isBlank(tailName)) {
- return new ArrayList<>();
- }
- List dtoList = tailProgressMap.values().stream()
- .flatMap(Collection::stream)
- .filter(collectDetail -> Objects.equals(tailId.toString(), collectDetail.getTailId()))
- .flatMap(collectDetail -> collectDetail.getFileProgressDetails().stream())
- .map(fileProgressDetail -> TailLogProcessDTO.builder()
- .tailName(tailName)
- .collectTime(fileProgressDetail.getCollectTime())
- .collectPercentage(fileProgressDetail.getCollectPercentage())
- .ip(fileProgressDetail.getConfigIp())
- .path(fileProgressDetail.getPattern())
- .fileRowNumber(fileProgressDetail.getFileRowNumber()).build())
- .filter(processDTO -> StringUtils.isNotBlank(processDTO.getIp()))
- .collect(Collectors.toList());
- if (StringUtils.isNotBlank(targetIp)) {
- dtoList = dtoList.stream().filter(processDTO -> Objects.equals(targetIp, processDTO.getIp())).collect(Collectors.toList());
- }
- List perOneIpProgressList = Lists.newArrayList();
- try {
- perOneIpProgressList = getTailLogProcessDTOS(dtoList, perOneIpProgressList);
- perOneIpProgressList = filterExpireTimePath(perOneIpProgressList);
- } catch (Exception e) {
- log.error("getTailLogProcess error,dtoList:{}", GSON.toJson(dtoList), e);
- }
- return perOneIpProgressList;
- }
-
- @Override
- public List getAgentLogProcess(String ip) {
- List dtoList = Lists.newArrayList();
- if (StringUtils.isEmpty(ip) || tailProgressMap.isEmpty()) {
- return dtoList;
- }
- List collect = tailProgressMap.values().stream()
- .flatMap(Collection::stream)
- .collect(Collectors.toList());
- collect.stream().forEach(collectDetail -> {
- try {
- String appName = collectDetail.getAppName();
- if (CollectionUtils.isNotEmpty(collectDetail.getFileProgressDetails())) {
- dtoList.addAll(collectDetail.getFileProgressDetails().stream()
- .filter(processDTO -> StringUtils.isNotBlank(processDTO.getConfigIp()))
- .filter(processDTO -> Objects.equals(ip, processDTO.getConfigIp()))
- .map(fileProgressDetail -> {
- AgentLogProcessDTO agentLogProcessDTO = new AgentLogProcessDTO();
- agentLogProcessDTO.setPath(fileProgressDetail.getPattern());
- agentLogProcessDTO.setFileRowNumber(fileProgressDetail.getFileRowNumber());
- agentLogProcessDTO.setPointer(fileProgressDetail.getPointer());
- agentLogProcessDTO.setFileMaxPointer(fileProgressDetail.getFileMaxPointer());
- agentLogProcessDTO.setAppName(appName);
- agentLogProcessDTO.setCollectPercentage(fileProgressDetail.getCollectPercentage());
- agentLogProcessDTO.setCollectTime(fileProgressDetail.getCollectTime());
- return agentLogProcessDTO;
- }).collect(Collectors.toList()));
- }
- } catch (Exception e) {
- log.error("getAgentLogProcess error,ip:{},CollectDetail:{}", ip, GSON.toJson(collectDetail), e);
- }
- });
- return dtoList;
- }
-
- @Override
- public List getColProcessImperfect(Double progressRation) {
- List resultList = Lists.newArrayList();
- if (null == progressRation || tailProgressMap.isEmpty()) {
- return resultList;
- }
- resultList = tailProgressMap.values().stream().flatMap(Collection::stream)
- .map(collectDetail -> {
- List fileProgressDetails = collectDetail.getFileProgressDetails();
- if (CollectionUtils.isNotEmpty(fileProgressDetails)) {
- List progressDetails = fileProgressDetails.stream()
- .filter(fileProgressDetail -> lessThenRation(fileProgressDetail.getCollectPercentage(), progressRation))
- .filter(tailLogProcessDTO -> null != tailLogProcessDTO.getCollectTime() &&
- Instant.now().toEpochMilli() - tailLogProcessDTO.getCollectTime() < TimeUnit.HOURS.toMillis(MAX_STATIC_INTERRUPT_TIME_HOUR))
- .collect(Collectors.toList());
- collectDetail.setFileProgressDetails(progressDetails);
- }
- return collectDetail;
- })
- .filter(collectDetail -> CollectionUtils.isNotEmpty(collectDetail.getFileProgressDetails()))
- .collect(Collectors.toList());
- return resultList;
- }
-
- @Override
- public List getFileProcessDetailByTail(Long tailId) {
- List resultList = new ArrayList<>();
- if (tailId == null) {
- return resultList;
- }
- try {
- for (List details : tailProgressMap.values()) {
- for (UpdateLogProcessCmd.CollectDetail detail : details) {
- if (String.valueOf(tailId).equals(detail.getTailId())) {
- resultList.addAll(detail.getFileProgressDetails());
- }
- }
- }
- } catch (Throwable t) {
- log.error("getFileProcessDetailByTail error : ", t);
- }
- return resultList;
- }
-
- @Override
- public List getAllCollectDetail(String ip) {
- return tailProgressMap.get(ip);
- }
-
- /**
- * @param source 89%
- * @param targetNum 0.98
- * @return
- */
- private boolean lessThenRation(String source, Double targetNum) {
- try {
- double sourceOrigin = Double.parseDouble(StringUtils.substringBefore(source, PROCESS_SEPARATOR));
- double sourceNum = NumberUtil.div(sourceOrigin, 100d);
- return Double.valueOf(sourceNum).compareTo(targetNum) < 0;
- } catch (Exception e) {
- log.error("lessThenRation error,source:{},target:{}", source, targetNum, e);
- }
- return true;
- }
-
-
- private List getTailLogProcessDTOS(List dtoList, List perOneIpProgressList) {
- if (CollectionUtils.isNotEmpty(dtoList)) {
- // Go to retrieve the latest one
- Map> collect = dtoList.stream()
- .collect(Collectors
- .groupingBy(processDTO ->
- String.format("%s-%s", processDTO.getIp(), processDTO.getPath()))
- );
- perOneIpProgressList = collect.keySet().stream().map(s -> {
- List tailLogProcessDTOS = collect.get(s);
- return tailLogProcessDTOS.stream()
- .sorted(Comparator.comparing(TailLogProcessDTO::getCollectTime).reversed())
- .findFirst().get();
- }).collect(Collectors.toList());
- return perOneIpProgressList;
- }
- return Lists.newArrayList();
- }
-
- private List filterExpireTimePath(List tailLogProcessDTOS) {
- return tailLogProcessDTOS.stream()
- .filter(processDTO -> Objects.nonNull(processDTO.getCollectTime()) &&
- Instant.now().toEpochMilli() - processDTO.getCollectTime() < TimeUnit.MINUTES.toMillis(MAX_INTERRUPT_TIME))
- .collect(Collectors.toList());
- }
-}
diff --git a/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/service/DefaultPublishConfigService.java b/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/service/DefaultPublishConfigService.java
deleted file mode 100644
index 3c1ef52af..000000000
--- a/ozhera-log/log-agent-server/src/main/java/com/xiaomi/mone/log/server/service/DefaultPublishConfigService.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xiaomi.mone.log.server.service;
-
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
-import com.xiaomi.data.push.context.AgentContext;
-import com.xiaomi.data.push.rpc.RpcServer;
-import com.xiaomi.data.push.rpc.netty.AgentChannel;
-import com.xiaomi.data.push.rpc.protocol.RemotingCommand;
-import com.xiaomi.mone.log.api.model.meta.LogCollectMeta;
-import com.xiaomi.mone.log.api.model.vo.LogCmd;
-import com.xiaomi.mone.log.api.service.PublishConfigService;
-import com.xiaomi.mone.log.utils.NetUtil;
-import com.xiaomi.youpin.docean.plugin.dubbo.anno.Service;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-
-import javax.annotation.Resource;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-
-import static com.xiaomi.mone.log.common.Constant.GSON;
-import static com.xiaomi.mone.log.common.Constant.SYMBOL_COLON;
-
-/**
- * @author wtt
- * @version 1.0
- * @description
- * @date 2022/12/6 17:48
- */
-@Slf4j
-@Service(interfaceClass = PublishConfigService.class, group = "$dubbo.group", timeout = 14000)
-public class DefaultPublishConfigService implements PublishConfigService {
-
- private static final AtomicInteger COUNT_INCR = new AtomicInteger(0);
- @Resource
- private RpcServer rpcServer;
-
- /**
- * dubbo interface, the timeout period cannot be too long
- *
- * @param agentIp
- * @param logCollectMeta
- */
- @Override
- public void sengConfigToAgent(String agentIp, LogCollectMeta logCollectMeta) {
- int count = 1;
- while (count < 4) {
- Map logAgentMap = getAgentChannelMap();
- String agentCurrentIp = queryCurrentDockerAgentIP(agentIp, logAgentMap);
- if (logAgentMap.containsKey(agentCurrentIp)) {
- String sendStr = GSON.toJson(logCollectMeta);
- if (CollectionUtils.isNotEmpty(logCollectMeta.getAppLogMetaList())) {
- RemotingCommand req = RemotingCommand.createRequestCommand(LogCmd.logReq);
- req.setBody(sendStr.getBytes());
- log.info("Send the configuration,agent ip:{},Configuration information:{}", agentCurrentIp, sendStr);
- Stopwatch started = Stopwatch.createStarted();
- RemotingCommand res = rpcServer.sendMessage(logAgentMap.get(agentCurrentIp), req, 10000);
- started.stop();
- String response = new String(res.getBody());
- log.info("The configuration is sent successfully---->{},duration:{}s,agentIp:{}", response, started.elapsed().getSeconds(), agentCurrentIp);
- if (Objects.equals(response, "ok")) {
- break;
- }
- }
- } else {
- log.info("The current agent IP is not connected,ip:{},configuration data:{}", agentIp, GSON.toJson(logCollectMeta));
- }
- //Retry policy - Retry 4 times, sleep 500 ms each time
- try {
- TimeUnit.MILLISECONDS.sleep(500L);
- } catch (final InterruptedException ignored) {
- }
- count++;
- }
- }
-
- @Override
- public List getAllAgentList() {
- List remoteAddress = Lists.newArrayList();
- List ipAddress = Lists.newArrayList();
- AgentContext.ins().map.entrySet().forEach(agentChannelEntry -> {
- String key = agentChannelEntry.getKey();
- remoteAddress.add(key);
- ipAddress.add(StringUtils.substringBefore(key, SYMBOL_COLON));
- }
- );
- if (COUNT_INCR.getAndIncrement() % 200 == 0) {
- log.info("The set of remote addresses of the connected agent machine is:{}", GSON.toJson(remoteAddress));
- }
- return remoteAddress;
- }
-
- private Map getAgentChannelMap() {
- Map logAgentMap = new HashMap<>();
- AgentContext.ins().map.forEach((k, v) -> logAgentMap.put(StringUtils.substringBefore(k, SYMBOL_COLON), v));
- return logAgentMap;
- }
-
- private String queryCurrentDockerAgentIP(String agentIp, Map logAgentMap) {
- if (Objects.equals(agentIp, NetUtil.getLocalIp())) {
- //for Docker handles the agent on the current machine
- final String tempIp = agentIp;
- List ipList = getAgentChannelMap().keySet()
- .stream().filter(ip -> ip.startsWith("172"))
- .collect(Collectors.toList());
- Optional optionalS = ipList.stream()
- .filter(ip -> Objects.equals(logAgentMap.get(ip).getIp(), tempIp))
- .findFirst();
- if (optionalS.isPresent()) {
- String correctIp = optionalS.get();
- log.info("origin ip:{},set agent ip:{}", agentIp, correctIp);
- agentIp = correctIp;
- }
- }
- return agentIp;
- }
-}
diff --git a/ozhera-log/log-agent-server/src/test/java/com/xiaomi/mone/log/server/DefaultAgentConfigAcquirerTest.java b/ozhera-log/log-agent-server/src/test/java/com/xiaomi/mone/log/server/DefaultAgentConfigAcquirerTest.java
deleted file mode 100644
index 0affdef9e..000000000
--- a/ozhera-log/log-agent-server/src/test/java/com/xiaomi/mone/log/server/DefaultAgentConfigAcquirerTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-///*
-// * Copyright (C) 2020 Xiaomi Corporation
-// *
-// * Licensed under the Apache License, Version 2.0 (the "License");
-// * you may not use this file except in compliance with the License.
-// * You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//package com.xiaomi.mone.log.server;
-//
-//import com.google.gson.Gson;
-//import com.google.gson.GsonBuilder;
-//import com.xiaomi.mone.log.api.model.meta.LogCollectMeta;
-//import com.xiaomi.mone.log.server.service.DefaultAgentConfigAcquirer;
-//import com.xiaomi.youpin.docean.Ioc;
-//import lombok.extern.slf4j.Slf4j;
-//import org.junit.Before;
-//import org.junit.Test;
-//
-///**
-// * @author wtt
-// * @version 1.0
-// * @description
-// * @date 2022/12/21 10:56
-// */
-//@Slf4j
-//public class DefaultAgentConfigAcquirerTest {
-//
-// DefaultAgentConfigAcquirer configAcquirer;
-// Gson gson;
-//
-// @Before
-// public void buildBean() {
-// Ioc.ins().init("com.xiaomi");
-// configAcquirer = Ioc.ins().getBean(DefaultAgentConfigAcquirer.class);
-// gson = new GsonBuilder().create();
-// }
-//
-// @Test
-// public void testGetConfig() {
-// String ip = "127.0.0.1:1";
-// LogCollectMeta logCollectMetaFromManager = configAcquirer.getLogCollectMetaFromManager(ip);
-// log.info("config:{}", gson.toJson(logCollectMetaFromManager));
-// }
-//}
diff --git a/ozhera-log/log-agent-server/src/test/java/com/xiaomi/mone/log/server/DefaultLogProcessCollectorTest.java b/ozhera-log/log-agent-server/src/test/java/com/xiaomi/mone/log/server/DefaultLogProcessCollectorTest.java
deleted file mode 100644
index b0c35feef..000000000
--- a/ozhera-log/log-agent-server/src/test/java/com/xiaomi/mone/log/server/DefaultLogProcessCollectorTest.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-///*
-// * Copyright (C) 2020 Xiaomi Corporation
-// *
-// * Licensed under the Apache License, Version 2.0 (the "License");
-// * you may not use this file except in compliance with the License.
-// * You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//package com.xiaomi.mone.log.server;
-//
-//import com.google.common.collect.Lists;
-//import com.google.gson.Gson;
-//import com.google.gson.GsonBuilder;
-//import com.xiaomi.mone.log.api.model.vo.UpdateLogProcessCmd;
-//import com.xiaomi.mone.log.server.service.DefaultLogProcessCollector;
-//import com.xiaomi.youpin.docean.Ioc;
-//import lombok.extern.slf4j.Slf4j;
-//import org.junit.Before;
-//import org.junit.Test;
-//
-//import java.util.List;
-//
-///**
-// * @author wtt
-// * @version 1.0
-// * @description
-// * @date 2023/1/4 11:14
-// */
-//@Slf4j
-//public class DefaultLogProcessCollectorTest {
-//
-// DefaultLogProcessCollector processCollector;
-// Gson gson;
-//
-// @Before
-// public void buildBean() {
-// Ioc.ins().init("com.xiaomi");
-// processCollector = Ioc.ins().getBean(DefaultLogProcessCollector.class);
-// gson = new GsonBuilder().create();
-// }
-//
-// @Test
-// public void testCollectLogProcess() {
-// UpdateLogProcessCmd updateLogProcessCmd = new UpdateLogProcessCmd();
-// List collectList = Lists.newArrayList();
-// UpdateLogProcessCmd.CollectDetail collectDetail = new UpdateLogProcessCmd.CollectDetail();
-// List fileProgressDetails = Lists.newArrayList();
-// UpdateLogProcessCmd.FileProgressDetail progressDetail = new UpdateLogProcessCmd.FileProgressDetail();
-// progressDetail.setPattern("/home/work/log/test/server.log");
-// progressDetail.setCollectPercentage("98%");
-// fileProgressDetails.add(progressDetail);
-// collectDetail.setFileProgressDetails(fileProgressDetails);
-// collectList.add(collectDetail);
-// updateLogProcessCmd.setCollectList(collectList);
-// updateLogProcessCmd.setIp("127.0.0.1");
-// processCollector.collectLogProcess(updateLogProcessCmd);
-// List colProcessImperfect = processCollector.getColProcessImperfect(0.97);
-// log.info("result:{}", gson.toJson(colProcessImperfect));
-// }
-//
-// @Test
-// public void testGetColProcessImperfect() {
-// List colProcessImperfect = processCollector.getColProcessImperfect(0.98);
-// log.info("result:{}", gson.toJson(colProcessImperfect));
-// }
-//}
diff --git a/ozhera-log/log-agent-server/src/test/java/com/xiaomi/mone/log/server/DubboTest.java b/ozhera-log/log-agent-server/src/test/java/com/xiaomi/mone/log/server/DubboTest.java
deleted file mode 100644
index 577f5e9dd..000000000
--- a/ozhera-log/log-agent-server/src/test/java/com/xiaomi/mone/log/server/DubboTest.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xiaomi.mone.log.server;
-
-import lombok.extern.slf4j.Slf4j;
-import org.junit.Test;
-
-/**
- * @author wtt
- * @version 1.0
- * @description dubbo generalization call
- * @date 2022/12/21 16:08
- */
-@Slf4j
-public class DubboTest {
-
- @Test
- public void test() {
-// ApplicationConfig application = new ApplicationConfig();
-// application.setName("api-gateway-test");
-// application.setQosEnable(false);
-//
-// RegistryConfig registry = new RegistryConfig();
-// registry.setAddress("nacos://127.0.0.1:80");
-// registry.setRegister(false);
-//
-// ReferenceConfig reference = new ReferenceConfig<>();
-// reference.setInterface("com.xiaomi.mone.log.api.service.PublishConfigService");
-// reference.setVersion("");
-// reference.setGroup("staging");
-// reference.setGeneric("true");
-//
-// reference.setApplication(application);
-// reference.setRegistry(registry);
-//
-//// DubboBootstrap bootstrap = DubboBootstrap.getInstance();
-//// bootstrap.application(application)
-//// .registry(registry)
-//// .reference(reference)
-//// .start();
-//
-// ReferenceConfigCache cache = ReferenceConfigCache.getCache();
-// GenericService genericService = cache.get(reference);
-//
-// Object result = genericService.$invoke("getAllAgentList", new String[]{}, new Object[]{});
-// log.info("result:{}", GSON.toJson(result));
- }
-}
diff --git a/ozhera-log/log-agent/pom.xml b/ozhera-log/log-agent/pom.xml
index ed87bb535..47044fbce 100644
--- a/ozhera-log/log-agent/pom.xml
+++ b/ozhera-log/log-agent/pom.xml
@@ -3,9 +3,9 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- run.mone
+ org.apache.ozhera
ozhera-log
- 1.4.0-jdk21
+ 2.0.0-SNAPSHOT
4.0.0
@@ -206,7 +206,7 @@
false
- com.xiaomi.mone.log.agent.bootstrap.MiLogAgentBootstrap
+ org.apache.ozhera.log.agent.bootstrap.MiLogAgentBootstrap
diff --git a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/bootstrap/MiLogAgentBootstrap.java b/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/bootstrap/MiLogAgentBootstrap.java
deleted file mode 100644
index df94bbc62..000000000
--- a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/bootstrap/MiLogAgentBootstrap.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xiaomi.mone.log.agent.bootstrap;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.xiaomi.data.push.bo.ClientInfo;
-import com.xiaomi.data.push.rpc.RpcClient;
-import com.xiaomi.mone.log.agent.common.Version;
-import com.xiaomi.mone.log.agent.rpc.task.PingTask;
-import com.xiaomi.mone.log.common.Config;
-import com.xiaomi.mone.log.utils.NetUtil;
-import com.xiaomi.youpin.docean.Aop;
-import com.xiaomi.youpin.docean.Ioc;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-
-import static com.xiaomi.mone.log.utils.ConfigUtils.getConfigValue;
-import static com.xiaomi.mone.log.utils.ConfigUtils.getDataHashKey;
-
-/**
- * @Author goodjava@qq.com
- * @Date 2021/6/22 11:22
- */
-@Slf4j
-public class MiLogAgentBootstrap {
-
- public static void main(String[] args) throws IOException {
- String nacosAddr = getConfigValue("nacosAddr");
- String serviceName = getConfigValue("serviceName");
- log.info("nacosAddr:{},serviceName:{},version:{}", nacosAddr, serviceName, new Version());
- String appName = Config.ins().get("app_name", "milog_agent");
- ClientInfo clientInfo = new ClientInfo(
- String.format("%s_%d", appName, getDataHashKey(NetUtil.getLocalIp(), Integer.parseInt(Config.ins().get("app_max_index", "30")))),
- NetUtil.getLocalIp(),
- Integer.parseInt(Config.ins().get("port", "9799")),
- new Version() + ":" + serviceName + ":" + nacosAddr);
- final RpcClient client = new RpcClient(nacosAddr, serviceName);
- //Even without service information, use the old registration information (fault tolerance processing).
- client.setClearServerAddr(false);
- client.setReconnection(false);
- client.setClientInfo(clientInfo);
- client.start();
- client.setTasks(Lists.newArrayList(new PingTask(client)));
- client.init();
- client.waitStarted();
- log.info("create rpc client finish");
- Aop.ins().init(Maps.newLinkedHashMap());
- Ioc.ins().putBean(client).init("com.xiaomi.mone.log.agent", "com.xiaomi.youpin.docean");
- //Because the client life cycle is advanced, the processor needs to be re-registered here
- client.registerProcessor();
- System.in.read();
- }
-
-}
diff --git a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/AbstractChannelService.java b/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/AbstractChannelService.java
deleted file mode 100644
index dcd947963..000000000
--- a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/AbstractChannelService.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xiaomi.mone.log.agent.channel;
-
-import com.xiaomi.mone.file.ReadResult;
-import com.xiaomi.mone.file.common.FileInfoCache;
-import com.xiaomi.mone.log.agent.channel.memory.ChannelMemory;
-import com.xiaomi.mone.log.agent.common.ChannelUtil;
-import com.xiaomi.mone.log.agent.input.Input;
-import com.xiaomi.mone.log.api.enums.LogTypeEnum;
-import com.xiaomi.mone.log.api.model.meta.LogPattern;
-import com.xiaomi.mone.log.api.model.msg.LineMessage;
-import com.xiaomi.mone.log.utils.NetUtil;
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
-import static com.xiaomi.mone.log.common.Constant.GSON;
-
-/**
- * @author wtt
- * @version 1.0
- * @description
- * @date 2023/6/20 16:26
- */
-@Slf4j
-public abstract class AbstractChannelService implements ChannelService {
-
- public String instanceId = UUID.randomUUID().toString();
-
- @Override
- public String instanceId() {
- return instanceId;
- }
-
- @Override
- public ChannelState state() {
- ChannelState channelState = new ChannelState();
-
- ChannelDefine channelDefine = getChannelDefine();
- ChannelMemory channelMemory = getChannelMemory();
-
- channelState.setTailId(channelDefine.getChannelId());
- channelState.setTailName(channelDefine.getTailName());
- channelState.setAppId(channelDefine.getAppId());
- channelState.setAppName(channelDefine.getAppName());
- channelState.setLogPattern(channelDefine.getInput().getLogPattern());
- channelState.setLogPatternCode(channelDefine.getInput().getPatternCode());
-
- List distinctIpList = channelDefine.getIpDirectoryRel()
- .stream()
- .map(LogPattern.IPRel::getIp)
- .distinct()
- .collect(Collectors.toList());
- channelState.setIpList(distinctIpList);
-
- channelState.setCollectTime(channelMemory.getCurrentTime());
-
- if (channelState.getStateProgressMap() == null) {
- channelState.setStateProgressMap(new HashMap<>(256));
- }
- channelMemory.getFileProgressMap().forEach((pattern, fileProcess) -> {
- if (null != fileProcess.getFinished() && fileProcess.getFinished()) {
- return;
- }
- ChannelState.StateProgress stateProgress = new ChannelState.StateProgress();
- stateProgress.setCurrentFile(pattern);
- stateProgress.setIp(getTailPodIp(pattern));
- stateProgress.setCurrentRowNum(fileProcess.getCurrentRowNum());
- stateProgress.setPointer(fileProcess.getPointer());
- stateProgress.setFileMaxPointer(fileProcess.getFileMaxPointer());
- stateProgress.setCtTime(fileProcess.getCtTime());
- channelState.getStateProgressMap().put(pattern, stateProgress);
- });
-
- channelState.setTotalSendCnt(getLogCounts());
- return channelState;
- }
-
- public abstract ChannelDefine getChannelDefine();
-
- public abstract ChannelMemory getChannelMemory();
-
- public abstract Map getExpireFileMap();
-
- public abstract void cancelFile(String file);
-
- public abstract Long getLogCounts();
-
- public LogTypeEnum getLogTypeEnum() {
- Input input = getChannelDefine().getInput();
- return LogTypeEnum.name2enum(input.getType());
- }
-
- /**
- * Query IP information based on the actual collection path.
- *
- * @param pattern
- * @return
- */
- protected String getTailPodIp(String pattern) {
- ChannelDefine channelDefine = getChannelDefine();
- List ipDirectoryRel = channelDefine.getIpDirectoryRel();
- LogPattern.IPRel actualIpRel = ipDirectoryRel.stream().filter(ipRel -> pattern.contains(ipRel.getKey())).findFirst().orElse(null);
- if (null != actualIpRel) {
- return actualIpRel.getIp();
- }
- return NetUtil.getLocalIp();
- }
-
- protected ChannelMemory initChannelMemory(Long channelId, Input input, List patterns, ChannelDefine channelDefine) {
- ChannelMemory channelMemory = new ChannelMemory();
- channelMemory.setChannelId(channelId);
- channelMemory.setInput(input);
- channelMemory.setFileProgressMap(buildFileProgressMap(patterns, channelDefine));
- channelMemory.setCurrentTime(System.currentTimeMillis());
- channelMemory.setVersion(ChannelMemory.DEFAULT_VERSION);
- return channelMemory;
- }
-
- private Map buildFileProgressMap(List patterns, ChannelDefine channelDefine) {
- Map fileProgressMap = new HashMap<>();
- for (String pattern : patterns) {
- ChannelMemory.FileProgress fileProgress = new ChannelMemory.FileProgress();
- fileProgress.setPointer(0L);
- fileProgress.setCurrentRowNum(0L);
- fileProgress.setUnixFileNode(ChannelUtil.buildUnixFileNode(pattern));
- fileProgress.setPodType(channelDefine.getPodType());
- fileProgressMap.put(pattern, fileProgress);
- }
- return fileProgressMap;
- }
-
- protected static void wildcardGraceShutdown(List directory, String matchExpress) {
- // Add a shutdown hook to gracefully shutdown FileInfoCache
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- log.info("wildcardGraceShutdown Shutdown,directory:{},express:{}", GSON.toJson(directory), matchExpress);
- FileInfoCache.ins().shutdown();
- }));
- }
-
- protected LineMessage createLineMessage(String lineMsg, AtomicReference readResult, String pattern, String patternCode, String ip, long ct) {
- LineMessage lineMessage = new LineMessage();
- lineMessage.setMsgBody(lineMsg);
- lineMessage.setPointer(readResult.get().getPointer());
- lineMessage.setLineNumber(readResult.get().getLineNumber());
- lineMessage.setFileName(pattern);
- lineMessage.setProperties(LineMessage.KEY_MQ_TOPIC_TAG, patternCode);
- lineMessage.setProperties(LineMessage.KEY_IP, ip);
- lineMessage.setProperties(LineMessage.KEY_COLLECT_TIMESTAMP, String.valueOf(ct));
-
- String logType = getChannelDefine().getInput().getType();
- LogTypeEnum logTypeEnum = LogTypeEnum.name2enum(logType);
- if (logTypeEnum != null) {
- lineMessage.setProperties(LineMessage.KEY_MESSAGE_TYPE, logTypeEnum.getType().toString());
- }
-
- return lineMessage;
- }
-
- protected void updateChannelMemory(ChannelMemory channelMemory, String fileName, LogTypeEnum logTypeEnum,
- long ct, AtomicReference readResult) {
- ChannelMemory.FileProgress fileProgress = channelMemory.getFileProgressMap().get(fileName);
- ChannelDefine channelDefine = getChannelDefine();
- if (null == fileProgress) {
- fileProgress = new ChannelMemory.FileProgress();
- channelMemory.getFileProgressMap().put(fileName, fileProgress);
- channelMemory.getInput().setLogPattern(channelDefine.getInput().getLogPattern());
- channelMemory.getInput().setType(logTypeEnum.name());
- channelMemory.getInput().setLogSplitExpress(channelDefine.getInput().getLogSplitExpress());
- }
- fileProgress.setCurrentRowNum(readResult.get().getLineNumber());
- fileProgress.setPointer(readResult.get().getPointer());
- if (null != readResult.get().getFileMaxPointer()) {
- fileProgress.setFileMaxPointer(readResult.get().getFileMaxPointer());
- }
- fileProgress.setUnixFileNode(ChannelUtil.buildUnixFileNode(fileName));
- fileProgress.setPodType(channelDefine.getPodType());
- fileProgress.setCtTime(ct);
- }
-}
diff --git a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/ChannelDefine.java b/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/ChannelDefine.java
deleted file mode 100644
index 32df5b956..000000000
--- a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/ChannelDefine.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xiaomi.mone.log.agent.channel;
-
-import com.xiaomi.mone.log.agent.input.Input;
-import com.xiaomi.mone.log.agent.output.Output;
-import com.xiaomi.mone.log.api.enums.OperateEnum;
-import com.xiaomi.mone.log.api.model.meta.FilterConf;
-import com.xiaomi.mone.log.api.model.meta.LogPattern;
-import lombok.Data;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * @author shanwb
- * @date 2021-07-20
- */
-@Data
-public class ChannelDefine implements Serializable {
-
- private Long channelId;
-
- private String tailName;
-
- private Long appId;
-
- private String appName;
-
- private Input input;
-
- private Output output;
-
- private OperateEnum operateEnum;
-
- private List ips;
- /**
- * Relationship between IP and directory
- */
- private List ipDirectoryRel;
-
- /**
- * filter and script configuration
- */
- private List filters;
-
- /**
- * Individual configuration data, default full configuration under this machine.
- */
- private Boolean singleMetaData;
-
- private String podType;
- /**
- * The log collection in the directory that needs to be deleted when a machine goes offline only has a value when a machine of a certain application goes offline.
- */
- private String delDirectory;
-
-}
diff --git a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/ChannelEngine.java b/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/ChannelEngine.java
deleted file mode 100644
index 8fc9f01ac..000000000
--- a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/ChannelEngine.java
+++ /dev/null
@@ -1,621 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xiaomi.mone.log.agent.channel;
-
-import cn.hutool.core.lang.Pair;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.gson.Gson;
-import com.xiaomi.data.push.common.SafeRun;
-import com.xiaomi.data.push.rpc.RpcClient;
-import com.xiaomi.data.push.rpc.protocol.RemotingCommand;
-import com.xiaomi.mone.file.ILogFile;
-import com.xiaomi.mone.log.agent.channel.comparator.*;
-import com.xiaomi.mone.log.agent.channel.listener.DefaultFileMonitorListener;
-import com.xiaomi.mone.log.agent.channel.listener.FileMonitorListener;
-import com.xiaomi.mone.log.agent.channel.locator.ChannelDefineJsonLocator;
-import com.xiaomi.mone.log.agent.channel.locator.ChannelDefineLocator;
-import com.xiaomi.mone.log.agent.channel.locator.ChannelDefineRpcLocator;
-import com.xiaomi.mone.log.agent.channel.memory.AgentMemoryService;
-import com.xiaomi.mone.log.agent.channel.memory.AgentMemoryServiceImpl;
-import com.xiaomi.mone.log.agent.common.ExecutorUtil;
-import com.xiaomi.mone.log.agent.export.MsgExporter;
-import com.xiaomi.mone.log.agent.factory.OutPutServiceFactory;
-import com.xiaomi.mone.log.agent.filter.FilterChain;
-import com.xiaomi.mone.log.agent.input.Input;
-import com.xiaomi.mone.log.agent.output.Output;
-import com.xiaomi.mone.log.api.enums.LogTypeEnum;
-import com.xiaomi.mone.log.api.enums.OperateEnum;
-import com.xiaomi.mone.log.api.model.vo.UpdateLogProcessCmd;
-import com.xiaomi.mone.log.common.Constant;
-import com.xiaomi.mone.log.utils.NetUtil;
-import com.xiaomi.youpin.docean.Ioc;
-import com.xiaomi.youpin.docean.anno.Lookup;
-import com.xiaomi.youpin.docean.anno.Service;
-import com.xiaomi.youpin.docean.plugin.config.Config;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-
-import java.text.NumberFormat;
-import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static com.xiaomi.mone.log.common.Constant.GSON;
-import static com.xiaomi.mone.log.common.PathUtils.PATH_WILDCARD;
-import static com.xiaomi.mone.log.common.PathUtils.SEPARATOR;
-
-/**
- * @author shanwb
- * @date 2021-07-20
- */
-@Service
-@Slf4j
-public class ChannelEngine {
- private AgentMemoryService agentMemoryService;
-
- private ChannelDefineLocator channelDefineLocator;
- /**
- * The configuration pulled in full when the service starts.
- */
- private List channelDefineList = Lists.newArrayList();
-
- private volatile List channelServiceList = Lists.newArrayList();
- /**
- * File listener
- */
- private FileMonitorListener fileMonitorListener;
-
- private String memoryBasePath;
-
- private Gson gson = GSON;
-
- @Getter
- private volatile boolean initComplete;
-
- @Lookup("$logFile")
- public ILogFile logFile() {
- return null;
- }
-
- public void init() {
- List failedChannelId = Lists.newArrayList();
- try {
- Config config = Ioc.ins().getBean(Config.class.getName());
- memoryBasePath = config.get("agent.memory.path", AgentMemoryService.DEFAULT_BASE_PATH);
- //talosProducerMap = new ConcurrentHashMap<>(512);
-
- channelDefineLocator = getChannelDefineLocator(config);
- channelDefineList = new CopyOnWriteArrayList<>(channelDefineLocator.getChannelDefine());
- log.info("current agent all config meta:{}", gson.toJson(channelDefineList));
- agentMemoryService = new AgentMemoryServiceImpl(memoryBasePath);
- fileMonitorListener = new DefaultFileMonitorListener();
-
- log.info("query channelDefineList:{}", gson.toJson(channelDefineList));
- channelServiceList = channelDefineList.stream()
- .filter(channelDefine -> filterCollStart(channelDefine.getAppName()))
- .map(channelDefine -> {
- ChannelService channelService = this.channelServiceTrans(channelDefine);
- if (null == channelService) {
- failedChannelId.add(channelDefine.getChannelId());
- }
- return channelService;
- }).filter(Objects::nonNull).collect(Collectors.toList());
- // Delete failed channel
- deleteFailedChannel(failedChannelId, this.channelDefineList, this.channelServiceList);
- channelServiceList = new CopyOnWriteArrayList<>(channelServiceList);
- // start channel
- channelStart(channelServiceList);
- //Shutdown - callback action
- graceShutdown();
- //Report channel progress once every 10 seconds
- exportChannelState();
- log.info("current channelDefineList:{},current channelServiceList:{}", gson.toJson(this.channelDefineList), gson.toJson(this.channelServiceList.stream().map(ChannelService::instanceId).collect(Collectors.toList())));
- monitorFilesClean();
- executorFileClean();
- } catch (Exception e) {
- log.error("ChannelEngine init exception", e);
- } finally {
- initComplete = true;
- }
- }
-
- /**
- * Thread pool cleaning, many wasted files don't need to keep wasting threads, they should be cleaned up directly.
- */
- private void executorFileClean() {
- ExecutorUtil.scheduleAtFixedRate(() -> {
- SafeRun.run(() -> {
- List>> serviceTimeList = Lists.newArrayList();
- for (ChannelService channelService : channelServiceList) {
- AbstractChannelService service = (AbstractChannelService) channelService;
- Map fileReadTime = service.getExpireFileMap();
- if (!fileReadTime.isEmpty()) {
- for (Map.Entry entry : fileReadTime.entrySet()) {
- serviceTimeList.add(Pair.of(service, Pair.of(entry.getKey(), entry.getValue())));
- }
- }
- }
- if (serviceTimeList.size() > 500) {
- serviceTimeList = serviceTimeList.stream().sorted(Comparator.comparing(o -> o.getValue().getValue())).collect(Collectors.toList());
- for (int i = 0; i < serviceTimeList.size(); i++) {
- if (i < 100) {
- serviceTimeList.get(i).getKey().cancelFile(serviceTimeList.get(i).getValue().getKey());
- }
- }
- }
- });
- }, 1, 10, TimeUnit.MINUTES);
- }
-
- /**
- * Clean up deleted file events
- */
- private void monitorFilesClean() {
- ExecutorUtil.scheduleAtFixedRate(() -> {
- for (ChannelService channelService : channelServiceList) {
- try {
- channelService.cleanCollectFiles();
- } catch (Exception e) {
- log.error("monitorFilesClean error", e);
- }
- }
- }, 1, 1, TimeUnit.MINUTES);
- }
-
- private ChannelDefineLocator getChannelDefineLocator(Config config) {
- String locatorType = config.get("agent.channel.locator", "rpc");
- log.warn("locatorType: {}", locatorType);
- switch (locatorType) {
- case "json":
- return new ChannelDefineJsonLocator();
- default:
- return new ChannelDefineRpcLocator();
- }
- }
-
- private void exportChannelState() {
- ExecutorUtil.scheduleAtFixedRate(() -> {
- SafeRun.run(() -> {
- List channelStateList = channelServiceList.stream().map(c -> c.state()).collect(Collectors.toList());
- // Send the collection progress
- sendCollectionProgress(channelStateList);
- });
- }, 10, 10, TimeUnit.SECONDS);
- }
-
- private List channelStart(List channelServiceList) {
- List failedChannelIds = Lists.newArrayList();
- List successChannelIds = Lists.newArrayList();
- for (ChannelService channelService : channelServiceList) {
- AbstractChannelService abstractChannelService = (AbstractChannelService) channelService;
- Long channelId = abstractChannelService.getChannelDefine().getChannelId();
- log.info("realChannelService,id:{}", channelId);
- try {
- channelService.start();
- fileMonitorListener.addChannelService(channelService);
- successChannelIds.add(channelId);
- } catch (RejectedExecutionException e) {
- log.error("The thread pool is full.id:{}", channelId, e);
- } catch (Exception e) {
- failedChannelIds.add(channelId);
- log.error("start channel exception,channelId:{}", channelId, e);
- }
- }
- deleteFailedChannel(failedChannelIds, this.channelDefineList, this.channelServiceList);
- return successChannelIds;
- }
-
- private void deleteFailedChannel(List failedChannelId, List defineList, List serviceList) {
- if (CollectionUtils.isNotEmpty(failedChannelId)) {
- //Processing is removed from the current queue
- for (Long delChannelId : failedChannelId) {
- defineList.removeIf(channelDefine -> Objects.equals(delChannelId, channelDefine.getChannelId()));
- serviceList.removeIf(channelService -> Objects.equals(delChannelId, ((AbstractChannelService) channelService).getChannelDefine().getChannelId()));
- }
- }
- }
-
- private void graceShutdown() {
- //Close operation
- Runtime.getRuntime().addShutdownHook(new Thread(() -> {
- log.info("shutdown hook begin!");
- for (ChannelService c : channelServiceList) {
- try {
- c.close();
- } catch (Exception e) {
- log.error("shutdown channel exception:{}", e);
- }
- }
- log.info("shutdown hook end!");
- }));
- }
-
- private ChannelService channelServiceTrans(ChannelDefine channelDefine) {
- try {
- preCheckChannelDefine(channelDefine);
- Output output = channelDefine.getOutput();
- MsgExporter exporter = exporterTrans(output);
- if (null == exporter) {
- throw new IllegalArgumentException("cant not trans to MsgExporter, output:" + gson.toJson(output));
- }
- FilterChain filterChain = new FilterChain();
- filterChain.loadFilterList(channelDefine.getFilters());
- filterChain.reset();
- if (null == agentMemoryService) {
- agentMemoryService = new AgentMemoryServiceImpl(com.xiaomi.mone.log.common.Config.ins().get("agent.memory.path", AgentMemoryService.DEFAULT_BASE_PATH));
- }
- ChannelService channelService;
- Input input = channelDefine.getInput();
- boolean matchWildcard = Arrays.stream(input.getLogPattern().split(",")).anyMatch(data -> StringUtils.substringAfterLast(data, SEPARATOR).contains(PATH_WILDCARD));
- if (matchWildcard) {
- channelService = new WildcardChannelServiceImpl(exporter, agentMemoryService, channelDefine, filterChain, memoryBasePath);
- } else {
- channelService = new ChannelServiceImpl(exporter, agentMemoryService, channelDefine, filterChain);
- }
- return channelService;
- } catch (Throwable e) {
- log.error("channelServiceTrans exception, channelDefine:{}, exception:{}", gson.toJson(channelDefine), e);
- }
- return null;
- }
-
- private void preCheckChannelDefine(ChannelDefine channelDefine) {
- Preconditions.checkArgument(null != channelDefine, "channelDefine can not be null");
- Preconditions.checkArgument(null != channelDefine.getInput(), "channelDefine.input can not be null");
- Preconditions.checkArgument(null != channelDefine.getOutput(), "channelDefine.output can not be null");
- Preconditions.checkArgument(null != channelDefine.getChannelId(), "channelDefine.channelId can not be null");
- preCheckOutput(channelDefine.getOutput());
-
- Input input = channelDefine.getInput();
- String logPattern = input.getLogPattern();
- Preconditions.checkArgument(null != logPattern, "channelDefine.logPattern can not be null");
-
- }
-
- private void preCheckOutput(Output output) {
- Preconditions.checkArgument(StringUtils.isNotBlank(output.getOutputType()), "outputType can not be null");
- OutPutServiceFactory.getOutPutService(output.getServiceName()).preCheckOutput(output);
- }
-
- private MsgExporter exporterTrans(Output output) throws Exception {
- if (null == output) {
- return null;
- }
- return OutPutServiceFactory.getOutPutService(output.getServiceName()).exporterTrans(output);
- }
-
-
- /**
- * Refresh configuration (refresh existing configuration when incremental configuration and full configuration come)
- * There are deletion events, indicating that it is not a full configuration, and it goes directly to the stop event.
- *
- * @param channelDefines
- */
- public void refresh(List channelDefines) {
- log.info("[config change],changed data:{},origin data:{}", gson.toJson(channelDefines), gson.toJson(channelDefineList));
- try {
- if (CollectionUtils.isNotEmpty(channelDefines) && !CollectionUtils.isEqualCollection(channelDefines, channelDefineList)) {
- if (channelDefines.stream().allMatch(channelDefine -> null != channelDefine.getOperateEnum() &&
- channelDefine.getOperateEnum().getCode().equals(OperateEnum.STOP_OPERATE.getCode()))) {
- // Collect and delete files in the specified directory
- log.info("stopSpecialFileColl,config:{}", gson.toJson(channelDefines));
- delSpecialFileColl(channelDefines);
- return;
- }
-
- if (channelDefines.stream().allMatch(channelDefine -> null != channelDefine.getOperateEnum() &&
- channelDefine.getOperateEnum().getCode().equals(OperateEnum.DELETE_OPERATE.getCode()))) {
- log.info("delSpecialFileColl,config:{}", gson.toJson(channelDefines));
- deleteConfig(channelDefines, false);
- return;
- }
-
- log.info("refresh,config:{}", gson.toJson(channelDefines));
- // add config
- addConfig(channelDefines, false);
- // update config
- updateConfig(channelDefines);
- /**
- * Single configuration processing without deletion.
- */
- if (channelDefines.size() == 1 && channelDefines.get(0).getSingleMetaData() != null && channelDefines.get(0).getSingleMetaData()) {
- return;
- }
- // delete config
- deleteConfig(channelDefines, false);
- }
- } catch (Exception e) {
- log.error("refresh error,[config change],changed data:{},origin data:{}", gson.toJson(channelDefines), gson.toJson(channelDefineList), e);
- }
- }
-
- /**
- * New configuration
- *
- * @param channelDefines
- */
- private void addConfig(List channelDefines, boolean directAdd) {
- try {
- // Newly added, initialize
- List channelDefinesDifference = differenceSet(channelDefines, channelDefineList);
- if (directAdd) {
- channelDefinesDifference = channelDefines;
- }
- if (directAdd || CollectionUtils.isNotEmpty(channelDefinesDifference)) {
- log.info("[add config]data:{}", gson.toJson(channelDefinesDifference));
- initIncrement(channelDefinesDifference);
- }
- } catch (Exception e) {
- log.error("addConfig error,source channelDefines:{},origin channelDefines:{},directAdd:{}", gson.toJson(channelDefines), gson.toJson(channelDefineList), directAdd, e);
- }
- }
-
- /**
- * Update configuration(
- * 1. Find the changed configuration
- * 2. Delete the original configuration
- * 3. Add the configuration again
- * )
- *
- * @param channelDefines
- */
- private void updateConfig(List channelDefines) {
- List channelDefinesIntersection = intersection(channelDefines, channelDefineList);
- if (CollectionUtils.isNotEmpty(channelDefinesIntersection)) {
- List changedDefines = Lists.newArrayList();
- log.info("have exist config:{}", GSON.toJson(channelDefineList));
- Iterator iterator = channelDefinesIntersection.iterator();
- while (iterator.hasNext()) {
- ChannelDefine newChannelDefine = iterator.next();
- // old channelDefine
- Long channelId = newChannelDefine.getChannelId();
- ChannelDefine oldChannelDefine = channelDefineList.stream().filter(channelDefine -> channelDefine.getChannelId().equals(channelId)).findFirst().orElse(null);
- if (null != oldChannelDefine) {
- // Comparator
- SimilarComparator appSimilarComparator = new AppSimilarComparator(oldChannelDefine.getAppId());
- SimilarComparator inputSimilarComparator = new InputSimilarComparator(oldChannelDefine.getInput());
- SimilarComparator outputSimilarComparator = new OutputSimilarComparator(oldChannelDefine.getOutput());
- FilterSimilarComparator filterSimilarComparator = new FilterSimilarComparator(oldChannelDefine.getFilters());
- if (appSimilarComparator.compare(newChannelDefine.getAppId()) && inputSimilarComparator.compare(newChannelDefine.getInput()) && outputSimilarComparator.compare(newChannelDefine.getOutput())) {
- if (!filterSimilarComparator.compare(newChannelDefine.getFilters())) {
- channelServiceList.stream().filter(channelService -> ((AbstractChannelService) channelService).getChannelDefine().getChannelId().equals(channelId)).findFirst().ifPresent(channelService -> channelService.filterRefresh(newChannelDefine.getFilters()));
- }
- } else {
- log.info("config changed,old:{},new:{}", gson.toJson(oldChannelDefine), gson.toJson(newChannelDefine));
- changedDefines.add(newChannelDefine);
- deleteConfig(Arrays.asList(newChannelDefine), true);
- addConfig(Arrays.asList(newChannelDefine), true);
- }
- }
- }
- if (CollectionUtils.isNotEmpty(changedDefines)) {
- log.info("[update config]data:{}", gson.toJson(changedDefines));
- }
- }
- }
-
- /**
- * Delete configuration
- *
- * @param channelDefines
- */
- private void deleteConfig(List channelDefines, boolean directDel) {
- // The entire file is collected and deleted.
- delTailFileColl(channelDefines, directDel);
- }
-
- private void delTailFileColl(List channelDefines, boolean directDel) {
- List channelDels = channelDefines.stream().filter(channelDefine -> null != channelDefine.getOperateEnum() && channelDefine.getOperateEnum().getCode().equals(OperateEnum.DELETE_OPERATE.getCode()) && StringUtils.isEmpty(channelDefine.getDelDirectory())).collect(Collectors.toList());
- if (directDel) {
- channelDels = channelDefines;
- }
- try {
- if (directDel || CollectionUtils.isNotEmpty(channelDels)) {
- log.info("[delete config]data:{}", gson.toJson(channelDels));
- List channelIdDels = channelDels.stream().map(ChannelDefine::getChannelId).collect(Collectors.toList());
- List tempChannelServiceList = Lists.newArrayList();
- channelServiceList.forEach(channelService -> {
- Long channelId = ((AbstractChannelService) channelService).getChannelDefine().getChannelId();
- if (channelIdDels.contains(channelId)) {
- log.info("[delete config]channelService:{}", channelId);
- channelService.close();
- fileMonitorListener.removeChannelService(channelService);
- tempChannelServiceList.add(channelService);
- this.channelDefineList.removeIf(channelDefine -> {
- if (channelDefine.getChannelId().equals(channelId)) {
- //delete mq
- Output output = channelDefine.getOutput();
- OutPutServiceFactory.getOutPutService(output.getServiceName()).removeMQ(output);
- return true;
- }
- return false;
- });
- }
- });
- if (CollectionUtils.isNotEmpty(tempChannelServiceList)) {
- channelServiceList.removeAll(tempChannelServiceList);
- }
- }
- } catch (Exception e) {
- log.error(String.format("delete config exception,config:%s", gson.toJson(channelDels)), e);
- }
- }
-
- /**
- * Delete log collection under a specific directory.
- *
- * @param channelDefines
- */
- private void delSpecialFileColl(List channelDefines) {
- //Find out the pods that need to be deleted when a machine goes offline
- List delSpecialFiles = channelDefines.stream().filter(channelDefine -> null != channelDefine.getOperateEnum() && channelDefine.getOperateEnum().getCode().equals(OperateEnum.DELETE_OPERATE.getCode()) && StringUtils.isNotEmpty(channelDefine.getDelDirectory())).collect(Collectors.toList());
- if (CollectionUtils.isNotEmpty(delSpecialFiles)) {
- try {
- for (ChannelService channelService : channelServiceList) {
- CompletableFuture.runAsync(() -> {
- AbstractChannelService abstractChannelService = (AbstractChannelService) channelService;
- Long channelId = abstractChannelService.getChannelDefine().getChannelId();
-
- List defineList = delSpecialFiles.stream().filter(channelDefine -> Objects.equals(channelDefine.getChannelId(), channelId)).collect(Collectors.toList());
-
- for (ChannelDefine channelDefine : defineList) {
- log.info("deleteConfig,deleteCollFile,channelDefine:{}", gson.toJson(channelDefine));
- channelService.deleteCollFile(channelDefine.getDelDirectory());
- }
- //Also need to delete opentelemetry logs.
- if (LogTypeEnum.OPENTELEMETRY == abstractChannelService.getLogTypeEnum()) {
- for (ChannelDefine channelDefine : delSpecialFiles) {
- log.info("deleteConfig OPENTELEMETRY,deleteCollFile,channelDefine:{}", gson.toJson(channelDefine));
- channelService.deleteCollFile(channelDefine.getDelDirectory());
- }
- }
- });
- }
- } catch (Exception e) {
- log.error("delSpecialFileColl error,delSpecialFiles:{}", gson.toJson(channelDefines), e);
- }
- }
- }
-
- /**
- * Difference
- *
- * @param origin New and old configuration
- * @param source Old configuration
- * @return
- */
- private List differenceSet(List origin, List source) {
- if (CollectionUtils.isEmpty(source)) {
- return origin;
- }
- List sourceIds = source.stream().map(ChannelDefine::getChannelId).collect(Collectors.toList());
- return origin.stream().filter(channelDefine -> !sourceIds.contains(channelDefine.getChannelId()) && OperateEnum.DELETE_OPERATE != channelDefine.getOperateEnum()).collect(Collectors.toList());
- }
-
-
- /**
- * Intersection
- *
- * @param origin
- * @param source
- * @return
- */
- private List intersection(List origin, List source) {
- List sourceIds = Lists.newArrayList();
- if (CollectionUtils.isNotEmpty(source)) {
- sourceIds = source.stream().map(ChannelDefine::getChannelId).collect(Collectors.toList());
- }
- List finalSourceIds = sourceIds;
- return origin.stream().filter(channelDefine -> finalSourceIds.contains(channelDefine.getChannelId()) && OperateEnum.DELETE_OPERATE != channelDefine.getOperateEnum()).collect(Collectors.toList());
- }
-
- /**
- * New configuration initialization
- *
- * @param definesIncrement
- */
- public void initIncrement(List definesIncrement) {
- List failedChannelId = Lists.newArrayList();
- List channelServices = definesIncrement.stream()
- .filter(Objects::nonNull)
- .filter(channelDefine -> filterCollStart(channelDefine.getAppName()))
- .map(channelDefine -> {
- ChannelService channelService = channelServiceTrans(channelDefine);
- if (null == channelService) {
- failedChannelId.add(channelDefine.getChannelId());
- }
- return channelService;
- }).filter(Objects::nonNull).collect(Collectors.toList());
- deleteFailedChannel(failedChannelId, definesIncrement, channelServices);
- List successChannelIds = channelStart(channelServices);
- if (CollectionUtils.isNotEmpty(successChannelIds)) {
- this.channelServiceList.addAll(channelServices.stream().filter(channelService -> successChannelIds.contains(((AbstractChannelService) channelService).getChannelDefine().getChannelId())).collect(Collectors.toList()));
- this.channelDefineList.addAll(definesIncrement.stream().filter(channelDefine -> successChannelIds.contains(channelDefine.getChannelId())).collect(Collectors.toList()));
- }
- log.info("[add config] after current channelDefineList:{},channelServiceList:{}", gson.toJson(this.channelDefineList), gson.toJson(gson.toJson(channelServiceList.stream().map(ChannelService::instanceId).collect(Collectors.toList()))));
- }
-
- private boolean filterCollStart(String appName) {
- String serviceName = System.getenv("K8S_SERVICE");
- if (StringUtils.isNotEmpty(serviceName) && StringUtils.isNotEmpty(appName)) {
- return serviceName.contains(appName);
- }
- return true;
- }
-
-
- /**
- * Send collection progress.
- *
- * @param
- */
- private void sendCollectionProgress(List channelStateList) {
- if (CollectionUtils.isEmpty(channelStateList)) {
- return;
- }
- UpdateLogProcessCmd processCmd = assembleLogProcessData(channelStateList);
- RpcClient rpcClient = Ioc.ins().getBean(RpcClient.class);
- RemotingCommand req = RemotingCommand.createRequestCommand(Constant.RPCCMD_AGENT_CODE);
- req.setBody(GSON.toJson(processCmd).getBytes());
- rpcClient.sendToAllMessage(req);
- log.debug("send collect progress,data:{}", gson.toJson(processCmd));
- }
-
- private UpdateLogProcessCmd assembleLogProcessData(List channelStateList) {
- UpdateLogProcessCmd cmd = new UpdateLogProcessCmd();
- try {
- cmd.setIp(NetUtil.getLocalIp());
- List collects = Lists.newArrayList();
- List finalCollects = collects;
- channelStateList.forEach(channelState -> {
-
- UpdateLogProcessCmd.CollectDetail collectDetail = new UpdateLogProcessCmd.CollectDetail();
- collectDetail.setTailId(channelState.getTailId().toString());
- collectDetail.setAppId(channelState.getAppId());
- collectDetail.setTailName(channelState.getTailName());
- collectDetail.setAppName(channelState.getAppName());
- collectDetail.setIpList(channelState.getIpList());
- collectDetail.setPath(channelState.getLogPattern());
-
- List progressDetails = channelState.getStateProgressMap().entrySet().stream().map(entry -> UpdateLogProcessCmd.FileProgressDetail.builder().fileRowNumber(entry.getValue().getCurrentRowNum()).collectTime(entry.getValue().getCtTime()).pointer(entry.getValue().getPointer()).fileMaxPointer(entry.getValue().getFileMaxPointer()).collectPercentage(getPercent(entry.getValue().getPointer(), entry.getValue().getFileMaxPointer())).configIp(entry.getValue().getIp()).pattern(entry.getKey()).build()).collect(Collectors.toList());
- collectDetail.setFileProgressDetails(progressDetails);
- finalCollects.add(collectDetail);
- });
- //Progress deduplication
- collects = collects.stream().distinct().collect(Collectors.toList());
- cmd.setCollectList(collects);
- return cmd;
- } catch (Exception e) {
- log.error("send collect data progress wrap data error", e);
- }
- return cmd;
- }
-
- private String getPercent(Long pointer, Long maxPointer) {
- if (null == pointer || pointer == 0 || null == maxPointer || maxPointer == 0) {
- return 0 + "%";
- }
- NumberFormat numberFormat = NumberFormat.getInstance();
- numberFormat.setMaximumFractionDigits(2);
- return numberFormat.format(((float) pointer / (float) maxPointer) * 100) + "%";
- }
-}
diff --git a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/ChannelService.java b/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/ChannelService.java
deleted file mode 100644
index dfd93cf9b..000000000
--- a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/ChannelService.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xiaomi.mone.log.agent.channel;
-
-import com.xiaomi.mone.log.agent.channel.file.MonitorFile;
-import com.xiaomi.mone.log.agent.export.MsgExporter;
-import com.xiaomi.mone.log.api.model.meta.FilterConf;
-
-import java.util.List;
-
-/**
- * @author shanwb
- * @date 2021-07-19
- */
-public interface ChannelService extends Closeable {
- /**
- * Start channel task.
- */
- void start();
-
- /**
- * Dynamic refresh channel configuration
- *
- * @param channelDefine
- * @param msgExporter
- */
- void refresh(ChannelDefine channelDefine, MsgExporter msgExporter);
-
- /**
- * Stop specified file collection.
- *
- * @param filePrefixList
- */
- void stopFile(List filePrefixList);
-
- /**
- * Get the current latest status of Chanel.
- *
- * @return
- */
- ChannelState state();
-
- /**
- * channel instance id
- *
- * @return
- */
- String instanceId();
-
- /**
- * There have been changes in the filter configuration.
- *
- * @param confs
- */
- void filterRefresh(List confs);
-
- /**
- * Listening for changes and restarting file collection.
- *
- * @param filePath
- */
- void reOpen(String filePath);
-
- /**
- * List of files to be monitored
- *
- * @return
- */
- List getMonitorPathList();
-
- /**
- * File cleanup needed
- */
- void cleanCollectFiles();
-
- /**
- * Delete the file collection of a certain directory, applicable when using the demonset deployment method in k8s, when a certain node goes offline, it needs to delete its collection and release resource occupation.
- *
- * @param directory
- */
- void deleteCollFile(String directory);
-}
diff --git a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/ChannelServiceImpl.java b/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/ChannelServiceImpl.java
deleted file mode 100644
index b4771b92a..000000000
--- a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/ChannelServiceImpl.java
+++ /dev/null
@@ -1,718 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xiaomi.mone.log.agent.channel;
-
-import cn.hutool.core.io.FileUtil;
-import cn.hutool.core.lang.Pair;
-import com.google.common.collect.Lists;
-import com.google.gson.Gson;
-import com.xiaomi.data.push.common.SafeRun;
-import com.xiaomi.mone.file.*;
-import com.xiaomi.mone.log.agent.channel.file.InodeFileComparator;
-import com.xiaomi.mone.log.agent.channel.file.MonitorFile;
-import com.xiaomi.mone.log.agent.channel.memory.AgentMemoryService;
-import com.xiaomi.mone.log.agent.channel.memory.ChannelMemory;
-import com.xiaomi.mone.log.agent.common.ChannelUtil;
-import com.xiaomi.mone.log.agent.common.ExecutorUtil;
-import com.xiaomi.mone.log.agent.export.MsgExporter;
-import com.xiaomi.mone.log.agent.filter.FilterChain;
-import com.xiaomi.mone.log.agent.input.Input;
-import com.xiaomi.mone.log.api.enums.K8sPodTypeEnum;
-import com.xiaomi.mone.log.api.enums.LogTypeEnum;
-import com.xiaomi.mone.log.api.model.meta.FilterConf;
-import com.xiaomi.mone.log.api.model.msg.LineMessage;
-import com.xiaomi.mone.log.common.Constant;
-import com.xiaomi.mone.log.common.PathUtils;
-import com.xiaomi.mone.log.utils.NetUtil;
-import com.xiaomi.youpin.docean.Ioc;
-import lombok.Getter;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-
-import java.time.Instant;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-
-import static com.xiaomi.mone.log.common.Constant.SYMBOL_COMMA;
-import static com.xiaomi.mone.log.common.PathUtils.PATH_WILDCARD;
-import static com.xiaomi.mone.log.common.PathUtils.SEPARATOR;
-
-/**
- * @author shanwb
- * @date 2021-07-20
- */
-@Slf4j
-public class ChannelServiceImpl extends AbstractChannelService {
-
- private AgentMemoryService memoryService;
-
- private MsgExporter msgExporter;
-
- private ChannelDefine channelDefine;
-
- private ChannelMemory channelMemory;
-
- @Getter
- private final ConcurrentHashMap logFileMap = new ConcurrentHashMap<>();
-
- @Getter
- private final ConcurrentHashMap futureMap = new ConcurrentHashMap<>();
-
- private Set delFileCollList = new CopyOnWriteArraySet<>();
-
- private final Map reOpenMap = new HashMap<>();
- private final Map fileReadMap = new ConcurrentHashMap<>();
-
- private final Map>> resultMap = new ConcurrentHashMap<>();
-
- private ScheduledFuture> lastFileLineScheduledFuture;
-
- private Gson gson = Constant.GSON;
-
- private List lineMessageList = new ArrayList<>();
-
- private ReentrantLock fileColLock = new ReentrantLock();
-
- private ReentrantLock fileReopenLock = new ReentrantLock();
-
- private volatile long lastSendTime = System.currentTimeMillis();
-
- private volatile long logCounts = 0;
-
- private ScheduledFuture> scheduledFuture;
-
- /**
- * collect once flag
- */
- private boolean collectOnce;
-
- private FilterChain chain;
-
- /**
- * The file path to monitor
- */
- private List monitorFileList;
-
- private LogTypeEnum logTypeEnum;
-
- private String logPattern;
-
- private String logSplitExpress;
-
- private String linePrefix;
-
- public ChannelServiceImpl(MsgExporter msgExporter, AgentMemoryService memoryService, ChannelDefine channelDefine, FilterChain chain) {
- this.memoryService = memoryService;
- this.msgExporter = msgExporter;
- this.channelDefine = channelDefine;
- this.chain = chain;
- this.monitorFileList = Lists.newArrayList();
- }
-
- @Override
- public void refresh(ChannelDefine channelDefine, MsgExporter msgExporter) {
- this.channelDefine = channelDefine;
- if (null != msgExporter) {
- this.msgExporter.close();
- this.msgExporter = msgExporter;
- }
- }
-
- @Override
- public void stopFile(List filePrefixList) {
- Map fileProgressMap = channelMemory.getFileProgressMap();
- if (null == fileProgressMap) {
- fileProgressMap = new HashMap<>();
- }
-
- for (Iterator> it = logFileMap.entrySet().iterator(); it.hasNext(); ) {
- Map.Entry entry = it.next();
- String filePath = entry.getKey();
- for (String filePrefix : filePrefixList) {
- if (filePath.startsWith(filePrefix)) {
- entry.getValue().setStop(true);
- futureMap.get(filePath).cancel(false);
- log.warn("channel:{} stop file:{} success", channelDefine.getChannelId(), filePath);
- ChannelMemory.FileProgress fileProgress = fileProgressMap.get(filePath);
- //Refresh the memory record to prevent the agent from restarting and recollect the file.
- if (null != fileProgress) {
- fileProgress.setFinished(true);
- }
- it.remove();
- }
- }
- }
- }
-
- @Override
- public void start() {
- Long channelId = channelDefine.getChannelId();
- Input input = channelDefine.getInput();
-
- this.logPattern = input.getLogPattern();
- this.logSplitExpress = input.getLogSplitExpress();
- this.linePrefix = input.getLinePrefix();
-
- String logType = channelDefine.getInput().getType();
- logTypeEnum = LogTypeEnum.name2enum(logType);
- collectOnce = StringUtils.substringAfterLast(logPattern, SEPARATOR).contains(PATH_WILDCARD);
-
- List patterns = PathUtils.parseLevel5Directory(logPattern);
- if (CollectionUtils.isEmpty(patterns)) {
- log.info("config pattern:{},current files not exist", logPattern);
- }
- log.info("channel start, logPattern:{},fileList:{}, channelId:{}, instanceId:{}", logPattern, patterns, channelId, instanceId());
- // disassembly monitor file
- logMonitorPathDisassembled(logSplitExpress, patterns, logPattern);
-
- channelMemory = memoryService.getMemory(channelId);
- if (null == channelMemory) {
- channelMemory = initChannelMemory(channelId, input, patterns, channelDefine);
- }
- memoryService.cleanChannelMemoryContent(channelId, patterns);
-
- startCollectFile(channelId, input, patterns);
-
- startExportQueueDataThread();
- memoryService.refreshMemory(channelMemory);
- log.warn("channelId:{}, channelInstanceId:{} start success! channelDefine:{}", channelId, instanceId(), gson.toJson(this.channelDefine));
- }
-
- @Override
- public void cleanCollectFiles() {
- for (String path : delFileCollList) {
- delCollFile(path);
- }
- }
-
- @Override
- public void deleteCollFile(String directory) {
- log.info("channelId:{},deleteCollFile,directory:{}", channelDefine.getChannelId(), directory);
- for (Map.Entry logFileEntry : logFileMap.entrySet()) {
- if (logFileEntry.getKey().contains(directory)) {
- delFileCollList.add(logFileEntry.getKey());
- log.info("channelId:{},delFileCollList:{}", channelDefine.getChannelId(), gson.toJson(delFileCollList));
- }
- }
- }
-
- private void startExportQueueDataThread() {
- scheduledFuture = ExecutorUtil.scheduleAtFixedRate(() -> {
- // If the mq message is not sent for more than 10 seconds, it will be sent asynchronously.
- if (System.currentTimeMillis() - lastSendTime < 10 * 1000 || CollectionUtils.isEmpty(lineMessageList)) {
- return;
- }
- if (CollectionUtils.isNotEmpty(lineMessageList) && fileColLock.tryLock()) {
- try {
- this.doExport(lineMessageList);
- } finally {
- fileColLock.unlock();
- }
- }
- }, 10, 7, TimeUnit.SECONDS);
- }
-
- private void startCollectFile(Long channelId, Input input, List patterns) {
- for (int i = 0; i < patterns.size(); i++) {
- log.info("startCollectFile,total file:{},start:{},remain:{}", patterns.size(), i + 1, patterns.size() - (i + 1));
- readFile(input.getPatternCode(), getTailPodIp(patterns.get(i)), patterns.get(i), channelId);
- InodeFileComparator.addFile(patterns.get(i));
- }
- lastLineRemainSendSchedule(input.getPatternCode());
- }
-
-
- private void handleAllFileCollectMonitor(String patternCode, String newFilePath, Long channelId) {
- String ip = getTailPodIp(newFilePath);
-
- if (logFileMap.keySet().stream().anyMatch(key -> Objects.equals(newFilePath, key))) {
- log.info("collectOnce open file:{}", newFilePath);
- logFileMap.get(newFilePath).setReOpen(true);
- } else {
- readFile(patternCode, ip, newFilePath, channelId);
- }
- }
-
- /**
- * 1.logSplitExpress:/home/work/log/log-agent/server.log.* realFilePaths: ["/home/work/log/log-agent/server.log"]
- * 2.logSplitExpress:/home/work/log/log-agent/(server.log.*|error.log.*) realFilePaths: ["/home/work/log/log-agent/server.log","/home/work/log/log-agent/server.log"]
- * 2.logSplitExpress:/home/work/log/(log-agent|log-stream)/server.log.* realFilePaths: ["/home/work/log/log-agent/server.log","/home/work/log/log-stream/server.log"]
- * The real file does not exist, it should also listen
- *
- * @param logSplitExpress
- * @param realFilePaths
- */
- private void logMonitorPathDisassembled(String logSplitExpress, List realFilePaths, String configPath) {
- List cleanedPathList = Lists.newArrayList();
- if (StringUtils.isNotBlank(logSplitExpress)) {
- PathUtils.dismantlingStrWithSymbol(logSplitExpress, cleanedPathList);
- }
- if (LogTypeEnum.OPENTELEMETRY == logTypeEnum || realFilePaths.isEmpty()) {
- opentelemetryMonitor(configPath);
- return;
- }
- if (collectOnce) {
- collectOnceFileMonitor(configPath);
- return;
- }
- for (int i = 0; i < realFilePaths.size(); i++) {
- String perFilePathExpress;
- try {
- perFilePathExpress = cleanedPathList.get(i);
- /**
- * Compatible with the current file, it can be monitored when it is created.
- */
- perFilePathExpress = String.format("(%s|%s)", perFilePathExpress, String.format("%s.*", realFilePaths.get(i)));
- } catch (Exception e) {
- perFilePathExpress = String.format("%s.*", realFilePaths.get(i));
- }
- monitorFileList.add(MonitorFile.of(realFilePaths.get(i), perFilePathExpress, logTypeEnum, collectOnce));
- }
- }
-
- private void collectOnceFileMonitor(String configPath) {
- String singleTimeExpress = ChannelUtil.buildSingleTimeExpress(configPath);
- monitorFileList.add(MonitorFile.of(configPath, singleTimeExpress, logTypeEnum, collectOnce));
- }
-
- private void opentelemetryMonitor(String configPath) {
- List cleanedPathList = ChannelUtil.buildLogExpressList(configPath);
- monitorFileList.add(MonitorFile.of(configPath, cleanedPathList.get(0), logTypeEnum, collectOnce));
- }
-
- private ReadListener initFileReadListener(MLog mLog, String patternCode, String ip, String pattern) {
- AtomicReference readResult = new AtomicReference<>();
- ReadListener listener = new DefaultReadListener(event -> {
- readResult.set(event.getReadResult());
- if (null == readResult.get()) {
- log.info("empty data");
- return;
- }
- long ct = System.currentTimeMillis();
- readResult.get().getLines().stream().forEach(l -> {
- String logType = channelDefine.getInput().getType();
- LogTypeEnum logTypeEnum = LogTypeEnum.name2enum(logType);
- // Multi-line application log type and opentelemetry type are used to determine the exception stack
- if (LogTypeEnum.APP_LOG_MULTI == logTypeEnum || LogTypeEnum.OPENTELEMETRY == logTypeEnum) {
- l = mLog.append2(l);
- } else {
- // tail single line mode
- }
- if (null != l) {
- try {
- fileColLock.lock();
- wrapDataToSend(l, readResult, pattern, patternCode, ip, ct);
- } finally {
- fileColLock.unlock();
- }
- } else {
- log.debug("biz log channelId:{}, not new line:{}", channelDefine.getChannelId(), l);
- }
- });
-
- });
- resultMap.put(pattern, Pair.of(mLog, readResult));
- return listener;
- }
-
- private void lastLineRemainSendSchedule(String patternCode) {
- /**
- * Collect all data in the last row of data that has not been sent for more than 10 seconds.
- */
- lastFileLineScheduledFuture = ExecutorUtil.scheduleAtFixedRate(() -> SafeRun.run(() -> {
- for (Map.Entry>> referenceEntry : resultMap.entrySet()) {
- MLog mLog = referenceEntry.getValue().getKey();
- String pattern = referenceEntry.getKey();
- Long appendTime = mLog.getAppendTime();
- if (null != appendTime && Instant.now().toEpochMilli() - appendTime > 10 * 1000) {
- if (fileColLock.tryLock()) {
- try {
- String remainMsg = mLog.takeRemainMsg2();
- if (null != remainMsg) {
- log.info("start send last line,pattern:{},patternCode:{},data:{}", pattern, patternCode, remainMsg);
- wrapDataToSend(remainMsg, referenceEntry.getValue().getValue(), pattern, patternCode, getTailPodIp(pattern), appendTime);
- }
- } finally {
- fileColLock.unlock();
- }
- }
- }
- }
- }), 30, 30, TimeUnit.SECONDS);
- }
-
- private void wrapDataToSend(String lineMsg, AtomicReference readResult, String pattern, String patternCode, String ip, long ct) {
- LineMessage lineMessage = createLineMessage(lineMsg, readResult, pattern, patternCode, ip, ct);
-
- updateChannelMemory(channelMemory, pattern, logTypeEnum, ct, readResult);
- lineMessageList.add(lineMessage);
-
- fileReadMap.put(pattern, ct);
- int batchSize = msgExporter.batchExportSize();
- if (lineMessageList.size() > batchSize) {
- List subList = lineMessageList.subList(0, batchSize);
- doExport(subList);
- }
- }
-
- private void readFile(String patternCode, String ip, String filePath, Long channelId) {
- MLog mLog = new MLog();
- if (StringUtils.isNotBlank(this.linePrefix)) {
- mLog.setCustomLinePattern(this.linePrefix);
- }
- String usedIp = StringUtils.isBlank(ip) ? NetUtil.getLocalIp() : ip;
-
- ReadListener listener = initFileReadListener(mLog, patternCode, usedIp, filePath);
- Map fileProgressMap = channelMemory.getFileProgressMap();
- printMapToJson(fileProgressMap, collectOnce);
-
- ILogFile logFile = getLogFile(filePath, listener, fileProgressMap);
- if (null == logFile) {
- log.warn("file:{} marked stop to collect", filePath);
- return;
- }
- //Determine whether the file exists
- if (FileUtil.exist(filePath)) {
- stopOldCurrentFileThread(filePath);
- log.info("start to collect file,channelId:{},fileName:{}", channelId, filePath);
- logFileMap.put(filePath, logFile);
- Future> future = ExecutorUtil.submit(() -> {
- try {
- log.info("thread {} {}", Thread.currentThread().isVirtual(), Thread.currentThread());
- logFile.readLine();
- } catch (Exception e) {
- logFile.setExceptionFinish();
- log.error("logFile read line err,channelId:{},localIp:{},file:{},patternCode:{}", channelId, usedIp, fileProgressMap, patternCode, e);
- }
- });
- futureMap.put(filePath, future);
- } else {
- log.info("file not exist,file:{}", filePath);
- }
- }
-
- private void stopOldCurrentFileThread(String filePath) {
- ILogFile logFile = logFileMap.get(filePath);
- if (null != logFile) {
- logFile.setStop(true);
- }
- Future future = futureMap.get(filePath);
- if (null != future) {
- future.cancel(false);
- }
- }
-
- private void printMapToJson(Map map, boolean collectOnce) {
- if (map == null || map.isEmpty()) {
- return;
- }
-
- Map snapshot;
- try {
- snapshot = new HashMap<>(map);
- } catch (ConcurrentModificationException e) {
- log.error("Failed to create snapshot of fileProgressMap", e);
- return;
- }
-
- if (!collectOnce && !snapshot.isEmpty()) {
- String jsonMap = gson.toJson(snapshot);
- log.info("fileProgressMap: {}", jsonMap);
- }
- }
-
-
- private ILogFile getLogFile(String filePath, ReadListener listener, Map fileProgressMap) {
-
- ChannelMemory.FileProgress progressInfo = fileProgressMap.get(filePath);
-
- if (progressInfo == null || (progressInfo.getFinished() != null && progressInfo.getFinished())) {
- // Stateful pods in k8s do not need to be judged by finished
- if (StringUtils.isNotBlank(channelDefine.getPodType()) &&
- K8sPodTypeEnum.valueOf(channelDefine.getPodType().toUpperCase()) != K8sPodTypeEnum.STATEFUL) {
- return null;
- }
- }
- long pointer = progressInfo != null ? progressInfo.getPointer() : 0L;
- long lineNumber = progressInfo != null ? progressInfo.getCurrentRowNum() : 0L;
- if (progressInfo != null) {
- ChannelMemory.UnixFileNode memoryUnixFileNode = progressInfo.getUnixFileNode();
- if (memoryUnixFileNode != null && memoryUnixFileNode.getSt_ino() != null) {
- log.info("memory file inode info, filePath:{},:{}", filePath, gson.toJson(memoryUnixFileNode));
- ChannelMemory.UnixFileNode currentUnixFileNode = ChannelUtil.buildUnixFileNode(filePath);
- if (currentUnixFileNode != null && currentUnixFileNode.getSt_ino() != null &&
- !Objects.equals(memoryUnixFileNode.getSt_ino(), currentUnixFileNode.getSt_ino())) {
- pointer = 0L;
- lineNumber = 0L;
- log.info("read file start from head, filePath:{}, memory:{}, current:{}",
- filePath, gson.toJson(memoryUnixFileNode), gson.toJson(currentUnixFileNode));
- }
- }
- }
- ChannelEngine channelEngine = Ioc.ins().getBean(ChannelEngine.class);
- ILogFile logFile = channelEngine.logFile();
- logFile.initLogFile(filePath, listener, pointer, lineNumber);
- return logFile;
- }
-
- private void doExport(List subList) {
- try {
- if (CollectionUtils.isEmpty(subList)) {
- return;
- }
- //Current limiting processing
- chain.doFilter();
-
- long current = System.currentTimeMillis();
- msgExporter.export(subList);
- logCounts += subList.size();
- lastSendTime = System.currentTimeMillis();
- channelMemory.setCurrentTime(lastSendTime);
-
- log.info("doExport channelId:{}, send {} message, cost:{}, total send:{}, instanceId:{},", channelDefine.getChannelId(), subList.size(), lastSendTime - current, logCounts, instanceId());
- } catch (Exception e) {
- log.error("doExport Exception:{}", e);
- } finally {
- subList.clear();
- }
- }
-
- @Override
- public void close() {
- log.info("Delete the current collection task,channelId:{}", getChannelId());
- //1.Stop log capture
- for (Map.Entry fileEntry : logFileMap.entrySet()) {
- fileEntry.getValue().setStop(true);
- InodeFileComparator.removeFile(fileEntry.getKey());
- }
- //2. stop exporting
- this.msgExporter.close();
- //3. refresh cache
- memoryService.refreshMemory(channelMemory);
- // stop task
- if (null != scheduledFuture) {
- scheduledFuture.cancel(false);
- }
- if (null != lastFileLineScheduledFuture) {
- lastFileLineScheduledFuture.cancel(false);
- }
- for (Future future : futureMap.values()) {
- future.cancel(false);
- }
- log.info("stop file monitor,fileName:", logFileMap.keySet().stream().collect(Collectors.joining(SYMBOL_COMMA)));
- lineMessageList.clear();
- reOpenMap.clear();
- fileReadMap.clear();
- resultMap.clear();
- }
-
- public Long getChannelId() {
- return channelDefine.getChannelId();
- }
-
- public MsgExporter getMsgExporter() {
- return msgExporter;
- }
-
- @Override
- public void filterRefresh(List confs) {
- try {
- this.chain.loadFilterList(confs);
- this.chain.reset();
- } catch (Exception e) {
- log.error("filter refresh err,new conf:{}", confs, e);
- }
- }
-
- @Override
- public void reOpen(String filePath) {
- fileReopenLock.lock();
- try {
- //Judging the number of openings, it can only be reopened once within 10 seconds.
- final long REOPEN_THRESHOLD = 10 * 1000;
-
- if (reOpenMap.containsKey(filePath) && Instant.now().toEpochMilli() - reOpenMap.get(filePath) < REOPEN_THRESHOLD) {
- log.info("The file has been opened too frequently.Please try again in 10 seconds.fileName:{}," +
- "last time opening time.:{}", filePath, reOpenMap.get(filePath));
- return;
- }
-
- reOpenMap.put(filePath, Instant.now().toEpochMilli());
- log.info("reOpen file:{}", filePath);
-
- if (collectOnce) {
- handleAllFileCollectMonitor(channelDefine.getInput().getPatternCode(), filePath, getChannelId());
- return;
- }
-
- ILogFile logFile = logFileMap.get(filePath);
- String tailPodIp = getTailPodIp(filePath);
- String ip = StringUtils.isBlank(tailPodIp) ? NetUtil.getLocalIp() : tailPodIp;
- if (null == logFile || logFile.getExceptionFinish()) {
- // Add new log file
- readFile(channelDefine.getInput().getPatternCode(), ip, filePath, getChannelId());
- log.info("watch new file create for channelId:{},ip:{},path:{}", getChannelId(), filePath, ip);
- } else {
- handleExistingLogFileWithRetry(logFile, filePath, ip);
- }
- } finally {
- fileReopenLock.unlock();
- }
- }
-
- private void handleExistingLogFileWithRetry(ILogFile logFile, String filePath, String ip) {
- LogFile file = (LogFile) logFile;
-
- int maxRetries = 60;
- int currentRetries = 0;
-
- while (currentRetries < maxRetries) {
- if (file.getPointer() < file.getMaxPointer()) {
- // Normal log segmentation
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- log.error("handleExistingLogFileWithRetry Sleep error", e);
- }
-
- currentRetries++;
- } else {
- logFile.setReOpen(true);
- log.info("file reOpen: channelId:{},ip:{},path:{}", getChannelId(), ip, filePath, file.getFile(), file.getBeforePointerHashCode());
- break;
- }
- }
- }
-
- @Override
- public List getMonitorPathList() {
- return monitorFileList;
- }
-
- public ChannelDefine getChannelDefine() {
- return channelDefine;
- }
-
- public ChannelMemory getChannelMemory() {
- return channelMemory;
- }
-
- /**
- * A file that has not been written to for more than 10 minutes.
- *
- * @return
- */
- @Override
- public Map getExpireFileMap() {
- Map expireMap = new HashMap();
- for (Map.Entry entry : fileReadMap.entrySet()) {
- if (Instant.now().toEpochMilli() - entry.getValue() > TimeUnit.MINUTES.toMillis(10)) {
- expireMap.put(entry.getKey(), entry.getValue());
- }
- }
- return expireMap;
- }
-
- @Override
- public void cancelFile(String file) {
- log.info("cancelFile,file:{}", file);
- for (Map.Entry logFileEntry : logFileMap.entrySet()) {
- if (file.equals(logFileEntry.getKey())) {
- delFileCollList.add(logFileEntry.getKey());
- }
- }
- }
-
- /**
- * Delete the specified directory collection, receive the delete event and no data is written in for more than 1 minute.
- *
- * @param path
- */
- private void delCollFile(String path) {
- boolean shouldRemovePath = false;
- if (logFileMap.containsKey(path) && fileReadMap.containsKey(path)) {
- if ((Instant.now().toEpochMilli() - fileReadMap.get(path)) > TimeUnit.MINUTES.toMillis(1)) {
- cleanFile(path::equals);
- shouldRemovePath = true;
- log.info("stop coll file:{}", path);
- }
- } else {
- shouldRemovePath = true;
- }
- if (shouldRemovePath) {
- log.info("channelId:{},delCollFile remove file:{}", channelDefine.getChannelId(), path);
- delFileCollList.removeIf(data -> StringUtils.equals(data, path));
- }
- }
-
- private void cleanFile(Predicate filter) {
- List delFiles = Lists.newArrayList();
- for (Map.Entry logFileEntry : logFileMap.entrySet()) {
- if (filter.test(logFileEntry.getKey())) {
- InodeFileComparator.removeFile(logFileEntry.getKey());
- logFileEntry.getValue().setStop(true);
- delFiles.add(logFileEntry.getKey());
- log.info("cleanFile,stop file:{}", logFileEntry.getKey());
- }
- }
- for (String delFile : delFiles) {
- logFileMap.remove(delFile);
- }
- delFiles.clear();
- for (Map.Entry futureEntry : futureMap.entrySet()) {
- if (filter.test(futureEntry.getKey())) {
- futureEntry.getValue().cancel(false);
- delFiles.add(futureEntry.getKey());
- }
- }
- for (String delFile : delFiles) {
- futureMap.remove(delFile);
- }
- delFiles.clear();
- delFiles = reOpenMap.keySet().stream()
- .filter(filePath -> filter.test(filePath))
- .collect(Collectors.toList());
- for (String delFile : delFiles) {
- reOpenMap.remove(delFile);
- }
-
- delFiles = fileReadMap.keySet().stream()
- .filter(filePath -> filter.test(filePath))
- .collect(Collectors.toList());
- for (String delFile : delFiles) {
- fileReadMap.remove(delFile);
- }
-
- delFiles = resultMap.keySet().stream()
- .filter(filePath -> filter.test(filePath))
- .collect(Collectors.toList());
- for (String delFile : delFiles) {
- resultMap.remove(delFile);
- }
- }
-
- @Override
- public Long getLogCounts() {
- return this.logCounts;
- }
-
-
-}
diff --git a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/ChannelState.java b/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/ChannelState.java
deleted file mode 100644
index f45e0164a..000000000
--- a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/ChannelState.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xiaomi.mone.log.agent.channel;
-
-import lombok.Data;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-/**
- * @author shanwb
- * @date 2021-08-26
- */
-@Data
-public class ChannelState implements Serializable {
-
- private Long tailId;
-
- private String tailName;
-
- private Long appId;
-
- private String appName;
-
- private String logPattern;
- /**
- * Generated by appId + logPattern
- */
- private String logPatternCode;
- /**
- * Total number of collected and sent rows.
- */
- private Long totalSendCnt;
-
- private List ipList;
-
- private Long collectTime;
-
- private Map stateProgressMap;
-
- @Data
- public static class StateProgress implements Serializable {
- /**
- * ip
- */
- private String ip;
- /**
- * Current collection file
- */
- private String currentFile;
- /**
- * The latest line number currently being collected.
- */
- private Long currentRowNum;
- /**
- * The latest character symbol currently being collected.
- */
- private Long pointer;
-
- /**
- * The maximum character count of the current file.
- */
- private Long fileMaxPointer;
-
- /**
- * Collection time
- */
- private Long ctTime;
- }
-}
diff --git a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/Closeable.java b/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/Closeable.java
deleted file mode 100644
index 9cd872f48..000000000
--- a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/Closeable.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xiaomi.mone.log.agent.channel;
-
-/**
- * @author shanwb
- * @date 2021-07-29
- */
-public interface Closeable {
- /**
- * Shut down
- */
- void close();
-}
diff --git a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/WildcardChannelServiceImpl.java b/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/WildcardChannelServiceImpl.java
deleted file mode 100644
index 77cf99d2f..000000000
--- a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/WildcardChannelServiceImpl.java
+++ /dev/null
@@ -1,461 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xiaomi.mone.log.agent.channel;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.xiaomi.data.push.common.SafeRun;
-import com.xiaomi.mone.file.MLog;
-import com.xiaomi.mone.file.ReadListener;
-import com.xiaomi.mone.file.ReadResult;
-import com.xiaomi.mone.file.common.FileInfo;
-import com.xiaomi.mone.file.common.FileInfoCache;
-import com.xiaomi.mone.file.listener.DefaultMonitorListener;
-import com.xiaomi.mone.file.ozhera.HeraFileMonitor;
-import com.xiaomi.mone.log.agent.channel.file.MonitorFile;
-import com.xiaomi.mone.log.agent.channel.memory.AgentMemoryService;
-import com.xiaomi.mone.log.agent.channel.memory.ChannelMemory;
-import com.xiaomi.mone.log.agent.common.ChannelUtil;
-import com.xiaomi.mone.log.agent.common.ExecutorUtil;
-import com.xiaomi.mone.log.agent.export.MsgExporter;
-import com.xiaomi.mone.log.agent.filter.FilterChain;
-import com.xiaomi.mone.log.agent.input.Input;
-import com.xiaomi.mone.log.api.enums.LogTypeEnum;
-import com.xiaomi.mone.log.api.model.meta.FilterConf;
-import com.xiaomi.mone.log.api.model.msg.LineMessage;
-import com.xiaomi.mone.log.common.PathUtils;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-
-import java.io.File;
-import java.io.IOException;
-import java.time.Instant;
-import java.util.*;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-import static com.xiaomi.mone.log.agent.channel.memory.AgentMemoryService.MEMORY_DIR;
-import static com.xiaomi.mone.log.common.Constant.GSON;
-import static com.xiaomi.mone.log.common.Constant.SYMBOL_MULTI;
-import static com.xiaomi.mone.log.common.PathUtils.*;
-
-/**
- * @author wtt
- * @version 1.0
- * @description Wildcard log collection implementation
- * @date 2023/9/27 11:26
- */
-@Slf4j
-public class WildcardChannelServiceImpl extends AbstractChannelService {
-
- private AgentMemoryService memoryService;
-
- private MsgExporter msgExporter;
-
- private ChannelDefine channelDefine;
-
- private ChannelMemory channelMemory;
-
- private FilterChain chain;
-
- private String logPattern;
-
- private String linePrefix;
-
- private String memoryBasePath;
-
- private static final String POINTER_FILENAME_PREFIX = ".ozhera_pointer";
-
- private List lineMessageList = new ArrayList<>();
-
- private ScheduledFuture> scheduledFuture;
-
- private ScheduledFuture> lastFileLineScheduledFuture;
-
- private List> fileCollFutures = Lists.newArrayList();
-
- private volatile long lastSendTime = System.currentTimeMillis();
-
- private volatile long logCounts = 0;
-
- private ReentrantLock reentrantLock = new ReentrantLock();
-
- private DefaultMonitorListener defaultMonitorListener;
-
-
- public WildcardChannelServiceImpl(MsgExporter msgExporter, AgentMemoryService memoryService,
- ChannelDefine channelDefine, FilterChain chain, String memoryBasePath) {
- this.memoryService = memoryService;
- this.msgExporter = msgExporter;
- this.channelDefine = channelDefine;
- this.chain = chain;
- this.memoryBasePath = memoryBasePath;
- }
-
- @Override
- public void start() {
- Long channelId = channelDefine.getChannelId();
- Input input = channelDefine.getInput();
-
- this.logPattern = input.getLogPattern();
- this.linePrefix = input.getLinePrefix();
-
- List patterns = PathUtils.parseLevel5Directory(logPattern);
- log.info("channel start, logPattern:{},fileList:{}, channelId:{}, instanceId:{}", logPattern, GSON.toJson(patterns), channelId, instanceId());
-
- channelMemory = memoryService.getMemory(channelId);
- if (null == channelMemory) {
- channelMemory = initChannelMemory(channelId, input, patterns, channelDefine);
- }
- memoryService.cleanChannelMemoryContent(channelId, patterns);
-
- startCollectFile(channelId, input, getTailPodIp(logPattern));
-
- startExportQueueDataThread();
- memoryService.refreshMemory(channelMemory);
- log.warn("channelId:{}, channelInstanceId:{} start success! channelDefine:{}", channelId, instanceId(), GSON.toJson(this.channelDefine));
-
- }
-
- private void startCollectFile(Long channelId, Input input, String ip) {
- try {
- // Load the restart file
- String restartFile = buildRestartFilePath();
- FileInfoCache.ins().load(restartFile);
-
- HeraFileMonitor monitor = createFileMonitor(input.getPatternCode(), ip);
-
- String fileExpression = buildFileExpression(input.getLogPattern());
-
- List monitorPaths = buildMonitorPaths(input.getLogPattern());
-
- wildcardGraceShutdown(monitorPaths, fileExpression);
-
- saveCollProgress();
-
- log.info("fileExpression:{}", fileExpression);
- // Compile the file expression pattern
- Pattern pattern = Pattern.compile(fileExpression);
- for (String monitorPath : monitorPaths) {
- fileCollFutures.add(ExecutorUtil.submit(() -> monitorFileChanges(monitor, monitorPath, pattern)));
- }
- } catch (Exception e) {
- log.error("startCollectFile error, channelId: {}, input: {}, ip: {}", channelId, GSON.toJson(input), ip, e);
- }
- }
-
- private void saveCollProgress() {
- ExecutorUtil.scheduleAtFixedRate(() -> SafeRun.run(() -> {
- try {
- for (ReadListener readListener : defaultMonitorListener.getReadListenerList()) {
- readListener.saveProgress();
- }
- cleanUpInvalidFileInfos();
- FileInfoCache.ins().shutdown();
- } catch (Exception e) {
- log.error("saveCollProgress error", e);
- }
- }), 60, 30, TimeUnit.SECONDS);
- }
-
- // 清理无效的文件信息的方法
- private void cleanUpInvalidFileInfos() {
- ConcurrentMap caches = FileInfoCache.ins().caches();
-
- for (Iterator> iterator = caches.entrySet().iterator(); iterator.hasNext(); ) {
- Map.Entry entry = iterator.next();
- FileInfo fileInfo = entry.getValue();
- File file = new File(fileInfo.getFileName());
-
- if (StringUtils.isEmpty(fileInfo.getFileName())) {
- continue;
- }
-
- if (!file.exists()) {
- FileInfoCache.ins().remove(entry.getKey());
- }
- }
- }
-
- private String buildRestartFilePath() {
- return String.format("%s%s%s", memoryBasePath, MEMORY_DIR, POINTER_FILENAME_PREFIX);
- }
-
- private String buildFileExpression(String logPattern) {
- String[] expressSplit = logPattern.split(",");
- if (expressSplit.length == 1) {
- return ChannelUtil.buildSingleTimeExpress(logPattern);
- }
- List expressions = Arrays.stream(expressSplit)
- .map(ChannelUtil::buildSingleTimeExpress)
- .map(s -> {
- String multipleFileName = StringUtils.substringAfterLast(s, SEPARATOR);
- return multipleFileName.contains(PATH_WILDCARD) ? s : s + SYMBOL_MULTI;
- })
- .distinct()
- .toList();
- return expressions.size() == 1 ?
- expressions.get(0) :
- expressions.stream().collect(Collectors.joining("|", MULTI_FILE_PREFIX, MULTI_FILE_SUFFIX));
- }
-
- private void monitorFileChanges(HeraFileMonitor monitor, String monitorPath, Pattern pattern) {
- try {
- log.info("monitorFileChanges,directory:{}", monitorPath);
- monitor.reg(monitorPath, filePath -> {
- boolean matches = pattern.matcher(filePath).matches();
- log.debug("file: {}, matches: {}", filePath, matches);
- return matches;
- });
- } catch (IOException | InterruptedException e) {
- log.error("Error while monitoring files, monitorPath: {}", monitorPath, e);
- }
- }
-
- private List buildMonitorPaths(String filePathExpressName) {
- String[] pathExpress = filePathExpressName.split(",");
-
- List monitorPaths = Arrays.stream(pathExpress)
- .map(express -> {
- String monitorPath = StringUtils.substringBeforeLast(express, SEPARATOR);
- return monitorPath.endsWith(SEPARATOR) ? monitorPath : monitorPath + SEPARATOR;
- })
- .flatMap(monitorPath -> PathUtils.buildMultipleDirectories(monitorPath).stream())
- .distinct()
- .collect(Collectors.toList());
-
- return monitorPaths;
- }
-
-
- private HeraFileMonitor createFileMonitor(String patternCode, String ip) {
- MLog mLog = new MLog();
- if (StringUtils.isNotBlank(this.linePrefix)) {
- mLog.setCustomLinePattern(this.linePrefix);
- }
-
- HeraFileMonitor monitor = new HeraFileMonitor();
- AtomicReference readResult = new AtomicReference<>();
-
- defaultMonitorListener = new DefaultMonitorListener(monitor, event -> {
- readResult.set(event.getReadResult());
- if (readResult.get() == null) {
- log.info("Empty data");
- return;
- }
- processLogLines(readResult, patternCode, ip, mLog);
- });
-
- monitor.setListener(defaultMonitorListener);
-
- /**
- * Collect all data in the last row of data that has not been sent for more than 10 seconds.
- */
- scheduleLastLineSender(mLog, readResult, patternCode, ip);
- return monitor;
- }
-
- private void processLogLines(AtomicReference readResult, String patternCode, String ip, MLog mLog) {
- long currentTime = System.currentTimeMillis();
- ReadResult result = readResult.get();
-
- LogTypeEnum logTypeEnum = getLogTypeEnum();
- result.getLines().forEach(line -> {
- if (LogTypeEnum.APP_LOG_MULTI == logTypeEnum || LogTypeEnum.OPENTELEMETRY == logTypeEnum) {
- line = mLog.append2(line);
- }
-
- if (line != null) {
- try {
- reentrantLock.lock();
- wrapDataToSend(line, readResult, patternCode, ip, currentTime);
- } finally {
- reentrantLock.unlock();
- }
- } else {
- log.debug("Biz log channelId:{}, not a new line", channelDefine.getChannelId());
- }
- });
- }
-
- private void scheduleLastLineSender(MLog mLog, AtomicReference readResult, String patternCode, String ip) {
- lastFileLineScheduledFuture = ExecutorUtil.scheduleAtFixedRate(() -> {
- Long appendTime = mLog.getAppendTime();
- if (appendTime != null && Instant.now().toEpochMilli() - appendTime > 10 * 1000) {
- if (reentrantLock.tryLock()) {
- try {
- String remainMsg = mLog.takeRemainMsg2();
- if (null != remainMsg) {
- log.info("start send last line, fileName:{}, patternCode:{}, data:{}", readResult.get().getFilePathName(), patternCode, remainMsg);
- wrapDataToSend(remainMsg, readResult, patternCode, ip, Instant.now().toEpochMilli());
- }
- } finally {
- reentrantLock.unlock();
- }
- }
- }
- }, 30, 30, TimeUnit.SECONDS);
- }
-
- private void wrapDataToSend(String lineMsg, AtomicReference readResult, String patternCode, String localIp, long ct) {
- String filePathName = readResult.get().getFilePathName();
- LineMessage lineMessage = createLineMessage(lineMsg, readResult, filePathName, patternCode, localIp, ct);
- updateChannelMemory(channelMemory, filePathName, getLogTypeEnum(), ct, readResult);
-
- lineMessageList.add(lineMessage);
-
- int batchSize = msgExporter.batchExportSize();
- if (lineMessageList.size() > batchSize) {
- List subList = lineMessageList.subList(0, batchSize);
- doExport(subList);
- }
- }
-
-
- private void doExport(List subList) {
- try {
- if (CollectionUtils.isEmpty(subList)) {
- return;
- }
- //Current limiting processing
- chain.doFilter();
-
- long current = System.currentTimeMillis();
- msgExporter.export(subList);
- logCounts += subList.size();
- lastSendTime = System.currentTimeMillis();
- channelMemory.setCurrentTime(lastSendTime);
-
- log.info("doExport channelId:{}, send {} message, cost:{}, total send:{}, instanceId:{},", channelDefine.getChannelId(), subList.size(), lastSendTime - current, logCounts, instanceId());
- } catch (Exception e) {
- log.error("doExport Exception", e);
- } finally {
- subList.clear();
- }
- }
-
- private void startExportQueueDataThread() {
- scheduledFuture = ExecutorUtil.scheduleAtFixedRate(() -> {
- // If the mq message is not sent for more than 10 seconds, it will be sent asynchronously.
- if (System.currentTimeMillis() - lastSendTime < 10 * 1000 || CollectionUtils.isEmpty(lineMessageList)) {
- return;
- }
- if (CollectionUtils.isNotEmpty(lineMessageList) && reentrantLock.tryLock()) {
- try {
- this.doExport(lineMessageList);
- } finally {
- reentrantLock.unlock();
- }
- }
- }, 10, 7, TimeUnit.SECONDS);
- }
-
- @Override
- public ChannelDefine getChannelDefine() {
- return channelDefine;
- }
-
- @Override
- public ChannelMemory getChannelMemory() {
- return channelMemory;
- }
-
- @Override
- public Map getExpireFileMap() {
- return Maps.newHashMap();
- }
-
- @Override
- public void cancelFile(String file) {
-
- }
-
- @Override
- public Long getLogCounts() {
- return logCounts;
- }
-
- @Override
- public void refresh(ChannelDefine channelDefine, MsgExporter msgExporter) {
- this.channelDefine = channelDefine;
- if (null != msgExporter) {
- this.msgExporter.close();
- this.msgExporter = msgExporter;
- }
- }
-
- @Override
- public void stopFile(List filePrefixList) {
-
- }
-
- @Override
- public void filterRefresh(List confs) {
- try {
- this.chain.loadFilterList(confs);
- this.chain.reset();
- } catch (Exception e) {
- log.error("filter refresh err,new conf:{}", confs, e);
- }
- }
-
- @Override
- public void reOpen(String filePath) {
-
- }
-
- @Override
- public List getMonitorPathList() {
- return Lists.newArrayList();
- }
-
- @Override
- public void cleanCollectFiles() {
-
- }
-
- @Override
- public void deleteCollFile(String directory) {
-
- }
-
- @Override
- public void close() {
- log.info("Delete the current collection task,channelId:{}", channelDefine.getChannelId());
- //2. stop exporting
- this.msgExporter.close();
- //3. refresh cache
- memoryService.refreshMemory(channelMemory);
- // stop task
- if (null != scheduledFuture) {
- scheduledFuture.cancel(false);
- }
- if (null != lastFileLineScheduledFuture) {
- lastFileLineScheduledFuture.cancel(false);
- }
- for (Future> fileCollFuture : fileCollFutures) {
- fileCollFuture.cancel(false);
- }
- lineMessageList.clear();
- }
-}
diff --git a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/comparator/AppSimilarComparator.java b/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/comparator/AppSimilarComparator.java
deleted file mode 100644
index 4b0cb58c9..000000000
--- a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/comparator/AppSimilarComparator.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xiaomi.mone.log.agent.channel.comparator;
-
-/**
- * @author wtt
- * @version 1.0
- * @description
- * @date 2021/9/15 10:04
- */
-public class AppSimilarComparator implements SimilarComparator {
-
- private Long oldAppId;
-
- public AppSimilarComparator(Long oldAppId) {
- this.oldAppId = oldAppId;
- }
-
- @Override
- public boolean compare(Long newAppId) {
- return Long.compare(oldAppId, newAppId) == 0;
- }
-}
diff --git a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/comparator/FilterSimilarComparator.java b/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/comparator/FilterSimilarComparator.java
deleted file mode 100644
index 3b3f84103..000000000
--- a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/comparator/FilterSimilarComparator.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xiaomi.mone.log.agent.channel.comparator;
-
-import com.xiaomi.mone.log.api.model.meta.FilterConf;
-
-import java.util.List;
-
-public class FilterSimilarComparator implements SimilarComparator> {
- private List filterConf;
-
- public FilterSimilarComparator(List confs) {
- this.filterConf = confs;
- }
-
- @Override
- public boolean compare(List confs) {
- if (confs == null && filterConf == null) {
- return true;
- } else if (confs != null && filterConf != null) {
- if (confs.size() != filterConf.size()) {
- return false;
- }
- return confs.equals(filterConf);
- }
- return false;
- }
-}
diff --git a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/comparator/InputSimilarComparator.java b/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/comparator/InputSimilarComparator.java
deleted file mode 100644
index 90e1943f5..000000000
--- a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/comparator/InputSimilarComparator.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xiaomi.mone.log.agent.channel.comparator;
-
-import com.xiaomi.mone.log.agent.input.Input;
-import lombok.extern.slf4j.Slf4j;
-
-import static com.xiaomi.mone.log.common.Constant.GSON;
-
-/**
- * @author wtt
- * @version 1.0
- * @description
- * @date 2021/9/15 10:08
- */
-@Slf4j
-public class InputSimilarComparator implements SimilarComparator {
-
- private Input oldInput;
-
- public InputSimilarComparator(Input oldInput) {
- this.oldInput = oldInput;
- }
-
- @Override
- public boolean compare(Input newInput) {
- if (null == oldInput) {
- return false;
- }
- if (oldInput == newInput) {
- return true;
- }
- return baseSimilarCompare(newInput);
- }
-
- private boolean baseSimilarCompare(Input newInput) {
- try {
- if (oldInput.equals(newInput)) {
- return true;
- }
- } catch (Exception e) {
- log.error("input compare error:new input:{},oldInput:{}", GSON.toJson(newInput), GSON.toJson(oldInput), e);
- }
- return false;
- }
-
-
-}
diff --git a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/comparator/OutputSimilarComparator.java b/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/comparator/OutputSimilarComparator.java
deleted file mode 100644
index 7965d128e..000000000
--- a/ozhera-log/log-agent/src/main/java/com/xiaomi/mone/log/agent/channel/comparator/OutputSimilarComparator.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (C) 2020 Xiaomi Corporation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.xiaomi.mone.log.agent.channel.comparator;
-
-import com.xiaomi.mone.log.agent.factory.OutPutServiceFactory;
-import com.xiaomi.mone.log.agent.output.Output;
-
-/**
- * @author wtt
- * @version 1.0
- * @description
- * @date 2021/9/15 10:08
- */
-public class OutputSimilarComparator implements SimilarComparator