Skip to content

Commit

Permalink
protostuff bug wait fix
Browse files Browse the repository at this point in the history
  • Loading branch information
iamlinhui committed May 27, 2022
1 parent d72b6d2 commit de4fd00
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 15 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@
<artifactId>protostuff-runtime</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
Expand Down
4 changes: 4 additions & 0 deletions rpt-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
<groupId>io.protostuff</groupId>
<artifactId>protostuff-runtime</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import cn.promptness.rpt.base.protocol.Message;
import cn.promptness.rpt.base.protocol.MessageType;
import cn.promptness.rpt.base.protocol.Meta;
import cn.promptness.rpt.base.utils.MetaUtils;
import cn.promptness.rpt.base.serialize.Jackson;
import cn.promptness.rpt.base.serialize.Serialize;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -14,6 +15,8 @@

public class MessageDecoder extends MessageToMessageDecoder<ByteBuf> {

private final Serialize serialize = new Jackson();

@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
Message proxyMessage = new Message();
Expand All @@ -24,7 +27,7 @@ protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteB
if (protobufLength > 0) {
byte[] metaByte = new byte[protobufLength];
byteBuf.readBytes(metaByte);
Meta meta = MetaUtils.deserialize(metaByte);
Meta meta = serialize.deserialize(metaByte);
proxyMessage.setMeta(meta);
}
if (byteBuf.isReadable()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import cn.promptness.rpt.base.protocol.Message;
import cn.promptness.rpt.base.protocol.MessageType;
import cn.promptness.rpt.base.protocol.Meta;
import cn.promptness.rpt.base.utils.MetaUtils;
import cn.promptness.rpt.base.serialize.Jackson;
import cn.promptness.rpt.base.serialize.Serialize;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
Expand All @@ -12,13 +13,15 @@

public class MessageEncoder extends MessageToByteEncoder<Message> {

private final Serialize serialize = new Jackson();

@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception {
MessageType type = message.getType();
out.writeInt(type.getCode());

Meta meta = message.getMeta();
byte[] protobuf = meta == null ? EmptyArrays.EMPTY_BYTES : MetaUtils.serialize(meta);
byte[] protobuf = meta == null ? EmptyArrays.EMPTY_BYTES : serialize.serialize(meta);
out.writeInt(protobuf.length);
out.writeBytes(protobuf);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package cn.promptness.rpt.base.protocol;

import cn.promptness.rpt.base.config.RemoteConfig;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

import java.util.Collections;
import java.util.List;

@JsonIgnoreProperties(ignoreUnknown = true)
public class Meta {

private String clientKey;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package cn.promptness.rpt.base.serialize;

import cn.promptness.rpt.base.protocol.Meta;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;

public class Jackson implements Serialize {

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

@Override
public byte[] serialize(Meta meta) throws Exception {
return OBJECT_MAPPER.writeValueAsBytes(meta);
}

@Override
public Meta deserialize(byte[] data) throws IOException {
return OBJECT_MAPPER.readValue(data, Meta.class);
}
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
package cn.promptness.rpt.base.utils;
package cn.promptness.rpt.base.serialize;

import cn.promptness.rpt.base.protocol.Meta;
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
import io.protostuff.Schema;
import io.protostuff.runtime.RuntimeSchema;

public class MetaUtils {
public class Protostuff implements Serialize {

private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
private static final Schema<Meta> SCHEMA_CACHE = RuntimeSchema.getSchema(Meta.class);

public static byte[] serialize(Meta meta) {
@Override
public byte[] serialize(Meta meta) {
try {
return ProtostuffIOUtil.toByteArray(meta, SCHEMA_CACHE, BUFFER);
} finally {
BUFFER.clear();
}
}

public static Meta deserialize(byte[] data) {
@Override
public Meta deserialize(byte[] data) {
Meta meta = SCHEMA_CACHE.newMessage();
ProtostuffIOUtil.mergeFrom(data, meta, SCHEMA_CACHE);
return meta;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package cn.promptness.rpt.base.serialize;

import cn.promptness.rpt.base.protocol.Meta;

public interface Serialize {

byte[] serialize(Meta meta) throws Exception;

Meta deserialize(byte[] data) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ protected void channelRead0(ChannelHandlerContext context, Message message) thro
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Application<Boolean> application = ctx.channel().attr(Constants.Client.APPLICATION).getAndSet(null);
logger.info("客户端-服务端连接中断,{}:{}", Config.getClientConfig().getServerIp(), Config.getClientConfig().getServerPort());
if (Objects.nonNull(application)) {
logger.info("客户端-服务端连接中断,{}:{}", Config.getClientConfig().getServerIp(), Config.getClientConfig().getServerPort());
Optional.ofNullable(ctx.channel().attr(Constants.CHANNELS).get()).ifPresent(this::clear);
application.start(1);
return;
}
logger.info("客户端-服务端代理连接中断");
Channel localChannel = ctx.channel().attr(Constants.LOCAL).getAndSet(null);
if (Objects.nonNull(localChannel) && localChannel.isActive()) {
localChannel.attr(Constants.PROXY).set(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.channel().config().setAutoRead(false);
Channel proxyChannel = ctx.channel().attr(Constants.PROXY).get();
channel.attr(Constants.CHANNELS).get().put(meta.getChannelId(), ctx.channel());
send(proxyChannel, MessageType.TYPE_CONNECTED);
send(proxyChannel, MessageType.TYPE_CONNECTED, EmptyArrays.EMPTY_BYTES);
ctx.channel().config().setAutoRead(true);
}

Expand All @@ -63,7 +63,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Optional.ofNullable(channel.attr(Constants.CHANNELS).get()).ifPresent(channelMap -> channelMap.remove(meta.getChannelId()));
Channel proxyChannel = ctx.channel().attr(Constants.PROXY).get();
if (Objects.nonNull(proxyChannel) && proxyChannel.isActive()) {
send(proxyChannel, MessageType.TYPE_DISCONNECTED);
send(proxyChannel, MessageType.TYPE_DISCONNECTED, EmptyArrays.EMPTY_BYTES);
}
}

Expand All @@ -75,10 +75,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
ctx.close();
}

private void send(Channel proxyChannel, MessageType type) {
send(proxyChannel, type, EmptyArrays.EMPTY_BYTES);
}

private void send(Channel proxyChannel, MessageType type, byte[] data) {
proxyChannel.writeAndFlush(new Message(type, meta, data));
}
Expand Down

0 comments on commit de4fd00

Please sign in to comment.