diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/coder/MessageDecoder.java b/rpt-base/src/main/java/cn/promptness/rpt/base/coder/MessageDecoder.java index 2811619..0b11347 100644 --- a/rpt-base/src/main/java/cn/promptness/rpt/base/coder/MessageDecoder.java +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/coder/MessageDecoder.java @@ -12,28 +12,31 @@ import java.util.List; - +/** + * messageLength|messageType|serializerType|metaLength|meta|data + */ public class MessageDecoder extends MessageToMessageDecoder { private static final SerializerDispatcher SERIALIZER_DISPATCHER = new SerializerDispatcher(); @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { - Message proxyMessage = new Message(); + Message message = new Message(); // 4个字节 - proxyMessage.setType(MessageType.getInstance(byteBuf.readInt())); + message.setType(MessageType.getInstance(byteBuf.readInt())); + SerializationType serialization = SerializationType.getInstance(byteBuf.readInt()); - int protobufLength = byteBuf.readInt(); - if (protobufLength > 0) { - byte[] metaByte = new byte[protobufLength]; + int metaByteLength = byteBuf.readInt(); + if (metaByteLength > 0) { + byte[] metaByte = new byte[metaByteLength]; byteBuf.readBytes(metaByte); - Meta meta = SERIALIZER_DISPATCHER.deserialize(SerializationType.PROTOSTUFF, metaByte, Meta.class); - proxyMessage.setMeta(meta); + Meta meta = SERIALIZER_DISPATCHER.deserialize(serialization, metaByte, Meta.class); + message.setMeta(meta); } if (byteBuf.isReadable()) { byte[] data = ByteBufUtil.getBytes(byteBuf); - proxyMessage.setData(data); + message.setData(data); } - list.add(proxyMessage); + list.add(message); } } diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/coder/MessageEncoder.java b/rpt-base/src/main/java/cn/promptness/rpt/base/coder/MessageEncoder.java index 1e41caa..8b53357 100644 --- a/rpt-base/src/main/java/cn/promptness/rpt/base/coder/MessageEncoder.java +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/coder/MessageEncoder.java @@ -4,13 +4,14 @@ import cn.promptness.rpt.base.protocol.MessageType; import cn.promptness.rpt.base.protocol.Meta; import cn.promptness.rpt.base.serialize.SerializerDispatcher; -import cn.promptness.rpt.base.serialize.api.SerializationType; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import io.netty.util.internal.EmptyArrays; - +/** + * messageLength|messageType|serializerType|metaLength|meta|data + */ public class MessageEncoder extends MessageToByteEncoder { private static final SerializerDispatcher SERIALIZER_DISPATCHER = new SerializerDispatcher(); @@ -19,11 +20,12 @@ public class MessageEncoder extends MessageToByteEncoder { protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception { MessageType type = message.getType(); out.writeInt(type.getCode()); + out.writeInt(message.getSerialization().getCode()); Meta meta = message.getMeta(); - byte[] protobuf = meta == null ? EmptyArrays.EMPTY_BYTES : SERIALIZER_DISPATCHER.serialize(SerializationType.PROTOSTUFF, meta); - out.writeInt(protobuf.length); - out.writeBytes(protobuf); + byte[] metaByte = meta == null ? EmptyArrays.EMPTY_BYTES : SERIALIZER_DISPATCHER.serialize(message.getSerialization(), meta); + out.writeInt(metaByte.length); + out.writeBytes(metaByte); if (message.getData() != null && message.getData().length > 0) { out.writeBytes(message.getData()); diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/protocol/Message.java b/rpt-base/src/main/java/cn/promptness/rpt/base/protocol/Message.java index 5894428..75404ba 100644 --- a/rpt-base/src/main/java/cn/promptness/rpt/base/protocol/Message.java +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/protocol/Message.java @@ -1,5 +1,7 @@ package cn.promptness.rpt.base.protocol; +import cn.promptness.rpt.base.serialize.api.SerializationType; + /** * 客户端-服务器自定义通信协议 */ @@ -10,6 +12,8 @@ public class Message { */ private MessageType type; + private SerializationType serialization = SerializationType.PROTOSTUFF; + /** * 元数据 */ @@ -54,4 +58,11 @@ public void setType(MessageType type) { this.type = type; } + public SerializationType getSerialization() { + return serialization; + } + + public void setSerialization(SerializationType serialization) { + this.serialization = serialization; + } } diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/api/SerializationType.java b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/api/SerializationType.java index f20822a..e41c25f 100644 --- a/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/api/SerializationType.java +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/api/SerializationType.java @@ -24,4 +24,13 @@ public int getCode() { public String getDesc() { return desc; } + + public static SerializationType getInstance(int code) { + for (SerializationType value : SerializationType.values()) { + if (value.code == code) { + return value; + } + } + return null; + } } diff --git a/rpt-server/src/main/java/cn/promptness/rpt/server/executor/RegisterExecutor.java b/rpt-server/src/main/java/cn/promptness/rpt/server/executor/RegisterExecutor.java index 5e61e48..44cc196 100644 --- a/rpt-server/src/main/java/cn/promptness/rpt/server/executor/RegisterExecutor.java +++ b/rpt-server/src/main/java/cn/promptness/rpt/server/executor/RegisterExecutor.java @@ -58,6 +58,7 @@ public void execute(ChannelHandlerContext context, Message message) throws Excep channelFuture.addListener(ChannelFutureListener.CLOSE); return; } + logger.info("授权注册成功,客户端使用的秘钥:{}", meta.getClientKey()); ServerChannelCache.getServerChannelMap().put(context.channel().id().asLongText(), context.channel()); context.channel().attr(Constants.CHANNELS).set(new ConcurrentHashMap<>(1024)); context.channel().attr(Constants.Server.DOMAIN).set(domainList);