diff --git a/pom.xml b/pom.xml index abc86bc..50489cc 100644 --- a/pom.xml +++ b/pom.xml @@ -48,6 +48,11 @@ protostuff-runtime 1.8.0 + + com.fasterxml.jackson.core + jackson-databind + 2.13.3 + org.yaml snakeyaml diff --git a/rpt-base/pom.xml b/rpt-base/pom.xml index 76cb5f8..01d1785 100644 --- a/rpt-base/pom.xml +++ b/rpt-base/pom.xml @@ -34,6 +34,10 @@ io.protostuff protostuff-runtime + + com.fasterxml.jackson.core + jackson-databind + org.yaml snakeyaml diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/coder/MessageDecoder.java b/rpt-base/src/main/java/cn/promptness/rpt/base/coder/MessageDecoder.java index d60fcb4..343e1a8 100644 --- a/rpt-base/src/main/java/cn/promptness/rpt/base/coder/MessageDecoder.java +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/coder/MessageDecoder.java @@ -3,7 +3,8 @@ import cn.promptness.rpt.base.protocol.Message; import cn.promptness.rpt.base.protocol.MessageType; import cn.promptness.rpt.base.protocol.Meta; -import cn.promptness.rpt.base.utils.MetaUtils; +import cn.promptness.rpt.base.serialize.Jackson; +import cn.promptness.rpt.base.serialize.Serialize; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelHandlerContext; @@ -14,6 +15,8 @@ public class MessageDecoder extends MessageToMessageDecoder { + private final Serialize serialize = new Jackson(); + @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { Message proxyMessage = new Message(); @@ -24,7 +27,7 @@ protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteB if (protobufLength > 0) { byte[] metaByte = new byte[protobufLength]; byteBuf.readBytes(metaByte); - Meta meta = MetaUtils.deserialize(metaByte); + Meta meta = serialize.deserialize(metaByte); proxyMessage.setMeta(meta); } if (byteBuf.isReadable()) { diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/coder/MessageEncoder.java b/rpt-base/src/main/java/cn/promptness/rpt/base/coder/MessageEncoder.java index c64b04d..709ef51 100644 --- a/rpt-base/src/main/java/cn/promptness/rpt/base/coder/MessageEncoder.java +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/coder/MessageEncoder.java @@ -3,7 +3,8 @@ import cn.promptness.rpt.base.protocol.Message; import cn.promptness.rpt.base.protocol.MessageType; import cn.promptness.rpt.base.protocol.Meta; -import cn.promptness.rpt.base.utils.MetaUtils; +import cn.promptness.rpt.base.serialize.Jackson; +import cn.promptness.rpt.base.serialize.Serialize; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; @@ -12,13 +13,15 @@ public class MessageEncoder extends MessageToByteEncoder { + private final Serialize serialize = new Jackson(); + @Override protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception { MessageType type = message.getType(); out.writeInt(type.getCode()); Meta meta = message.getMeta(); - byte[] protobuf = meta == null ? EmptyArrays.EMPTY_BYTES : MetaUtils.serialize(meta); + byte[] protobuf = meta == null ? EmptyArrays.EMPTY_BYTES : serialize.serialize(meta); out.writeInt(protobuf.length); out.writeBytes(protobuf); diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/protocol/Meta.java b/rpt-base/src/main/java/cn/promptness/rpt/base/protocol/Meta.java index dfe8aa7..b13645b 100644 --- a/rpt-base/src/main/java/cn/promptness/rpt/base/protocol/Meta.java +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/protocol/Meta.java @@ -1,10 +1,12 @@ package cn.promptness.rpt.base.protocol; import cn.promptness.rpt.base.config.RemoteConfig; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import java.util.Collections; import java.util.List; +@JsonIgnoreProperties(ignoreUnknown = true) public class Meta { private String clientKey; diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/Jackson.java b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/Jackson.java new file mode 100644 index 0000000..e7d8df6 --- /dev/null +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/Jackson.java @@ -0,0 +1,21 @@ +package cn.promptness.rpt.base.serialize; + +import cn.promptness.rpt.base.protocol.Meta; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; + +public class Jackson implements Serialize { + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @Override + public byte[] serialize(Meta meta) throws Exception { + return OBJECT_MAPPER.writeValueAsBytes(meta); + } + + @Override + public Meta deserialize(byte[] data) throws IOException { + return OBJECT_MAPPER.readValue(data, Meta.class); + } +} diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/utils/MetaUtils.java b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/Protostuff.java similarity index 77% rename from rpt-base/src/main/java/cn/promptness/rpt/base/utils/MetaUtils.java rename to rpt-base/src/main/java/cn/promptness/rpt/base/serialize/Protostuff.java index 2e63a80..33f36a4 100644 --- a/rpt-base/src/main/java/cn/promptness/rpt/base/utils/MetaUtils.java +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/Protostuff.java @@ -1,4 +1,4 @@ -package cn.promptness.rpt.base.utils; +package cn.promptness.rpt.base.serialize; import cn.promptness.rpt.base.protocol.Meta; import io.protostuff.LinkedBuffer; @@ -6,12 +6,13 @@ import io.protostuff.Schema; import io.protostuff.runtime.RuntimeSchema; -public class MetaUtils { +public class Protostuff implements Serialize { private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); private static final Schema SCHEMA_CACHE = RuntimeSchema.getSchema(Meta.class); - public static byte[] serialize(Meta meta) { + @Override + public byte[] serialize(Meta meta) { try { return ProtostuffIOUtil.toByteArray(meta, SCHEMA_CACHE, BUFFER); } finally { @@ -19,7 +20,8 @@ public static byte[] serialize(Meta meta) { } } - public static Meta deserialize(byte[] data) { + @Override + public Meta deserialize(byte[] data) { Meta meta = SCHEMA_CACHE.newMessage(); ProtostuffIOUtil.mergeFrom(data, meta, SCHEMA_CACHE); return meta; diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/Serialize.java b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/Serialize.java new file mode 100644 index 0000000..c8586f6 --- /dev/null +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/Serialize.java @@ -0,0 +1,10 @@ +package cn.promptness.rpt.base.serialize; + +import cn.promptness.rpt.base.protocol.Meta; + +public interface Serialize { + + byte[] serialize(Meta meta) throws Exception; + + Meta deserialize(byte[] data) throws Exception; +} diff --git a/rpt-client/src/main/java/cn/promptness/rpt/client/handler/ClientHandler.java b/rpt-client/src/main/java/cn/promptness/rpt/client/handler/ClientHandler.java index 88d4dad..71fa161 100644 --- a/rpt-client/src/main/java/cn/promptness/rpt/client/handler/ClientHandler.java +++ b/rpt-client/src/main/java/cn/promptness/rpt/client/handler/ClientHandler.java @@ -44,12 +44,13 @@ protected void channelRead0(ChannelHandlerContext context, Message message) thro @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Application application = ctx.channel().attr(Constants.Client.APPLICATION).getAndSet(null); - logger.info("客户端-服务端连接中断,{}:{}", Config.getClientConfig().getServerIp(), Config.getClientConfig().getServerPort()); if (Objects.nonNull(application)) { + logger.info("客户端-服务端连接中断,{}:{}", Config.getClientConfig().getServerIp(), Config.getClientConfig().getServerPort()); Optional.ofNullable(ctx.channel().attr(Constants.CHANNELS).get()).ifPresent(this::clear); application.start(1); return; } + logger.info("客户端-服务端代理连接中断"); Channel localChannel = ctx.channel().attr(Constants.LOCAL).getAndSet(null); if (Objects.nonNull(localChannel) && localChannel.isActive()) { localChannel.attr(Constants.PROXY).set(null); diff --git a/rpt-client/src/main/java/cn/promptness/rpt/client/handler/LocalHandler.java b/rpt-client/src/main/java/cn/promptness/rpt/client/handler/LocalHandler.java index 069cd53..31535c8 100644 --- a/rpt-client/src/main/java/cn/promptness/rpt/client/handler/LocalHandler.java +++ b/rpt-client/src/main/java/cn/promptness/rpt/client/handler/LocalHandler.java @@ -39,7 +39,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.channel().config().setAutoRead(false); Channel proxyChannel = ctx.channel().attr(Constants.PROXY).get(); channel.attr(Constants.CHANNELS).get().put(meta.getChannelId(), ctx.channel()); - send(proxyChannel, MessageType.TYPE_CONNECTED); + send(proxyChannel, MessageType.TYPE_CONNECTED, EmptyArrays.EMPTY_BYTES); ctx.channel().config().setAutoRead(true); } @@ -63,7 +63,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { Optional.ofNullable(channel.attr(Constants.CHANNELS).get()).ifPresent(channelMap -> channelMap.remove(meta.getChannelId())); Channel proxyChannel = ctx.channel().attr(Constants.PROXY).get(); if (Objects.nonNull(proxyChannel) && proxyChannel.isActive()) { - send(proxyChannel, MessageType.TYPE_DISCONNECTED); + send(proxyChannel, MessageType.TYPE_DISCONNECTED, EmptyArrays.EMPTY_BYTES); } } @@ -75,10 +75,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E ctx.close(); } - private void send(Channel proxyChannel, MessageType type) { - send(proxyChannel, type, EmptyArrays.EMPTY_BYTES); - } - private void send(Channel proxyChannel, MessageType type, byte[] data) { proxyChannel.writeAndFlush(new Message(type, meta, data)); }