From c71681d522b2647aac0a49b5777df9bd80ede8e7 Mon Sep 17 00:00:00 2001 From: zhangzhiyong Date: Thu, 7 Nov 2024 14:09:54 +0800 Subject: [PATCH] feat: mesh streaming response support (#894) --- .../push/uds/processor/StreamCallback.java | 15 +++ .../data/push/uds/processor/UdsProcessor.java | 10 ++ jcommon/pom.xml | 2 +- jcommon/rcurve/README.md | 4 +- jcommon/rcurve/pom.xml | 17 +++- .../com/xiaomi/data/push/common/NetUtils.java | 6 +- .../com/xiaomi/data/push/uds/UdsClient.java | 23 ++++- .../uds/handler/ClientStreamCallback.java | 15 +++ .../data/push/uds/handler/MessageTypes.java | 18 ++++ .../push/uds/handler/UdsClientHandler.java | 80 ++++++++++++---- .../push/uds/handler/UdsServerHandler.java | 94 ++++++++++++++++--- .../processor/sever/MockStreamProcessor.java | 76 +++++++++++++++ .../com/xiaomi/mone/rcurve/test/UdsTest.java | 77 +++++++++++++-- 13 files changed, 388 insertions(+), 49 deletions(-) create mode 100644 jcommon/api/src/main/java/com/xiaomi/data/push/uds/processor/StreamCallback.java create mode 100644 jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/handler/ClientStreamCallback.java create mode 100644 jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/handler/MessageTypes.java create mode 100644 jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/processor/sever/MockStreamProcessor.java diff --git a/jcommon/api/src/main/java/com/xiaomi/data/push/uds/processor/StreamCallback.java b/jcommon/api/src/main/java/com/xiaomi/data/push/uds/processor/StreamCallback.java new file mode 100644 index 000000000..60fd49bc2 --- /dev/null +++ b/jcommon/api/src/main/java/com/xiaomi/data/push/uds/processor/StreamCallback.java @@ -0,0 +1,15 @@ +package com.xiaomi.data.push.uds.processor; + +/** + * @author goodjava@qq.com + * @date 2024/11/7 11:56 + */ +public interface StreamCallback { + + void onContent(String content); + + void onComplete(); + + void onError(Throwable error); + +} diff --git a/jcommon/api/src/main/java/com/xiaomi/data/push/uds/processor/UdsProcessor.java b/jcommon/api/src/main/java/com/xiaomi/data/push/uds/processor/UdsProcessor.java index 4f505f5fd..9c99399b7 100644 --- a/jcommon/api/src/main/java/com/xiaomi/data/push/uds/processor/UdsProcessor.java +++ b/jcommon/api/src/main/java/com/xiaomi/data/push/uds/processor/UdsProcessor.java @@ -25,6 +25,16 @@ public interface UdsProcessor { Response processRequest(Request request); + // 新增:判断是否为流式处理器 + default boolean isStreamProcessor() { + return false; + } + + // 新增:流式处理方法 + default void processStream(Request request, StreamCallback callback) { + throw new UnsupportedOperationException("Stream processing not supported"); + } + default String cmd() { return ""; diff --git a/jcommon/pom.xml b/jcommon/pom.xml index c4f8c9c42..04a65eb57 100644 --- a/jcommon/pom.xml +++ b/jcommon/pom.xml @@ -134,7 +134,7 @@ ch.qos.logback logback-core - 1.1.2 + 1.2.3 provided diff --git a/jcommon/rcurve/README.md b/jcommon/rcurve/README.md index 86ec3d2dd..2d2d9a75a 100644 --- a/jcommon/rcurve/README.md +++ b/jcommon/rcurve/README.md @@ -2,4 +2,6 @@ + A well-performing mesh underlying communication framework. + Support UDS communication and TCP communication. + Support hessian gson protostuff encoding. -+ The performance is pretty good. \ No newline at end of file ++ The performance is pretty good. ++ jvm ++ --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.math=ALL-UNNAMED --add-opens=java.base/sun.reflect=ALL-UNNAMED --add-exports=java.base/sun.reflect.annotation=ALL-UNNAMED --add-exports=java.base/sun.reflect.generics.reflectiveObjects=ALL-UNNAMED --enable-preview \ No newline at end of file diff --git a/jcommon/rcurve/pom.xml b/jcommon/rcurve/pom.xml index 0392f7078..3c9c2be89 100644 --- a/jcommon/rcurve/pom.xml +++ b/jcommon/rcurve/pom.xml @@ -23,7 +23,7 @@ run.mone api - 1.4.1-jdk20-SNAPSHOT + ${submodule-release.version} run.mone @@ -41,17 +41,28 @@ easy 1.6.0-jdk21-SNAPSHOT + + io.netty netty-all - 4.1.48.Final + 4.1.114.Final + + + io.netty + netty-transport-native-kqueue + 4.1.114.Final + osx-aarch_64 + + + ch.qos.logback logback-classic 1.2.3 - provided + diff --git a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/common/NetUtils.java b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/common/NetUtils.java index 0368e760d..d0a289181 100644 --- a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/common/NetUtils.java +++ b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/common/NetUtils.java @@ -14,9 +14,9 @@ public class NetUtils { public static EventLoopGroup getEventLoopGroup() { - if (CommonUtils.isMac() && CommonUtils.isArch64()) { - return new NioEventLoopGroup(); - } +// if (CommonUtils.isMac() && CommonUtils.isArch64()) { +// return new NioEventLoopGroup(); +// } if (CommonUtils.isWindows()) { return new NioEventLoopGroup(); } diff --git a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/UdsClient.java b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/UdsClient.java index beb3bf9e4..7a25002e7 100644 --- a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/UdsClient.java +++ b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/UdsClient.java @@ -22,6 +22,8 @@ import com.xiaomi.data.push.uds.context.TraceContext; import com.xiaomi.data.push.uds.context.TraceEvent; import com.xiaomi.data.push.uds.context.UdsClientContext; +import com.xiaomi.data.push.uds.handler.MessageTypes; +import com.xiaomi.data.push.uds.handler.ClientStreamCallback; import com.xiaomi.data.push.uds.handler.UdsClientConnetManageHandler; import com.xiaomi.data.push.uds.handler.UdsClientHandler; import com.xiaomi.data.push.uds.po.UdsCommand; @@ -41,7 +43,6 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; /** * @author goodjava@qq.com @@ -134,6 +135,20 @@ public void call(Object msg) { Send.send(this.channel, command); } + + /** + * 发送OpenAI流式请求 + */ + public void stream(UdsCommand command, ClientStreamCallback callback) { + Map attachments = command.getAttachments(); + // 注册回调 + ((UdsClientHandler) channel.pipeline().last()).getStreamCallbacks() + .put(attachments.get(MessageTypes.STREAM_ID_KEY), callback); + // 发送请求 + Send.send(this.channel, command); + } + + @Override public UdsCommand call(UdsCommand req) { Stopwatch sw = Stopwatch.createStarted(); @@ -142,11 +157,11 @@ public UdsCommand call(UdsCommand req) { long id = req.getId(); try { CompletableFuture future = new CompletableFuture<>(); - HashMap hashMap = new HashMap<>(); + HashMap hashMap = new HashMap<>(); hashMap.put("future", future); hashMap.put("async", req.isAsync()); hashMap.put("returnType", req.getReturnClass()); - reqMap.put(req.getId(),hashMap); + reqMap.put(req.getId(), hashMap); Channel channel = this.channel; if (null == channel || !channel.isOpen()) { log.warn("client channel is close"); @@ -160,7 +175,7 @@ public UdsCommand call(UdsCommand req) { wheelTimer.newTimeout(() -> { log.warn("check async udsClient time out auto close:{},{}", req.getId(), req.getTimeout()); reqMap.remove(req.getId()); - }, req.getTimeout()+350); + }, req.getTimeout() + 350); return req; } return (UdsCommand) future.get(req.getTimeout(), TimeUnit.MILLISECONDS); diff --git a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/handler/ClientStreamCallback.java b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/handler/ClientStreamCallback.java new file mode 100644 index 000000000..9ff88e828 --- /dev/null +++ b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/handler/ClientStreamCallback.java @@ -0,0 +1,15 @@ +package com.xiaomi.data.push.uds.handler; + +/** + * @author goodjava@qq.com + * @date 2024/11/7 10:35 + */ +public interface ClientStreamCallback { + + void onContent(String content); + + void onComplete(); + + void onError(Throwable error); + +} diff --git a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/handler/MessageTypes.java b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/handler/MessageTypes.java new file mode 100644 index 000000000..1c5bab680 --- /dev/null +++ b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/handler/MessageTypes.java @@ -0,0 +1,18 @@ +package com.xiaomi.data.push.uds.handler; + +/** + * @author goodjava@qq.com + * @date 2024/11/6 17:41 + */ +public class MessageTypes { + + public static final String TYPE_KEY = "messageType"; + public static final String TYPE_NORMAL = "normal"; + public static final String TYPE_OPENAI = "openai"; + public static final String STREAM_ID_KEY = "streamId"; + public static final String PROMPT_KEY = "prompt"; + public static final String CONTENT_KEY = "content"; + public static final String STATUS_KEY = "status"; + + +} diff --git a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/handler/UdsClientHandler.java b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/handler/UdsClientHandler.java index 417daad7b..d542d6a80 100644 --- a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/handler/UdsClientHandler.java +++ b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/handler/UdsClientHandler.java @@ -26,8 +26,10 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -42,6 +44,9 @@ public class UdsClientHandler extends SimpleChannelInboundHandler { private ConcurrentHashMap,ExecutorService>> processorMap; + @Getter + private final Map streamCallbacks = new ConcurrentHashMap<>(); + public UdsClientHandler(ConcurrentHashMap,ExecutorService>> processorMap) { this.processorMap = processorMap; @@ -70,28 +75,67 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Excep log.warn("processor is null cmd:{}", command.getCmd()); } } else { - Optional.ofNullable(UdsClient.reqMap.get(command.getId())).ifPresent(f -> { - if (Boolean.TRUE.toString().equals(String.valueOf(f.get("async")))) { - Object res = null; - try { - res = processResult(command, (Class) f.get("returnType")); - if (command.getCode() == 0) { - ((CompletableFuture)f.get("future")).complete(res); - } else { - ((CompletableFuture)f.get("future")).completeExceptionally(new RuntimeException(res.toString())); - } - } catch (Exception e) { - log.error("async response error,", e); - ((CompletableFuture)f.get("future")).completeExceptionally(e); + String messageType = command.getAttachments() + .getOrDefault(MessageTypes.TYPE_KEY, MessageTypes.TYPE_NORMAL); + + //流式的操作 + if (MessageTypes.TYPE_OPENAI.equals(messageType)) { + handleOpenAIResponse(command); + } else { + handleNormalResponse(command); + } + + } + } + + private void handleOpenAIResponse(UdsCommand command) { + Map attachments = command.getAttachments(); + String streamId = attachments.get(MessageTypes.STREAM_ID_KEY); + String content = attachments.get(MessageTypes.CONTENT_KEY); + String status = attachments.get(MessageTypes.STATUS_KEY); + + ClientStreamCallback callback = streamCallbacks.get(streamId); + if (callback != null) { + if ("complete".equals(status)) { + callback.onComplete(); + streamCallbacks.remove(streamId); + } else if ("error".equals(status)) { + callback.onError(new RuntimeException(content)); + streamCallbacks.remove(streamId); + } else { + callback.onContent(content); + } + } + } + + + private void handleNormalResponse(UdsCommand command) { + // 保持原有的处理逻辑不变 + Optional.ofNullable(UdsClient.reqMap.get(command.getId())).ifPresent(f -> { + if (Boolean.TRUE.toString().equals(String.valueOf(f.get("async")))) { + Object res = null; + try { + res = processResult(command, (Class) f.get("returnType")); + if (command.getCode() == 0) { + ((CompletableFuture)f.get("future")).complete(res); + } else { + ((CompletableFuture)f.get("future")).completeExceptionally( + new RuntimeException(res.toString()) + ); } - UdsClient.reqMap.remove(command.getId()); - } else { - ((CompletableFuture)f.get("future")).complete(command); + } catch (Exception e) { + log.error("async response error,", e); + ((CompletableFuture)f.get("future")).completeExceptionally(e); } - }); - } + UdsClient.reqMap.remove(command.getId()); + } else { + ((CompletableFuture)f.get("future")).complete(command); + } + }); } + + @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.error("client channelInactive:{}",ctx.channel().id()); diff --git a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/handler/UdsServerHandler.java b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/handler/UdsServerHandler.java index 38fb22441..a66e0333c 100644 --- a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/handler/UdsServerHandler.java +++ b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/handler/UdsServerHandler.java @@ -21,6 +21,7 @@ import com.xiaomi.data.push.uds.UdsServer; import com.xiaomi.data.push.uds.context.UdsServerContext; import com.xiaomi.data.push.uds.po.UdsCommand; +import com.xiaomi.data.push.uds.processor.StreamCallback; import com.xiaomi.data.push.uds.processor.UdsProcessor; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; @@ -32,7 +33,9 @@ import java.util.Map; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; /** * @author goodjava@qq.com @@ -45,33 +48,30 @@ public class UdsServerHandler extends ChannelInboundHandlerAdapter { private Map> m; - public UdsServerHandler(ConcurrentHashMap> processorMap) { this.m = processorMap; } - @Override public void channelRead(ChannelHandlerContext ctx, Object _msg) { try { ByteBuf msg = (ByteBuf) _msg; UdsCommand command = new UdsCommand(); command.decode(msg); - log.debug("server receive:id:{}:{}:{}:{}:{}",command.getId(), command.isRequest(), command.getApp(), command.getCmd(), command.getSerializeType()); + log.debug("server receive:id:{}:{}:{}:{}:{}", command.getId(), command.isRequest(), command.getApp(), command.getCmd(), command.getSerializeType()); if (command.isRequest()) { command.setChannel(ctx.channel()); Pair pair = this.m.get(command.getCmd()); if (null != pair) { UdsProcessor processor = pair.getKey(); - pair.getValue().submit(() -> { - log.debug("server received:{}", command.getId()); - UdsCommand res = processor.processRequest(command); - if (null != res) { - Send.send(ctx.channel(), res); - } - }); + // 判断是否为流式处理 + if (processor.isStreamProcessor()) { + handleStreamRequest(ctx, command, processor); + } else { + handleNormalRequest(pair.getValue(), ctx, command, processor); + } } else { - log.warn("processor is null cmd:{},id:{}", command.getCmd(),command.getId()); + log.warn("processor is null cmd:{},id:{}", command.getCmd(), command.getId()); } } else { Optional.ofNullable(UdsServer.reqMap.get(command.getId())).ifPresent(f -> f.complete(command)); @@ -81,6 +81,74 @@ public void channelRead(ChannelHandlerContext ctx, Object _msg) { } } + private void handleNormalRequest(ExecutorService pool, ChannelHandlerContext ctx, UdsCommand command, UdsProcessor processor) { + pool.submit(() -> { + log.debug("server received:{}", command.getId()); + UdsCommand res = processor.processRequest(command); + if (null != res) { + Send.send(ctx.channel(), res); + } + }); + } + + + private void handleStreamRequest(ChannelHandlerContext ctx, UdsCommand command, + UdsProcessor processor) { + + String streamId = command.getAttachments().getOrDefault( + MessageTypes.STREAM_ID_KEY, + UUID.randomUUID().toString() + ); + + StreamCallback callback = new StreamCallback() { + @Override + public void onContent(String content) { + sendStreamContent(ctx, command, streamId, content); + } + + @Override + public void onComplete() { + sendCompleteResponse(ctx, command, streamId); + } + + @Override + public void onError(Throwable error) { + sendErrorResponse(ctx, command, error.getMessage()); + } + }; + + // 执行流式处理 + processor.processStream(command, callback); + } + + + private void sendErrorResponse(ChannelHandlerContext ctx, UdsCommand command, String error) { + UdsCommand response = UdsCommand.createResponse(command); + response.setCode(-1); + response.setMessage(error); + Send.send(ctx.channel(), response); + } + + + private void sendCompleteResponse(ChannelHandlerContext ctx, UdsCommand request, String streamId) { + UdsCommand response = UdsCommand.createResponse(request); + Map attachments = response.getAttachments(); + attachments.put(MessageTypes.TYPE_KEY, MessageTypes.TYPE_OPENAI); + attachments.put(MessageTypes.STREAM_ID_KEY, streamId); + attachments.put(MessageTypes.STATUS_KEY, "complete"); + Send.send(ctx.channel(), response); + } + + + private void sendStreamContent(ChannelHandlerContext ctx, UdsCommand request, String streamId, String content) { + UdsCommand response = UdsCommand.createResponse(request); + Map attachments = response.getAttachments(); + attachments.put(MessageTypes.TYPE_KEY, MessageTypes.TYPE_OPENAI); + attachments.put(MessageTypes.STREAM_ID_KEY, streamId); + attachments.put(MessageTypes.CONTENT_KEY, content); + Send.send(ctx.channel(), response); + } + @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { @@ -91,7 +159,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) { Attribute attr = ctx.channel().attr(app); String v = attr.get(); - log.error("server channelInactive:{},{},{}", app, v,ctx.channel().id()); + log.error("server channelInactive:{},{},{}", app, v, ctx.channel().id()); if (null != v) { UdsServerContext.ins().remove(v); } @@ -99,7 +167,7 @@ public void channelInactive(ChannelHandlerContext ctx) { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - log.error("exceptionCaught,{}:{}",ctx.channel().id(), cause); + log.error("exceptionCaught,{}:{}", ctx.channel().id(), cause); Attribute attr = ctx.channel().attr(app); String v = attr.get(); if (null != v) { diff --git a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/processor/sever/MockStreamProcessor.java b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/processor/sever/MockStreamProcessor.java new file mode 100644 index 000000000..a69ee22ea --- /dev/null +++ b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/processor/sever/MockStreamProcessor.java @@ -0,0 +1,76 @@ +package com.xiaomi.data.push.uds.processor.sever; + +import com.xiaomi.data.push.uds.handler.MessageTypes; +import com.xiaomi.data.push.uds.po.UdsCommand; +import com.xiaomi.data.push.uds.processor.StreamCallback; +import com.xiaomi.data.push.uds.processor.UdsProcessor; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author goodjava@qq.com + * @date 2024/11/7 11:53 + */ +public class MockStreamProcessor implements UdsProcessor { + + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + + private final Map> activeStreams = new ConcurrentHashMap<>(); + + @Override + public boolean isStreamProcessor() { + return true; + } + + @Override + public String cmd() { + return "stream"; + } + + @Override + public UdsCommand processRequest(UdsCommand udsCommand) { + return null; + } + + + @Override + public void processStream(UdsCommand request, StreamCallback callback) { + + String streamId = request.getAttachments().getOrDefault( + MessageTypes.STREAM_ID_KEY, + UUID.randomUUID().toString() + ); + + AtomicInteger counter = new AtomicInteger(0); + + ScheduledFuture future = scheduler.scheduleAtFixedRate(() -> { + try { + int currentCount = counter.incrementAndGet(); + + if (currentCount <= 10) { + callback.onContent(String.valueOf(currentCount)); + + if (currentCount == 10) { + callback.onComplete(); + ScheduledFuture scheduledFuture = activeStreams.remove(streamId); + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + } + } + } + } catch (Exception e) { + callback.onError(e); + ScheduledFuture scheduledFuture = activeStreams.remove(streamId); + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + } + } + }, 0, 1, TimeUnit.SECONDS); + + activeStreams.put(streamId, future); + + } +} diff --git a/jcommon/rcurve/src/test/java/com/xiaomi/mone/rcurve/test/UdsTest.java b/jcommon/rcurve/src/test/java/com/xiaomi/mone/rcurve/test/UdsTest.java index 128e63ff0..5a2f688cb 100644 --- a/jcommon/rcurve/src/test/java/com/xiaomi/mone/rcurve/test/UdsTest.java +++ b/jcommon/rcurve/src/test/java/com/xiaomi/mone/rcurve/test/UdsTest.java @@ -1,21 +1,23 @@ package com.xiaomi.mone.rcurve.test; import com.xiaomi.data.push.common.CommonUtils; -import com.xiaomi.data.push.common.Pair; import com.xiaomi.data.push.common.RcurveConfig; import com.xiaomi.data.push.common.SafeRun; import com.xiaomi.data.push.uds.UdsClient; import com.xiaomi.data.push.uds.UdsServer; import com.xiaomi.data.push.uds.codes.GsonCodes; import com.xiaomi.data.push.uds.context.UdsClientContext; +import com.xiaomi.data.push.uds.handler.MessageTypes; +import com.xiaomi.data.push.uds.handler.ClientStreamCallback; import com.xiaomi.data.push.uds.po.UdsCommand; import com.xiaomi.data.push.uds.processor.client.CallMethodProcessor; +import com.xiaomi.data.push.uds.processor.sever.MockStreamProcessor; import com.xiaomi.data.push.uds.processor.sever.PingProcessor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.junit.Test; -import java.util.concurrent.Executors; +import java.util.UUID; import java.util.function.Function; import java.util.stream.IntStream; @@ -28,6 +30,8 @@ public class UdsTest { private String path = "/tmp/test.sock"; + private boolean remote = false; + /** * 模拟启动server */ @@ -36,7 +40,11 @@ public void testServer() { //使用gson格式编码 RcurveConfig.ins().init(it -> it.setCodeType(GsonCodes.type)); UdsServer udsServer = new UdsServer(); + udsServer.setRemote(remote); + udsServer.setHost("0.0.0.0"); + udsServer.setPort(7777); udsServer.putProcessor(new PingProcessor()); + udsServer.putProcessor(new MockStreamProcessor()); new Thread(() -> { CommonUtils.sleep(10); UdsCommand command = UdsCommand.createRequest(); @@ -45,10 +53,10 @@ public void testServer() { command.setServiceName("com.xiaomi.youpin.rpc.test.uds.UdsTestServcie"); command.setMethodName("test"); command.setTimeout(1000000); - SafeRun.run(() -> { - UdsCommand res = udsServer.call(command); - System.out.println(res); - }); +// SafeRun.run(() -> { +// UdsCommand res = udsServer.call(command); +// System.out.println(res); +// }); }).start(); udsServer.start(path); } @@ -62,12 +70,16 @@ public void testClient2() { /** * 模拟启动client + * * @throws InterruptedException */ @Test public void testClient() throws InterruptedException { RcurveConfig.ins().init(it -> it.setCodeType(GsonCodes.type)); UdsClient client = new UdsClient("app1"); + client.setRemote(true); + client.setHost("127.0.0.1"); + client.setPort(7777); client.putProcessor(new CallMethodProcessor(new Function() { @Override public Object apply(UdsCommand udsCommand) { @@ -90,4 +102,57 @@ public Object apply(UdsCommand udsCommand) { }); Thread.currentThread().join(); } + + + /** + * 测试StreamClient方法 + * + * 该方法初始化RcurveConfig,创建并配置UdsClient,设置处理器并启动客户端。 + * 然后进行100次ping操作,每次ping都会创建一个UdsCommand请求并通过client.stream方法发送, + * 并使用ClientStreamCallback处理响应内容、完成和错误信息。 + */ + @Test + public void testStreamClient() { + RcurveConfig.ins().init(it -> it.setCodeType(GsonCodes.type)); + UdsClient client = new UdsClient("app1"); + client.setRemote(remote); + client.setHost("127.0.0.1"); + client.setPort(7777); + client.putProcessor(new CallMethodProcessor(new Function() { + @Override + public Object apply(UdsCommand udsCommand) { + return new UdsTestServcie(); + } + })); + client.start(path); + UdsClientContext.ins().channel.set(client.getChannel()); + //调用100次ping + IntStream.range(0, 100).forEach(i -> { + UdsCommand request = UdsCommand.createRequest(); + request.setCmd("stream"); + request.setApp("app1"); + + request.putAtt(MessageTypes.TYPE_KEY, MessageTypes.TYPE_OPENAI); + request.putAtt(MessageTypes.STREAM_ID_KEY, UUID.randomUUID().toString()); + + SafeRun.run(() -> client.stream(request, new ClientStreamCallback() { + @Override + public void onContent(String content) { + System.out.println(content); + } + + @Override + public void onComplete() { + System.out.println("complete"); + } + + @Override + public void onError(Throwable error) { + System.out.println(error.getMessage()); + } + })); + CommonUtils.sleep(15); + }); + } + }