From 35b8d63ef4c71452511c97175a8e680429a6a36e Mon Sep 17 00:00:00 2001 From: lynn Date: Sat, 28 May 2022 00:13:25 +0800 Subject: [PATCH] 2.4.0 --- Reverse Proxy Tool.iss | 2 +- pom.xml | 2 +- rpt-base/pom.xml | 2 +- .../rpt/base/executor/MessageDispatcher.java | 32 +++ .../rpt/base/executor/MessageExecutor.java | 16 ++ .../promptness/rpt/base/utils/Constants.java | 25 ++- rpt-client/pom.xml | 2 +- .../rpt/client/ClientApplication.java | 2 +- .../rpt/client/cache/ProxyChannelCache.java | 12 +- .../rpt/client/executor/AuthExecutor.java | 37 +++ .../client/executor/ConnectedExecutor.java | 76 +++++++ .../rpt/client/executor/DataExecutor.java | 28 +++ .../client/executor/DisconnectedExecutor.java | 29 +++ .../rpt/client/handler/ClientHandler.java | 112 +--------- .../rpt/client/handler/LocalHandler.java | 31 +-- ...omptness.rpt.base.executor.MessageExecutor | 4 + rpt-desktop/pom.xml | 2 +- .../rpt/desktop/DesktopApplication.java | 4 +- .../desktop/controller/ConfigController.java | 2 +- .../desktop/controller/MenuController.java | 6 +- .../rpt/desktop/utils/SystemTrayUtil.java | 2 +- rpt-server/pom.xml | 2 +- .../rpt/server/cache/ServerChannelCache.java | 14 ++ .../server/executor/ConnectedExecutor.java | 46 ++++ .../rpt/server/executor/DataExecutor.java | 27 +++ .../server/executor/DisconnectedExecutor.java | 34 +++ .../rpt/server/executor/RegisterExecutor.java | 148 ++++++++++++ .../rpt/server/handler/RemoteHandler.java | 15 +- .../rpt/server/handler/RequestHandler.java | 23 +- .../rpt/server/handler/ServerHandler.java | 211 +++--------------- .../StaticDispatcher.java} | 16 +- ...omptness.rpt.base.executor.MessageExecutor | 4 + 32 files changed, 601 insertions(+), 367 deletions(-) create mode 100644 rpt-base/src/main/java/cn/promptness/rpt/base/executor/MessageDispatcher.java create mode 100644 rpt-base/src/main/java/cn/promptness/rpt/base/executor/MessageExecutor.java create mode 100644 rpt-client/src/main/java/cn/promptness/rpt/client/executor/AuthExecutor.java create mode 100644 rpt-client/src/main/java/cn/promptness/rpt/client/executor/ConnectedExecutor.java create mode 100644 rpt-client/src/main/java/cn/promptness/rpt/client/executor/DataExecutor.java create mode 100644 rpt-client/src/main/java/cn/promptness/rpt/client/executor/DisconnectedExecutor.java create mode 100644 rpt-client/src/main/resources/META-INF/services/cn.promptness.rpt.base.executor.MessageExecutor create mode 100644 rpt-server/src/main/java/cn/promptness/rpt/server/executor/ConnectedExecutor.java create mode 100644 rpt-server/src/main/java/cn/promptness/rpt/server/executor/DataExecutor.java create mode 100644 rpt-server/src/main/java/cn/promptness/rpt/server/executor/DisconnectedExecutor.java create mode 100644 rpt-server/src/main/java/cn/promptness/rpt/server/executor/RegisterExecutor.java rename rpt-server/src/main/java/cn/promptness/rpt/server/{cache/DispatcherCache.java => page/StaticDispatcher.java} (92%) create mode 100644 rpt-server/src/main/resources/META-INF/services/cn.promptness.rpt.base.executor.MessageExecutor diff --git a/Reverse Proxy Tool.iss b/Reverse Proxy Tool.iss index a4edfb4..a8fe952 100644 --- a/Reverse Proxy Tool.iss +++ b/Reverse Proxy Tool.iss @@ -2,7 +2,7 @@ ; SEE THE DOCUMENTATION FOR DETAILS ON CREATING INNO SETUP SCRIPT FILES! #define MyAppName "Reverse Proxy Tool" -#define MyAppVersion "2.3.5" +#define MyAppVersion "2.4.0" #define MyAppPublisher "Lynn" #define MyAppURL "https://www.holme.cn/" #define MyAppExeName "rpt-desktop.exe" diff --git a/pom.xml b/pom.xml index 6bdedd7..abc86bc 100644 --- a/pom.xml +++ b/pom.xml @@ -7,7 +7,7 @@ cn.promptness rpt pom - 2.3.5 + 2.4.0 rpt-base rpt-server diff --git a/rpt-base/pom.xml b/rpt-base/pom.xml index 7e43825..76cb5f8 100644 --- a/rpt-base/pom.xml +++ b/rpt-base/pom.xml @@ -5,7 +5,7 @@ rpt cn.promptness - 2.3.5 + 2.4.0 4.0.0 diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/executor/MessageDispatcher.java b/rpt-base/src/main/java/cn/promptness/rpt/base/executor/MessageDispatcher.java new file mode 100644 index 0000000..8d08e7b --- /dev/null +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/executor/MessageDispatcher.java @@ -0,0 +1,32 @@ +package cn.promptness.rpt.base.executor; + +import cn.promptness.rpt.base.protocol.Message; +import cn.promptness.rpt.base.protocol.MessageType; +import io.netty.channel.ChannelHandlerContext; + +import java.util.HashMap; +import java.util.Map; +import java.util.ServiceLoader; + +public class MessageDispatcher { + + private static final Map MESSAGE_EXECUTOR_MAP = new HashMap<>(); + + static { + ServiceLoader executors = ServiceLoader.load(MessageExecutor.class); + for (MessageExecutor executor : executors) { + MessageExecutor messageExecutor = MESSAGE_EXECUTOR_MAP.put(executor.getMessageType(), executor); + if (messageExecutor != null) { + throw new RuntimeException(String.format("%s Message Executor Register Repeat", messageExecutor.getMessageType().name())); + } + } + } + + public void handle(ChannelHandlerContext context, Message message) throws Exception { + MessageExecutor messageExecutor = MESSAGE_EXECUTOR_MAP.get(message.getType()); + if (messageExecutor == null) { + return; + } + messageExecutor.execute(context, message); + } +} diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/executor/MessageExecutor.java b/rpt-base/src/main/java/cn/promptness/rpt/base/executor/MessageExecutor.java new file mode 100644 index 0000000..381bcf1 --- /dev/null +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/executor/MessageExecutor.java @@ -0,0 +1,16 @@ +package cn.promptness.rpt.base.executor; + +import cn.promptness.rpt.base.protocol.Message; +import cn.promptness.rpt.base.protocol.MessageType; +import io.netty.channel.ChannelHandlerContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public interface MessageExecutor { + + Logger logger = LoggerFactory.getLogger(MessageExecutor.class); + + MessageType getMessageType(); + + void execute(ChannelHandlerContext context, Message message) throws Exception; +} diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/utils/Constants.java b/rpt-base/src/main/java/cn/promptness/rpt/base/utils/Constants.java index c4c0372..f2596ee 100644 --- a/rpt-base/src/main/java/cn/promptness/rpt/base/utils/Constants.java +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/utils/Constants.java @@ -1,28 +1,35 @@ package cn.promptness.rpt.base.utils; import io.netty.channel.Channel; +import io.netty.channel.nio.NioEventLoopGroup; import io.netty.util.AttributeKey; +import java.util.List; import java.util.Map; import java.util.regex.Pattern; public interface Constants { - String TITLE = "Reverse Proxy Tool"; - - String VERSION = "2.3.5"; - AttributeKey> CHANNELS = AttributeKey.newInstance("CHANNELS"); - AttributeKey PROXY = AttributeKey.newInstance("PROXY"); - AttributeKey LOCAL = AttributeKey.newInstance("LOCAL"); - AttributeKey CLIENT_KEY = AttributeKey.newInstance("CLIENT_KEY"); + interface Server { + AttributeKey CLIENT_KEY = AttributeKey.newInstance("CLIENT_KEY"); + AttributeKey> DOMAIN = AttributeKey.newInstance("DOMAIN"); + AttributeKey REMOTE_BOSS_GROUP = AttributeKey.newInstance("REMOTE_BOSS_GROUP"); + AttributeKey REMOTE_WORKER_GROUP = AttributeKey.newInstance("REMOTE_WORKER_GROUP"); + } - AttributeKey> APPLICATION = AttributeKey.newInstance("APPLICATION"); + interface Client { + AttributeKey> APPLICATION = AttributeKey.newInstance("APPLICATION"); + } - Pattern COLON = Pattern.compile(":"); + interface Desktop { + String TITLE = "Reverse Proxy Tool"; + String VERSION = "2.4.0"; + } + Pattern COLON = Pattern.compile(":"); Pattern BLANK = Pattern.compile("\\s"); } diff --git a/rpt-client/pom.xml b/rpt-client/pom.xml index 7841b17..b3700ec 100644 --- a/rpt-client/pom.xml +++ b/rpt-client/pom.xml @@ -5,7 +5,7 @@ rpt cn.promptness - 2.3.5 + 2.4.0 4.0.0 diff --git a/rpt-client/src/main/java/cn/promptness/rpt/client/ClientApplication.java b/rpt-client/src/main/java/cn/promptness/rpt/client/ClientApplication.java index da684ff..835c15f 100644 --- a/rpt-client/src/main/java/cn/promptness/rpt/client/ClientApplication.java +++ b/rpt-client/src/main/java/cn/promptness/rpt/client/ClientApplication.java @@ -71,7 +71,7 @@ public Boolean start(int seconds) throws Exception { logger.info("客户端开始连接服务端IP:{},服务端端口:{}", clientConfig.getServerIp(), clientConfig.getServerPort()); bootstrap.connect(clientConfig.getServerIp(), clientConfig.getServerPort()).addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { - future.channel().attr(Constants.APPLICATION).set(this); + future.channel().attr(Constants.Client.APPLICATION).set(this); //连接建立成功,发送注册请求 Message message = new Message(); message.setType(MessageType.TYPE_REGISTER); diff --git a/rpt-client/src/main/java/cn/promptness/rpt/client/cache/ProxyChannelCache.java b/rpt-client/src/main/java/cn/promptness/rpt/client/cache/ProxyChannelCache.java index 91a6bfb..3a6196c 100644 --- a/rpt-client/src/main/java/cn/promptness/rpt/client/cache/ProxyChannelCache.java +++ b/rpt-client/src/main/java/cn/promptness/rpt/client/cache/ProxyChannelCache.java @@ -21,7 +21,7 @@ public class ProxyChannelCache { public static void get(Bootstrap bootstrap, Consumer success, Supplier fail) { Channel proxyChannel = PROXY_CHANNEL_QUEUE.poll(); - if (proxyChannel != null) { + if (proxyChannel != null && proxyChannel.isActive()) { success.accept(proxyChannel); return; } @@ -37,14 +37,16 @@ public static void get(Bootstrap bootstrap, Consumer success, Supplier< } - public static void release(Channel proxyChannel) { + public static void put(Channel proxyChannel) { if (PROXY_CHANNEL_QUEUE.size() > MAX_QUEUE_LIMIT) { proxyChannel.close(); return; } - proxyChannel.config().setAutoRead(true); - proxyChannel.attr(Constants.LOCAL).set(null); - PROXY_CHANNEL_QUEUE.offer(proxyChannel); + if (proxyChannel.isActive()) { + proxyChannel.config().setAutoRead(true); + proxyChannel.attr(Constants.LOCAL).set(null); + PROXY_CHANNEL_QUEUE.offer(proxyChannel); + } } public static void delete(Channel proxyChannel) { diff --git a/rpt-client/src/main/java/cn/promptness/rpt/client/executor/AuthExecutor.java b/rpt-client/src/main/java/cn/promptness/rpt/client/executor/AuthExecutor.java new file mode 100644 index 0000000..e90ea75 --- /dev/null +++ b/rpt-client/src/main/java/cn/promptness/rpt/client/executor/AuthExecutor.java @@ -0,0 +1,37 @@ +package cn.promptness.rpt.client.executor; + +import cn.promptness.rpt.base.executor.MessageExecutor; +import cn.promptness.rpt.base.protocol.Message; +import cn.promptness.rpt.base.protocol.MessageType; +import cn.promptness.rpt.base.utils.Constants; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; + +import java.util.Collections; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +public class AuthExecutor implements MessageExecutor { + + private final Map channelMap = new ConcurrentHashMap<>(1024); + + @Override + public MessageType getMessageType() { + return MessageType.TYPE_AUTH; + } + + @Override + public void execute(ChannelHandlerContext context, Message message) { + for (String remoteResult : Optional.ofNullable(message.getMeta().getRemoteResult()).orElse(Collections.emptyList())) { + logger.info(remoteResult); + } + boolean connection = message.getMeta().isConnection(); + if (connection) { + logger.info("连接成功,当前秘钥:{}", message.getMeta().getClientKey()); + context.channel().attr(Constants.CHANNELS).set(channelMap); + } else { + logger.info("连接失败,当前秘钥:{}", message.getMeta().getClientKey()); + } + } +} diff --git a/rpt-client/src/main/java/cn/promptness/rpt/client/executor/ConnectedExecutor.java b/rpt-client/src/main/java/cn/promptness/rpt/client/executor/ConnectedExecutor.java new file mode 100644 index 0000000..84fd91d --- /dev/null +++ b/rpt-client/src/main/java/cn/promptness/rpt/client/executor/ConnectedExecutor.java @@ -0,0 +1,76 @@ +package cn.promptness.rpt.client.executor; + +import cn.promptness.rpt.base.coder.ByteArrayCodec; +import cn.promptness.rpt.base.config.ProxyType; +import cn.promptness.rpt.base.config.RemoteConfig; +import cn.promptness.rpt.base.executor.MessageExecutor; +import cn.promptness.rpt.base.protocol.Message; +import cn.promptness.rpt.base.protocol.MessageType; +import cn.promptness.rpt.base.protocol.Meta; +import cn.promptness.rpt.base.utils.Config; +import cn.promptness.rpt.base.utils.Constants; +import cn.promptness.rpt.client.cache.ProxyChannelCache; +import cn.promptness.rpt.client.handler.LocalHandler; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.stream.ChunkedWriteHandler; +import io.netty.util.internal.EmptyArrays; + +import java.util.Collections; +import java.util.Objects; + +public class ConnectedExecutor implements MessageExecutor { + + private final EventLoopGroup localGroup = new NioEventLoopGroup(); + + @Override + public MessageType getMessageType() { + return MessageType.TYPE_CONNECTED; + } + + @Override + public void execute(ChannelHandlerContext context, Message message) { + Meta meta = message.getMeta(); + RemoteConfig remoteConfig = meta.getRemoteConfig(); + if (Objects.isNull(remoteConfig)) { + return; + } + ProxyType proxyType = remoteConfig.getProxyType(); + if (Objects.equals(ProxyType.HTTP, proxyType)) { + String domain = remoteConfig.getDomain(); + // 补全配置信息 + RemoteConfig httpConfig = Config.getClientConfig().getHttpConfig(domain); + if (Objects.isNull(httpConfig)) { + return; + } + meta.setRemoteConfigList(Collections.singletonList(httpConfig)); + } + // 绑定代理连接 + ProxyChannelCache.get(context.channel().attr(Constants.Client.APPLICATION).get().bootstrap(), proxyChannel -> connectedTcp(context, proxyChannel, meta), () -> context.writeAndFlush(new Message(MessageType.TYPE_DISCONNECTED, meta, EmptyArrays.EMPTY_BYTES))); + } + + private void connectedTcp(ChannelHandlerContext context, Channel proxyChannel, Meta meta) { + RemoteConfig remoteConfig = meta.getRemoteConfig(); + Bootstrap localBootstrap = new Bootstrap(); + localBootstrap.group(localGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel channel) throws Exception { + channel.pipeline().addLast(new ByteArrayCodec()); + channel.pipeline().addLast(new ChunkedWriteHandler()); + channel.pipeline().addLast(new LocalHandler(context.channel(), meta)); + } + }); + localBootstrap.connect(remoteConfig.getLocalIp(), remoteConfig.getLocalPort()).addListener((ChannelFutureListener) future -> { + if (future.isSuccess()) { + future.channel().attr(Constants.PROXY).set(proxyChannel); + proxyChannel.attr(Constants.LOCAL).set(future.channel()); + } else { + ProxyChannelCache.put(proxyChannel); + context.writeAndFlush(new Message(MessageType.TYPE_DISCONNECTED, meta, EmptyArrays.EMPTY_BYTES)); + } + }); + } +} diff --git a/rpt-client/src/main/java/cn/promptness/rpt/client/executor/DataExecutor.java b/rpt-client/src/main/java/cn/promptness/rpt/client/executor/DataExecutor.java new file mode 100644 index 0000000..352a968 --- /dev/null +++ b/rpt-client/src/main/java/cn/promptness/rpt/client/executor/DataExecutor.java @@ -0,0 +1,28 @@ +package cn.promptness.rpt.client.executor; + +import cn.promptness.rpt.base.executor.MessageExecutor; +import cn.promptness.rpt.base.protocol.Message; +import cn.promptness.rpt.base.protocol.MessageType; +import cn.promptness.rpt.base.utils.Constants; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.internal.EmptyArrays; + +import java.util.Objects; +import java.util.Optional; + +public class DataExecutor implements MessageExecutor { + + @Override + public MessageType getMessageType() { + return MessageType.TYPE_DATA; + } + + @Override + public void execute(ChannelHandlerContext context, Message message) { + Channel localChannel = context.channel().attr(Constants.LOCAL).get(); + if (Objects.nonNull(localChannel)) { + localChannel.writeAndFlush(Optional.ofNullable(message.getData()).orElse(EmptyArrays.EMPTY_BYTES)); + } + } +} diff --git a/rpt-client/src/main/java/cn/promptness/rpt/client/executor/DisconnectedExecutor.java b/rpt-client/src/main/java/cn/promptness/rpt/client/executor/DisconnectedExecutor.java new file mode 100644 index 0000000..136315d --- /dev/null +++ b/rpt-client/src/main/java/cn/promptness/rpt/client/executor/DisconnectedExecutor.java @@ -0,0 +1,29 @@ +package cn.promptness.rpt.client.executor; + +import cn.promptness.rpt.base.executor.MessageExecutor; +import cn.promptness.rpt.base.protocol.Message; +import cn.promptness.rpt.base.protocol.MessageType; +import cn.promptness.rpt.base.utils.Constants; +import cn.promptness.rpt.client.cache.ProxyChannelCache; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.internal.EmptyArrays; + +import java.util.Objects; + +public class DisconnectedExecutor implements MessageExecutor { + @Override + public MessageType getMessageType() { + return MessageType.TYPE_DISCONNECTED; + } + + @Override + public void execute(ChannelHandlerContext context, Message message) { + Channel localChannel = context.channel().attr(Constants.LOCAL).getAndSet(null); + if (Objects.nonNull(localChannel)) { + ProxyChannelCache.put(context.channel()); + localChannel.writeAndFlush(EmptyArrays.EMPTY_BYTES).addListener(ChannelFutureListener.CLOSE); + } + } +} diff --git a/rpt-client/src/main/java/cn/promptness/rpt/client/handler/ClientHandler.java b/rpt-client/src/main/java/cn/promptness/rpt/client/handler/ClientHandler.java index 6a836a7..88d4dad 100644 --- a/rpt-client/src/main/java/cn/promptness/rpt/client/handler/ClientHandler.java +++ b/rpt-client/src/main/java/cn/promptness/rpt/client/handler/ClientHandler.java @@ -1,30 +1,22 @@ package cn.promptness.rpt.client.handler; -import cn.promptness.rpt.base.coder.ByteArrayCodec; -import cn.promptness.rpt.base.config.ProxyType; -import cn.promptness.rpt.base.config.RemoteConfig; +import cn.promptness.rpt.base.executor.MessageDispatcher; import cn.promptness.rpt.base.protocol.Message; -import cn.promptness.rpt.base.protocol.MessageType; -import cn.promptness.rpt.base.protocol.Meta; import cn.promptness.rpt.base.utils.Application; import cn.promptness.rpt.base.utils.Config; import cn.promptness.rpt.base.utils.Constants; import cn.promptness.rpt.client.cache.ProxyChannelCache; -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.*; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.stream.ChunkedWriteHandler; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.internal.EmptyArrays; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collections; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; /** * 服务器连接处理器 @@ -32,7 +24,8 @@ public class ClientHandler extends SimpleChannelInboundHandler { private static final Logger logger = LoggerFactory.getLogger(ClientHandler.class); - private static final EventLoopGroup LOCAL_GROUP = new NioEventLoopGroup(); + + private final MessageDispatcher messageDispatcher = new MessageDispatcher(); @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { @@ -45,101 +38,20 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio @Override protected void channelRead0(ChannelHandlerContext context, Message message) throws Exception { - switch (message.getType()) { - case TYPE_AUTH: - for (String remoteResult : Optional.ofNullable(message.getMeta().getRemoteResult()).orElse(Collections.emptyList())) { - logger.info(remoteResult); - } - boolean connection = message.getMeta().isConnection(); - if (connection) { - logger.info("连接成功,当前秘钥:{}", message.getMeta().getClientKey()); - context.channel().attr(Constants.CHANNELS).set(new ConcurrentHashMap<>(1024)); - } else { - logger.info("连接失败,当前秘钥:{}", message.getMeta().getClientKey()); - } - break; - case TYPE_CONNECTED: - // 外部请求进入,开始与内网建立连接 - connected(context, message); - break; - case TYPE_DISCONNECTED: - disconnected(context); - break; - case TYPE_DATA: - transfer(context, message); - break; - case TYPE_KEEPALIVE: - default: - } - } - - private void transfer(ChannelHandlerContext context, Message message) { - Channel localChannel = context.channel().attr(Constants.LOCAL).get(); - if (Objects.nonNull(localChannel)) { - localChannel.writeAndFlush(Optional.ofNullable(message.getData()).orElse(EmptyArrays.EMPTY_BYTES)); - } - } - - private void disconnected(ChannelHandlerContext context) { - Channel localChannel = context.channel().attr(Constants.LOCAL).getAndSet(null); - if (Objects.nonNull(localChannel)) { - ProxyChannelCache.release(context.channel()); - localChannel.writeAndFlush(EmptyArrays.EMPTY_BYTES).addListener(ChannelFutureListener.CLOSE); - } - } - - private void connected(ChannelHandlerContext context, Message message) { - Meta meta = message.getMeta(); - RemoteConfig remoteConfig = meta.getRemoteConfig(); - if (Objects.isNull(remoteConfig)) { - return; - } - ProxyType proxyType = remoteConfig.getProxyType(); - if (Objects.equals(ProxyType.HTTP, proxyType)) { - String domain = remoteConfig.getDomain(); - // 补全配置信息 - RemoteConfig httpConfig = Config.getClientConfig().getHttpConfig(domain); - if (Objects.isNull(httpConfig)) { - return; - } - meta.setRemoteConfigList(Collections.singletonList(httpConfig)); - } - // 绑定代理连接 - ProxyChannelCache.get(context.channel().attr(Constants.APPLICATION).get().bootstrap(), proxyChannel -> connectedTcp(context, proxyChannel, meta), () -> context.writeAndFlush(new Message(MessageType.TYPE_DISCONNECTED, meta, EmptyArrays.EMPTY_BYTES))); - } - - private void connectedTcp(ChannelHandlerContext context, Channel proxyChannel, Meta meta) { - RemoteConfig remoteConfig = meta.getRemoteConfig(); - Bootstrap localBootstrap = new Bootstrap(); - localBootstrap.group(LOCAL_GROUP).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel channel) throws Exception { - channel.pipeline().addLast(new ByteArrayCodec()); - channel.pipeline().addLast(new ChunkedWriteHandler()); - channel.pipeline().addLast(new LocalHandler(context.channel(), meta)); - } - }); - localBootstrap.connect(remoteConfig.getLocalIp(), remoteConfig.getLocalPort()).addListener((ChannelFutureListener) future -> { - if (future.isSuccess()) { - future.channel().attr(Constants.PROXY).set(proxyChannel); - proxyChannel.attr(Constants.LOCAL).set(future.channel()); - } else { - ProxyChannelCache.release(proxyChannel); - context.writeAndFlush(new Message(MessageType.TYPE_DISCONNECTED, meta, EmptyArrays.EMPTY_BYTES)); - } - }); + messageDispatcher.handle(context, message); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - Application application = ctx.channel().attr(Constants.APPLICATION).getAndSet(null); + Application application = ctx.channel().attr(Constants.Client.APPLICATION).getAndSet(null); logger.info("客户端-服务端连接中断,{}:{}", Config.getClientConfig().getServerIp(), Config.getClientConfig().getServerPort()); if (Objects.nonNull(application)) { - Optional.ofNullable(ctx.channel().attr(Constants.CHANNELS).getAndSet(null)).ifPresent(this::clear); + Optional.ofNullable(ctx.channel().attr(Constants.CHANNELS).get()).ifPresent(this::clear); + application.start(1); return; } Channel localChannel = ctx.channel().attr(Constants.LOCAL).getAndSet(null); - if (Objects.nonNull(localChannel)) { + if (Objects.nonNull(localChannel) && localChannel.isActive()) { localChannel.attr(Constants.PROXY).set(null); localChannel.writeAndFlush(EmptyArrays.EMPTY_BYTES).addListener(ChannelFutureListener.CLOSE); } diff --git a/rpt-client/src/main/java/cn/promptness/rpt/client/handler/LocalHandler.java b/rpt-client/src/main/java/cn/promptness/rpt/client/handler/LocalHandler.java index dbd457b..069cd53 100644 --- a/rpt-client/src/main/java/cn/promptness/rpt/client/handler/LocalHandler.java +++ b/rpt-client/src/main/java/cn/promptness/rpt/client/handler/LocalHandler.java @@ -36,10 +36,10 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { + ctx.channel().config().setAutoRead(false); Channel proxyChannel = ctx.channel().attr(Constants.PROXY).get(); channel.attr(Constants.CHANNELS).get().put(meta.getChannelId(), ctx.channel()); - ctx.channel().config().setAutoRead(false); - send(proxyChannel, MessageType.TYPE_CONNECTED, EmptyArrays.EMPTY_BYTES); + send(proxyChannel, MessageType.TYPE_CONNECTED); ctx.channel().config().setAutoRead(true); } @@ -61,19 +61,10 @@ protected void channelRead0(ChannelHandlerContext ctx, byte[] bytes) throws Exce @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Optional.ofNullable(channel.attr(Constants.CHANNELS).get()).ifPresent(channelMap -> channelMap.remove(meta.getChannelId())); - // 绑定代理连接断线 - Channel proxyChannel = ctx.channel().attr(Constants.PROXY).getAndSet(null); - if (Objects.isNull(proxyChannel)) { - return; - } - // 服务端通知断线 - Channel localChannel = proxyChannel.attr(Constants.LOCAL).getAndSet(null); - if (Objects.isNull(localChannel)) { - return; + Channel proxyChannel = ctx.channel().attr(Constants.PROXY).get(); + if (Objects.nonNull(proxyChannel) && proxyChannel.isActive()) { + send(proxyChannel, MessageType.TYPE_DISCONNECTED); } - // 主动断线 - proxyChannel.config().setAutoRead(true); - send(proxyChannel, MessageType.TYPE_DISCONNECTED, EmptyArrays.EMPTY_BYTES); } /** @@ -84,13 +75,11 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E ctx.close(); } - private void send(Channel proxyChannel, MessageType type, byte[] data) { - Message message = new Message(); - message.setType(type); - message.setMeta(meta); - message.setData(data); - // 收到内网服务器响应后返回给服务器端 - proxyChannel.writeAndFlush(message); + private void send(Channel proxyChannel, MessageType type) { + send(proxyChannel, type, EmptyArrays.EMPTY_BYTES); } + private void send(Channel proxyChannel, MessageType type, byte[] data) { + proxyChannel.writeAndFlush(new Message(type, meta, data)); + } } diff --git a/rpt-client/src/main/resources/META-INF/services/cn.promptness.rpt.base.executor.MessageExecutor b/rpt-client/src/main/resources/META-INF/services/cn.promptness.rpt.base.executor.MessageExecutor new file mode 100644 index 0000000..2866db8 --- /dev/null +++ b/rpt-client/src/main/resources/META-INF/services/cn.promptness.rpt.base.executor.MessageExecutor @@ -0,0 +1,4 @@ +cn.promptness.rpt.client.executor.AuthExecutor +cn.promptness.rpt.client.executor.ConnectedExecutor +cn.promptness.rpt.client.executor.DataExecutor +cn.promptness.rpt.client.executor.DisconnectedExecutor \ No newline at end of file diff --git a/rpt-desktop/pom.xml b/rpt-desktop/pom.xml index 8c8450f..7732ef8 100644 --- a/rpt-desktop/pom.xml +++ b/rpt-desktop/pom.xml @@ -5,7 +5,7 @@ rpt cn.promptness - 2.3.5 + 2.4.0 4.0.0 diff --git a/rpt-desktop/src/main/java/cn/promptness/rpt/desktop/DesktopApplication.java b/rpt-desktop/src/main/java/cn/promptness/rpt/desktop/DesktopApplication.java index 113cb7b..7bda6b2 100644 --- a/rpt-desktop/src/main/java/cn/promptness/rpt/desktop/DesktopApplication.java +++ b/rpt-desktop/src/main/java/cn/promptness/rpt/desktop/DesktopApplication.java @@ -32,12 +32,12 @@ public void init() throws Exception { @Override public void start(Stage primaryStage) throws Exception { - SystemTrayUtil.systemTray(primaryStage, Constants.TITLE); + SystemTrayUtil.systemTray(primaryStage, Constants.Desktop.TITLE); Parent root = new FXMLLoader(this.getClass().getResource("/fxml/main.fxml")).load(); Scene scene = new Scene(root); scene.setFill(Color.TRANSPARENT); scene.getStylesheets().add(Style.LIGHT.getStyleStylesheetURL()); - primaryStage.setTitle(Constants.TITLE); + primaryStage.setTitle(Constants.Desktop.TITLE); primaryStage.getIcons().add(new Image("/icon.png")); primaryStage.setScene(scene); primaryStage.setResizable(true); diff --git a/rpt-desktop/src/main/java/cn/promptness/rpt/desktop/controller/ConfigController.java b/rpt-desktop/src/main/java/cn/promptness/rpt/desktop/controller/ConfigController.java index 993937d..490b483 100644 --- a/rpt-desktop/src/main/java/cn/promptness/rpt/desktop/controller/ConfigController.java +++ b/rpt-desktop/src/main/java/cn/promptness/rpt/desktop/controller/ConfigController.java @@ -27,7 +27,7 @@ public class ConfigController { public static Pair> buildDialog(String confirm, String headerTex) { ButtonType buttonType = new ButtonType(confirm); Dialog dialog = new Dialog<>(); - dialog.setTitle(Constants.TITLE); + dialog.setTitle(Constants.Desktop.TITLE); dialog.setHeaderText(headerTex); dialog.initOwner(SystemTrayUtil.getPrimaryStage()); dialog.getDialogPane().getButtonTypes().add(buttonType); diff --git a/rpt-desktop/src/main/java/cn/promptness/rpt/desktop/controller/MenuController.java b/rpt-desktop/src/main/java/cn/promptness/rpt/desktop/controller/MenuController.java index 6a81a0a..eb1a7c9 100644 --- a/rpt-desktop/src/main/java/cn/promptness/rpt/desktop/controller/MenuController.java +++ b/rpt-desktop/src/main/java/cn/promptness/rpt/desktop/controller/MenuController.java @@ -37,9 +37,9 @@ public void initialize() { @FXML public void about() { Alert alert = new Alert(Alert.AlertType.NONE); - alert.setTitle(Constants.TITLE); + alert.setTitle(Constants.Desktop.TITLE); alert.setHeaderText("关于"); - alert.setContentText(String.format(" Version %s %n Powered By Lynn %n https://github.com/iamlinhui/rpt", Constants.VERSION)); + alert.setContentText(String.format(" Version %s %n Powered By Lynn %n https://github.com/iamlinhui/rpt", Constants.Desktop.VERSION)); alert.initOwner(SystemTrayUtil.getPrimaryStage()); alert.getButtonTypes().add(ButtonType.CLOSE); alert.showAndWait(); @@ -48,7 +48,7 @@ public void about() { @FXML public void instruction() { Alert alert = new Alert(Alert.AlertType.NONE); - alert.setTitle(Constants.TITLE); + alert.setTitle(Constants.Desktop.TITLE); alert.setHeaderText("使用说明"); alert.setContentText("1.先填写服务器配置信息\n2.再填写映射配置信息\n3.最后点击开启服务"); alert.initOwner(SystemTrayUtil.getPrimaryStage()); diff --git a/rpt-desktop/src/main/java/cn/promptness/rpt/desktop/utils/SystemTrayUtil.java b/rpt-desktop/src/main/java/cn/promptness/rpt/desktop/utils/SystemTrayUtil.java index bfb7e6c..164254b 100644 --- a/rpt-desktop/src/main/java/cn/promptness/rpt/desktop/utils/SystemTrayUtil.java +++ b/rpt-desktop/src/main/java/cn/promptness/rpt/desktop/utils/SystemTrayUtil.java @@ -60,7 +60,7 @@ private static TrayIcon getTrayIcon(String toolTip) { * @param text 预定会议室成功 */ public static void displayMessage(String text) { - trayIcon.displayMessage(Constants.TITLE, text, TrayIcon.MessageType.INFO); + trayIcon.displayMessage(Constants.Desktop.TITLE, text, TrayIcon.MessageType.INFO); } private static MenuItem getExitMenuItem() { diff --git a/rpt-server/pom.xml b/rpt-server/pom.xml index cc25ef5..98abb74 100644 --- a/rpt-server/pom.xml +++ b/rpt-server/pom.xml @@ -5,7 +5,7 @@ rpt cn.promptness - 2.3.5 + 2.4.0 4.0.0 diff --git a/rpt-server/src/main/java/cn/promptness/rpt/server/cache/ServerChannelCache.java b/rpt-server/src/main/java/cn/promptness/rpt/server/cache/ServerChannelCache.java index a83c24a..02113e8 100644 --- a/rpt-server/src/main/java/cn/promptness/rpt/server/cache/ServerChannelCache.java +++ b/rpt-server/src/main/java/cn/promptness/rpt/server/cache/ServerChannelCache.java @@ -2,6 +2,7 @@ import io.netty.channel.Channel; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -21,4 +22,17 @@ public static Map getServerDomainChannelMap() { public static Map getServerDomainToken() { return SERVER_DOMAIN_TOKEN; } + + private static final Map SERVER_CHANNEL_MAP = new ConcurrentHashMap<>(); + + public static Map getServerChannelMap() { + return SERVER_CHANNEL_MAP; + } + + public static void remove(List domainList) { + for (String domain : domainList) { + SERVER_DOMAIN_CHANNEL_MAP.remove(domain); + SERVER_DOMAIN_TOKEN.remove(domain); + } + } } diff --git a/rpt-server/src/main/java/cn/promptness/rpt/server/executor/ConnectedExecutor.java b/rpt-server/src/main/java/cn/promptness/rpt/server/executor/ConnectedExecutor.java new file mode 100644 index 0000000..9412f9c --- /dev/null +++ b/rpt-server/src/main/java/cn/promptness/rpt/server/executor/ConnectedExecutor.java @@ -0,0 +1,46 @@ +package cn.promptness.rpt.server.executor; + +import cn.promptness.rpt.base.config.ProxyType; +import cn.promptness.rpt.base.config.RemoteConfig; +import cn.promptness.rpt.base.executor.MessageExecutor; +import cn.promptness.rpt.base.protocol.Message; +import cn.promptness.rpt.base.protocol.MessageType; +import cn.promptness.rpt.base.protocol.Meta; +import cn.promptness.rpt.base.utils.Constants; +import cn.promptness.rpt.server.cache.ServerChannelCache; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; + +import java.util.Map; +import java.util.Optional; + +public class ConnectedExecutor implements MessageExecutor { + + @Override + public MessageType getMessageType() { + return MessageType.TYPE_CONNECTED; + } + + @Override + public void execute(ChannelHandlerContext context, Message message) throws Exception { + Meta meta = message.getMeta(); + RemoteConfig remoteConfig = meta.getRemoteConfig(); + if (remoteConfig == null) { + return; + } + ProxyType proxyType = Optional.ofNullable(remoteConfig.getProxyType()).orElse(ProxyType.TCP); + + String serverId = meta.getServerId(); + Map channelMap = ServerChannelCache.getServerChannelMap().get(serverId).attr(Constants.CHANNELS).get(); + + String channelId = meta.getChannelId(); + Channel localChannel = channelMap.get(channelId); + if (localChannel == null) { + return; + } + // binding each other + localChannel.attr(Constants.PROXY).set(context.channel()); + context.channel().attr(Constants.LOCAL).set(localChannel); + localChannel.pipeline().fireUserEventTriggered(proxyType); + } +} diff --git a/rpt-server/src/main/java/cn/promptness/rpt/server/executor/DataExecutor.java b/rpt-server/src/main/java/cn/promptness/rpt/server/executor/DataExecutor.java new file mode 100644 index 0000000..c685fa1 --- /dev/null +++ b/rpt-server/src/main/java/cn/promptness/rpt/server/executor/DataExecutor.java @@ -0,0 +1,27 @@ +package cn.promptness.rpt.server.executor; + +import cn.promptness.rpt.base.executor.MessageExecutor; +import cn.promptness.rpt.base.protocol.Message; +import cn.promptness.rpt.base.protocol.MessageType; +import cn.promptness.rpt.base.utils.Constants; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.internal.EmptyArrays; + +import java.util.Objects; +import java.util.Optional; + +public class DataExecutor implements MessageExecutor { + @Override + public MessageType getMessageType() { + return MessageType.TYPE_DATA; + } + + @Override + public void execute(ChannelHandlerContext context, Message message) throws Exception { + Channel localChannel = context.channel().attr(Constants.LOCAL).get(); + if (Objects.nonNull(localChannel)) { + localChannel.writeAndFlush(Optional.ofNullable(message.getData()).orElse(EmptyArrays.EMPTY_BYTES)); + } + } +} diff --git a/rpt-server/src/main/java/cn/promptness/rpt/server/executor/DisconnectedExecutor.java b/rpt-server/src/main/java/cn/promptness/rpt/server/executor/DisconnectedExecutor.java new file mode 100644 index 0000000..ddd08b2 --- /dev/null +++ b/rpt-server/src/main/java/cn/promptness/rpt/server/executor/DisconnectedExecutor.java @@ -0,0 +1,34 @@ +package cn.promptness.rpt.server.executor; + +import cn.promptness.rpt.base.executor.MessageExecutor; +import cn.promptness.rpt.base.protocol.Message; +import cn.promptness.rpt.base.protocol.MessageType; +import cn.promptness.rpt.base.utils.Constants; +import cn.promptness.rpt.server.cache.ServerChannelCache; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.internal.EmptyArrays; + +import java.util.Map; + +public class DisconnectedExecutor implements MessageExecutor { + + @Override + public MessageType getMessageType() { + return MessageType.TYPE_DISCONNECTED; + } + + @Override + public void execute(ChannelHandlerContext context, Message message) throws Exception { + String serverId = message.getMeta().getServerId(); + Map channelMap = ServerChannelCache.getServerChannelMap().get(serverId).attr(Constants.CHANNELS).get(); + + String channelId = message.getMeta().getChannelId(); + Channel localChannel = channelMap.get(channelId); + if (localChannel == null) { + return; + } + localChannel.writeAndFlush(EmptyArrays.EMPTY_BYTES).addListener(ChannelFutureListener.CLOSE); + } +} diff --git a/rpt-server/src/main/java/cn/promptness/rpt/server/executor/RegisterExecutor.java b/rpt-server/src/main/java/cn/promptness/rpt/server/executor/RegisterExecutor.java new file mode 100644 index 0000000..3757721 --- /dev/null +++ b/rpt-server/src/main/java/cn/promptness/rpt/server/executor/RegisterExecutor.java @@ -0,0 +1,148 @@ +package cn.promptness.rpt.server.executor; + +import cn.promptness.rpt.base.coder.ByteArrayCodec; +import cn.promptness.rpt.base.config.ProxyType; +import cn.promptness.rpt.base.config.RemoteConfig; +import cn.promptness.rpt.base.config.ServerToken; +import cn.promptness.rpt.base.executor.MessageExecutor; +import cn.promptness.rpt.base.protocol.Message; +import cn.promptness.rpt.base.protocol.MessageType; +import cn.promptness.rpt.base.protocol.Meta; +import cn.promptness.rpt.base.utils.Config; +import cn.promptness.rpt.base.utils.Constants; +import cn.promptness.rpt.base.utils.StringUtils; +import cn.promptness.rpt.server.cache.ServerChannelCache; +import cn.promptness.rpt.server.handler.RemoteHandler; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.stream.ChunkedWriteHandler; + +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; + +public class RegisterExecutor implements MessageExecutor { + + @Override + public MessageType getMessageType() { + return MessageType.TYPE_REGISTER; + } + + @Override + public void execute(ChannelHandlerContext context, Message message) throws Exception { + + context.channel().attr(Constants.Server.REMOTE_BOSS_GROUP).set(new NioEventLoopGroup()); + context.channel().attr(Constants.Server.REMOTE_WORKER_GROUP).set(new NioEventLoopGroup()); + + Meta meta = message.getMeta(); + context.channel().attr(Constants.Server.CLIENT_KEY).set(meta.getClientKey()); + + if (!Config.getServerConfig().authorize(meta.getClientKey())) { + logger.info("授权失败,客户端使用的秘钥:{}", meta.getClientKey()); + Message res = new Message(); + res.setType(MessageType.TYPE_AUTH); + res.setMeta(meta.setConnection(false).setRemoteResult(Collections.singletonList("秘钥授权失败"))); + context.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE); + return; + } + List domainList = fillRemoteResult(context, meta); + Message res = new Message(); + res.setType(MessageType.TYPE_AUTH); + res.setMeta(meta); + ChannelFuture channelFuture = context.writeAndFlush(res); + if (!meta.isConnection()) { + channelFuture.addListener(ChannelFutureListener.CLOSE); + return; + } + ServerChannelCache.getServerChannelMap().put(context.channel().id().asLongText(), context.channel()); + context.channel().attr(Constants.CHANNELS).set(new ConcurrentHashMap<>(1024)); + context.channel().attr(Constants.Server.DOMAIN).set(domainList); + } + + private List fillRemoteResult(ChannelHandlerContext context, Meta meta) throws Exception { + List domainList = new CopyOnWriteArrayList<>(); + List remoteResult = new CopyOnWriteArrayList<>(); + meta.setConnection(true).setRemoteResult(remoteResult); + List remoteConfigList = Optional.ofNullable(meta.getRemoteConfigList()).orElse(Collections.emptyList()); + if (remoteConfigList.isEmpty()) { + return domainList; + } + CountDownLatch countDownLatch = new CountDownLatch(remoteConfigList.size()); + for (RemoteConfig remoteConfig : remoteConfigList) { + ProxyType proxyType = Optional.ofNullable(remoteConfig.getProxyType()).orElse(ProxyType.TCP); + switch (proxyType) { + case TCP: + registerTcp(context, meta, remoteConfig, countDownLatch); + break; + case HTTP: + registerHttp(context, meta, remoteConfig, domainList, countDownLatch); + break; + default: + } + } + countDownLatch.await(); + return domainList; + } + + private void registerHttp(ChannelHandlerContext context, Meta meta, RemoteConfig remoteConfig, List domainList, CountDownLatch countDownLatch) { + if (!StringUtils.hasText(remoteConfig.getDomain())) { + meta.setConnection(false).addRemoteResult(String.format("服务端绑定域名[%s]不合法", remoteConfig.getDomain())); + countDownLatch.countDown(); + return; + } + logger.info("服务端开始绑定域名[{}]", remoteConfig.getDomain()); + ServerChannelCache.getServerDomainChannelMap().compute(remoteConfig.getDomain(), (domain, channel) -> { + if (channel != null) { + meta.setConnection(false).addRemoteResult(String.format("服务端绑定域名[%s]重复", domain)); + return channel; + } + domainList.add(domain); + if (StringUtils.hasText(remoteConfig.getToken())) { + ServerChannelCache.getServerDomainToken().put(domain, remoteConfig.getToken()); + } + meta.addRemoteResult(String.format("服务端绑定域名[%s]成功", domain)); + return context.channel(); + }); + countDownLatch.countDown(); + } + + private void registerTcp(ChannelHandlerContext context, Meta meta, RemoteConfig remoteConfig, CountDownLatch countDownLatch) { + if (remoteConfig.getRemotePort() == 0 || remoteConfig.getRemotePort() == Config.getServerConfig().getServerPort() || remoteConfig.getRemotePort() == Config.getServerConfig().getHttpPort() || remoteConfig.getRemotePort() == Config.getServerConfig().getHttpsPort()) { + meta.setConnection(false).addRemoteResult(String.format("需要绑定的端口[%s]不合法", remoteConfig.getRemotePort())); + countDownLatch.countDown(); + return; + } + ServerToken serverToken = Config.getServerConfig().getServerToken(meta.getClientKey()); + if (!serverToken.authorize(remoteConfig.getRemotePort())) { + meta.setConnection(false).addRemoteResult(String.format("需要绑定的端口[%s]范围不合法", remoteConfig.getRemotePort())); + countDownLatch.countDown(); + return; + } + ServerBootstrap remoteBootstrap = new ServerBootstrap(); + remoteBootstrap.group(context.channel().attr(Constants.Server.REMOTE_BOSS_GROUP).get(), context.channel().attr(Constants.Server.REMOTE_WORKER_GROUP).get()).channel(NioServerSocketChannel.class).childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel channel) throws Exception { + channel.pipeline().addLast(new ByteArrayCodec()); + channel.pipeline().addLast(new ChunkedWriteHandler()); + channel.pipeline().addLast(new RemoteHandler(context.channel(), remoteConfig)); + } + }); + + logger.info("服务端开始建立本地端口绑定[{}]", remoteConfig.getRemotePort()); + remoteBootstrap.bind(Config.getServerConfig().getServerIp(), remoteConfig.getRemotePort()).addListener((ChannelFutureListener) channelFuture -> { + if (channelFuture.isSuccess()) { + meta.addRemoteResult(String.format("服务端绑定端口[%s]成功", remoteConfig.getRemotePort())); + } else { + logger.info("服务端失败建立本地端口绑定[{}], {}", remoteConfig.getRemotePort(), channelFuture.cause().getMessage()); + meta.setConnection(false).addRemoteResult(String.format("服务端绑定端口[%s]失败,原因:%s", remoteConfig.getRemotePort(), channelFuture.cause().getMessage())); + } + countDownLatch.countDown(); + }); + } +} diff --git a/rpt-server/src/main/java/cn/promptness/rpt/server/handler/RemoteHandler.java b/rpt-server/src/main/java/cn/promptness/rpt/server/handler/RemoteHandler.java index 00aee2a..ce512ab 100644 --- a/rpt-server/src/main/java/cn/promptness/rpt/server/handler/RemoteHandler.java +++ b/rpt-server/src/main/java/cn/promptness/rpt/server/handler/RemoteHandler.java @@ -72,19 +72,12 @@ protected void channelRead0(ChannelHandlerContext ctx, byte[] bytes) throws Exce @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Optional.ofNullable(channel.attr(Constants.CHANNELS).get()).ifPresent(channelMap -> channelMap.remove(ctx.channel().id().asLongText())); - // 绑定代理连接断线 Channel proxyChannel = ctx.channel().attr(Constants.PROXY).getAndSet(null); - if (Objects.isNull(proxyChannel)) { - return; - } - // 服务端通知断线 - Channel localChannel = proxyChannel.attr(Constants.LOCAL).getAndSet(null); - if (Objects.isNull(localChannel)) { - return; + if (Objects.nonNull(proxyChannel) && proxyChannel.isActive()) { + proxyChannel.attr(Constants.LOCAL).set(null); + proxyChannel.config().setAutoRead(true); + send(proxyChannel, MessageType.TYPE_DISCONNECTED, EmptyArrays.EMPTY_BYTES, ctx); } - // 主动断线 - proxyChannel.config().setAutoRead(true); - send(proxyChannel, MessageType.TYPE_DISCONNECTED, EmptyArrays.EMPTY_BYTES, ctx); } /** diff --git a/rpt-server/src/main/java/cn/promptness/rpt/server/handler/RequestHandler.java b/rpt-server/src/main/java/cn/promptness/rpt/server/handler/RequestHandler.java index b0da052..9f1917e 100644 --- a/rpt-server/src/main/java/cn/promptness/rpt/server/handler/RequestHandler.java +++ b/rpt-server/src/main/java/cn/promptness/rpt/server/handler/RequestHandler.java @@ -8,7 +8,7 @@ import cn.promptness.rpt.base.protocol.Meta; import cn.promptness.rpt.base.utils.Constants; import cn.promptness.rpt.base.utils.StringUtils; -import cn.promptness.rpt.server.cache.DispatcherCache; +import cn.promptness.rpt.server.page.StaticDispatcher; import cn.promptness.rpt.server.cache.ServerChannelCache; import cn.promptness.rpt.server.coder.HttpEncoder; import io.netty.buffer.ByteBuf; @@ -67,19 +67,12 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { return; } Optional.ofNullable(serverChannel.attr(Constants.CHANNELS).get()).ifPresent(channelMap -> channelMap.remove(ctx.channel().id().asLongText())); - // 绑定代理连接断线 Channel proxyChannel = ctx.channel().attr(Constants.PROXY).getAndSet(null); - if (Objects.isNull(proxyChannel)) { - return; - } - // 服务端通知断线 - Channel localChannel = proxyChannel.attr(Constants.LOCAL).getAndSet(null); - if (Objects.isNull(localChannel)) { - return; + if (Objects.nonNull(proxyChannel) && proxyChannel.isActive()) { + proxyChannel.attr(Constants.LOCAL).set(null); + proxyChannel.config().setAutoRead(true); + send(proxyChannel, ctx, domain, MessageType.TYPE_DISCONNECTED, EmptyArrays.EMPTY_BYTES); } - // 主动断线 - proxyChannel.config().setAutoRead(true); - send(proxyChannel, ctx, domain, MessageType.TYPE_DISCONNECTED, EmptyArrays.EMPTY_BYTES); } @Override @@ -126,17 +119,17 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest fullHttpR domain = Optional.ofNullable(domain).orElse(Constants.COLON.split(fullHttpRequest.headers().get(HttpHeaderNames.HOST))[0]); if (!StringUtils.hasText(domain)) { - DispatcherCache.dispatch(fullHttpRequest, ctx); + StaticDispatcher.dispatch(fullHttpRequest, ctx); return; } Channel serverChannel = ServerChannelCache.getServerDomainChannelMap().get(domain); if (serverChannel == null || !serverChannel.isOpen()) { - DispatcherCache.dispatch(fullHttpRequest, ctx); + StaticDispatcher.dispatch(fullHttpRequest, ctx); return; } String token = ServerChannelCache.getServerDomainToken().get(domain); - if (token != null && !DispatcherCache.authorize(ctx, fullHttpRequest, token)) { + if (token != null && !StaticDispatcher.authorize(ctx, fullHttpRequest, token)) { return; } diff --git a/rpt-server/src/main/java/cn/promptness/rpt/server/handler/ServerHandler.java b/rpt-server/src/main/java/cn/promptness/rpt/server/handler/ServerHandler.java index 3e7e884..245d9f9 100644 --- a/rpt-server/src/main/java/cn/promptness/rpt/server/handler/ServerHandler.java +++ b/rpt-server/src/main/java/cn/promptness/rpt/server/handler/ServerHandler.java @@ -1,32 +1,21 @@ package cn.promptness.rpt.server.handler; -import cn.promptness.rpt.base.coder.ByteArrayCodec; -import cn.promptness.rpt.base.config.ProxyType; -import cn.promptness.rpt.base.config.RemoteConfig; -import cn.promptness.rpt.base.config.ServerToken; +import cn.promptness.rpt.base.executor.MessageDispatcher; import cn.promptness.rpt.base.protocol.Message; -import cn.promptness.rpt.base.protocol.MessageType; -import cn.promptness.rpt.base.protocol.Meta; -import cn.promptness.rpt.base.utils.Config; import cn.promptness.rpt.base.utils.Constants; -import cn.promptness.rpt.base.utils.StringUtils; import cn.promptness.rpt.server.cache.ServerChannelCache; -import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.*; -import io.netty.channel.nio.NioEventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.channel.socket.nio.NioServerSocketChannel; -import io.netty.handler.stream.ChunkedWriteHandler; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.util.concurrent.AbstractEventExecutorGroup; import io.netty.util.internal.EmptyArrays; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.function.BiConsumer; -import java.util.function.Function; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; /** * 处理服务器接收到的客户端连接 @@ -34,15 +23,8 @@ public class ServerHandler extends SimpleChannelInboundHandler { private static final Logger logger = LoggerFactory.getLogger(ServerHandler.class); - private static final Map SERVER_CHANNEL_MAP = new ConcurrentHashMap<>(); - private static final EventLoopGroup REMOTE_BOSS_GROUP = new NioEventLoopGroup(); - private static final EventLoopGroup REMOTE_WORKER_GROUP = new NioEventLoopGroup(); - private final List domainList = new CopyOnWriteArrayList<>(); - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - ctx.close(); - } + private final MessageDispatcher messageDispatcher = new MessageDispatcher(); @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { @@ -53,180 +35,41 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exceptio super.channelWritabilityChanged(ctx); } + @Override + protected void channelRead0(ChannelHandlerContext context, Message message) throws Exception { + messageDispatcher.handle(context, message); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.close(); + } + /** * 连接中断 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - String clientKey = ctx.channel().attr(Constants.CLIENT_KEY).getAndSet(null); + String clientKey = ctx.channel().attr(Constants.Server.CLIENT_KEY).getAndSet(null); logger.info("服务端-客户端连接中断,{}", clientKey == null ? "未知连接/代理连接" : clientKey); // 代理连接/未知连接 if (Objects.isNull(clientKey)) { - Channel localChannel = ctx.channel().attr(Constants.LOCAL).getAndSet(null); - if (Objects.nonNull(localChannel)) { - localChannel.attr(Constants.PROXY).set(null); + Channel localChannel = ctx.channel().attr(Constants.LOCAL).get(); + if (Objects.nonNull(localChannel) && localChannel.isActive()) { localChannel.writeAndFlush(EmptyArrays.EMPTY_BYTES).addListener(ChannelFutureListener.CLOSE); } return; } Optional.ofNullable(ctx.channel().attr(Constants.CHANNELS).getAndSet(null)).ifPresent(this::clear); + Optional.ofNullable(ctx.channel().attr(Constants.Server.DOMAIN).getAndSet(null)).ifPresent(ServerChannelCache::remove); + Optional.ofNullable(ctx.channel().attr(Constants.Server.REMOTE_BOSS_GROUP).getAndSet(null)).ifPresent(AbstractEventExecutorGroup::shutdownGracefully); + Optional.ofNullable(ctx.channel().attr(Constants.Server.REMOTE_WORKER_GROUP).getAndSet(null)).ifPresent(AbstractEventExecutorGroup::shutdownGracefully); } private void clear(Map channelMap) { for (Channel localChannel : channelMap.values()) { - localChannel.writeAndFlush(EmptyArrays.EMPTY_BYTES).addListener(ChannelFutureListener.CLOSE); - } - for (String domain : domainList) { - ServerChannelCache.getServerDomainChannelMap().remove(domain); - ServerChannelCache.getServerDomainToken().remove(domain); - } - } - - - @Override - protected void channelRead0(ChannelHandlerContext context, Message message) throws Exception { - MessageType type = message.getType(); - switch (type) { - case TYPE_REGISTER: - register(context, message); - break; - case TYPE_DATA: - dispatch(message, channelId -> context.channel().attr(Constants.LOCAL).get(), (proxyType, channel) -> channel.writeAndFlush(Optional.ofNullable(message.getData()).orElse(EmptyArrays.EMPTY_BYTES))); - break; - case TYPE_CONNECTED: - dispatch(message, SERVER_CHANNEL_MAP.get(message.getMeta().getServerId()).attr(Constants.CHANNELS).get()::get, (proxyType, channel) -> { - channel.attr(Constants.PROXY).set(context.channel()); - context.channel().attr(Constants.LOCAL).set(channel); - channel.pipeline().fireUserEventTriggered(proxyType); - }); - break; - case TYPE_DISCONNECTED: - dispatch(message, SERVER_CHANNEL_MAP.get(message.getMeta().getServerId()).attr(Constants.CHANNELS).get()::remove, (proxyType, channel) -> { - // maybe proxy channel - context.channel().attr(Constants.LOCAL).set(null); - channel.writeAndFlush(EmptyArrays.EMPTY_BYTES).addListener(ChannelFutureListener.CLOSE); - }); - break; - case TYPE_KEEPALIVE: - default: - } - } - - private void register(ChannelHandlerContext context, Message message) throws InterruptedException { - Meta meta = message.getMeta(); - context.channel().attr(Constants.CLIENT_KEY).set(meta.getClientKey()); - if (!Config.getServerConfig().authorize(meta.getClientKey())) { - logger.info("授权失败,客户端使用的秘钥:{}", meta.getClientKey()); - Message res = new Message(); - res.setType(MessageType.TYPE_AUTH); - res.setMeta(meta.setConnection(false).setRemoteResult(Collections.singletonList("秘钥授权失败"))); - context.writeAndFlush(res).addListener(ChannelFutureListener.CLOSE); - return; - } - fillRemoteResult(context, meta); - Message res = new Message(); - res.setType(MessageType.TYPE_AUTH); - res.setMeta(meta); - ChannelFuture channelFuture = context.writeAndFlush(res); - if (!meta.isConnection()) { - channelFuture.addListener(ChannelFutureListener.CLOSE); - return; + localChannel.close(); } - context.channel().attr(Constants.CHANNELS).set(new ConcurrentHashMap<>(1024)); - SERVER_CHANNEL_MAP.put(context.channel().id().asLongText(), context.channel()); } - private void fillRemoteResult(ChannelHandlerContext context, Meta meta) throws InterruptedException { - List remoteResult = new ArrayList<>(); - meta.setConnection(true).setRemoteResult(remoteResult); - List remoteConfigList = Optional.ofNullable(meta.getRemoteConfigList()).orElse(Collections.emptyList()); - if (remoteConfigList.isEmpty()) { - return; - } - CountDownLatch countDownLatch = new CountDownLatch(remoteConfigList.size()); - for (RemoteConfig remoteConfig : remoteConfigList) { - ProxyType proxyType = Optional.ofNullable(remoteConfig.getProxyType()).orElse(ProxyType.TCP); - switch (proxyType) { - case TCP: - registerTcp(context, meta, remoteConfig, countDownLatch); - break; - case HTTP: - registerHttp(context, meta, remoteConfig, countDownLatch); - break; - default: - } - } - countDownLatch.await(); - } - - private void registerHttp(ChannelHandlerContext context, Meta meta, RemoteConfig remoteConfig, CountDownLatch countDownLatch) { - if (!StringUtils.hasText(remoteConfig.getDomain())) { - meta.setConnection(false).addRemoteResult(String.format("服务端绑定域名[%s]不合法", remoteConfig.getDomain())); - countDownLatch.countDown(); - return; - } - logger.info("服务端开始绑定域名[{}]", remoteConfig.getDomain()); - ServerChannelCache.getServerDomainChannelMap().compute(remoteConfig.getDomain(), (domain, channel) -> { - if (channel != null) { - meta.setConnection(false).addRemoteResult(String.format("服务端绑定域名[%s]重复", domain)); - return channel; - } - domainList.add(domain); - if (StringUtils.hasText(remoteConfig.getToken())) { - ServerChannelCache.getServerDomainToken().put(domain, remoteConfig.getToken()); - } - meta.addRemoteResult(String.format("服务端绑定域名[%s]成功", domain)); - return context.channel(); - }); - countDownLatch.countDown(); - } - - private void registerTcp(ChannelHandlerContext context, Meta meta, RemoteConfig remoteConfig, CountDownLatch countDownLatch) { - if (remoteConfig.getRemotePort() == 0 || remoteConfig.getRemotePort() == Config.getServerConfig().getServerPort() || remoteConfig.getRemotePort() == Config.getServerConfig().getHttpPort() || remoteConfig.getRemotePort() == Config.getServerConfig().getHttpsPort()) { - meta.setConnection(false).addRemoteResult(String.format("需要绑定的端口[%s]不合法", remoteConfig.getRemotePort())); - countDownLatch.countDown(); - return; - } - ServerToken serverToken = Config.getServerConfig().getServerToken(meta.getClientKey()); - if (!serverToken.authorize(remoteConfig.getRemotePort())) { - meta.setConnection(false).addRemoteResult(String.format("需要绑定的端口[%s]范围不合法", remoteConfig.getRemotePort())); - countDownLatch.countDown(); - return; - } - ServerBootstrap remoteBootstrap = new ServerBootstrap(); - remoteBootstrap.group(REMOTE_BOSS_GROUP, REMOTE_WORKER_GROUP).channel(NioServerSocketChannel.class).childOption(ChannelOption.SO_KEEPALIVE, true).childHandler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel channel) throws Exception { - channel.pipeline().addLast(new ByteArrayCodec()); - channel.pipeline().addLast(new ChunkedWriteHandler()); - channel.pipeline().addLast(new RemoteHandler(context.channel(), remoteConfig)); - } - }); - - logger.info("服务端开始建立本地端口绑定[{}]", remoteConfig.getRemotePort()); - remoteBootstrap.bind(Config.getServerConfig().getServerIp(), remoteConfig.getRemotePort()).addListener((ChannelFutureListener) channelFuture -> { - if (channelFuture.isSuccess()) { - meta.addRemoteResult(String.format("服务端绑定端口[%s]成功", remoteConfig.getRemotePort())); - } else { - logger.info("服务端失败建立本地端口绑定[{}], {}", remoteConfig.getRemotePort(), channelFuture.cause().getMessage()); - meta.setConnection(false).addRemoteResult(String.format("服务端绑定端口[%s]失败,原因:%s", remoteConfig.getRemotePort(), channelFuture.cause().getMessage())); - } - countDownLatch.countDown(); - }); - } - - private void dispatch(Message message, Function function, BiConsumer consumer) { - Meta meta = message.getMeta(); - RemoteConfig remoteConfig = meta.getRemoteConfig(); - if (remoteConfig == null) { - return; - } - ProxyType proxyType = Optional.ofNullable(remoteConfig.getProxyType()).orElse(ProxyType.TCP); - String channelId = meta.getChannelId(); - Channel channel = function.apply(channelId); - if (channel == null) { - return; - } - consumer.accept(proxyType, channel); - } } diff --git a/rpt-server/src/main/java/cn/promptness/rpt/server/cache/DispatcherCache.java b/rpt-server/src/main/java/cn/promptness/rpt/server/page/StaticDispatcher.java similarity index 92% rename from rpt-server/src/main/java/cn/promptness/rpt/server/cache/DispatcherCache.java rename to rpt-server/src/main/java/cn/promptness/rpt/server/page/StaticDispatcher.java index 9c33cb9..5604794 100644 --- a/rpt-server/src/main/java/cn/promptness/rpt/server/cache/DispatcherCache.java +++ b/rpt-server/src/main/java/cn/promptness/rpt/server/page/StaticDispatcher.java @@ -1,4 +1,4 @@ -package cn.promptness.rpt.server.cache; +package cn.promptness.rpt.server.page; import cn.promptness.rpt.base.utils.Constants; import cn.promptness.rpt.base.utils.StringUtils; @@ -17,24 +17,24 @@ import java.util.*; import java.util.function.BiConsumer; -public class DispatcherCache { +public class StaticDispatcher { - private static final Logger logger = LoggerFactory.getLogger(DispatcherCache.class); + private static final Logger logger = LoggerFactory.getLogger(StaticDispatcher.class); private static final Map> HANDLE_MAP = new HashMap<>(); private static final List WHITE_URI = Arrays.asList("/favicon.ico", "/static/base.css"); static { - HANDLE_MAP.put("/favicon.ico", DispatcherCache::favicon); - HANDLE_MAP.put("/", DispatcherCache::index); - HANDLE_MAP.put("/index.html", DispatcherCache::index); - HANDLE_MAP.put("/static/base.css", DispatcherCache::css); + HANDLE_MAP.put("/favicon.ico", StaticDispatcher::favicon); + HANDLE_MAP.put("/", StaticDispatcher::index); + HANDLE_MAP.put("/index.html", StaticDispatcher::index); + HANDLE_MAP.put("/static/base.css", StaticDispatcher::css); } public static void dispatch(FullHttpRequest fullHttpRequest, ChannelHandlerContext ctx) { String uri = fullHttpRequest.uri(); - HANDLE_MAP.getOrDefault(uri, DispatcherCache::notFound).accept(ctx, fullHttpRequest); + HANDLE_MAP.getOrDefault(uri, StaticDispatcher::notFound).accept(ctx, fullHttpRequest); } public static boolean authorize(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest, String token) { diff --git a/rpt-server/src/main/resources/META-INF/services/cn.promptness.rpt.base.executor.MessageExecutor b/rpt-server/src/main/resources/META-INF/services/cn.promptness.rpt.base.executor.MessageExecutor new file mode 100644 index 0000000..24643d9 --- /dev/null +++ b/rpt-server/src/main/resources/META-INF/services/cn.promptness.rpt.base.executor.MessageExecutor @@ -0,0 +1,4 @@ +cn.promptness.rpt.server.executor.RegisterExecutor +cn.promptness.rpt.server.executor.DataExecutor +cn.promptness.rpt.server.executor.DisconnectedExecutor +cn.promptness.rpt.server.executor.ConnectedExecutor \ No newline at end of file