From 5af96d412b68679b40d8518e1a749b85200f6a60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E5=BF=97=E5=8B=87?= Date: Thu, 28 Sep 2023 10:14:45 +0800 Subject: [PATCH] Enhancing Log Agent Server with Multi-Instance Deployment Capability #741 (#742) --- .../com/xiaomi/data/push/rpc/RpcClient.java | 74 ++++++++++++++++++- .../xiaomi/youpin/rpc/test/tcp/TcpTest.java | 36 +++++++-- 2 files changed, 101 insertions(+), 9 deletions(-) diff --git a/jcommon/rpc/src/main/java/com/xiaomi/data/push/rpc/RpcClient.java b/jcommon/rpc/src/main/java/com/xiaomi/data/push/rpc/RpcClient.java index 9e64e3a95..21122bdb5 100644 --- a/jcommon/rpc/src/main/java/com/xiaomi/data/push/rpc/RpcClient.java +++ b/jcommon/rpc/src/main/java/com/xiaomi/data/push/rpc/RpcClient.java @@ -52,6 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -79,9 +80,10 @@ public class RpcClient implements Service { @Setter private List> processorList = Lists.newLinkedList(); - private AtomicReference serverAddrs = new AtomicReference<>(""); + private AtomicReference> serverList = new AtomicReference<>(Lists.newArrayList()); + private int pooSize = Runtime.getRuntime().availableProcessors() * 2 + 1; private ExecutorService defaultPool = new ThreadPoolExecutor(pooSize, pooSize, @@ -215,6 +217,15 @@ private void initAddr() { //在nacos查找ip列表并且是enable的 List list = nacosNaming.getAllInstances(serverName) .stream().filter(it -> it.isHealthy() && it.isEnabled()).collect(Collectors.toList()); + + if (list.size() > 0) { + List addrList = list.stream().map(it -> { + String addr = it.getIp() + ":" + it.getPort(); + return addr; + }).collect(Collectors.toList()); + this.serverList.set(addrList); + } + if (list.size() > 0) { String serverIp = list.get(0).getIp(); int serverPort = list.get(0).getPort(); @@ -339,6 +350,67 @@ public void sendMessage(RemotingCommand req) { } } + + /** + * Send messages to all server nodes. + * + * @param req + */ + public void sendToAllMessage(RemotingCommand req) { + this.serverList.get().stream().forEach(addr -> { + if (StringUtils.isEmpty(addr)) { + logger.warn("send message addr is null"); + return; + } + try { + this.client.invokeOneway(addr, req, req.getTimeout()); + } catch (Exception e) { + throw new RpcException(e.getMessage(), e); + } + }); + } + + public void sendToAllMessage(int code, byte[] body, InvokeCallback invokeCallback) { + this.serverList.get().stream().forEach(addr -> { + if (StringUtils.isEmpty(addr)) { + logger.warn("send message addr is null"); + return; + } + try { + RemotingCommand req = RemotingCommand.createRequestCommand(code); + req.setBody(body); + this.client.invokeAsync(addr, req, req.getTimeout(), invokeCallback); + } catch (Exception e) { + throw new RpcException(e.getMessage(), e); + } + }); + } + + /** + * Send messages according to your own routing rules. + * + * @param code + * @param body + * @param function + * @param invokeCallback + */ + public void sendMessageWithSelect(int code, byte[] body, Function, String> function, InvokeCallback invokeCallback) { + List list = this.serverList.get(); + String addr = function.apply(list); + if (StringUtils.isEmpty(addr)) { + logger.warn("send message addr is null"); + return; + } + try { + RemotingCommand req = RemotingCommand.createRequestCommand(code); + req.setBody(body); + this.client.invokeAsync(addr, req, req.getTimeout(), invokeCallback); + } catch (Exception e) { + throw new RpcException(e.getMessage(), e); + } + } + + public void tell(String addr, int code, String message) { try { RemotingCommand req = RemotingCommand.createRequestCommand(code); diff --git a/jcommon/rpc/src/test/java/com/xiaomi/youpin/rpc/test/tcp/TcpTest.java b/jcommon/rpc/src/test/java/com/xiaomi/youpin/rpc/test/tcp/TcpTest.java index 1f8095ba8..3c68caa4d 100644 --- a/jcommon/rpc/src/test/java/com/xiaomi/youpin/rpc/test/tcp/TcpTest.java +++ b/jcommon/rpc/src/test/java/com/xiaomi/youpin/rpc/test/tcp/TcpTest.java @@ -17,6 +17,7 @@ package com.xiaomi.youpin.rpc.test.tcp; import com.google.common.collect.Lists; +import com.google.gson.Gson; import com.xiaomi.data.push.bo.MPPing; import com.xiaomi.data.push.bo.User; import com.xiaomi.data.push.context.AgentContext; @@ -33,6 +34,7 @@ import com.xiaomi.data.push.task.Task; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.junit.Test; import java.util.List; @@ -44,11 +46,18 @@ @Slf4j public class TcpTest { + @Test + public void testServer3() { + testServer(); + } + @SneakyThrows @Test public void testServer() { - RpcServer rpcServer = new RpcServer("", "demo_server1", false); + String nacosAddr = System.getenv("nacos_addr"); + boolean regNacos = StringUtils.isNotEmpty(nacosAddr) ? true : false; + RpcServer rpcServer = new RpcServer(null == nacosAddr ? "" : nacosAddr, "demo_server_zzy", regNacos); //注册处理器 rpcServer.setProcessorList(Lists.newArrayList( new Pair<>(RpcCmd.mpPingReq, new MPingProcessor()), @@ -69,7 +78,7 @@ public void testServer() { log.info("res:{}", new String(res.getBody())); } - }, 2) + }, 10) )); rpcServer.init(); rpcServer.start(); @@ -121,7 +130,9 @@ public void testServer2() { @SneakyThrows @Test public void testClient() { - RpcClient client = new RpcClient("127.0.0.1:53442"); + String nacosAddr = System.getenv("nacos_addr"); + nacosAddr = null == nacosAddr ? "127.0.0.1:53442" : nacosAddr; + RpcClient client = new RpcClient(nacosAddr, "demo_server_zzy"); client.setReconnection(false); client.setProcessorList(Lists.newArrayList( new Pair<>(RpcCmd.getInfoReq, new GetInfoProcessor()) @@ -135,12 +146,21 @@ public void testClient() { user.setName("zzy"); p.setData("ping"); p.setUser(user); - client.sendMessage(client.getServerAddrs(), RemotingCommand.createMsgPackRequest(RpcCmd.mpPingReq, p), responseFuture -> { - MPPing pong = responseFuture.getResponseCommand().getReq(MPPing.class); - log.info("--->" + pong.getData()); + RemotingCommand req = RemotingCommand.createRequestCommand(RpcCmd.pingReq); + req.setTimeout(2000L); + req.setBody(new Gson().toJson(p).getBytes()); + +// client.sendMessage(client.getServerAddrs(), req, responseFuture -> { +// log.info("--->" + responseFuture.getResponseCommand()); +// }); + +// client.sendToAllMessage(req); + client.sendToAllMessage(RpcCmd.pingReq, "ping".getBytes(), resFuture -> { + log.info("----->{}", resFuture.getResponseCommand()); }); - RemotingCommand res = client.sendMessage(client.getServerAddrs(), RpcCmd.pingReq, "abc", 1000); - log.info("res:{}", new String(res.getBody())); + +// RemotingCommand res = client.sendMessage(client.getServerAddrs(), RpcCmd.pingReq, "abc", 1000); +// log.info("res:{}", new String(res.getBody())); } catch (Exception ex) { log.error(ex.getMessage()); }