diff --git a/dubbo-serialization-extensions/dubbo-serialization-jdk/pom.xml b/dubbo-serialization-extensions/dubbo-serialization-jdk/pom.xml
new file mode 100644
index 000000000..43525f81d
--- /dev/null
+++ b/dubbo-serialization-extensions/dubbo-serialization-jdk/pom.xml
@@ -0,0 +1,54 @@
+
+
+
+ * e.g. <dubbo:protocol serialization="compactedjava" /> + *+ */ +public class CompactedJavaSerialization implements Serialization { + + @Override + public byte getContentTypeId() { + return COMPACTED_JAVA_SERIALIZATION_ID; + } + + @Override + public String getContentType() { + return "x-application/compactedjava"; + } + + @Override + public ObjectOutput serialize(URL url, OutputStream out) throws IOException { + return new JavaObjectOutput(out, true); + } + + @Override + public ObjectInput deserialize(URL url, InputStream is) throws IOException { + return new JavaObjectInput(is, true); + } +} diff --git a/dubbo-serialization-extensions/dubbo-serialization-jdk/src/main/java/org/apache/dubbo/common/serialize/java/CompactedObjectInputStream.java b/dubbo-serialization-extensions/dubbo-serialization-jdk/src/main/java/org/apache/dubbo/common/serialize/java/CompactedObjectInputStream.java new file mode 100644 index 000000000..045fec3b1 --- /dev/null +++ b/dubbo-serialization-extensions/dubbo-serialization-jdk/src/main/java/org/apache/dubbo/common/serialize/java/CompactedObjectInputStream.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.common.serialize.java; + +import org.apache.dubbo.common.utils.ClassUtils; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectStreamClass; +import java.io.StreamCorruptedException; + +/** + * Compacted java object input implementation + */ +public class CompactedObjectInputStream extends ObjectInputStream { + private ClassLoader mClassLoader; + + public CompactedObjectInputStream(InputStream in) throws IOException { + this(in, Thread.currentThread().getContextClassLoader()); + } + + public CompactedObjectInputStream(InputStream in, ClassLoader cl) throws IOException { + super(in); + mClassLoader = cl == null ? ClassUtils.getClassLoader() : cl; + } + + @Override + protected ObjectStreamClass readClassDescriptor() throws IOException, ClassNotFoundException { + int type = read(); + if (type < 0) { + throw new EOFException(); + } + switch (type) { + case 0: + return super.readClassDescriptor(); + case 1: + Class> clazz = loadClass(readUTF()); + return ObjectStreamClass.lookup(clazz); + default: + throw new StreamCorruptedException("Unexpected class descriptor type: " + type); + } + } + + private Class> loadClass(String className) throws ClassNotFoundException { + return mClassLoader.loadClass(className); + } +} diff --git a/dubbo-serialization-extensions/dubbo-serialization-jdk/src/main/java/org/apache/dubbo/common/serialize/java/CompactedObjectOutputStream.java b/dubbo-serialization-extensions/dubbo-serialization-jdk/src/main/java/org/apache/dubbo/common/serialize/java/CompactedObjectOutputStream.java new file mode 100644 index 000000000..d93108311 --- /dev/null +++ b/dubbo-serialization-extensions/dubbo-serialization-jdk/src/main/java/org/apache/dubbo/common/serialize/java/CompactedObjectOutputStream.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.common.serialize.java; + +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.io.ObjectStreamClass; +import java.io.OutputStream; + +/** + * Compacted java object output implementation + */ +public class CompactedObjectOutputStream extends ObjectOutputStream { + public CompactedObjectOutputStream(OutputStream out) throws IOException { + super(out); + } + + @Override + protected void writeClassDescriptor(ObjectStreamClass desc) throws IOException { + Class> clazz = desc.forClass(); + if (clazz.isPrimitive() || clazz.isArray()) { + write(0); + super.writeClassDescriptor(desc); + } else { + write(1); + writeUTF(desc.getName()); + } + } +} diff --git a/dubbo-serialization-extensions/dubbo-serialization-jdk/src/main/java/org/apache/dubbo/common/serialize/java/JavaObjectInput.java b/dubbo-serialization-extensions/dubbo-serialization-jdk/src/main/java/org/apache/dubbo/common/serialize/java/JavaObjectInput.java new file mode 100644 index 000000000..bbdd02ffa --- /dev/null +++ b/dubbo-serialization-extensions/dubbo-serialization-jdk/src/main/java/org/apache/dubbo/common/serialize/java/JavaObjectInput.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.common.serialize.java; + +import org.apache.dubbo.common.serialize.nativejava.NativeJavaObjectInput; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.lang.reflect.Type; + +/** + * Java object input implementation + */ +public class JavaObjectInput extends NativeJavaObjectInput { + public static final int MAX_BYTE_ARRAY_LENGTH = 8 * 1024 * 1024; + + public JavaObjectInput(InputStream is) throws IOException { + super(new ObjectInputStream(is)); + } + + public JavaObjectInput(InputStream is, boolean compacted) throws IOException { + super(compacted ? new CompactedObjectInputStream(is) : new ObjectInputStream(is)); + } + + @Override + public byte[] readBytes() throws IOException { + int len = getObjectInputStream().readInt(); + if (len < 0) { + return null; + } + if (len == 0) { + return new byte[0]; + } + if (len > MAX_BYTE_ARRAY_LENGTH) { + throw new IOException("Byte array length too large. " + len); + } + + byte[] b = new byte[len]; + getObjectInputStream().readFully(b); + return b; + } + + @Override + public String readUTF() throws IOException { + int len = getObjectInputStream().readInt(); + if (len < 0) { + return null; + } + + return getObjectInputStream().readUTF(); + } + + @Override + public Object readObject() throws IOException, ClassNotFoundException { + byte b = getObjectInputStream().readByte(); + if (b == 0) { + return null; + } + + return getObjectInputStream().readObject(); + } + + @Override + @SuppressWarnings("unchecked") + public
+ * e.g. <dubbo:protocol serialization="java" /> + *+ */ +public class JavaSerialization implements Serialization { + private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(JavaSerialization.class); + private static final AtomicBoolean warn = new AtomicBoolean(false); + + @Override + public byte getContentTypeId() { + return JAVA_SERIALIZATION_ID; + } + + @Override + public String getContentType() { + return "x-application/java"; + } + + @Override + public ObjectOutput serialize(URL url, OutputStream out) throws IOException { + if (warn.compareAndSet(false, true)) { + logger.error( + PROTOCOL_UNSAFE_SERIALIZATION, + "", + "", + "Java serialization is unsafe. Dubbo Team do not recommend anyone to use it." + + "If you still want to use it, please follow [JEP 290](https://openjdk.java.net/jeps/290)" + + "to set serialization filter to prevent deserialization leak."); + } + return new JavaObjectOutput(out); + } + + @Override + public ObjectInput deserialize(URL url, InputStream is) throws IOException { + if (warn.compareAndSet(false, true)) { + logger.error( + PROTOCOL_UNSAFE_SERIALIZATION, + "", + "", + "Java serialization is unsafe. Dubbo Team do not recommend anyone to use it." + + "If you still want to use it, please follow [JEP 290](https://openjdk.java.net/jeps/290)" + + "to set serialization filter to prevent deserialization leak."); + } + return new JavaObjectInput(is); + } +} diff --git a/dubbo-serialization-extensions/dubbo-serialization-jdk/src/main/java/org/apache/dubbo/common/serialize/nativejava/NativeJavaObjectInput.java b/dubbo-serialization-extensions/dubbo-serialization-jdk/src/main/java/org/apache/dubbo/common/serialize/nativejava/NativeJavaObjectInput.java new file mode 100644 index 000000000..74431832f --- /dev/null +++ b/dubbo-serialization-extensions/dubbo-serialization-jdk/src/main/java/org/apache/dubbo/common/serialize/nativejava/NativeJavaObjectInput.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.common.serialize.nativejava; + +import org.apache.dubbo.common.serialize.ObjectInput; +import org.apache.dubbo.common.utils.Assert; + +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.lang.reflect.Type; + +/** + * Native java object input implementation + */ +public class NativeJavaObjectInput implements ObjectInput { + + private final ObjectInputStream inputStream; + + public NativeJavaObjectInput(InputStream is) throws IOException { + this(new ObjectInputStream(is)); + } + + protected NativeJavaObjectInput(ObjectInputStream is) { + Assert.notNull(is, "input == null"); + inputStream = is; + } + + protected ObjectInputStream getObjectInputStream() { + return inputStream; + } + + @Override + public Object readObject() throws IOException, ClassNotFoundException { + return inputStream.readObject(); + } + + @Override + @SuppressWarnings("unchecked") + public
+ * e.g. <dubbo:protocol serialization="nativejava" /> + *+ */ +public class NativeJavaSerialization implements Serialization { + private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(JavaSerialization.class); + private static final AtomicBoolean warn = new AtomicBoolean(false); + + @Override + public byte getContentTypeId() { + return NATIVE_JAVA_SERIALIZATION_ID; + } + + @Override + public String getContentType() { + return "x-application/nativejava"; + } + + @Override + public ObjectOutput serialize(URL url, OutputStream output) throws IOException { + if (warn.compareAndSet(false, true)) { + logger.error( + PROTOCOL_UNSAFE_SERIALIZATION, + "", + "", + "Java serialization is unsafe. Dubbo Team do not recommend anyone to use it." + + "If you still want to use it, please follow [JEP 290](https://openjdk.java.net/jeps/290)" + + "to set serialization filter to prevent deserialization leak."); + } + return new NativeJavaObjectOutput(output); + } + + @Override + public ObjectInput deserialize(URL url, InputStream input) throws IOException { + if (warn.compareAndSet(false, true)) { + logger.error( + PROTOCOL_UNSAFE_SERIALIZATION, + "", + "", + "Java serialization is unsafe. Dubbo Team do not recommend anyone to use it." + + "If you still want to use it, please follow [JEP 290](https://openjdk.java.net/jeps/290)" + + "to set serialization filter to prevent deserialization leak."); + } + return new NativeJavaObjectInput(input); + } +} diff --git a/dubbo-serialization-extensions/dubbo-serialization-jdk/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.serialize.Serialization b/dubbo-serialization-extensions/dubbo-serialization-jdk/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.serialize.Serialization new file mode 100644 index 000000000..bc1e4ef00 --- /dev/null +++ b/dubbo-serialization-extensions/dubbo-serialization-jdk/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.common.serialize.Serialization @@ -0,0 +1,3 @@ +java=org.apache.dubbo.common.serialize.java.JavaSerialization +compactedjava=org.apache.dubbo.common.serialize.java.CompactedJavaSerialization +nativejava=org.apache.dubbo.common.serialize.nativejava.NativeJavaSerialization \ No newline at end of file diff --git a/dubbo-serialization-extensions/dubbo-serialization-jdk/src/test/java/org/apache/dubbo/common/serialize/jdk/Image.java b/dubbo-serialization-extensions/dubbo-serialization-jdk/src/test/java/org/apache/dubbo/common/serialize/jdk/Image.java new file mode 100644 index 000000000..b6a1b5453 --- /dev/null +++ b/dubbo-serialization-extensions/dubbo-serialization-jdk/src/test/java/org/apache/dubbo/common/serialize/jdk/Image.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.common.serialize.jdk; + + +public class Image implements java.io.Serializable { + private static final long serialVersionUID = 1L; + public String uri; + public String title; // Can be null + public int width; + public int height; + public Size size; + + public Image() { + } + + public Image(String uri, String title, int width, int height, Size size) { + this.height = height; + this.title = title; + this.uri = uri; + this.width = width; + this.size = size; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Image image = (Image) o; + + if (height != image.height) return false; + if (width != image.width) return false; + if (size != image.size) return false; + if (title != null ? !title.equals(image.title) : image.title != null) return false; + if (uri != null ? !uri.equals(image.uri) : image.uri != null) return false; + + return true; + } + + @Override + public int hashCode() { + int result = uri != null ? uri.hashCode() : 0; + result = 31 * result + (title != null ? title.hashCode() : 0); + result = 31 * result + width; + result = 31 * result + height; + result = 31 * result + (size != null ? size.hashCode() : 0); + return result; + } + + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("[Image "); + sb.append("uri=").append(uri); + sb.append(", title=").append(title); + sb.append(", width=").append(width); + sb.append(", height=").append(height); + sb.append(", size=").append(size); + sb.append("]"); + return sb.toString(); + } + + public String getUri() { + return uri; + } + + public void setUri(String uri) { + this.uri = uri; + } + + public String getTitle() { + return title; + } + + public void setTitle(String title) { + this.title = title; + } + + public int getWidth() { + return width; + } + + public void setWidth(int width) { + this.width = width; + } + + public int getHeight() { + return height; + } + + public void setHeight(int height) { + this.height = height; + } + + public Size getSize() { + return size; + } + + public void setSize(Size size) { + this.size = size; + } + + public enum Size { + SMALL, LARGE + } +} diff --git a/dubbo-serialization-extensions/dubbo-serialization-jdk/src/test/java/org/apache/dubbo/common/serialize/jdk/JdkObjectOutputTest.java b/dubbo-serialization-extensions/dubbo-serialization-jdk/src/test/java/org/apache/dubbo/common/serialize/jdk/JdkObjectOutputTest.java new file mode 100644 index 000000000..4043c0720 --- /dev/null +++ b/dubbo-serialization-extensions/dubbo-serialization-jdk/src/test/java/org/apache/dubbo/common/serialize/jdk/JdkObjectOutputTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.common.serialize.jdk; + +import org.apache.dubbo.common.serialize.java.JavaObjectInput; +import org.apache.dubbo.common.serialize.java.JavaObjectOutput; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * {@link JavaObjectOutput} Unit Test + */ +public class JdkObjectOutputTest { + + private JavaObjectOutput javaObjectOutput; + private JavaObjectInput jdkObjectInput; + private ByteArrayOutputStream byteArrayOutputStream; + private ByteArrayInputStream byteArrayInputStream; + + @BeforeEach + public void setUp() throws Exception { + this.byteArrayOutputStream = new ByteArrayOutputStream(); + this.javaObjectOutput = new JavaObjectOutput(byteArrayOutputStream); + } + + @Test + public void testWriteBool() throws IOException { + this.javaObjectOutput.writeBool(true); + this.flushToInput(); + + assertThat(jdkObjectInput.readBool(), is(true)); + } + + @Test + public void testWriteShort() throws IOException { + this.javaObjectOutput.writeShort((short) 2); + this.flushToInput(); + + assertThat(jdkObjectInput.readShort(), is((short) 2)); + } + + @Test + public void testWriteInt() throws IOException { + this.javaObjectOutput.writeInt(1); + this.flushToInput(); + + assertThat(jdkObjectInput.readInt(), is(1)); + } + + @Test + public void testWriteLong() throws IOException { + this.javaObjectOutput.writeLong(1000L); + this.flushToInput(); + + assertThat(jdkObjectInput.readLong(), is(1000L)); + } + + @Test + public void testWriteUTF() throws IOException { + this.javaObjectOutput.writeUTF("Pace Hasîtî 和平 Мир"); + this.flushToInput(); + + assertThat(jdkObjectInput.readUTF(), is("Pace Hasîtî 和平 Мир")); + } + + + @Test + public void testWriteFloat() throws IOException { + this.javaObjectOutput.writeFloat(1.88f); + this.flushToInput(); + + assertThat(this.jdkObjectInput.readFloat(), is(1.88f)); + } + + @Test + public void testWriteDouble() throws IOException { + this.javaObjectOutput.writeDouble(1.66d); + this.flushToInput(); + + assertThat(this.jdkObjectInput.readDouble(), is(1.66d)); + } + + @Test + public void testWriteBytes() throws IOException { + this.javaObjectOutput.writeBytes("hello".getBytes()); + this.flushToInput(); + + assertThat(this.jdkObjectInput.readBytes(), is("hello".getBytes())); + } + + @Test + public void testWriteBytesWithSubLength() throws IOException { + this.javaObjectOutput.writeBytes("hello".getBytes(), 2, 2); + this.flushToInput(); + + assertThat(this.jdkObjectInput.readBytes(), is("ll".getBytes())); + } + + @Test + public void testWriteByte() throws IOException { + this.javaObjectOutput.writeByte((byte) 123); + this.flushToInput(); + + assertThat(this.jdkObjectInput.readByte(), is((byte) 123)); + } + + @Test + public void testWriteObject() throws IOException, ClassNotFoundException { + Image image = new Image("http://dubbo.apache.org/img/dubbo_white.png", "logo", 300, 480, Image.Size.SMALL); + this.javaObjectOutput.writeObject(image); + this.flushToInput(); + + Image readObjectForImage = jdkObjectInput.readObject(Image.class); + assertThat(readObjectForImage, not(nullValue())); + assertThat(readObjectForImage, is(image)); + } + + private void flushToInput() throws IOException { + this.javaObjectOutput.flushBuffer(); + this.byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray()); + this.jdkObjectInput = new JavaObjectInput(byteArrayInputStream); + } +} diff --git a/dubbo-serialization-extensions/dubbo-serialization-jdk/src/test/java/org/apache/dubbo/common/serialize/jdk/JdkSerializationTest.java b/dubbo-serialization-extensions/dubbo-serialization-jdk/src/test/java/org/apache/dubbo/common/serialize/jdk/JdkSerializationTest.java new file mode 100644 index 000000000..aa0ed6796 --- /dev/null +++ b/dubbo-serialization-extensions/dubbo-serialization-jdk/src/test/java/org/apache/dubbo/common/serialize/jdk/JdkSerializationTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.common.serialize.jdk; + +import org.apache.dubbo.common.serialize.ObjectOutput; +import org.apache.dubbo.common.serialize.java.JavaObjectOutput; +import org.apache.dubbo.common.serialize.java.JavaSerialization; + +import org.hamcrest.MatcherAssert; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.OutputStream; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; + +/** + * {@link JavaSerialization} Unit Test + */ +public class JdkSerializationTest { + + private JavaSerialization JavaSerialization; + + @BeforeEach + public void setUp() { + this.JavaSerialization = new JavaSerialization(); + } + + @Test + public void testContentTypeId() { + MatcherAssert.assertThat(JavaSerialization.getContentTypeId(), is((byte) 3)); + } + + @Test + public void testContentType() { + MatcherAssert.assertThat(JavaSerialization.getContentType(), is("x-application/java")); + } + + @Test + public void testObjectOutput() throws IOException { + ObjectOutput objectOutput = JavaSerialization.serialize(null, mock(OutputStream.class)); + assertThat(objectOutput, Matchers.instanceOf(JavaObjectOutput.class)); + } + +} diff --git a/dubbo-serialization-extensions/pom.xml b/dubbo-serialization-extensions/pom.xml index 2a7eef1b4..cec14402b 100644 --- a/dubbo-serialization-extensions/pom.xml +++ b/dubbo-serialization-extensions/pom.xml @@ -44,6 +44,7 @@
+ * Helper class to start an embedded instance of standalone (non clustered) ZooKeeper. + *
+ * NOTE: at least an external standalone server (if not an ensemble) are recommended, even for + * {@link org.springframework.xd.dirt.server.singlenode.SingleNodeApplication} + * + * @author Patrick Peralta + * @author Mark Fisher + * @author David Turanski + */ +public class EmbeddedZooKeeper implements SmartLifecycle { + + /** + * Logger. + */ + private static final Logger logger = LoggerFactory.getLogger(EmbeddedZooKeeper.class); + + /** + * ZooKeeper client port. This will be determined dynamically upon startup. + */ + private final int clientPort; + + /** + * Whether to auto-start. Default is true. + */ + private boolean autoStartup = true; + + /** + * Lifecycle phase. Default is 0. + */ + private int phase = 0; + + /** + * Thread for running the ZooKeeper server. + */ + private volatile Thread zkServerThread; + + /** + * ZooKeeper server. + */ + private volatile ZooKeeperServerMain zkServer; + + /** + * {@link ErrorHandler} to be invoked if an Exception is thrown from the ZooKeeper server thread. + */ + private ErrorHandler errorHandler; + + private boolean daemon = true; + + /** + * Construct an EmbeddedZooKeeper with a random port. + */ + public EmbeddedZooKeeper() { + clientPort = SocketUtils.findAvailableTcpPort(); + } + + /** + * Construct an EmbeddedZooKeeper with the provided port. + * + * @param clientPort port for ZooKeeper server to bind to + */ + public EmbeddedZooKeeper(int clientPort, boolean daemon) { + this.clientPort = clientPort; + this.daemon = daemon; + } + + /** + * Returns the port that clients should use to connect to this embedded server. + * + * @return dynamically determined client port + */ + public int getClientPort() { + return this.clientPort; + } + + /** + * Specify whether to start automatically. Default is true. + * + * @param autoStartup whether to start automatically + */ + public void setAutoStartup(boolean autoStartup) { + this.autoStartup = autoStartup; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isAutoStartup() { + return this.autoStartup; + } + + /** + * Specify the lifecycle phase for the embedded server. + * + * @param phase the lifecycle phase + */ + public void setPhase(int phase) { + this.phase = phase; + } + + /** + * {@inheritDoc} + */ + @Override + public int getPhase() { + return this.phase; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isRunning() { + return (zkServerThread != null); + } + + /** + * Start the ZooKeeper server in a background thread. + *
+ * Register an error handler via {@link #setErrorHandler} in order to handle
+ * any exceptions thrown during startup or execution.
+ */
+ @Override
+ public synchronized void start() {
+ if (zkServerThread == null) {
+ zkServerThread = new Thread(new ServerRunnable(), "ZooKeeper Server Starter");
+ zkServerThread.setDaemon(daemon);
+ zkServerThread.start();
+ }
+ }
+
+ /**
+ * Shutdown the ZooKeeper server.
+ */
+ @Override
+ public synchronized void stop() {
+ if (zkServerThread != null) {
+ // The shutdown method is protected...thus this hack to invoke it.
+ // This will log an exception on shutdown; see
+ // https://issues.apache.org/jira/browse/ZOOKEEPER-1873 for details.
+ try {
+ Method shutdown = ZooKeeperServerMain.class.getDeclaredMethod("shutdown");
+ shutdown.setAccessible(true);
+ shutdown.invoke(zkServer);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ // It is expected that the thread will exit after
+ // the server is shutdown; this will block until
+ // the shutdown is complete.
+ try {
+ zkServerThread.join(5000);
+ zkServerThread = null;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.warn("Interrupted while waiting for embedded ZooKeeper to exit");
+ // abandoning zk thread
+ zkServerThread = null;
+ }
+ }
+ }
+
+ /**
+ * Stop the server if running and invoke the callback when complete.
+ */
+ @Override
+ public void stop(Runnable callback) {
+ stop();
+ callback.run();
+ }
+
+ /**
+ * Provide an {@link ErrorHandler} to be invoked if an Exception is thrown from the ZooKeeper server thread. If none
+ * is provided, only error-level logging will occur.
+ *
+ * @param errorHandler the {@link ErrorHandler} to be invoked
+ */
+ public void setErrorHandler(ErrorHandler errorHandler) {
+ this.errorHandler = errorHandler;
+ }
+
+ /**
+ * Runnable implementation that starts the ZooKeeper server.
+ */
+ private class ServerRunnable implements Runnable {
+
+ @Override
+ public void run() {
+ try {
+ Properties properties = new Properties();
+ File file = new File(System.getProperty("java.io.tmpdir")
+ + File.separator + UUID.randomUUID());
+ file.deleteOnExit();
+ properties.setProperty("dataDir", file.getAbsolutePath());
+ properties.setProperty("clientPort", String.valueOf(clientPort));
+
+ QuorumPeerConfig quorumPeerConfig = new QuorumPeerConfig();
+ quorumPeerConfig.parseProperties(properties);
+
+ zkServer = new ZooKeeperServerMain();
+ ServerConfig configuration = new ServerConfig();
+ configuration.readFrom(quorumPeerConfig);
+
+ zkServer.runFromConfig(configuration);
+ } catch (Exception e) {
+ if (errorHandler != null) {
+ errorHandler.handleError(e);
+ } else {
+ logger.error("Exception running embedded ZooKeeper", e);
+ }
+ }
+ }
+ }
+
+}
diff --git a/test/scenarios/scenarios-dubbo-serialization/dubbo-serialization-jdk-test/src/main/java/org/apache/dubbo/test/serialization/jdk/JdkProvider.java b/test/scenarios/scenarios-dubbo-serialization/dubbo-serialization-jdk-test/src/main/java/org/apache/dubbo/test/serialization/jdk/JdkProvider.java
new file mode 100644
index 000000000..9d1a1c502
--- /dev/null
+++ b/test/scenarios/scenarios-dubbo-serialization/dubbo-serialization-jdk-test/src/main/java/org/apache/dubbo/test/serialization/jdk/JdkProvider.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.dubbo.test.serialization.jdk;
+
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+import java.util.concurrent.CountDownLatch;
+
+public class JdkProvider {
+
+ public static void main(String[] args) throws Exception {
+ new EmbeddedZooKeeper(2181, false).start();
+ // wait for embedded zookeeper start completely.
+ Thread.sleep(1000);
+
+ ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/dubbo-demo-provider.xml");
+ context.start();
+
+ System.out.println("dubbo service started");
+ new CountDownLatch(1).await();
+ }
+}
diff --git a/test/scenarios/scenarios-dubbo-serialization/dubbo-serialization-jdk-test/src/main/resources/log4j.properties b/test/scenarios/scenarios-dubbo-serialization/dubbo-serialization-jdk-test/src/main/resources/log4j.properties
new file mode 100644
index 000000000..1f8fe1ece
--- /dev/null
+++ b/test/scenarios/scenarios-dubbo-serialization/dubbo-serialization-jdk-test/src/main/resources/log4j.properties
@@ -0,0 +1,25 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+###set log levels###
+log4j.rootLogger=info, stdout
+###output to the console###
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d{dd/MM/yy hh:mm:ss:sss z}] %t %5p %c{2}: %m%n
diff --git a/test/scenarios/scenarios-dubbo-serialization/dubbo-serialization-jdk-test/src/main/resources/spring/dubbo-demo-consumer.xml b/test/scenarios/scenarios-dubbo-serialization/dubbo-serialization-jdk-test/src/main/resources/spring/dubbo-demo-consumer.xml
new file mode 100644
index 000000000..62422373c
--- /dev/null
+++ b/test/scenarios/scenarios-dubbo-serialization/dubbo-serialization-jdk-test/src/main/resources/spring/dubbo-demo-consumer.xml
@@ -0,0 +1,33 @@
+
+
+
+