Skip to content

Commit

Permalink
feat: uds stream 处理优化 (#905)
Browse files Browse the repository at this point in the history
* feat: uds 支持协程

* feat: uds 超时优化

* feat: uds stream 处理优化
  • Loading branch information
goodjava authored Nov 12, 2024
1 parent 1cd046e commit 894fb14
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 7 deletions.
4 changes: 3 additions & 1 deletion jcommon/docean-plugin/docean-plugin-sidecar/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
<artifactId>docean-plugin-config</artifactId>
<version>1.6.0-jdk21-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>run.mone</groupId>
<artifactId>rcurve</artifactId>
<version>1.4.2-SNAPSHOT</version>
<version>1.6.0-jdk21-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>run.mone</groupId>
<artifactId>grpc</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package run.mone.docean.plugin.sidecar.bo;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

/**
* @author [email protected]
* @date 2024/11/12 09:13
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class StreamMsg implements Serializable {

private String type;

private String content;

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,26 @@

package run.mone.docean.plugin.sidecar.interceptor;

import com.xiaomi.data.push.common.SafeRun;
import com.xiaomi.data.push.uds.UdsClient;
import com.xiaomi.data.push.uds.codes.CodesFactory;
import com.xiaomi.data.push.uds.codes.ICodes;
import com.xiaomi.data.push.uds.handler.ClientStreamCallback;
import com.xiaomi.data.push.uds.handler.MessageTypes;
import com.xiaomi.data.push.uds.po.UdsCommand;
import com.xiaomi.youpin.docean.Ioc;
import com.xiaomi.youpin.docean.plugin.config.Config;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import net.sf.cglib.proxy.MethodInterceptor;
import net.sf.cglib.proxy.MethodProxy;
import run.mone.docean.plugin.sidecar.bo.StreamMsg;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
* @author [email protected]
Expand All @@ -38,9 +44,9 @@
@Slf4j
public abstract class AbstractInterceptor implements MethodInterceptor {

private Ioc ioc;
protected Ioc ioc;

private Config config;
protected Config config;

@Setter
private ExceptionProcessor exceptionProcessor = new ExceptionProcessor();
Expand All @@ -51,6 +57,9 @@ public abstract class AbstractInterceptor implements MethodInterceptor {
@Setter
private ParamProcessor paramProcessor = new ParamProcessor();

@Setter
private Consumer<StreamMsg> consumer;


public AbstractInterceptor(Ioc ioc, Config config) {
this.ioc = ioc;
Expand All @@ -63,6 +72,10 @@ public void intercept1(Context ctx, UdsCommand req, Object o) {

}

public boolean isStream(Object obj, Method method, Object[] objects) {
return false;
}


@Override
public Object intercept(Object obj, Method method, Object[] objects, MethodProxy methodProxy) throws Throwable {
Expand Down Expand Up @@ -90,14 +103,43 @@ public Object intercept(Object obj, Method method, Object[] objects, MethodProxy

this.intercept0(ctx, command, obj, method, objects);
if (ctx.getData().getOrDefault("skip_code", "false").equals("false")) {
command.setParamTypes(Arrays.stream(method.getParameterTypes()).map(it -> it.getName()).toArray(String[]::new));
command.setParamTypes(Arrays.stream(method.getParameterTypes()).map(Class::getName).toArray(String[]::new));
ICodes codes = CodesFactory.getCodes(command.getSerializeType());
command.setByteParams(Arrays.stream(objects).map(it -> codes.encode(it)).toArray(byte[][]::new));
command.setByteParams(Arrays.stream(objects).map(codes::encode).toArray(byte[][]::new));
}
this.intercept1(ctx, command, obj);
//信息发送server(mesh)层
UdsClient client = ioc.getBean("sideCarClient");

//流式处理(sidecar会陆续返回结果)
if (isStream(obj, method, objects)) {
log.info("is stream");
command.putAtt(MessageTypes.TYPE_KEY, MessageTypes.TYPE_OPENAI);
command.putAtt(MessageTypes.STREAM_ID_KEY, UUID.randomUUID().toString());
SafeRun.run(() -> client.stream(command, new ClientStreamCallback() {
@Override
public void onContent(String content) {
log.debug("onContent: {}", content);
consumer.accept(StreamMsg.builder().type("onContent").content(content).build());
}

@Override
public void onComplete() {
log.info("onComplete");
consumer.accept(StreamMsg.builder().type("onComplete").build());
}

@Override
public void onError(Throwable error) {
log.error("Error in StreamSideCarCallMethodInterceptor", error);
consumer.accept(StreamMsg.builder().type("onError").content(error.getMessage()).build());
}
}));
return null;
}


//单次调用的处理
UdsCommand res = null;
int retries = command.getRetries();
int i = 0;
Expand All @@ -121,8 +163,7 @@ public Object intercept(Object obj, Method method, Object[] objects, MethodProxy
}
}
//处理参数
paramProcessor.processResult(res,objects);

paramProcessor.processResult(res, objects);
//处理结果
return resultProcessor.processResult(command, res, method);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package com.xiaomi.data.push.uds.processor.sever;

import com.xiaomi.data.push.uds.handler.MessageTypes;
import com.xiaomi.data.push.uds.po.UdsCommand;
import com.xiaomi.data.push.uds.processor.StreamCallback;
import com.xiaomi.data.push.uds.processor.UdsProcessor;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author [email protected]
* @date 2024/11/7 11:53
*/
public class MockStreamProcessor implements UdsProcessor<UdsCommand, UdsCommand> {

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

private final Map<String, ScheduledFuture<?>> activeStreams = new ConcurrentHashMap<>();

@Override
public boolean isStreamProcessor() {
return true;
}

@Override
public String cmd() {
return "stream";
}

@Override
public UdsCommand processRequest(UdsCommand udsCommand) {
return null;
}


@Override
public void processStream(UdsCommand request, StreamCallback callback) {

String streamId = request.getAttachments().getOrDefault(
MessageTypes.STREAM_ID_KEY,
UUID.randomUUID().toString()
);

AtomicInteger counter = new AtomicInteger(0);

ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(() -> {
try {
int currentCount = counter.incrementAndGet();

if (currentCount <= 10) {
callback.onContent(String.valueOf(currentCount));

if (currentCount == 10) {
callback.onComplete();
ScheduledFuture<?> scheduledFuture = activeStreams.remove(streamId);
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
}
}
}
} catch (Exception e) {
callback.onError(e);
ScheduledFuture<?> scheduledFuture = activeStreams.remove(streamId);
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
}
}
}, 0, 1, TimeUnit.SECONDS);

activeStreams.put(streamId, future);

}
}

0 comments on commit 894fb14

Please sign in to comment.