diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/coder/MessageDecoder.java b/rpt-base/src/main/java/cn/promptness/rpt/base/coder/MessageDecoder.java index fbb8f10..b3cabbd 100644 --- a/rpt-base/src/main/java/cn/promptness/rpt/base/coder/MessageDecoder.java +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/coder/MessageDecoder.java @@ -3,8 +3,8 @@ import cn.promptness.rpt.base.protocol.Message; import cn.promptness.rpt.base.protocol.MessageType; import cn.promptness.rpt.base.protocol.Meta; -import cn.promptness.rpt.base.serialize.Jackson; -import cn.promptness.rpt.base.serialize.Serializer; +import cn.promptness.rpt.base.serialize.SerializerDispatcher; +import cn.promptness.rpt.base.serialize.api.SerializationType; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelHandlerContext; @@ -15,7 +15,7 @@ public class MessageDecoder extends MessageToMessageDecoder { - private static final Serializer SERIALIZER = new Jackson(); + private static final SerializerDispatcher SERIALIZER_DISPATCHER = new SerializerDispatcher(); @Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) throws Exception { @@ -27,7 +27,7 @@ protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteB if (protobufLength > 0) { byte[] metaByte = new byte[protobufLength]; byteBuf.readBytes(metaByte); - Meta meta = SERIALIZER.deserialize(metaByte); + Meta meta = SERIALIZER_DISPATCHER.deserialize(SerializationType.PROTOSTUFF, metaByte); proxyMessage.setMeta(meta); } if (byteBuf.isReadable()) { diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/coder/MessageEncoder.java b/rpt-base/src/main/java/cn/promptness/rpt/base/coder/MessageEncoder.java index cf6ca57..1e41caa 100644 --- a/rpt-base/src/main/java/cn/promptness/rpt/base/coder/MessageEncoder.java +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/coder/MessageEncoder.java @@ -3,8 +3,8 @@ import cn.promptness.rpt.base.protocol.Message; import cn.promptness.rpt.base.protocol.MessageType; import cn.promptness.rpt.base.protocol.Meta; -import cn.promptness.rpt.base.serialize.Jackson; -import cn.promptness.rpt.base.serialize.Serializer; +import cn.promptness.rpt.base.serialize.SerializerDispatcher; +import cn.promptness.rpt.base.serialize.api.SerializationType; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; @@ -13,7 +13,7 @@ public class MessageEncoder extends MessageToByteEncoder { - private static final Serializer SERIALIZER = new Jackson(); + private static final SerializerDispatcher SERIALIZER_DISPATCHER = new SerializerDispatcher(); @Override protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf out) throws Exception { @@ -21,7 +21,7 @@ protected void encode(ChannelHandlerContext channelHandlerContext, Message messa out.writeInt(type.getCode()); Meta meta = message.getMeta(); - byte[] protobuf = meta == null ? EmptyArrays.EMPTY_BYTES : SERIALIZER.serialize(meta); + byte[] protobuf = meta == null ? EmptyArrays.EMPTY_BYTES : SERIALIZER_DISPATCHER.serialize(SerializationType.PROTOSTUFF, meta); out.writeInt(protobuf.length); out.writeBytes(protobuf); diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/executor/MessageDispatcher.java b/rpt-base/src/main/java/cn/promptness/rpt/base/executor/MessageDispatcher.java index 8d08e7b..4323622 100644 --- a/rpt-base/src/main/java/cn/promptness/rpt/base/executor/MessageDispatcher.java +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/executor/MessageDispatcher.java @@ -4,21 +4,18 @@ import cn.promptness.rpt.base.protocol.MessageType; import io.netty.channel.ChannelHandlerContext; -import java.util.HashMap; import java.util.Map; import java.util.ServiceLoader; +import java.util.concurrent.ConcurrentHashMap; public class MessageDispatcher { - private static final Map MESSAGE_EXECUTOR_MAP = new HashMap<>(); + private static final Map MESSAGE_EXECUTOR_MAP = new ConcurrentHashMap<>(); static { ServiceLoader executors = ServiceLoader.load(MessageExecutor.class); for (MessageExecutor executor : executors) { - MessageExecutor messageExecutor = MESSAGE_EXECUTOR_MAP.put(executor.getMessageType(), executor); - if (messageExecutor != null) { - throw new RuntimeException(String.format("%s Message Executor Register Repeat", messageExecutor.getMessageType().name())); - } + MESSAGE_EXECUTOR_MAP.put(executor.getMessageType(), executor); } } diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/Jackson.java b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/Jackson.java deleted file mode 100644 index dd2c78f..0000000 --- a/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/Jackson.java +++ /dev/null @@ -1,21 +0,0 @@ -package cn.promptness.rpt.base.serialize; - -import cn.promptness.rpt.base.protocol.Meta; -import com.fasterxml.jackson.databind.ObjectMapper; - -import java.io.IOException; - -public class Jackson implements Serializer { - - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - - @Override - public byte[] serialize(Meta meta) throws Exception { - return OBJECT_MAPPER.writeValueAsBytes(meta); - } - - @Override - public Meta deserialize(byte[] data) throws IOException { - return OBJECT_MAPPER.readValue(data, Meta.class); - } -} diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/Protostuff.java b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/Protostuff.java deleted file mode 100644 index 999bbf2..0000000 --- a/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/Protostuff.java +++ /dev/null @@ -1,29 +0,0 @@ -package cn.promptness.rpt.base.serialize; - -import cn.promptness.rpt.base.protocol.Meta; -import io.protostuff.LinkedBuffer; -import io.protostuff.ProtostuffIOUtil; -import io.protostuff.Schema; -import io.protostuff.runtime.RuntimeSchema; - -public class Protostuff implements Serializer { - - private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); - private static final Schema SCHEMA_CACHE = RuntimeSchema.getSchema(Meta.class); - - @Override - public byte[] serialize(Meta meta) { - try { - return ProtostuffIOUtil.toByteArray(meta, SCHEMA_CACHE, BUFFER); - } finally { - BUFFER.clear(); - } - } - - @Override - public Meta deserialize(byte[] data) { - Meta meta = SCHEMA_CACHE.newMessage(); - ProtostuffIOUtil.mergeFrom(data, meta, SCHEMA_CACHE); - return meta; - } -} diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/Serializer.java b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/Serializer.java deleted file mode 100644 index 2d71cf2..0000000 --- a/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/Serializer.java +++ /dev/null @@ -1,10 +0,0 @@ -package cn.promptness.rpt.base.serialize; - -import cn.promptness.rpt.base.protocol.Meta; - -public interface Serializer { - - byte[] serialize(Meta meta) throws Exception; - - Meta deserialize(byte[] data) throws Exception; -} diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/SerializerDispatcher.java b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/SerializerDispatcher.java new file mode 100644 index 0000000..5c6ccd1 --- /dev/null +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/SerializerDispatcher.java @@ -0,0 +1,26 @@ +package cn.promptness.rpt.base.serialize; + +import cn.promptness.rpt.base.serialize.api.*; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; + +public class SerializerDispatcher { + + public byte[] serialize(SerializationType serializationType, Object obj) throws Exception { + Serialization serialization = SerializeFactory.getSerialization(serializationType); + try (ByteArrayOutputStream byteBufOutputStream = new ByteArrayOutputStream(); ObjectOutputStream objectOutputStream = serialization.serialize(byteBufOutputStream)) { + objectOutputStream.writeObject(obj); + objectOutputStream.flush(); + return byteBufOutputStream.toByteArray(); + } + } + + @SuppressWarnings("unchecked") + public T deserialize(SerializationType serializationType, byte[] data) throws Exception { + Serialization serialization = SerializeFactory.getSerialization(serializationType); + try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data); ObjectInputStream objectInputStream = serialization.deserialize(byteArrayInputStream)) { + return (T) objectInputStream.readObject(Object.class); + } + } +} diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/api/ObjectInputStream.java b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/api/ObjectInputStream.java new file mode 100644 index 0000000..e33ac17 --- /dev/null +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/api/ObjectInputStream.java @@ -0,0 +1,31 @@ +package cn.promptness.rpt.base.serialize.api; + +import java.io.Closeable; +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.Map; + +public interface ObjectInputStream extends Closeable { + + int readInt() throws IOException; + + byte readByte() throws IOException; + + byte[] readBytes() throws IOException; + + String readUTF() throws IOException; + + T readObject(Class cls) throws IOException, ClassNotFoundException; + + default T readObject(Class cls, Type genericType) throws IOException, ClassNotFoundException { + return readObject(cls); + } + + default Throwable readThrowable() throws IOException, ClassNotFoundException { + return readObject(Throwable.class); + } + + default Map readMap() throws IOException, ClassNotFoundException { + return readObject(Map.class); + } +} diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/api/ObjectOutputStream.java b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/api/ObjectOutputStream.java new file mode 100644 index 0000000..2d636a5 --- /dev/null +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/api/ObjectOutputStream.java @@ -0,0 +1,32 @@ +package cn.promptness.rpt.base.serialize.api; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Map; + +public interface ObjectOutputStream extends Closeable { + + void writeInt(int v) throws IOException; + + void writeByte(byte v) throws IOException; + + void writeBytes(byte[] b) throws IOException; + + void writeUTF(String v) throws IOException; + + void writeObject(Object obj) throws IOException; + + default void writeMap(Map map) throws IOException { + writeObject(map); + } + + default void writeThrowable(Object obj) throws IOException { + writeObject(obj); + } + + default void writeEvent(Object data) throws IOException { + writeObject(data); + } + + void flush() throws IOException; +} diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/api/Serialization.java b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/api/Serialization.java new file mode 100644 index 0000000..a73e91f --- /dev/null +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/api/Serialization.java @@ -0,0 +1,15 @@ +package cn.promptness.rpt.base.serialize.api; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public interface Serialization { + + SerializationType getType(); + + ObjectOutputStream serialize(OutputStream output) throws IOException; + + ObjectInputStream deserialize(InputStream input) throws IOException; +} diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/api/SerializationType.java b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/api/SerializationType.java new file mode 100644 index 0000000..f20822a --- /dev/null +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/api/SerializationType.java @@ -0,0 +1,27 @@ +package cn.promptness.rpt.base.serialize.api; + +public enum SerializationType { + + /** + * + */ + PROTOSTUFF(1, "protostuff"), + JSON(2, "jackson"); + + SerializationType(int code, String desc) { + this.code = code; + this.desc = desc; + } + + final int code; + + final String desc; + + public int getCode() { + return code; + } + + public String getDesc() { + return desc; + } +} diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/api/SerializeFactory.java b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/api/SerializeFactory.java new file mode 100644 index 0000000..a2e5f71 --- /dev/null +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/api/SerializeFactory.java @@ -0,0 +1,23 @@ +package cn.promptness.rpt.base.serialize.api; + + +import java.util.Map; +import java.util.ServiceLoader; +import java.util.concurrent.ConcurrentHashMap; + +public class SerializeFactory { + + private static final Map SERIALIZATION_MAP = new ConcurrentHashMap<>(); + + static { + ServiceLoader serializations = ServiceLoader.load(Serialization.class); + for (Serialization serialization : serializations) { + SERIALIZATION_MAP.put(serialization.getType(), serialization); + } + } + + public static Serialization getSerialization(SerializationType serializationType) { + return SERIALIZATION_MAP.get(serializationType); + } + +} diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/json/JacksonUtil.java b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/json/JacksonUtil.java new file mode 100644 index 0000000..e5ccb95 --- /dev/null +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/json/JacksonUtil.java @@ -0,0 +1,52 @@ +package cn.promptness.rpt.base.serialize.json; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.cfg.MapperConfig; +import com.fasterxml.jackson.databind.json.JsonMapper; +import com.fasterxml.jackson.databind.jsontype.PolymorphicTypeValidator; + +import static com.fasterxml.jackson.databind.ObjectMapper.DefaultTyping.NON_FINAL; + +public class JacksonUtil { + + /** + * used for take the bean actual type with json string; for de-serializing the json string into real type bean. + */ + private static final String TYPE_AS_JSON_PROPERTY = "typeAsJsonProperty"; + + private static final JsonMapper MAPPER = new JsonMapper(); + + static { + final boolean takeTypeAsProperty = Boolean.getBoolean(TYPE_AS_JSON_PROPERTY); + if (takeTypeAsProperty) { + MAPPER.activateDefaultTypingAsProperty(new PolymorphicTypeValidator() { + @Override + public Validity validateBaseType(final MapperConfig config, + final JavaType baseType) { + return Validity.ALLOWED; + } + + @Override + public Validity validateSubClassName(final MapperConfig config, + final JavaType baseType, + final String subClassName) throws JsonMappingException { + return Validity.ALLOWED; + } + + @Override + public Validity validateSubType(final MapperConfig config, + final JavaType baseType, + final JavaType subType) throws JsonMappingException { + return Validity.ALLOWED; + } + }, NON_FINAL, null); + } + MAPPER.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + } + + public static JsonMapper getJsonMapper() { + return MAPPER; + } +} diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/json/JsonDataInputStream.java b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/json/JsonDataInputStream.java new file mode 100644 index 0000000..c489ff1 --- /dev/null +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/json/JsonDataInputStream.java @@ -0,0 +1,77 @@ +package cn.promptness.rpt.base.serialize.json; + +import cn.promptness.rpt.base.serialize.api.ObjectInputStream; +import com.fasterxml.jackson.core.type.TypeReference; + +import java.io.*; +import java.lang.reflect.Type; + +public class JsonDataInputStream implements ObjectInputStream { + + private final BufferedReader reader; + + public JsonDataInputStream(InputStream in) { + this.reader = new BufferedReader(new InputStreamReader(in)); + } + + @Override + public int readInt() throws IOException { + return read(int.class); + } + + @Override + public byte readByte() throws IOException { + return read(byte.class); + } + + @Override + public byte[] readBytes() throws IOException { + return readLine().getBytes(); + } + + @Override + public String readUTF() throws IOException { + return read(String.class); + } + + @Override + public T readObject(final Class cls) throws IOException, ClassNotFoundException { + return read(cls); + } + + @Override + public T readObject(final Class cls, final Type genericType) throws IOException, ClassNotFoundException { + if (genericType == null || genericType == cls) { + return read(cls); + } + return read(genericType); + } + + private T read(Class cls) throws IOException { + String json = readLine(); + return JacksonUtil.getJsonMapper().readValue(json, cls); + } + + T read(Type genericType) throws IOException { + String json = readLine(); + return JacksonUtil.getJsonMapper().readValue(json, new TypeReference() { + @Override + public Type getType() { + return genericType; + } + }); + } + + private String readLine() throws IOException { + String line = reader.readLine(); + if (line == null || line.trim().length() == 0) { + throw new EOFException(); + } + return line; + } + + @Override + public void close() throws IOException { + reader.close(); + } +} diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/json/JsonDataOutputStream.java b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/json/JsonDataOutputStream.java new file mode 100644 index 0000000..0c2b1d9 --- /dev/null +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/json/JsonDataOutputStream.java @@ -0,0 +1,53 @@ +package cn.promptness.rpt.base.serialize.json; + + +import cn.promptness.rpt.base.serialize.api.ObjectOutputStream; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintWriter; + +public class JsonDataOutputStream implements ObjectOutputStream { + + private final PrintWriter writer; + + public JsonDataOutputStream(OutputStream out) { + this.writer = new PrintWriter(out); + } + + @Override + public void writeInt(final int v) throws IOException { + writeObject(v); + } + + @Override + public void writeByte(final byte v) throws IOException { + writeObject(v); + } + + @Override + public void writeBytes(final byte[] b) throws IOException { + writer.println(new String(b)); + } + + @Override + public void writeUTF(final String v) throws IOException { + writeObject(v); + } + + @Override + public void writeObject(final Object obj) throws IOException { + final String content = JacksonUtil.getJsonMapper().writeValueAsString(obj); + writer.println(content); + } + + @Override + public void flush() throws IOException { + writer.flush(); + } + + @Override + public void close() throws IOException { + this.writer.close(); + } +} diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/json/JsonSerialization.java b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/json/JsonSerialization.java new file mode 100644 index 0000000..a78510b --- /dev/null +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/json/JsonSerialization.java @@ -0,0 +1,28 @@ +package cn.promptness.rpt.base.serialize.json; + +import cn.promptness.rpt.base.serialize.api.ObjectInputStream; +import cn.promptness.rpt.base.serialize.api.ObjectOutputStream; +import cn.promptness.rpt.base.serialize.api.Serialization; +import cn.promptness.rpt.base.serialize.api.SerializationType; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public class JsonSerialization implements Serialization { + + @Override + public SerializationType getType() { + return SerializationType.JSON; + } + + @Override + public ObjectOutputStream serialize(final OutputStream output) throws IOException { + return new JsonDataOutputStream(output); + } + + @Override + public ObjectInputStream deserialize(final InputStream input) throws IOException { + return new JsonDataInputStream(input); + } +} diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/protostuff/ProtostuffDataInputStream.java b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/protostuff/ProtostuffDataInputStream.java new file mode 100644 index 0000000..8277c39 --- /dev/null +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/protostuff/ProtostuffDataInputStream.java @@ -0,0 +1,85 @@ +package cn.promptness.rpt.base.serialize.protostuff; + +import cn.promptness.rpt.base.serialize.api.ObjectInputStream; +import cn.promptness.rpt.base.serialize.protostuff.utils.WrapperUtils; +import io.protostuff.GraphIOUtil; +import io.protostuff.Schema; +import io.protostuff.runtime.RuntimeSchema; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; + +public class ProtostuffDataInputStream implements ObjectInputStream { + + private final DataInputStream dis; + + public ProtostuffDataInputStream(InputStream inputStream) { + dis = new DataInputStream(inputStream); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Override + public T readObject(Class clazzType) throws IOException, ClassNotFoundException { + int classNameLength = dis.readInt(); + int bytesLength = dis.readInt(); + + if (classNameLength < 0 || bytesLength < 0) { + throw new IOException(); + } + + byte[] classNameBytes = new byte[classNameLength]; + dis.readFully(classNameBytes, 0, classNameLength); + + byte[] bytes = new byte[bytesLength]; + dis.readFully(bytes, 0, bytesLength); + + String className = new String(classNameBytes); + Class clazz = Class.forName(className); + + Object result; + if (WrapperUtils.needWrapper(clazz)) { + Schema schema = RuntimeSchema.getSchema(Wrapper.class); + Wrapper wrapper = schema.newMessage(); + GraphIOUtil.mergeFrom(bytes, wrapper, schema); + result = wrapper.getData(); + } else { + Schema schema = RuntimeSchema.getSchema(clazz); + result = schema.newMessage(); + GraphIOUtil.mergeFrom(bytes, result, schema); + } + + return (T) result; + } + + @Override + public int readInt() throws IOException { + return dis.readInt(); + } + + @Override + public byte readByte() throws IOException { + return dis.readByte(); + } + + @Override + public byte[] readBytes() throws IOException { + int length = dis.readInt(); + byte[] bytes = new byte[length]; + dis.read(bytes, 0, length); + return bytes; + } + + @Override + public String readUTF() throws IOException { + int length = dis.readInt(); + byte[] bytes = new byte[length]; + dis.read(bytes, 0, length); + return new String(bytes); + } + + @Override + public void close() throws IOException { + dis.close(); + } +} diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/protostuff/ProtostuffDataOutputStream.java b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/protostuff/ProtostuffDataOutputStream.java new file mode 100644 index 0000000..d5f7cf5 --- /dev/null +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/protostuff/ProtostuffDataOutputStream.java @@ -0,0 +1,82 @@ +package cn.promptness.rpt.base.serialize.protostuff; + +import cn.promptness.rpt.base.serialize.api.ObjectOutputStream; +import cn.promptness.rpt.base.serialize.protostuff.utils.WrapperUtils; +import io.protostuff.GraphIOUtil; +import io.protostuff.LinkedBuffer; +import io.protostuff.Schema; +import io.protostuff.runtime.RuntimeSchema; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +public class ProtostuffDataOutputStream implements ObjectOutputStream { + + private final LinkedBuffer buffer = LinkedBuffer.allocate(); + private final DataOutputStream dos; + + public ProtostuffDataOutputStream(OutputStream outputStream) { + dos = new DataOutputStream(outputStream); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Override + public void writeObject(Object obj) throws IOException { + byte[] bytes; + byte[] classNameBytes; + + try { + if (obj == null || WrapperUtils.needWrapper(obj)) { + Schema schema = RuntimeSchema.getSchema(Wrapper.class); + Wrapper wrapper = new Wrapper(obj); + bytes = GraphIOUtil.toByteArray(wrapper, schema, buffer); + classNameBytes = Wrapper.class.getName().getBytes(); + } else { + Schema schema = RuntimeSchema.getSchema(obj.getClass()); + bytes = GraphIOUtil.toByteArray(obj, schema, buffer); + classNameBytes = obj.getClass().getName().getBytes(); + } + } finally { + buffer.clear(); + } + + dos.writeInt(classNameBytes.length); + dos.writeInt(bytes.length); + dos.write(classNameBytes); + dos.write(bytes); + } + + @Override + public void writeInt(int v) throws IOException { + dos.writeInt(v); + } + + @Override + public void writeByte(byte v) throws IOException { + dos.writeByte(v); + } + + @Override + public void writeBytes(byte[] b) throws IOException { + dos.writeInt(b.length); + dos.write(b); + } + + @Override + public void writeUTF(String v) throws IOException { + byte[] bytes = v.getBytes(); + dos.writeInt(bytes.length); + dos.write(bytes); + } + + @Override + public void flush() throws IOException { + dos.flush(); + } + + @Override + public void close() throws IOException { + dos.close(); + } +} diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/protostuff/ProtostuffSerialization.java b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/protostuff/ProtostuffSerialization.java new file mode 100644 index 0000000..6e66e10 --- /dev/null +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/protostuff/ProtostuffSerialization.java @@ -0,0 +1,28 @@ +package cn.promptness.rpt.base.serialize.protostuff; + +import cn.promptness.rpt.base.serialize.api.ObjectInputStream; +import cn.promptness.rpt.base.serialize.api.ObjectOutputStream; +import cn.promptness.rpt.base.serialize.api.Serialization; +import cn.promptness.rpt.base.serialize.api.SerializationType; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public class ProtostuffSerialization implements Serialization { + + @Override + public SerializationType getType() { + return SerializationType.PROTOSTUFF; + } + + @Override + public ObjectOutputStream serialize(OutputStream output) throws IOException { + return new ProtostuffDataOutputStream(output); + } + + @Override + public ObjectInputStream deserialize(InputStream input) throws IOException { + return new ProtostuffDataInputStream(input); + } +} diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/protostuff/Wrapper.java b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/protostuff/Wrapper.java new file mode 100644 index 0000000..808e4f0 --- /dev/null +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/protostuff/Wrapper.java @@ -0,0 +1,17 @@ +package cn.promptness.rpt.base.serialize.protostuff; + +/** + * Protostuff can only serialize/deserialize POJOs, for those it can't deal with, use this Wrapper. + */ +public class Wrapper { + + private final T data; + + Wrapper(T data) { + this.data = data; + } + + Object getData() { + return data; + } +} diff --git a/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/protostuff/delegate/TimeDelegate.java b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/protostuff/delegate/TimeDelegate.java new file mode 100644 index 0000000..5bf162d --- /dev/null +++ b/rpt-base/src/main/java/cn/promptness/rpt/base/serialize/protostuff/delegate/TimeDelegate.java @@ -0,0 +1,40 @@ +package cn.promptness.rpt.base.serialize.protostuff.delegate; + +import io.protostuff.Input; +import io.protostuff.Output; +import io.protostuff.Pipe; +import io.protostuff.WireFormat; +import io.protostuff.runtime.Delegate; + +import java.io.IOException; +import java.sql.Time; + +/** + * Custom {@link Time} delegate + */ +public class TimeDelegate implements Delegate