Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: uds 支持协程 #903

Merged
merged 1 commit into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
*/
public class NetUtils {

public static EventLoopGroup getEventLoopGroup() {
// if (CommonUtils.isMac() && CommonUtils.isArch64()) {
// return new NioEventLoopGroup();
// }
public static EventLoopGroup getEventLoopGroup(boolean remote) {
if (remote) {
return new NioEventLoopGroup();
}
if (CommonUtils.isWindows()) {
return new NioEventLoopGroup();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public UdsClient(String id) {


private EventLoopGroup getEventLoopGroup() {
return NetUtils.getEventLoopGroup();
return NetUtils.getEventLoopGroup(this.remote);
}


Expand Down Expand Up @@ -135,10 +135,6 @@ public void call(Object msg) {
Send.send(this.channel, command);
}


/**
* 发送OpenAI流式请求
*/
public void stream(UdsCommand command, ClientStreamCallback callback) {
Map<String, String> attachments = command.getAttachments();
// 注册回调
Expand Down Expand Up @@ -169,13 +165,23 @@ public UdsCommand call(UdsCommand req) {
}
log.debug("start send,id:{}", id);
Send.send(channel, req);

wheelTimer.newTimeout(() -> {
log.warn("check async udsClient time out auto close:{},{}", req.getId(), req.getTimeout());
HashMap<String, Object> map = reqMap.remove(req.getId());
if (null != map) {
CompletableFuture<Object> f = (CompletableFuture<Object>) map.get("future");
if (null != f && !f.isDone()) {
future.completeExceptionally(
new TimeoutException("Request timeout: " + req.getTimeout())
);
}
}
}, req.getTimeout() + 350);

//异步还是同步
if (req.isAsync()) {
req.setCompletableFuture(future);
wheelTimer.newTimeout(() -> {
log.warn("check async udsClient time out auto close:{},{}", req.getId(), req.getTimeout());
reqMap.remove(req.getId());
}, req.getTimeout() + 350);
return req;
}
return (UdsCommand) future.get(req.getTimeout(), TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.common.base.Stopwatch;
import com.xiaomi.data.push.common.*;
import com.xiaomi.data.push.uds.WheelTimer.UdsWheelTimer;
import com.xiaomi.data.push.uds.context.TraceContext;
import com.xiaomi.data.push.uds.context.TraceEvent;
import com.xiaomi.data.push.uds.context.UdsServerContext;
Expand All @@ -31,6 +32,8 @@
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -42,7 +45,6 @@
import java.nio.file.Paths;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

/**
Expand All @@ -58,6 +60,10 @@ public class UdsServer implements IServer<UdsCommand> {

private ExecutorService pool = Executors.newVirtualThreadPerTaskExecutor();

// 创建时间轮,可以根据需要调整tick时间和轮子大小
private UdsWheelTimer wheelTimer = new UdsWheelTimer();


/**
* 是否使用remote模式(标准tcp)
*/
Expand Down Expand Up @@ -97,8 +103,8 @@ public void start(String path) {
this.path = path;
delPath();
boolean mac = CommonUtils.isMac();
EventLoopGroup bossGroup = NetUtils.getEventLoopGroup();
EventLoopGroup workerGroup = NetUtils.getEventLoopGroup();
EventLoopGroup bossGroup = NetUtils.getEventLoopGroup(this.remote);
EventLoopGroup workerGroup = NetUtils.getEventLoopGroup(this.remote);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
Expand Down Expand Up @@ -187,9 +193,24 @@ public UdsCommand call(UdsCommand req) {
TraceContext context = new TraceContext();
context.enter();
long id = req.getId();

// 创建超时任务
Timeout timeout = wheelTimer.newTimeout(() -> {
CompletableFuture<UdsCommand> future = reqMap.remove(id);
if (future != null && !future.isDone()) {
future.completeExceptionally(
new TimeoutException("Request timeout: " + req.getTimeout())
);
}
}, req.getTimeout());

try {
String app = req.getApp();
CompletableFuture<UdsCommand> future = new CompletableFuture<>();

// 添加完成时取消定时任务的回调
future.whenComplete((k, v) -> timeout.cancel());

reqMap.put(id, future);
Channel channel = UdsServerContext.ins().channel(app);
if (null == channel || !channel.isOpen()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;

/**
* @author [email protected]
Expand Down

This file was deleted.

Loading