diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/client/RedisAsyncNettyClient.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/client/RedisAsyncNettyClient.java index 43ad91869..9fa1c5fa7 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/client/RedisAsyncNettyClient.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/client/RedisAsyncNettyClient.java @@ -12,6 +12,7 @@ import com.ctrip.xpipe.redis.core.protocal.protocal.RequestStringParser; import com.ctrip.xpipe.redis.core.protocal.protocal.SimpleStringParser; import com.ctrip.xpipe.utils.ChannelUtil; +import com.ctrip.xpipe.utils.VisibleForTesting; import com.dianping.cat.Cat; import com.dianping.cat.message.Transaction; import io.netty.buffer.ByteBuf; @@ -40,6 +41,8 @@ public class RedisAsyncNettyClient extends AsyncNettyClient implements RedisNett private static final int DEFAULT_REDIS_COMMAND_TIME_OUT_MILLI = 660; + private boolean doAfterConnectedSuccess = false; + protected final AtomicReference doAfterConnectedOver = new AtomicReference<>(false); public RedisAsyncNettyClient(ChannelFuture future, Endpoint endpoint, String clientName, AsyncConnectionCondition asyncConnectionCondition) { @@ -64,50 +67,60 @@ protected void doAfterConnected() { RequestStringParser requestString = new RequestStringParser(CLIENT_SET_NAME, clientName); SimpleStringParser simpleStringParser = new SimpleStringParser(); Transaction transaction = Cat.newTransaction("netty.client.setName", clientName); - RedisAsyncNettyClient.super.sendRequest(requestString.format(), new ByteBufReceiver() { - @Override - public RECEIVER_RESULT receive(Channel channel, ByteBuf byteBuf) { - try{ - transaction.addData("remoteAddress", ChannelUtil.getSimpleIpport(channel.remoteAddress())); - transaction.addData("commandTimeoutMills", getAfterConnectCommandTimeoutMill()); - RedisClientProtocol payload = simpleStringParser.read(byteBuf); - String result = null; - if(payload != null){ - result = payload.getPayload(); - } - if(result == null){ - return RECEIVER_RESULT.CONTINUE; + try { + RedisAsyncNettyClient.super.sendRequest(requestString.format(), new ByteBufReceiver() { + @Override + public RECEIVER_RESULT receive(Channel channel, ByteBuf byteBuf) { + try{ + transaction.addData("remoteAddress", ChannelUtil.getSimpleIpport(channel.remoteAddress())); + transaction.addData("commandTimeoutMills", getAfterConnectCommandTimeoutMill()); + RedisClientProtocol payload = simpleStringParser.read(byteBuf); + String result = null; + if(payload != null){ + result = payload.getPayload(); + } + if(result == null){ + return RECEIVER_RESULT.CONTINUE; + } + if (EXPECT_RESP.equalsIgnoreCase(result)){ + transaction.setStatus(Transaction.SUCCESS); + doAfterConnectedSuccess = true; + logger.info("[redisAsync][clientSetName][success][{}] {}", desc, result); + } else { + doAfterConnectedSuccess = false; + transaction.setStatus(new XpipeException(String.format("[redisAsync][clientSetName][wont-result][%s] result:%s", desc, result))); + logger.warn("[redisAsync][clientSetName][err-result][{}] {}", desc, result); + } + transaction.complete(); + }catch(Throwable th){ + doAfterConnectedSuccess = false; + transaction.setStatus(th); + transaction.complete(); + logger.error("[logTransaction]" + "netty.client.setName" + "," + clientName, th); + throw th; } - if (EXPECT_RESP.equalsIgnoreCase(result)){ - transaction.setStatus(Transaction.SUCCESS); - logger.info("[redisAsync][clientSetName][success][{}] {}", desc, result); - } else { - transaction.setStatus(new XpipeException(String.format("[redisAsync][clientSetName][wont-result][%s] result:%s", desc, result))); - logger.warn("[redisAsync][clientSetName][wont-result][{}] {}", desc, result); - } - }catch(Throwable th){ - transaction.setStatus(th); - logger.error("[logTransaction]" + "netty.client.setName" + "," + clientName, th); - }finally{ - transaction.complete(); + return RECEIVER_RESULT.SUCCESS; } - return RECEIVER_RESULT.SUCCESS; - } - @Override - public void clientClosed(NettyClient nettyClient) { - logger.warn("[redisAsync][clientSetName][wont-send][{}] {}", desc, nettyClient.channel()); - transaction.setStatus(new XpipeException(String.format("[redisAsync][clientSetName][wont-send][%s]client closed %s", desc, nettyClient.channel()))); - transaction.complete(); - } + @Override + public void clientClosed(NettyClient nettyClient) { + logger.warn("[redisAsync][clientSetName][wont-send][{}] {}", desc, nettyClient.channel()); + transaction.setStatus(new XpipeException(String.format("[redisAsync][clientSetName][wont-send][%s]client closed %s", desc, nettyClient.channel()))); + transaction.complete(); + } - @Override - public void clientClosed(NettyClient nettyClient, Throwable th) { - logger.warn("[redisAsync][clientSetName][wont-send][{}] {}", desc, nettyClient.channel(), th); - transaction.setStatus(th); - transaction.complete(); - } - }); + @Override + public void clientClosed(NettyClient nettyClient, Throwable th) { + logger.warn("[redisAsync][clientSetName][wont-send][{}] {}", desc, nettyClient.channel(), th); + transaction.setStatus(th); + transaction.complete(); + } + }); + } catch (Exception e) { + transaction.setStatus(e); + transaction.complete(); + logger.error("[redisAsync][clientSetName] err", e); + } } @@ -121,4 +134,9 @@ public int getAfterConnectCommandTimeoutMill() { return DEFAULT_REDIS_COMMAND_TIME_OUT_MILLI; } + @VisibleForTesting + public boolean getDoAfterConnectedSuccess() { + return doAfterConnectedSuccess; + } + } diff --git a/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/client/RedisAsyncNettyClientTest.java b/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/client/RedisAsyncNettyClientTest.java index 4f88a47dc..8394d9621 100644 --- a/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/client/RedisAsyncNettyClientTest.java +++ b/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/client/RedisAsyncNettyClientTest.java @@ -77,11 +77,13 @@ public void clientClosed(NettyClient nettyClient, Throwable th) { sleep(1000); String str = sb.toString(); Assert.assertTrue(minReadAbleBytes[0] > 0); + Assert.assertTrue(client.getDoAfterConnectedSuccess()); Assert.assertEquals(str, expected.toString()); } @Test public void testUnpacking() throws Exception { + server = startUnpackingServer(randomPort(), "+O"); RedisAsyncNettyClient client = new RedisAsyncNettyClient(b.connect("localhost", server.getPort()), new DefaultEndPoint("localhost", server.getPort()), "xpipe", () -> true); client.channel().attr(NettyClientHandler.KEY_CLIENT).set(client); @@ -89,48 +91,71 @@ public void testUnpacking() throws Exception { StringBuffer sb = new StringBuffer(); StringBuilder expected = new StringBuilder(); + String message = "K\r\nOK1\r\n"; + client.sendRequest(Unpooled.copiedBuffer(message.getBytes()), new ByteBufReceiver() { - int N = 100; - final int[] maxProtocolTime = {0}; - for(int i = 0; i < N; i++) { - String message = "+" + i + "\r\n"; - client.sendRequest(Unpooled.copiedBuffer(message.getBytes()), new ByteBufReceiver() { - - private RedisClientProtocol parser = new SimpleStringParser(); - int protocolTime = 0; - @Override - public RECEIVER_RESULT receive(Channel channel, ByteBuf byteBuf) { - RedisClientProtocol clientProtocol = parser.read(byteBuf); - if(clientProtocol != null) { - sb.append(clientProtocol.getPayload()); - protocolTime++; - if (protocolTime > maxProtocolTime[0]) { - maxProtocolTime[0] = protocolTime; - } - return RECEIVER_RESULT.SUCCESS; - } - protocolTime++; - return RECEIVER_RESULT.CONTINUE; + private RedisClientProtocol parser = new SimpleStringParser(); + @Override + public RECEIVER_RESULT receive(Channel channel, ByteBuf byteBuf) { + RedisClientProtocol clientProtocol = parser.read(byteBuf); + if(clientProtocol != null) { + sb.append(clientProtocol.getPayload()); + return RECEIVER_RESULT.SUCCESS; } + return RECEIVER_RESULT.CONTINUE; + } - @Override - public void clientClosed(NettyClient nettyClient) { + @Override + public void clientClosed(NettyClient nettyClient) { - } + } - @Override - public void clientClosed(NettyClient nettyClient, Throwable th) { + @Override + public void clientClosed(NettyClient nettyClient, Throwable th) { - } - }); - expected.append(prefix).append("+"); - expected.append(i); - } + } + }); + expected.append("OK1"); waitConditionUntilTimeOut(()->client.channel().isActive(), 1000); sleep(1000); String str = sb.toString(); + Assert.assertTrue(client.getDoAfterConnectedSuccess()); Assert.assertEquals(str, expected.toString()); - Assert.assertTrue(maxProtocolTime[0] >= 2); + } + + + protected Server startUnpackingServer(int port, String prefix) throws Exception { + return startServer(port, new IoActionFactory() { + + boolean sended = false; + + @Override + public IoAction createIoAction(Socket socket) { + return new AbstractIoAction(socket) { + + private String line; + + @Override + protected Object doRead(InputStream ins) throws IOException { + line = readLine(ins); + return line; + } + + @Override + protected void doWrite(OutputStream ous, Object readResult) throws IOException { + if (prefix != null && !sended) { + ous.write(prefix.getBytes()); + sended = true; + } + if (!line.contains("CLIENT")) { + ous.write(line.getBytes()); + } + sleepIgnoreInterrupt(1); + ous.flush(); + } + }; + } + }); } @Test