From 41102ecf5525dd85d9585e953923e936f5b5cfdc Mon Sep 17 00:00:00 2001 From: yifuzhou Date: Tue, 5 Nov 2024 12:09:24 +0800 Subject: [PATCH] add redis client name --- .../AbstractNettyRequestResponseCommand.java | 11 +- .../netty/commands/AsyncNettyClient.java | 19 +++ .../commands/NettyRedisPoolClientFactory.java | 25 ++++ .../netty/commands/RedisAsyncNettyClient.java | 113 ++++++++++++++++++ .../redismaster/MasterOverOneMonitor.java | 4 +- .../DefaultSentinelHelloCollector.java | 4 +- .../impl/ProxyConnectedChecker.java | 4 +- .../redis/checker/resource/Resource.java | 10 +- .../AbstractCheckerIntegrationTest.java | 2 +- .../DefaultProxyMonitorCollectorManager.java | 4 +- .../console/redis/DefaultSentinelManager.java | 5 +- .../model/impl/ShardModelServiceImpl.java | 2 +- .../redis/console/spring/ResourceConfig.java | 37 ++++-- 13 files changed, 214 insertions(+), 26 deletions(-) create mode 100644 core/src/main/java/com/ctrip/xpipe/netty/commands/NettyRedisPoolClientFactory.java create mode 100644 core/src/main/java/com/ctrip/xpipe/netty/commands/RedisAsyncNettyClient.java diff --git a/core/src/main/java/com/ctrip/xpipe/netty/commands/AbstractNettyRequestResponseCommand.java b/core/src/main/java/com/ctrip/xpipe/netty/commands/AbstractNettyRequestResponseCommand.java index 1f6b9c5c4b..5ed7af87f0 100644 --- a/core/src/main/java/com/ctrip/xpipe/netty/commands/AbstractNettyRequestResponseCommand.java +++ b/core/src/main/java/com/ctrip/xpipe/netty/commands/AbstractNettyRequestResponseCommand.java @@ -56,16 +56,21 @@ protected void doSendRequest(final NettyClient nettyClient, ByteBuf byteBuf) { } if(getCommandTimeoutMilli() > 0 && scheduled != null){ + int commandTimeoutMilli = getCommandTimeoutMilli(); + if (nettyClient instanceof RedisAsyncNettyClient && !((RedisAsyncNettyClient) nettyClient).getDoAfterConnectedOver()) { + commandTimeoutMilli += ((RedisAsyncNettyClient) nettyClient).getAfterConnectCommandTimeoutMill(); + } - getLogger().debug("[doSendRequest][schedule timeout]{}, {}", this, getCommandTimeoutMilli()); + getLogger().debug("[doSendRequest][schedule timeout]{}, {}", this, commandTimeoutMilli); + int finalCommandTimeoutMilli = commandTimeoutMilli; timeoutFuture = scheduled.schedule(new AbstractExceptionLogTask() { @Override public void doRun() { getLogger().info("[{}][run][timeout]{}", AbstractNettyRequestResponseCommand.this, nettyClient); - future().setFailure(new CommandTimeoutException("timeout " + + getCommandTimeoutMilli())); + future().setFailure(new CommandTimeoutException("timeout " + finalCommandTimeoutMilli)); } - }, getCommandTimeoutMilli(), TimeUnit.MILLISECONDS); + }, commandTimeoutMilli, TimeUnit.MILLISECONDS); future().addListener(new CommandFutureListener() { diff --git a/core/src/main/java/com/ctrip/xpipe/netty/commands/AsyncNettyClient.java b/core/src/main/java/com/ctrip/xpipe/netty/commands/AsyncNettyClient.java index 5d12521fac..f4a1d953d8 100644 --- a/core/src/main/java/com/ctrip/xpipe/netty/commands/AsyncNettyClient.java +++ b/core/src/main/java/com/ctrip/xpipe/netty/commands/AsyncNettyClient.java @@ -3,12 +3,15 @@ import com.ctrip.xpipe.api.endpoint.Endpoint; import com.ctrip.xpipe.api.proxy.ProxyEnabled; import com.ctrip.xpipe.utils.ChannelUtil; +import com.sun.org.apache.xpath.internal.operations.Bool; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.atomic.AtomicReference; + /** * @author chen.zhu @@ -21,6 +24,8 @@ public class AsyncNettyClient extends DefaultNettyClient { private ChannelFuture future; + protected final AtomicReference doAfterConnectedOver = new AtomicReference<>(false); + public AsyncNettyClient(ChannelFuture future, Endpoint endpoint) { super(future.channel()); this.future = future; @@ -35,6 +40,8 @@ public void operationComplete(ChannelFuture future) { } else { desc.set(ChannelUtil.getDesc(future.channel())); } + doAfterConnected(); + doAfterConnectedOver.set(true); } else { logger.info("[async][connect-fail] endpint: {}", endpoint, future.cause()); } @@ -93,4 +100,16 @@ public String toString() { return super.toString(); } + protected void doAfterConnected() { + + } + + protected boolean getDoAfterConnectedOver() { + return doAfterConnectedOver.get(); + } + + protected int getAfterConnectCommandTimeoutMill() { + return 0; + } + } diff --git a/core/src/main/java/com/ctrip/xpipe/netty/commands/NettyRedisPoolClientFactory.java b/core/src/main/java/com/ctrip/xpipe/netty/commands/NettyRedisPoolClientFactory.java new file mode 100644 index 0000000000..df08a73695 --- /dev/null +++ b/core/src/main/java/com/ctrip/xpipe/netty/commands/NettyRedisPoolClientFactory.java @@ -0,0 +1,25 @@ +package com.ctrip.xpipe.netty.commands; + +import com.ctrip.xpipe.api.endpoint.Endpoint; +import io.netty.channel.ChannelFuture; +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; + +public class NettyRedisPoolClientFactory extends NettyKeyedPoolClientFactory{ + + private String clientName; + + public NettyRedisPoolClientFactory(int eventLoopThreads, String clientName) { + super(eventLoopThreads); + this.clientName = clientName; + } + + @Override + public PooledObject makeObject(Endpoint key) throws Exception { + ChannelFuture f = b.connect(key.getHost(), key.getPort()); + NettyClient nettyClient = new RedisAsyncNettyClient(f, key, clientName); + f.channel().attr(NettyClientHandler.KEY_CLIENT).set(nettyClient); + return new DefaultPooledObject(nettyClient); + } + +} diff --git a/core/src/main/java/com/ctrip/xpipe/netty/commands/RedisAsyncNettyClient.java b/core/src/main/java/com/ctrip/xpipe/netty/commands/RedisAsyncNettyClient.java new file mode 100644 index 0000000000..954fb6a984 --- /dev/null +++ b/core/src/main/java/com/ctrip/xpipe/netty/commands/RedisAsyncNettyClient.java @@ -0,0 +1,113 @@ +package com.ctrip.xpipe.netty.commands; + +import com.ctrip.xpipe.api.codec.Codec; +import com.ctrip.xpipe.api.endpoint.Endpoint; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; + +public class RedisAsyncNettyClient extends AsyncNettyClient{ + + protected Logger logger = LoggerFactory.getLogger(getClass()); + + private String clientName; + + private static final String CLIENT_SET_NAME = "CLIENT SETNAME "; + + private static final String EXPECT_RESP = "OK"; + + private static final int DEFAULT_REDIS_COMMAND_TIME_OUT_MILLI = 660; + + public RedisAsyncNettyClient(ChannelFuture future, Endpoint endpoint, String clientName) { + super(future, endpoint); + this.clientName = clientName; + } + + @Override + protected void doAfterConnected() { + String command = CLIENT_SET_NAME + clientName + "\r\n"; + ByteBuf byteBuf = Unpooled.wrappedBuffer(command.getBytes(Codec.defaultCharset)); + sendRequest(byteBuf, new ByteBufReceiver() { + @Override + public RECEIVER_RESULT receive(Channel channel, ByteBuf byteBuf) { + String result = doReceiveResponse(byteBuf); + if (result == null) { + logger.warn("[redisAsync][clientSetName][wont-result][{}] {}", desc, result); + } + if (EXPECT_RESP.equalsIgnoreCase(result)) { + logger.info("[redisAsync][clientSetName][success][{}] {}", desc, result); + } + return RECEIVER_RESULT.SUCCESS; + } + + @Override + public void clientClosed(NettyClient nettyClient) { + logger.warn("[redisAsync][wont-send][{}] {}", desc, nettyClient.channel()); + } + + @Override + public void clientClosed(NettyClient nettyClient, Throwable th) { + logger.warn("[redisAsync][wont-send][{}] {}", desc, nettyClient.channel(), th); + } + }); + + } + + @Override + protected int getAfterConnectCommandTimeoutMill() { + return DEFAULT_REDIS_COMMAND_TIME_OUT_MILLI; + } + + public static String doReceiveResponse(ByteBuf byteBuf) { + ByteArrayOutputStream baous = new ByteArrayOutputStream(); + CRLF_STATE crlfState = CRLF_STATE.CONTENT; + int readable = byteBuf.readableBytes(); + for(int i=0; i < readable ;i++){ + + byte data = byteBuf.readByte(); + baous.write(data); + switch(data){ + case '\r': + crlfState = CRLF_STATE.CR; + break; + case '\n': + if(crlfState == CRLF_STATE.CR){ + crlfState = CRLF_STATE.CRLF; + break; + } + default: + crlfState = CRLF_STATE.CONTENT; + break; + } + + if(crlfState == CRLF_STATE.CRLF){ + break; + } + } + if (baous.toByteArray().length != 0 && crlfState == CRLF_STATE.CRLF) { + String data = new String(baous.toByteArray(), Codec.defaultCharset); + int beginIndex = 0; + int endIndex = data.length(); + if(data.charAt(0) == '+'){ + beginIndex = 1; + } + if(data.charAt(endIndex - 2) == '\r' && data.charAt(endIndex - 1) == '\n'){ + endIndex -= 2; + } + return data.substring(beginIndex, endIndex); + } + return null; + } + + public enum CRLF_STATE{ + CR, + CRLF, + CONTENT + } + +} diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/redismaster/MasterOverOneMonitor.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/redismaster/MasterOverOneMonitor.java index 1750b8c8b6..3b13ef061c 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/redismaster/MasterOverOneMonitor.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/redismaster/MasterOverOneMonitor.java @@ -33,7 +33,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; -import static com.ctrip.xpipe.redis.checker.resource.Resource.KEYED_NETTY_CLIENT_POOL; +import static com.ctrip.xpipe.redis.checker.resource.Resource.REDIS_KEYED_NETTY_CLIENT_POOL; import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR; /** @@ -49,7 +49,7 @@ public class MasterOverOneMonitor implements RedisMasterActionListener, OneWaySu @Autowired private AlertManager alertManager; - @Resource(name = KEYED_NETTY_CLIENT_POOL) + @Resource(name = REDIS_KEYED_NETTY_CLIENT_POOL) private XpipeNettyClientKeyedObjectPool keyedObjectPool; @Resource(name = SCHEDULED_EXECUTOR) diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/DefaultSentinelHelloCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/DefaultSentinelHelloCollector.java index f38c41f619..8ab3c2d544 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/DefaultSentinelHelloCollector.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/DefaultSentinelHelloCollector.java @@ -46,7 +46,7 @@ import java.util.stream.Collectors; import static com.ctrip.xpipe.redis.checker.healthcheck.actions.sentinel.SentinelHelloCheckAction.LOG_TITLE; -import static com.ctrip.xpipe.redis.checker.resource.Resource.KEYED_NETTY_CLIENT_POOL; +import static com.ctrip.xpipe.redis.checker.resource.Resource.SENTINEL_KEYED_NETTY_CLIENT_POOL; import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.THREAD_POOL_TIME_OUT; /** @@ -77,7 +77,7 @@ public class DefaultSentinelHelloCollector implements SentinelHelloCollector { @Autowired private PersistenceCache persistenceCache; - @Resource(name = KEYED_NETTY_CLIENT_POOL) + @Resource(name = SENTINEL_KEYED_NETTY_CLIENT_POOL) private XpipeNettyClientKeyedObjectPool keyedObjectPool; private SentinelLeakyBucket leakyBucket; diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/ProxyConnectedChecker.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/ProxyConnectedChecker.java index 394184c1ab..c66d7dd782 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/ProxyConnectedChecker.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/ProxyConnectedChecker.java @@ -19,7 +19,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; -import static com.ctrip.xpipe.redis.checker.resource.Resource.KEYED_NETTY_CLIENT_POOL; +import static com.ctrip.xpipe.redis.checker.resource.Resource.PROXY_KEYED_NETTY_CLIENT_POOL; import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR; @Component @@ -28,7 +28,7 @@ public class ProxyConnectedChecker implements ProxyChecker { @Autowired private CheckerConfig checkerConfig; - @Resource(name = KEYED_NETTY_CLIENT_POOL) + @Resource(name = PROXY_KEYED_NETTY_CLIENT_POOL) private XpipeNettyClientKeyedObjectPool keyedObjectPool; @Resource(name = SCHEDULED_EXECUTOR) diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/Resource.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/Resource.java index 51a3e4da40..54794672cd 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/Resource.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/Resource.java @@ -8,11 +8,15 @@ public class Resource { public static final String REDIS_COMMAND_EXECUTOR = "redisCommandExecutor"; - public static final String KEYED_NETTY_CLIENT_POOL = "keyedClientPool"; + public static final String REDIS_KEYED_NETTY_CLIENT_POOL = "redisKeyedClientPool"; - public static final String REDIS_SESSION_NETTY_CLIENT_POOL = "redisSessionClientPool"; + public static final String SENTINEL_KEYED_NETTY_CLIENT_POOL = "sentinelKeyedClientPool"; + + public static final String KEEPER_KEYED_NETTY_CLIENT_POOL = "keeperKeyedClientPool"; - public static final String MIGRATE_KEEPER_CLIENT_POOL = "migrateKeeperClientPool"; + public static final String PROXY_KEYED_NETTY_CLIENT_POOL = "proxyKeyedClientPool"; + + public static final String REDIS_SESSION_NETTY_CLIENT_POOL = "redisSessionClientPool"; public static final String PING_DELAY_INFO_EXECUTORS = "pingDelayInfoExecutors"; diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AbstractCheckerIntegrationTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AbstractCheckerIntegrationTest.java index e1df04132c..49d218b941 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AbstractCheckerIntegrationTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AbstractCheckerIntegrationTest.java @@ -148,7 +148,7 @@ public ScheduledExecutorService getRedisCommandExecutor() { ); } - @Bean(name = KEYED_NETTY_CLIENT_POOL) + @Bean(name = REDIS_KEYED_NETTY_CLIENT_POOL) public XpipeNettyClientKeyedObjectPool getReqResNettyClientPool() throws Exception { XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(8)); LifecycleHelper.initializeIfPossible(keyedObjectPool); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyMonitorCollectorManager.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyMonitorCollectorManager.java index 080da9c581..335856633a 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyMonitorCollectorManager.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyMonitorCollectorManager.java @@ -35,7 +35,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static com.ctrip.xpipe.redis.checker.resource.Resource.KEYED_NETTY_CLIENT_POOL; +import static com.ctrip.xpipe.redis.checker.resource.Resource.PROXY_KEYED_NETTY_CLIENT_POOL; import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR; @Component @@ -49,7 +49,7 @@ public class DefaultProxyMonitorCollectorManager extends AbstractStartStoppable @Resource(name = SCHEDULED_EXECUTOR) private ScheduledExecutorService scheduled; - @Resource(name = KEYED_NETTY_CLIENT_POOL) + @Resource(name = PROXY_KEYED_NETTY_CLIENT_POOL) private XpipeNettyClientKeyedObjectPool keyedObjectPool; @Autowired diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/redis/DefaultSentinelManager.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/redis/DefaultSentinelManager.java index 37b8832309..228658ca43 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/redis/DefaultSentinelManager.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/redis/DefaultSentinelManager.java @@ -36,8 +36,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import static com.ctrip.xpipe.redis.checker.resource.Resource.KEYED_NETTY_CLIENT_POOL; -import static com.ctrip.xpipe.redis.checker.resource.Resource.REDIS_COMMAND_EXECUTOR; +import static com.ctrip.xpipe.redis.checker.resource.Resource.*; /** * @author chen.zhu @@ -53,7 +52,7 @@ public class DefaultSentinelManager implements SentinelManager, ShardEventHandle private static final int LONG_SENTINEL_COMMAND_TIMEOUT = 2000; - @Resource(name = KEYED_NETTY_CLIENT_POOL) + @Resource(name = SENTINEL_KEYED_NETTY_CLIENT_POOL) private XpipeNettyClientKeyedObjectPool keyedClientPool; @Resource(name = REDIS_COMMAND_EXECUTOR) diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java index 6623647f51..b024c921f2 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java @@ -69,7 +69,7 @@ public class ShardModelServiceImpl implements ShardModelService{ @Resource(name = REDIS_COMMAND_EXECUTOR) private ScheduledExecutorService scheduled; - @Resource(name = MIGRATE_KEEPER_CLIENT_POOL) + @Resource(name = KEEPER_KEYED_NETTY_CLIENT_POOL) private XpipeNettyClientKeyedObjectPool keyedObjectPool; private RetryCommandFactory retryCommandFactory; diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/ResourceConfig.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/ResourceConfig.java index 6c802fa636..ce9d33e22d 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/ResourceConfig.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/ResourceConfig.java @@ -3,11 +3,14 @@ import com.ctrip.xpipe.concurrent.DefaultExecutorFactory; import com.ctrip.xpipe.lifecycle.LifecycleHelper; import com.ctrip.xpipe.netty.commands.NettyKeyedPoolClientFactory; +import com.ctrip.xpipe.netty.commands.NettyRedisPoolClientFactory; import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; +import com.ctrip.xpipe.redis.console.config.ConsoleConfig; import com.ctrip.xpipe.redis.core.spring.AbstractRedisConfigContext; import com.ctrip.xpipe.utils.OsUtils; import com.ctrip.xpipe.utils.XpipeThreadFactory; import com.google.common.util.concurrent.MoreExecutors; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -26,7 +29,7 @@ public class ResourceConfig extends AbstractRedisConfigContext { private final static int KEYED_CLIENT_POOL_SIZE = Integer.parseInt(System.getProperty("KEYED_CLIENT_POOL_SIZE", "8")); - private final static int MIGRATE_KEEPER_CLIENT_POOL_SIZE = Integer.parseInt(System.getProperty("MIGRATE_KEEPER_CLIENT_POOL_SIZE", "1")); + private final static String DEFAULT_CLIENT_NAME = "xpipe"; @Bean(name = REDIS_COMMAND_EXECUTOR) public ScheduledExecutorService getRedisCommandExecutor() { @@ -40,9 +43,9 @@ public ScheduledExecutorService getRedisCommandExecutor() { ); } - @Bean(name = KEYED_NETTY_CLIENT_POOL) + @Bean(name = REDIS_KEYED_NETTY_CLIENT_POOL) public XpipeNettyClientKeyedObjectPool getReqResNettyClientPool() throws Exception { - XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(KEYED_CLIENT_POOL_SIZE)); + XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getRedisPoolClientFactory(KEYED_CLIENT_POOL_SIZE)); LifecycleHelper.initializeIfPossible(keyedObjectPool); LifecycleHelper.startIfPossible(keyedObjectPool); return keyedObjectPool; @@ -50,15 +53,31 @@ public XpipeNettyClientKeyedObjectPool getReqResNettyClientPool() throws Excepti @Bean(name = REDIS_SESSION_NETTY_CLIENT_POOL) public XpipeNettyClientKeyedObjectPool getRedisSessionNettyClientPool() throws Exception { - XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(REDIS_SESSION_CLIENT_POOL_SIZE)); + XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getRedisPoolClientFactory(REDIS_SESSION_CLIENT_POOL_SIZE)); + LifecycleHelper.initializeIfPossible(keyedObjectPool); + LifecycleHelper.startIfPossible(keyedObjectPool); + return keyedObjectPool; + } + + @Bean(name = KEEPER_KEYED_NETTY_CLIENT_POOL) + public XpipeNettyClientKeyedObjectPool getKeeperReqResNettyClientPool() throws Exception { + XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(KEYED_CLIENT_POOL_SIZE)); + LifecycleHelper.initializeIfPossible(keyedObjectPool); + LifecycleHelper.startIfPossible(keyedObjectPool); + return keyedObjectPool; + } + + @Bean(name = SENTINEL_KEYED_NETTY_CLIENT_POOL) + public XpipeNettyClientKeyedObjectPool getSentinelReqResNettyClientPool() throws Exception { + XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(KEYED_CLIENT_POOL_SIZE)); LifecycleHelper.initializeIfPossible(keyedObjectPool); LifecycleHelper.startIfPossible(keyedObjectPool); return keyedObjectPool; } - @Bean(name = MIGRATE_KEEPER_CLIENT_POOL) - public XpipeNettyClientKeyedObjectPool getMigrateKeeperClientPool() throws Exception { - XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(MIGRATE_KEEPER_CLIENT_POOL_SIZE)); + @Bean(name = PROXY_KEYED_NETTY_CLIENT_POOL) + public XpipeNettyClientKeyedObjectPool getProxyReqResNettyClientPool() throws Exception { + XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(KEYED_CLIENT_POOL_SIZE)); LifecycleHelper.initializeIfPossible(keyedObjectPool); LifecycleHelper.startIfPossible(keyedObjectPool); return keyedObjectPool; @@ -96,4 +115,8 @@ private NettyKeyedPoolClientFactory getKeyedPoolClientFactory(int eventLoopThrea return new NettyKeyedPoolClientFactory(eventLoopThreads); } + private NettyKeyedPoolClientFactory getRedisPoolClientFactory(int eventLoopThreads) { + return new NettyRedisPoolClientFactory(eventLoopThreads, DEFAULT_CLIENT_NAME); + } + }