Skip to content

Commit

Permalink
add redis client name
Browse files Browse the repository at this point in the history
  • Loading branch information
yifuzhou committed Nov 20, 2024
1 parent 012be9a commit 2712c51
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.ctrip.xpipe.redis.core.protocal.protocal.RequestStringParser;
import com.ctrip.xpipe.redis.core.protocal.protocal.SimpleStringParser;
import com.ctrip.xpipe.utils.ChannelUtil;
import com.ctrip.xpipe.utils.VisibleForTesting;
import com.dianping.cat.Cat;
import com.dianping.cat.message.Transaction;
import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -40,6 +41,8 @@ public class RedisAsyncNettyClient extends AsyncNettyClient implements RedisNett

private static final int DEFAULT_REDIS_COMMAND_TIME_OUT_MILLI = 660;

private boolean doAfterConnectedSuccess = false;

protected final AtomicReference<Boolean> doAfterConnectedOver = new AtomicReference<>(false);

public RedisAsyncNettyClient(ChannelFuture future, Endpoint endpoint, String clientName, AsyncConnectionCondition asyncConnectionCondition) {
Expand All @@ -64,50 +67,60 @@ protected void doAfterConnected() {
RequestStringParser requestString = new RequestStringParser(CLIENT_SET_NAME, clientName);
SimpleStringParser simpleStringParser = new SimpleStringParser();
Transaction transaction = Cat.newTransaction("netty.client.setName", clientName);
RedisAsyncNettyClient.super.sendRequest(requestString.format(), new ByteBufReceiver() {
@Override
public RECEIVER_RESULT receive(Channel channel, ByteBuf byteBuf) {
try{
transaction.addData("remoteAddress", ChannelUtil.getSimpleIpport(channel.remoteAddress()));
transaction.addData("commandTimeoutMills", getAfterConnectCommandTimeoutMill());
RedisClientProtocol<String> payload = simpleStringParser.read(byteBuf);
String result = null;
if(payload != null){
result = payload.getPayload();
}
if(result == null){
return RECEIVER_RESULT.CONTINUE;
try {
RedisAsyncNettyClient.super.sendRequest(requestString.format(), new ByteBufReceiver() {
@Override
public RECEIVER_RESULT receive(Channel channel, ByteBuf byteBuf) {
try{
transaction.addData("remoteAddress", ChannelUtil.getSimpleIpport(channel.remoteAddress()));
transaction.addData("commandTimeoutMills", getAfterConnectCommandTimeoutMill());
RedisClientProtocol<String> payload = simpleStringParser.read(byteBuf);
String result = null;
if(payload != null){
result = payload.getPayload();
}
if(result == null){
return RECEIVER_RESULT.CONTINUE;
}
if (EXPECT_RESP.equalsIgnoreCase(result)){
transaction.setStatus(Transaction.SUCCESS);
doAfterConnectedSuccess = true;
logger.info("[redisAsync][clientSetName][success][{}] {}", desc, result);
} else {
doAfterConnectedSuccess = false;
transaction.setStatus(new XpipeException(String.format("[redisAsync][clientSetName][wont-result][%s] result:%s", desc, result)));
logger.warn("[redisAsync][clientSetName][err-result][{}] {}", desc, result);
}
transaction.complete();
}catch(Throwable th){
doAfterConnectedSuccess = false;
transaction.setStatus(th);
transaction.complete();
logger.error("[logTransaction]" + "netty.client.setName" + "," + clientName, th);
throw th;
}
if (EXPECT_RESP.equalsIgnoreCase(result)){
transaction.setStatus(Transaction.SUCCESS);
logger.info("[redisAsync][clientSetName][success][{}] {}", desc, result);
} else {
transaction.setStatus(new XpipeException(String.format("[redisAsync][clientSetName][wont-result][%s] result:%s", desc, result)));
logger.warn("[redisAsync][clientSetName][wont-result][{}] {}", desc, result);
}
}catch(Throwable th){
transaction.setStatus(th);
logger.error("[logTransaction]" + "netty.client.setName" + "," + clientName, th);
}finally{
transaction.complete();
return RECEIVER_RESULT.SUCCESS;
}
return RECEIVER_RESULT.SUCCESS;
}

@Override
public void clientClosed(NettyClient nettyClient) {
logger.warn("[redisAsync][clientSetName][wont-send][{}] {}", desc, nettyClient.channel());
transaction.setStatus(new XpipeException(String.format("[redisAsync][clientSetName][wont-send][%s]client closed %s", desc, nettyClient.channel())));
transaction.complete();
}
@Override
public void clientClosed(NettyClient nettyClient) {
logger.warn("[redisAsync][clientSetName][wont-send][{}] {}", desc, nettyClient.channel());
transaction.setStatus(new XpipeException(String.format("[redisAsync][clientSetName][wont-send][%s]client closed %s", desc, nettyClient.channel())));
transaction.complete();
}

@Override
public void clientClosed(NettyClient nettyClient, Throwable th) {
logger.warn("[redisAsync][clientSetName][wont-send][{}] {}", desc, nettyClient.channel(), th);
transaction.setStatus(th);
transaction.complete();
}
});
@Override
public void clientClosed(NettyClient nettyClient, Throwable th) {
logger.warn("[redisAsync][clientSetName][wont-send][{}] {}", desc, nettyClient.channel(), th);
transaction.setStatus(th);
transaction.complete();
}
});
} catch (Exception e) {
transaction.setStatus(e);
transaction.complete();
logger.error("[redisAsync][clientSetName] err", e);
}

}

Expand All @@ -121,4 +134,9 @@ public int getAfterConnectCommandTimeoutMill() {
return DEFAULT_REDIS_COMMAND_TIME_OUT_MILLI;
}

@VisibleForTesting
public boolean getDoAfterConnectedSuccess() {
return doAfterConnectedSuccess;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,60 +77,85 @@ public void clientClosed(NettyClient nettyClient, Throwable th) {
sleep(1000);
String str = sb.toString();
Assert.assertTrue(minReadAbleBytes[0] > 0);
Assert.assertTrue(client.getDoAfterConnectedSuccess());
Assert.assertEquals(str, expected.toString());
}

@Test
public void testUnpacking() throws Exception {
server = startUnpackingServer(randomPort(), "+O");
RedisAsyncNettyClient client = new RedisAsyncNettyClient(b.connect("localhost", server.getPort()),
new DefaultEndPoint("localhost", server.getPort()), "xpipe", () -> true);
client.channel().attr(NettyClientHandler.KEY_CLIENT).set(client);

StringBuffer sb = new StringBuffer();

StringBuilder expected = new StringBuilder();
String message = "K\r\nOK1\r\n";
client.sendRequest(Unpooled.copiedBuffer(message.getBytes()), new ByteBufReceiver() {

int N = 100;
final int[] maxProtocolTime = {0};
for(int i = 0; i < N; i++) {
String message = "+" + i + "\r\n";
client.sendRequest(Unpooled.copiedBuffer(message.getBytes()), new ByteBufReceiver() {

private RedisClientProtocol<String> parser = new SimpleStringParser();
int protocolTime = 0;
@Override
public RECEIVER_RESULT receive(Channel channel, ByteBuf byteBuf) {
RedisClientProtocol<String> clientProtocol = parser.read(byteBuf);
if(clientProtocol != null) {
sb.append(clientProtocol.getPayload());
protocolTime++;
if (protocolTime > maxProtocolTime[0]) {
maxProtocolTime[0] = protocolTime;
}
return RECEIVER_RESULT.SUCCESS;
}
protocolTime++;
return RECEIVER_RESULT.CONTINUE;
private RedisClientProtocol<String> parser = new SimpleStringParser();
@Override
public RECEIVER_RESULT receive(Channel channel, ByteBuf byteBuf) {
RedisClientProtocol<String> clientProtocol = parser.read(byteBuf);
if(clientProtocol != null) {
sb.append(clientProtocol.getPayload());
return RECEIVER_RESULT.SUCCESS;
}
return RECEIVER_RESULT.CONTINUE;
}

@Override
public void clientClosed(NettyClient nettyClient) {
@Override
public void clientClosed(NettyClient nettyClient) {

}
}

@Override
public void clientClosed(NettyClient nettyClient, Throwable th) {
@Override
public void clientClosed(NettyClient nettyClient, Throwable th) {

}
});
expected.append(prefix).append("+");
expected.append(i);
}
}
});
expected.append("OK1");
waitConditionUntilTimeOut(()->client.channel().isActive(), 1000);
sleep(1000);
String str = sb.toString();
Assert.assertTrue(client.getDoAfterConnectedSuccess());
Assert.assertEquals(str, expected.toString());
Assert.assertTrue(maxProtocolTime[0] >= 2);
}


protected Server startUnpackingServer(int port, String prefix) throws Exception {
return startServer(port, new IoActionFactory() {

boolean sended = false;

@Override
public IoAction createIoAction(Socket socket) {
return new AbstractIoAction(socket) {

private String line;

@Override
protected Object doRead(InputStream ins) throws IOException {
line = readLine(ins);
return line;
}

@Override
protected void doWrite(OutputStream ous, Object readResult) throws IOException {
if (prefix != null && !sended) {
ous.write(prefix.getBytes());
sended = true;
}
if (!line.contains("CLIENT")) {
ous.write(line.getBytes());
}
sleepIgnoreInterrupt(1);
ous.flush();
}
};
}
});
}

@Test
Expand Down

0 comments on commit 2712c51

Please sign in to comment.