Skip to content

Commit

Permalink
Enhancing Log Agent Server with Multi-Instance Deployment Capability #…
Browse files Browse the repository at this point in the history
  • Loading branch information
goodjava committed Sep 26, 2023
1 parent eee8086 commit fabd60b
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ public class RpcClient implements Service {
@Setter
private List<Pair<Integer, NettyRequestProcessor>> processorList = Lists.newLinkedList();


private AtomicReference<String> serverAddrs = new AtomicReference<>("");

private AtomicReference<List<String>> serverList = new AtomicReference<>(Lists.newArrayList());

private int pooSize = Runtime.getRuntime().availableProcessors() * 2 + 1;

private ExecutorService defaultPool = new ThreadPoolExecutor(pooSize, pooSize,
Expand Down Expand Up @@ -215,6 +216,15 @@ private void initAddr() {
//在nacos查找ip列表并且是enable的
List<Instance> list = nacosNaming.getAllInstances(serverName)
.stream().filter(it -> it.isHealthy() && it.isEnabled()).collect(Collectors.toList());

if (list.size() > 0) {
List<String> addrList = list.stream().map(it -> {
String addr = it.getIp() + ":" + it.getPort();
return addr;
}).collect(Collectors.toList());
this.serverList.set(addrList);
}

if (list.size() > 0) {
String serverIp = list.get(0).getIp();
int serverPort = list.get(0).getPort();
Expand Down Expand Up @@ -339,6 +349,43 @@ public void sendMessage(RemotingCommand req) {
}
}


/**
* 向所有server 节点发送信息
*
* @param req
*/
public void sendToAllMessage(RemotingCommand req) {
this.serverList.get().stream().forEach(addr -> {
if (StringUtils.isEmpty(addr)) {
logger.warn("send message addr is null");
return;
}
try {
this.client.invokeOneway(addr, req, req.getTimeout());
} catch (Exception e) {
throw new RpcException(e.getMessage(), e);
}
});
}

public void sendToAllMessage(int code, byte[] body, InvokeCallback invokeCallback) {
this.serverList.get().stream().forEach(addr -> {
if (StringUtils.isEmpty(addr)) {
logger.warn("send message addr is null");
return;
}
try {
RemotingCommand req = RemotingCommand.createRequestCommand(code);
req.setBody(body);
this.client.invokeAsync(addr, req, req.getTimeout(), invokeCallback);
} catch (Exception e) {
throw new RpcException(e.getMessage(), e);
}
});
}


public void tell(String addr, int code, String message) {
try {
RemotingCommand req = RemotingCommand.createRequestCommand(code);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.xiaomi.youpin.rpc.test.tcp;

import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.xiaomi.data.push.bo.MPPing;
import com.xiaomi.data.push.bo.User;
import com.xiaomi.data.push.context.AgentContext;
Expand All @@ -33,6 +34,7 @@
import com.xiaomi.data.push.task.Task;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;

import java.util.List;
Expand All @@ -44,11 +46,18 @@
@Slf4j
public class TcpTest {

@Test
public void testServer3() {
testServer();
}


@SneakyThrows
@Test
public void testServer() {
RpcServer rpcServer = new RpcServer("", "demo_server1", false);
String nacosAddr = System.getenv("nacos_addr");
boolean regNacos = StringUtils.isNotEmpty(nacosAddr) ? true : false;
RpcServer rpcServer = new RpcServer(null == nacosAddr ? "" : nacosAddr, "demo_server_zzy", regNacos);
//注册处理器
rpcServer.setProcessorList(Lists.newArrayList(
new Pair<>(RpcCmd.mpPingReq, new MPingProcessor()),
Expand All @@ -69,7 +78,7 @@ public void testServer() {
log.info("res:{}", new String(res.getBody()));
}

}, 2)
}, 10)
));
rpcServer.init();
rpcServer.start();
Expand Down Expand Up @@ -121,7 +130,9 @@ public void testServer2() {
@SneakyThrows
@Test
public void testClient() {
RpcClient client = new RpcClient("127.0.0.1:53442");
String nacosAddr = System.getenv("nacos_addr");
nacosAddr = null == nacosAddr ? "127.0.0.1:53442" : nacosAddr;
RpcClient client = new RpcClient(nacosAddr, "demo_server_zzy");
client.setReconnection(false);
client.setProcessorList(Lists.newArrayList(
new Pair<>(RpcCmd.getInfoReq, new GetInfoProcessor())
Expand All @@ -135,12 +146,21 @@ public void testClient() {
user.setName("zzy");
p.setData("ping");
p.setUser(user);
client.sendMessage(client.getServerAddrs(), RemotingCommand.createMsgPackRequest(RpcCmd.mpPingReq, p), responseFuture -> {
MPPing pong = responseFuture.getResponseCommand().getReq(MPPing.class);
log.info("--->" + pong.getData());
RemotingCommand req = RemotingCommand.createRequestCommand(RpcCmd.pingReq);
req.setTimeout(2000L);
req.setBody(new Gson().toJson(p).getBytes());

// client.sendMessage(client.getServerAddrs(), req, responseFuture -> {
// log.info("--->" + responseFuture.getResponseCommand());
// });

// client.sendToAllMessage(req);
client.sendToAllMessage(RpcCmd.pingReq, "ping".getBytes(), resFuture -> {
log.info("----->{}", resFuture.getResponseCommand());
});
RemotingCommand res = client.sendMessage(client.getServerAddrs(), RpcCmd.pingReq, "abc", 1000);
log.info("res:{}", new String(res.getBody()));

// RemotingCommand res = client.sendMessage(client.getServerAddrs(), RpcCmd.pingReq, "abc", 1000);
// log.info("res:{}", new String(res.getBody()));
} catch (Exception ex) {
log.error(ex.getMessage());
}
Expand Down

0 comments on commit fabd60b

Please sign in to comment.