Skip to content

Commit

Permalink
2.4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
iamlinhui committed May 27, 2022
1 parent 41a6391 commit 35b8d63
Show file tree
Hide file tree
Showing 32 changed files with 601 additions and 367 deletions.
2 changes: 1 addition & 1 deletion Reverse Proxy Tool.iss
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
; SEE THE DOCUMENTATION FOR DETAILS ON CREATING INNO SETUP SCRIPT FILES!

#define MyAppName "Reverse Proxy Tool"
#define MyAppVersion "2.3.5"
#define MyAppVersion "2.4.0"
#define MyAppPublisher "Lynn"
#define MyAppURL "https://www.holme.cn/"
#define MyAppExeName "rpt-desktop.exe"
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<groupId>cn.promptness</groupId>
<artifactId>rpt</artifactId>
<packaging>pom</packaging>
<version>2.3.5</version>
<version>2.4.0</version>
<modules>
<module>rpt-base</module>
<module>rpt-server</module>
Expand Down
2 changes: 1 addition & 1 deletion rpt-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>rpt</artifactId>
<groupId>cn.promptness</groupId>
<version>2.3.5</version>
<version>2.4.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package cn.promptness.rpt.base.executor;

import cn.promptness.rpt.base.protocol.Message;
import cn.promptness.rpt.base.protocol.MessageType;
import io.netty.channel.ChannelHandlerContext;

import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;

