diff --git a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/common/NetUtils.java b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/common/NetUtils.java index f5111615e..c83ba4f80 100644 --- a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/common/NetUtils.java +++ b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/common/NetUtils.java @@ -13,9 +13,23 @@ */ public class NetUtils { + public static final String OS_NAME = System.getProperty("os.name"); + + private static boolean isLinuxPlatform = false; + + static { + if (OS_NAME != null && OS_NAME.toLowerCase().indexOf("linux") >= 0) { + isLinuxPlatform = true; + } + } + public static EventLoopGroup getEventLoopGroup(boolean remote) { if (remote) { - return new NioEventLoopGroup(); + if(useEpoll()) { + return new EpollEventLoopGroup(); + } else { + return new NioEventLoopGroup(); + } } if (CommonUtils.isWindows()) { return new NioEventLoopGroup(); @@ -50,4 +64,12 @@ public static Class getClientChannelClass(boolean mac, boolean remote) { return mac ? KQueueDomainSocketChannel.class : EpollDomainSocketChannel.class; } + public static boolean useEpoll() { + return isLinuxPlatform() && Epoll.isAvailable(); + } + + public static boolean isLinuxPlatform() { + return isLinuxPlatform; + } + } diff --git a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/UdsClient.java b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/UdsClient.java index 50177d278..263ef7c70 100644 --- a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/UdsClient.java +++ b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/UdsClient.java @@ -115,7 +115,7 @@ protected void initChannel(Channel ch) throws Exception { f.sync(); } catch (Throwable ex) { UdsClientContext.ins().exceptionCaught(ex); - log.error("start error:{} restart", ex.getMessage()); + log.error("start error restart", ex); if (null != group) { group.shutdownGracefully(); } diff --git a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/UdsServer.java b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/UdsServer.java index 463ecc3cf..4bc57d343 100644 --- a/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/UdsServer.java +++ b/jcommon/rcurve/src/main/java/com/xiaomi/data/push/uds/UdsServer.java @@ -29,18 +29,14 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.*; -import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; -import io.netty.util.HashedWheelTimer; import io.netty.util.Timeout; import lombok.Setter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import run.mone.api.IServer; -import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Optional; @@ -103,27 +99,38 @@ public void start(String path) { this.path = path; delPath(); boolean mac = CommonUtils.isMac(); - EventLoopGroup bossGroup = NetUtils.getEventLoopGroup(this.remote); - EventLoopGroup workerGroup = NetUtils.getEventLoopGroup(this.remote); try { - ServerBootstrap b = new ServerBootstrap(); - b.group(bossGroup, workerGroup) - .option(ChannelOption.SO_BACKLOG, 5000) - .option(ChannelOption.SO_RCVBUF, 65535) - .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) - .channel(NetUtils.getServerChannelClass(mac, this.remote)) - .childHandler(new ChannelInitializer() { - @Override - public void initChannel(Channel ch) throws Exception { - ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); - ch.pipeline().addLast(new LengthFieldPrepender(4)); - ch.pipeline().addLast(new UdsServerHandler(processorMap)); + EventLoopGroup bossGroup = NetUtils.getEventLoopGroup(this.remote); + EventLoopGroup workerGroup = NetUtils.getEventLoopGroup(this.remote); + + ServerBootstrap serverBootstrap = + new ServerBootstrap().group(bossGroup, workerGroup) + .channel(NetUtils.getServerChannelClass(mac, this.remote)) + .option(ChannelOption.SO_BACKLOG, 5000) + .option(ChannelOption.SO_RCVBUF, 65535) + .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + + ChannelInitializer initializer = new ChannelInitializer() { + @Override + protected void initChannel(Channel ch) { + ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); + ch.pipeline().addLast(new LengthFieldPrepender(4)); + ch.pipeline().addLast(new UdsServerHandler(processorMap)); + } + }; + + serverBootstrap.childHandler(initializer); + ChannelFuture future = + serverBootstrap.bind("0.0.0.0", this.port).addListener((ChannelFutureListener) future1 -> { + if (future1.isSuccess()) { + log.info("bind:{}", this.remote ? this.host + ":" + this.port : path); } - }); + }).awaitUninterruptibly(); - SocketAddress s = this.remote ? new InetSocketAddress(this.host, this.port) : new DomainSocketAddress(path); - ChannelFuture f = b.bind(s); - log.info("bind:{}", this.remote ? this.host + ":" + this.port : path); + Throwable cause = future.cause(); + if (cause != null) { + log.error(cause.getMessage(), cause); + } if (Optional.ofNullable(this.regConsumer).isPresent()) { RpcServerInfo si = this.getServerInfo(); @@ -131,7 +138,6 @@ public void initChannel(Channel ch) throws Exception { this.regConsumer.accept(si); } - f.channel().closeFuture().sync(); } catch (Throwable ex) { log.error(ex.getMessage(), ex); }