From 894fb14d0fee3e085ea01ece190b2dd2ef0faf3a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E5=BF=97=E5=8B=87?= Date: Tue, 12 Nov 2024 10:58:00 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20uds=20stream=20=E5=A4=84=E7=90=86?= =?UTF-8?q?=E4=BC=98=E5=8C=96=20(#905)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: uds 支持协程 * feat: uds 超时优化 * feat: uds stream 处理优化 --- .../docean-plugin-sidecar/pom.xml | 4 +- .../docean/plugin/sidecar/bo/StreamMsg.java | 24 ++++++ .../interceptor/AbstractInterceptor.java | 53 +++++++++++-- .../processor/sever/MockStreamProcessor.java | 76 +++++++++++++++++++ 4 files changed, 150 insertions(+), 7 deletions(-) create mode 100644 jcommon/docean-plugin/docean-plugin-sidecar/src/main/java/run/mone/docean/plugin/sidecar/bo/StreamMsg.java create mode 100644 jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/processor/sever/MockStreamProcessor.java diff --git a/jcommon/docean-plugin/docean-plugin-sidecar/pom.xml b/jcommon/docean-plugin/docean-plugin-sidecar/pom.xml index 54a7fdec3..03ac4f26d 100644 --- a/jcommon/docean-plugin/docean-plugin-sidecar/pom.xml +++ b/jcommon/docean-plugin/docean-plugin-sidecar/pom.xml @@ -19,11 +19,13 @@ docean-plugin-config 1.6.0-jdk21-SNAPSHOT + run.mone rcurve - 1.4.2-SNAPSHOT + 1.6.0-jdk21-SNAPSHOT + run.mone grpc diff --git a/jcommon/docean-plugin/docean-plugin-sidecar/src/main/java/run/mone/docean/plugin/sidecar/bo/StreamMsg.java b/jcommon/docean-plugin/docean-plugin-sidecar/src/main/java/run/mone/docean/plugin/sidecar/bo/StreamMsg.java new file mode 100644 index 000000000..9cb77f75b --- /dev/null +++ b/jcommon/docean-plugin/docean-plugin-sidecar/src/main/java/run/mone/docean/plugin/sidecar/bo/StreamMsg.java @@ -0,0 +1,24 @@ +package run.mone.docean.plugin.sidecar.bo; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * @author goodjava@qq.com + * @date 2024/11/12 09:13 + */ +@Data +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class StreamMsg implements Serializable { + + private String type; + + private String content; + +} diff --git a/jcommon/docean-plugin/docean-plugin-sidecar/src/main/java/run/mone/docean/plugin/sidecar/interceptor/AbstractInterceptor.java b/jcommon/docean-plugin/docean-plugin-sidecar/src/main/java/run/mone/docean/plugin/sidecar/interceptor/AbstractInterceptor.java index 0caa49b66..c03e8f1f6 100644 --- a/jcommon/docean-plugin/docean-plugin-sidecar/src/main/java/run/mone/docean/plugin/sidecar/interceptor/AbstractInterceptor.java +++ b/jcommon/docean-plugin/docean-plugin-sidecar/src/main/java/run/mone/docean/plugin/sidecar/interceptor/AbstractInterceptor.java @@ -16,9 +16,12 @@ package run.mone.docean.plugin.sidecar.interceptor; +import com.xiaomi.data.push.common.SafeRun; import com.xiaomi.data.push.uds.UdsClient; import com.xiaomi.data.push.uds.codes.CodesFactory; import com.xiaomi.data.push.uds.codes.ICodes; +import com.xiaomi.data.push.uds.handler.ClientStreamCallback; +import com.xiaomi.data.push.uds.handler.MessageTypes; import com.xiaomi.data.push.uds.po.UdsCommand; import com.xiaomi.youpin.docean.Ioc; import com.xiaomi.youpin.docean.plugin.config.Config; @@ -26,10 +29,13 @@ import lombok.extern.slf4j.Slf4j; import net.sf.cglib.proxy.MethodInterceptor; import net.sf.cglib.proxy.MethodProxy; +import run.mone.docean.plugin.sidecar.bo.StreamMsg; import java.lang.reflect.Method; import java.util.Arrays; +import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; /** * @author goodjava@qq.com @@ -38,9 +44,9 @@ @Slf4j public abstract class AbstractInterceptor implements MethodInterceptor { - private Ioc ioc; + protected Ioc ioc; - private Config config; + protected Config config; @Setter private ExceptionProcessor exceptionProcessor = new ExceptionProcessor(); @@ -51,6 +57,9 @@ public abstract class AbstractInterceptor implements MethodInterceptor { @Setter private ParamProcessor paramProcessor = new ParamProcessor(); + @Setter + private Consumer consumer; + public AbstractInterceptor(Ioc ioc, Config config) { this.ioc = ioc; @@ -63,6 +72,10 @@ public void intercept1(Context ctx, UdsCommand req, Object o) { } + public boolean isStream(Object obj, Method method, Object[] objects) { + return false; + } + @Override public Object intercept(Object obj, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable { @@ -90,14 +103,43 @@ public Object intercept(Object obj, Method method, Object[] objects, MethodProxy this.intercept0(ctx, command, obj, method, objects); if (ctx.getData().getOrDefault("skip_code", "false").equals("false")) { - command.setParamTypes(Arrays.stream(method.getParameterTypes()).map(it -> it.getName()).toArray(String[]::new)); + command.setParamTypes(Arrays.stream(method.getParameterTypes()).map(Class::getName).toArray(String[]::new)); ICodes codes = CodesFactory.getCodes(command.getSerializeType()); - command.setByteParams(Arrays.stream(objects).map(it -> codes.encode(it)).toArray(byte[][]::new)); + command.setByteParams(Arrays.stream(objects).map(codes::encode).toArray(byte[][]::new)); } this.intercept1(ctx, command, obj); //信息发送server(mesh)层 UdsClient client = ioc.getBean("sideCarClient"); + //流式处理(sidecar会陆续返回结果) + if (isStream(obj, method, objects)) { + log.info("is stream"); + command.putAtt(MessageTypes.TYPE_KEY, MessageTypes.TYPE_OPENAI); + command.putAtt(MessageTypes.STREAM_ID_KEY, UUID.randomUUID().toString()); + SafeRun.run(() -> client.stream(command, new ClientStreamCallback() { + @Override + public void onContent(String content) { + log.debug("onContent: {}", content); + consumer.accept(StreamMsg.builder().type("onContent").content(content).build()); + } + + @Override + public void onComplete() { + log.info("onComplete"); + consumer.accept(StreamMsg.builder().type("onComplete").build()); + } + + @Override + public void onError(Throwable error) { + log.error("Error in StreamSideCarCallMethodInterceptor", error); + consumer.accept(StreamMsg.builder().type("onError").content(error.getMessage()).build()); + } + })); + return null; + } + + + //单次调用的处理 UdsCommand res = null; int retries = command.getRetries(); int i = 0; @@ -121,8 +163,7 @@ public Object intercept(Object obj, Method method, Object[] objects, MethodProxy } } //处理参数 - paramProcessor.processResult(res,objects); - + paramProcessor.processResult(res, objects); //处理结果 return resultProcessor.processResult(command, res, method); } diff --git a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/processor/sever/MockStreamProcessor.java b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/processor/sever/MockStreamProcessor.java new file mode 100644 index 000000000..a69ee22ea --- /dev/null +++ b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/processor/sever/MockStreamProcessor.java @@ -0,0 +1,76 @@ +package com.xiaomi.data.push.uds.processor.sever; + +import com.xiaomi.data.push.uds.handler.MessageTypes; +import com.xiaomi.data.push.uds.po.UdsCommand; +import com.xiaomi.data.push.uds.processor.StreamCallback; +import com.xiaomi.data.push.uds.processor.UdsProcessor; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author goodjava@qq.com + * @date 2024/11/7 11:53 + */ +public class MockStreamProcessor implements UdsProcessor { + + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + + private final Map> activeStreams = new ConcurrentHashMap<>(); + + @Override + public boolean isStreamProcessor() { + return true; + } + + @Override + public String cmd() { + return "stream"; + } + + @Override + public UdsCommand processRequest(UdsCommand udsCommand) { + return null; + } + + + @Override + public void processStream(UdsCommand request, StreamCallback callback) { + + String streamId = request.getAttachments().getOrDefault( + MessageTypes.STREAM_ID_KEY, + UUID.randomUUID().toString() + ); + + AtomicInteger counter = new AtomicInteger(0); + + ScheduledFuture future = scheduler.scheduleAtFixedRate(() -> { + try { + int currentCount = counter.incrementAndGet(); + + if (currentCount <= 10) { + callback.onContent(String.valueOf(currentCount)); + + if (currentCount == 10) { + callback.onComplete(); + ScheduledFuture scheduledFuture = activeStreams.remove(streamId); + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + } + } + } + } catch (Exception e) { + callback.onError(e); + ScheduledFuture scheduledFuture = activeStreams.remove(streamId); + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + } + } + }, 0, 1, TimeUnit.SECONDS); + + activeStreams.put(streamId, future); + + } +}