public class MessageDispatcher {

private static final Map<MessageType, MessageExecutor> MESSAGE_EXECUTOR_MAP = new HashMap<>();

static {
ServiceLoader<MessageExecutor> executors = ServiceLoader.load(MessageExecutor.class);
for (MessageExecutor executor : executors) {
MessageExecutor messageExecutor = MESSAGE_EXECUTOR_MAP.put(executor.getMessageType(), executor);
if (messageExecutor != null) {
throw new RuntimeException(String.format("%s Message Executor Register Repeat", messageExecutor.getMessageType().name()));
}
}
}

public void handle(ChannelHandlerContext context, Message message) throws Exception {
MessageExecutor messageExecutor = MESSAGE_EXECUTOR_MAP.get(message.getType());
if (messageExecutor == null) {
return;
}
messageExecutor.execute(context, message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package cn.promptness.rpt.base.executor;

import cn.promptness.rpt.base.protocol.Message;
import cn.promptness.rpt.base.protocol.MessageType;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public interface MessageExecutor {

Logger logger = LoggerFactory.getLogger(MessageExecutor.class);

MessageType getMessageType();

void execute(ChannelHandlerContext context, Message message) throws Exception;
}
25 changes: 16 additions & 9 deletions rpt-base/src/main/java/cn/promptness/rpt/base/utils/Constants.java
Original file line number Diff line number Diff line change
@@ -1,28 +1,35 @@
package cn.promptness.rpt.base.utils;

import io.netty.channel.Channel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.AttributeKey;

import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;

public interface Constants {

String TITLE = "Reverse Proxy Tool";

String VERSION = "2.3.5";

AttributeKey<Map<String, Channel>> CHANNELS = AttributeKey.newInstance("CHANNELS");

AttributeKey<Channel> PROXY = AttributeKey.newInstance("PROXY");

AttributeKey<Channel> LOCAL = AttributeKey.newInstance("LOCAL");

AttributeKey<String> CLIENT_KEY = AttributeKey.newInstance("CLIENT_KEY");
interface Server {
AttributeKey<String> CLIENT_KEY = AttributeKey.newInstance("CLIENT_KEY");
AttributeKey<List<String>> DOMAIN = AttributeKey.newInstance("DOMAIN");
AttributeKey<NioEventLoopGroup> REMOTE_BOSS_GROUP = AttributeKey.newInstance("REMOTE_BOSS_GROUP");
AttributeKey<NioEventLoopGroup> REMOTE_WORKER_GROUP = AttributeKey.newInstance("REMOTE_WORKER_GROUP");
}

AttributeKey<Application<Boolean>> APPLICATION = AttributeKey.newInstance("APPLICATION");
interface Client {
AttributeKey<Application<Boolean>> APPLICATION = AttributeKey.newInstance("APPLICATION");
}

Pattern COLON = Pattern.compile(":");
interface Desktop {
String TITLE = "Reverse Proxy Tool";
String VERSION = "2.4.0";
}

Pattern COLON = Pattern.compile(":");
Pattern BLANK = Pattern.compile("\\s");
}
2 changes: 1 addition & 1 deletion rpt-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>rpt</artifactId>
<groupId>cn.promptness</groupId>
<version>2.3.5</version>
<version>2.4.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public Boolean start(int seconds) throws Exception {
logger.info("客户端开始连接服务端IP:{},服务端端口:{}", clientConfig.getServerIp(), clientConfig.getServerPort());
bootstrap.connect(clientConfig.getServerIp(), clientConfig.getServerPort()).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
future.channel().attr(Constants.APPLICATION).set(this);
future.channel().attr(Constants.Client.APPLICATION).set(this);
//连接建立成功,发送注册请求
Message message = new Message();
message.setType(MessageType.TYPE_REGISTER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class ProxyChannelCache {

public static void get(Bootstrap bootstrap, Consumer<Channel> success, Supplier<ChannelFuture> fail) {
Channel proxyChannel = PROXY_CHANNEL_QUEUE.poll();
if (proxyChannel != null) {
if (proxyChannel != null && proxyChannel.isActive()) {
success.accept(proxyChannel);
return;
}
Expand All @@ -37,14 +37,16 @@ public static void get(Bootstrap bootstrap, Consumer<Channel> success, Supplier<

}

public static void release(Channel proxyChannel) {
public static void put(Channel proxyChannel) {
if (PROXY_CHANNEL_QUEUE.size() > MAX_QUEUE_LIMIT) {
proxyChannel.close();
return;
}
proxyChannel.config().setAutoRead(true);
proxyChannel.attr(Constants.LOCAL).set(null);
PROXY_CHANNEL_QUEUE.offer(proxyChannel);
if (proxyChannel.isActive()) {
proxyChannel.config().setAutoRead(true);
proxyChannel.attr(Constants.LOCAL).set(null);
PROXY_CHANNEL_QUEUE.offer(proxyChannel);
}
}

public static void delete(Channel proxyChannel) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package cn.promptness.rpt.client.executor;

import cn.promptness.rpt.base.executor.MessageExecutor;
import cn.promptness.rpt.base.protocol.Message;
import cn.promptness.rpt.base.protocol.MessageType;
import cn.promptness.rpt.base.utils.Constants;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

public class AuthExecutor implements MessageExecutor {

private final Map<String, Channel> channelMap = new ConcurrentHashMap<>(1024);

@Override
public MessageType getMessageType() {
return MessageType.TYPE_AUTH;
}

@Override
public void execute(ChannelHandlerContext context, Message message) {
for (String remoteResult : Optional.ofNullable(message.getMeta().getRemoteResult()).orElse(Collections.emptyList())) {
logger.info(remoteResult);
}
boolean connection = message.getMeta().isConnection();
if (connection) {
logger.info("连接成功,当前秘钥:{}", message.getMeta().getClientKey());
context.channel().attr(Constants.CHANNELS).set(channelMap);
} else {
logger.info("连接失败,当前秘钥:{}", message.getMeta().getClientKey());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package cn.promptness.rpt.client.executor;

import cn.promptness.rpt.base.coder.ByteArrayCodec;
import cn.promptness.rpt.base.config.ProxyType;
import cn.promptness.rpt.base.config.RemoteConfig;
import cn.promptness.rpt.base.executor.MessageExecutor;
import cn.promptness.rpt.base.protocol.Message;
import cn.promptness.rpt.base.protocol.MessageType;
import cn.promptness.rpt.base.protocol.Meta;
import cn.promptness.rpt.base.utils.Config;
import cn.promptness.rpt.base.utils.Constants;
import cn.promptness.rpt.client.cache.ProxyChannelCache;
import cn.promptness.rpt.client.handler.LocalHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.internal.EmptyArrays;

import java.util.Collections;
import java.util.Objects;

public class ConnectedExecutor implements MessageExecutor {

private final EventLoopGroup localGroup = new NioEventLoopGroup();

@Override
public MessageType getMessageType() {
return MessageType.TYPE_CONNECTED;
}

@Override
public void execute(ChannelHandlerContext context, Message message) {
Meta meta = message.getMeta();
RemoteConfig remoteConfig = meta.getRemoteConfig();
if (Objects.isNull(remoteConfig)) {
return;
}
ProxyType proxyType = remoteConfig.getProxyType();
if (Objects.equals(ProxyType.HTTP, proxyType)) {
String domain = remoteConfig.getDomain();
// 补全配置信息
RemoteConfig httpConfig = Config.getClientConfig().getHttpConfig(domain);
if (Objects.isNull(httpConfig)) {
return;
}
meta.setRemoteConfigList(Collections.singletonList(httpConfig));
}
// 绑定代理连接
ProxyChannelCache.get(context.channel().attr(Constants.Client.APPLICATION).get().bootstrap(), proxyChannel -> connectedTcp(context, proxyChannel, meta), () -> context.writeAndFlush(new Message(MessageType.TYPE_DISCONNECTED, meta, EmptyArrays.EMPTY_BYTES)));
}

private void connectedTcp(ChannelHandlerContext context, Channel proxyChannel, Meta meta) {
RemoteConfig remoteConfig = meta.getRemoteConfig();
Bootstrap localBootstrap = new Bootstrap();
localBootstrap.group(localGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new ByteArrayCodec());
channel.pipeline().addLast(new ChunkedWriteHandler());
channel.pipeline().addLast(new LocalHandler(context.channel(), meta));
}
});
localBootstrap.connect(remoteConfig.getLocalIp(), remoteConfig.getLocalPort()).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
future.channel().attr(Constants.PROXY).set(proxyChannel);
proxyChannel.attr(Constants.LOCAL).set(future.channel());
} else {
ProxyChannelCache.put(proxyChannel);
context.writeAndFlush(new Message(MessageType.TYPE_DISCONNECTED, meta, EmptyArrays.EMPTY_BYTES));
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package cn.promptness.rpt.client.executor;

import cn.promptness.rpt.base.executor.MessageExecutor;
import cn.promptness.rpt.base.protocol.Message;
import cn.promptness.rpt.base.protocol.MessageType;
import cn.promptness.rpt.base.utils.Constants;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.internal.EmptyArrays;

import java.util.Objects;
import java.util.Optional;

public class DataExecutor implements MessageExecutor {

@Override
public MessageType getMessageType() {
return MessageType.TYPE_DATA;
}

@Override
public void execute(ChannelHandlerContext context, Message message) {
Channel localChannel = context.channel().attr(Constants.LOCAL).get();
if (Objects.nonNull(localChannel)) {
localChannel.writeAndFlush(Optional.ofNullable(message.getData()).orElse(EmptyArrays.EMPTY_BYTES));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package cn.promptness.rpt.client.executor;

import cn.promptness.rpt.base.executor.MessageExecutor;
import cn.promptness.rpt.base.protocol.Message;
import cn.promptness.rpt.base.protocol.MessageType;
import cn.promptness.rpt.base.utils.Constants;
import cn.promptness.rpt.client.cache.ProxyChannelCache;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.internal.EmptyArrays;

import java.util.Objects;

public class DisconnectedExecutor implements MessageExecutor {
@Override
public MessageType getMessageType() {
return MessageType.TYPE_DISCONNECTED;
}

@Override
public void execute(ChannelHandlerContext context, Message message) {
Channel localChannel = context.channel().attr(Constants.LOCAL).getAndSet(null);
if (Objects.nonNull(localChannel)) {
ProxyChannelCache.put(context.channel());
localChannel.writeAndFlush(EmptyArrays.EMPTY_BYTES).addListener(ChannelFutureListener.CLOSE);
}
}
}
Loading

0 comments on commit 35b8d63

Please sign in to comment.