diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/Replconf.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/Replconf.java index dc368ee99b..9df88ece8f 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/Replconf.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/Replconf.java @@ -50,7 +50,7 @@ protected boolean hasResponse() { public enum ReplConfType { - LISTENING_PORT("listening-port"), CAPA("capa"), CRDT("crdt"), ACK("ack"), KEEPER("keeper"); + LISTENING_PORT("listening-port"), CAPA("capa"), CRDT("crdt"), ACK("ack"), KEEPER("keeper"), IDC("idc"); private String command; diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/CommandReaderWriterFactory.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/CommandReaderWriterFactory.java index 91e08a7d07..1a36620ca0 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/CommandReaderWriterFactory.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/CommandReaderWriterFactory.java @@ -2,6 +2,7 @@ import com.ctrip.xpipe.netty.filechannel.ReferenceFileRegion; import com.ctrip.xpipe.redis.core.redis.operation.RedisOp; +import com.ctrip.xpipe.redis.core.store.ratelimit.ReplDelayConfig; import com.ctrip.xpipe.utils.OffsetNotifier; import org.slf4j.Logger; @@ -16,7 +17,7 @@ public interface CommandReaderWriterFactory { CommandWriter createCmdWriter(CommandStore cmdStore, int maxFileSize, Logger delayTraceLogger) throws IOException; CommandReader createCmdReader(OffsetReplicationProgress replProgress, CommandStore cmdStore, - OffsetNotifier offsetNotifier, long commandReaderFlyingThreshold) throws IOException; + OffsetNotifier offsetNotifier, ReplDelayConfig replDelayConfig, long commandReaderFlyingThreshold) throws IOException; CommandReader createCmdReader(GtidSetReplicationProgress replProgress, CommandStore cmdStore, OffsetNotifier offsetNotifier, long commandReaderFlyingThreshold) throws IOException; diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/CommandsListener.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/CommandsListener.java index 72104a633d..5884a4fb04 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/CommandsListener.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/CommandsListener.java @@ -1,5 +1,6 @@ package com.ctrip.xpipe.redis.core.store; +import com.ctrip.xpipe.redis.core.store.ratelimit.ReplDelayConfig; import io.netty.channel.ChannelFuture; /** @@ -17,4 +18,9 @@ public interface CommandsListener { void beforeCommand(); Long processedOffset(); + + default ReplDelayConfig getReplDelayConfig() { + return null; + } + } diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/ratelimit/ReplDelayConfig.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/ratelimit/ReplDelayConfig.java new file mode 100644 index 0000000000..767979cf75 --- /dev/null +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/ratelimit/ReplDelayConfig.java @@ -0,0 +1,9 @@ +package com.ctrip.xpipe.redis.core.store.ratelimit; + +public interface ReplDelayConfig { + + long getDelayMilli(); + + long getDelayBytes(); + +} diff --git a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/AbstractIntegratedTest.java b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/AbstractIntegratedTest.java index e81c303869..68b8d6b157 100644 --- a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/AbstractIntegratedTest.java +++ b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/AbstractIntegratedTest.java @@ -15,10 +15,7 @@ import com.ctrip.xpipe.redis.core.redis.operation.RedisOpParserManager; import com.ctrip.xpipe.redis.core.redis.operation.parser.*; import com.ctrip.xpipe.redis.keeper.RedisKeeperServer; -import com.ctrip.xpipe.redis.keeper.config.DefaultKeeperConfig; -import com.ctrip.xpipe.redis.keeper.config.DefaultKeeperResourceManager; -import com.ctrip.xpipe.redis.keeper.config.KeeperConfig; -import com.ctrip.xpipe.redis.keeper.config.KeeperResourceManager; +import com.ctrip.xpipe.redis.keeper.config.*; import com.ctrip.xpipe.redis.keeper.impl.DefaultRedisKeeperServer; import com.ctrip.xpipe.redis.keeper.monitor.KeepersMonitorManager; import com.ctrip.xpipe.redis.keeper.monitor.impl.NoneKeepersMonitorManager; @@ -191,7 +188,7 @@ protected RedisKeeperServer createGtidRedisKeeperServer(KeeperMeta keeperMeta, F Long replId = keeperMeta.parent().getDbId(); return new DefaultRedisKeeperServer(replId, keeperMeta, keeperConfig, baseDir, - leaderElectorManager, keeperMonitorManager, resourceManager, syncRateManager, redisOpParser); + leaderElectorManager, keeperMonitorManager, resourceManager, syncRateManager, redisOpParser, new ReplDelayConfigCache()); } protected RedisKeeperServer createRedisKeeperServer(KeeperMeta keeperMeta, File baseDir, KeeperConfig keeperConfig, diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisClient.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisClient.java index 33edbf51f5..fa47a518e3 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisClient.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisClient.java @@ -4,6 +4,7 @@ import com.ctrip.xpipe.api.lifecycle.Releasable; import com.ctrip.xpipe.api.observer.Observable; import com.ctrip.xpipe.redis.core.protocal.CAPA; +import com.ctrip.xpipe.redis.core.store.ratelimit.ReplDelayConfig; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; @@ -15,7 +16,7 @@ * * 2016年4月22日 上午11:25:07 */ -public interface RedisClient extends Observable, Infoable, Closeable, RedisRole, Releasable, Keeperable{ +public interface RedisClient extends Observable, Infoable, Closeable, RedisRole, ReplDelayConfig, Releasable, Keeperable{ public static enum CLIENT_ROLE{ NORMAL, @@ -33,6 +34,10 @@ public static enum CLIENT_ROLE{ int getSlaveListeningPort(); + void setIdc(String idc); + + String getIdc(); + void setClientIpAddress(String host); String getClientIpAddress(); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/DefaultKeeperCommonConfig.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/DefaultKeeperCommonConfig.java new file mode 100644 index 0000000000..b7b514d1fc --- /dev/null +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/DefaultKeeperCommonConfig.java @@ -0,0 +1,33 @@ +package com.ctrip.xpipe.redis.keeper.config; + +import com.ctrip.xpipe.api.codec.Codec; +import com.ctrip.xpipe.api.codec.GenericTypeReference; +import com.ctrip.xpipe.api.config.Config; +import com.ctrip.xpipe.api.config.ConfigProvider; +import com.ctrip.xpipe.config.CompositeConfig; +import com.ctrip.xpipe.config.DefaultFileConfig; +import com.ctrip.xpipe.config.DefaultPropertyConfig; +import com.ctrip.xpipe.redis.core.config.AbstractCoreConfig; + +import java.util.List; + +public class DefaultKeeperCommonConfig extends AbstractCoreConfig implements KeeperCommonConfig { + + private static String KEY_REPL_DELAY_CONFIG = "keeper.repl.delay.config"; + + private final GenericTypeReference> replDelayConfigListType = new GenericTypeReference>() {}; + + public DefaultKeeperCommonConfig(){ + CompositeConfig compositeConfig = new CompositeConfig(); + compositeConfig.addConfig(ConfigProvider.DEFAULT.getOrCreateConfig(ConfigProvider.COMMON_CONFIG)); + compositeConfig.addConfig(new DefaultPropertyConfig()); + setConfig(compositeConfig); + } + + @Override + public List getReplDelayConfigs() { + String replDelayConfigInfo = getProperty(KEY_REPL_DELAY_CONFIG, "[]"); + return Codec.DEFAULT.decode(replDelayConfigInfo, replDelayConfigListType); + } + +} diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperCommonConfig.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperCommonConfig.java new file mode 100644 index 0000000000..49c9edd889 --- /dev/null +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperCommonConfig.java @@ -0,0 +1,9 @@ +package com.ctrip.xpipe.redis.keeper.config; + +import java.util.List; + +public interface KeeperCommonConfig { + + List getReplDelayConfigs(); + +} diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperConfig.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperConfig.java index 7e5759e0f3..5ec1a700c2 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperConfig.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperConfig.java @@ -2,6 +2,9 @@ import com.ctrip.xpipe.redis.core.config.CoreConfig; +import java.util.List; +import java.util.Map; + /** * @author marsqing * diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperReplDelayConfig.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperReplDelayConfig.java new file mode 100644 index 0000000000..c17a123969 --- /dev/null +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperReplDelayConfig.java @@ -0,0 +1,44 @@ +package com.ctrip.xpipe.redis.keeper.config; + +public class KeeperReplDelayConfig { + + private String srcDc; + + private String destDc; + + private long delayMilli; + + private long delayBytes; + + public String getSrcDc() { + return srcDc; + } + + public void setSrcDc(String srcDc) { + this.srcDc = srcDc; + } + + public String getDestDc() { + return destDc; + } + + public void setDestDc(String destDc) { + this.destDc = destDc; + } + + public long getDelayMilli() { + return delayMilli; + } + + public void setDelayMilli(long delayMilli) { + this.delayMilli = delayMilli; + } + + public long getDelayBytes() { + return delayBytes; + } + + public void setDelayBytes(long delayBytes) { + this.delayBytes = delayBytes; + } +} diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/ReplDelayConfigCache.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/ReplDelayConfigCache.java new file mode 100644 index 0000000000..b22ae83b5e --- /dev/null +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/ReplDelayConfigCache.java @@ -0,0 +1,93 @@ +package com.ctrip.xpipe.redis.keeper.config; + +import com.ctrip.xpipe.api.foundation.FoundationService; +import com.ctrip.xpipe.api.lifecycle.TopElement; +import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask; +import com.ctrip.xpipe.lifecycle.AbstractLifecycle; +import com.ctrip.xpipe.utils.XpipeThreadFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +@Component +public class ReplDelayConfigCache extends AbstractLifecycle implements TopElement { + + private KeeperCommonConfig keeperConfig; + + private ScheduledExecutorService scheduled; + + private ScheduledFuture future; + + private Map keeperReplDelayConfigMap; + + public ReplDelayConfigCache() { + this(null); + } + + @Autowired + public ReplDelayConfigCache(KeeperCommonConfig keeperConfig) { + this.keeperConfig = keeperConfig; + this.keeperReplDelayConfigMap = new HashMap<>(); + } + + private void refresh() { + logger.debug("[refresh]"); + List replDelayConfigs = keeperConfig.getReplDelayConfigs(); + Map localReplDelayConfigMap = new HashMap<>(); + String currentDc = FoundationService.DEFAULT.getDataCenter(); + for (KeeperReplDelayConfig config: replDelayConfigs) { + if (currentDc.equalsIgnoreCase(config.getSrcDc())) { + localReplDelayConfigMap.put(config.getDestDc().toUpperCase(), config); + } + } + this.keeperReplDelayConfigMap = localReplDelayConfigMap; + } + + public KeeperReplDelayConfig getReplDelayConfig(String destIdc) { + if (null == destIdc || this.keeperReplDelayConfigMap.isEmpty()) return null; + return this.keeperReplDelayConfigMap.get(destIdc); + } + + @Override + protected void doInitialize() throws Exception { + super.doInitialize(); + this.scheduled = Executors.newScheduledThreadPool(1, XpipeThreadFactory.create("ReplDelayConfigCache")); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + this.future = scheduled.scheduleWithFixedDelay(new AbstractExceptionLogTask() { + @Override + protected void doRun() throws Exception { + ReplDelayConfigCache.this.refresh(); + } + }, 5, 10, TimeUnit.SECONDS); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + if (null != future) { + this.future.cancel(false); + this.future = null; + } + } + + @Override + protected void doDispose() throws Exception { + super.doDispose(); + if (null != scheduled) { + this.scheduled.shutdownNow(); + this.scheduled = null; + } + } + +} diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperCommonConfig.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperCommonConfig.java new file mode 100644 index 0000000000..246784ddc4 --- /dev/null +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperCommonConfig.java @@ -0,0 +1,12 @@ +package com.ctrip.xpipe.redis.keeper.config; + +import java.util.Collections; +import java.util.List; + +public class TestKeeperCommonConfig implements KeeperCommonConfig { + + @Override + public List getReplDelayConfigs() { + return Collections.emptyList(); + } +} diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperConfig.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperConfig.java index 8f5171ea9c..ee162d182a 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperConfig.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperConfig.java @@ -3,6 +3,9 @@ import com.ctrip.xpipe.redis.core.config.AbstractCoreConfig; import com.ctrip.xpipe.redis.keeper.store.DefaultCommandStore; +import java.util.Collections; +import java.util.List; + /** * @author wenchao.meng * diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/container/KeeperContainerService.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/container/KeeperContainerService.java index fc104d9605..ff2b543eec 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/container/KeeperContainerService.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/container/KeeperContainerService.java @@ -16,6 +16,7 @@ import com.ctrip.xpipe.redis.keeper.config.KeeperConfig; import com.ctrip.xpipe.redis.keeper.config.KeeperContainerConfig; import com.ctrip.xpipe.redis.keeper.config.KeeperResourceManager; +import com.ctrip.xpipe.redis.keeper.config.ReplDelayConfigCache; import com.ctrip.xpipe.redis.keeper.exception.RedisKeeperRuntimeException; import com.ctrip.xpipe.redis.keeper.health.DiskHealthChecker; import com.ctrip.xpipe.redis.keeper.health.HealthState; @@ -61,6 +62,8 @@ public class KeeperContainerService extends AbstractLifecycle implements TopElem private DiskHealthChecker diskHealthChecker; @Autowired private SyncRateManager syncRateManager; + @Autowired + private ReplDelayConfigCache replDelayConfigCache; private Map redisKeeperServers = Maps.newConcurrentMap(); @@ -110,7 +113,8 @@ public RedisKeeperServer add(KeeperTransMeta keeperTransMeta) { File baseDir = getReplicationStoreDir(keeperMeta); RedisKeeperServer redisKeeperServer = new DefaultRedisKeeperServer(keeperTransMeta.getReplId(), keeperMeta, - keeperConfig, baseDir, leaderElectorManager, keepersMonitorManager, resourceManager, syncRateManager, redisOpParser); + keeperConfig, baseDir, leaderElectorManager, keepersMonitorManager, resourceManager, syncRateManager, + redisOpParser, replDelayConfigCache); try { register(redisKeeperServer); @@ -313,7 +317,8 @@ private RedisKeeperServer createRedisKeeperServer(Long replId, KeeperMeta keeper File baseDir) throws Exception { RedisKeeperServer redisKeeperServer = new DefaultRedisKeeperServer(replId, keeper, keeperConfig, - baseDir, leaderElectorManager, keepersMonitorManager, resourceManager, syncRateManager, redisOpParser); + baseDir, leaderElectorManager, keepersMonitorManager, resourceManager, syncRateManager, + redisOpParser, replDelayConfigCache); register(redisKeeperServer); return redisKeeperServer; diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/ReplconfHandler.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/ReplconfHandler.java index 75917ba98f..ed61cf6d24 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/ReplconfHandler.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/ReplconfHandler.java @@ -1,6 +1,7 @@ package com.ctrip.xpipe.redis.keeper.handler.keeper; import com.ctrip.xpipe.redis.core.protocal.CAPA; +import com.ctrip.xpipe.redis.core.protocal.cmd.Replconf; import com.ctrip.xpipe.redis.core.protocal.protocal.RedisErrorParser; import com.ctrip.xpipe.redis.core.protocal.protocal.SimpleStringParser; import com.ctrip.xpipe.redis.keeper.RedisClient; @@ -49,7 +50,9 @@ protected void doHandle(String[] args, RedisClient redisClient) { throw new IllegalStateException("[doHandle][getack not supported]" ); }else if("keeper".equalsIgnoreCase(option)){//extends by keeper redisClient.setKeeper(); - }else{ + }else if(Replconf.ReplConfType.IDC.name().equalsIgnoreCase(option)) { + redisClient.setIdc(args[1].toUpperCase()); + } else{ logger.error("[doHandler][unkonwn command]" + StringUtil.join(" ", args)); redisClient.sendMessage(new RedisErrorParser("unknown replconf command " + StringUtil.join(" ", args)).format()); return; diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/AbstractRedisMasterReplication.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/AbstractRedisMasterReplication.java index 006fa35868..75da6e7148 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/AbstractRedisMasterReplication.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/AbstractRedisMasterReplication.java @@ -1,9 +1,11 @@ package com.ctrip.xpipe.redis.keeper.impl; +import com.ctrip.framework.foundation.Foundation; import com.ctrip.xpipe.api.command.Command; import com.ctrip.xpipe.api.command.CommandFuture; import com.ctrip.xpipe.api.command.CommandFutureListener; import com.ctrip.xpipe.api.endpoint.Endpoint; +import com.ctrip.xpipe.api.foundation.FoundationService; import com.ctrip.xpipe.api.monitor.EventMonitor; import com.ctrip.xpipe.api.pool.SimpleObjectPool; import com.ctrip.xpipe.command.CommandExecutionException; @@ -272,6 +274,7 @@ public void masterConnected(Channel channel) { Replconf crdt = new Replconf(clientPool, ReplConfType.CRDT, scheduled, commandTimeoutMilli, REPL_CONF_CRDT_MARK); chain.add(new FailSafeCommandWrapper<>(crdt)); + chain.add(new FailSafeCommandWrapper<>(keeperIdcCommand())); try { executeCommand(chain).addListener(new CommandFutureListener() { @@ -357,6 +360,11 @@ protected CommandFuture executeCommand(Command command) { return null; } + private Replconf keeperIdcCommand() { + return new Replconf(clientPool, ReplConfType.IDC, scheduled, commandTimeoutMilli, + FoundationService.DEFAULT.getDataCenter()); + } + private Replconf listeningPortCommand() { Replconf replconf = new Replconf(clientPool, ReplConfType.LISTENING_PORT, scheduled, commandTimeoutMilli, diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/ApplierRedisClient.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/ApplierRedisClient.java index 62daa6f0bb..a6ff341a40 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/ApplierRedisClient.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/ApplierRedisClient.java @@ -100,4 +100,22 @@ protected Logger getLogger() { return logger; } + @Override + public void setIdc(String idc) { + } + + @Override + public String getIdc() { + return null; + } + + @Override + public long getDelayMilli() { + return 0; + } + + @Override + public long getDelayBytes() { + return 0; + } } diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisClient.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisClient.java index 00b5c5ce92..f7db4867f5 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisClient.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisClient.java @@ -1,12 +1,16 @@ package com.ctrip.xpipe.redis.keeper.impl; import com.ctrip.xpipe.api.endpoint.Endpoint; +import com.ctrip.xpipe.api.foundation.FoundationService; import com.ctrip.xpipe.redis.core.protocal.CAPA; import com.ctrip.xpipe.redis.keeper.RedisClient; import com.ctrip.xpipe.redis.keeper.RedisKeeperServer; import com.ctrip.xpipe.redis.keeper.RedisSlave; +import com.ctrip.xpipe.redis.keeper.config.KeeperReplDelayConfig; +import com.ctrip.xpipe.redis.keeper.config.ReplDelayConfigCache; import com.ctrip.xpipe.utils.ChannelUtil; import com.ctrip.xpipe.utils.IpUtils; +import com.ctrip.xpipe.utils.StringUtil; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -29,6 +33,8 @@ public class DefaultRedisClient extends AbstractRedisClient i private Set capas = new HashSet(); private int slaveListeningPort; + + private String idc = null; private AtomicBoolean isKeeper = new AtomicBoolean(false); @@ -38,6 +44,10 @@ public class DefaultRedisClient extends AbstractRedisClient i private Endpoint endpoint; + private ReplDelayConfigCache replDelayConfigCache; + + private final String CURRENT_DC = FoundationService.DEFAULT.getDataCenter(); + public DefaultRedisClient(Channel channel, RedisKeeperServer redisKeeperServer) { super(channel, redisKeeperServer); String remoteIpLocalPort = ChannelUtil.getRemoteAddr(channel); @@ -60,6 +70,39 @@ public void setSlaveListeningPort(int port) { this.slaveListeningPort = port; } + @Override + public void setIdc(String _idc) { + if(logger.isInfoEnabled()){ + logger.info("[setIdc][{}] {}", this, _idc); + } + this.idc = _idc; + } + + @Override + public String getIdc() { + return this.idc; + } + + public void setReplDelayConfigCache(ReplDelayConfigCache replDelayConfigCache) { + this.replDelayConfigCache = replDelayConfigCache; + } + + @Override + public long getDelayMilli() { + if (null == replDelayConfigCache || StringUtil.isEmpty(idc)) return 0; + KeeperReplDelayConfig replDelayConfig = replDelayConfigCache.getReplDelayConfig(idc); + if (null == replDelayConfig) return 0; + else return replDelayConfig.getDelayMilli(); + } + + @Override + public long getDelayBytes() { + if (null == replDelayConfigCache || StringUtil.isEmpty(idc)) return 0; + KeeperReplDelayConfig replDelayConfig = replDelayConfigCache.getReplDelayConfig(idc); + if (null == replDelayConfig) return 0; + else return replDelayConfig.getDelayBytes(); + } + @Override public void capa(CAPA capa) { logger.info("[capa]{}, {}", capa, this); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServer.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServer.java index a810516e8a..023c25dfb0 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServer.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServer.java @@ -35,7 +35,9 @@ import com.ctrip.xpipe.redis.core.store.*; import com.ctrip.xpipe.redis.keeper.*; import com.ctrip.xpipe.redis.keeper.config.KeeperConfig; +import com.ctrip.xpipe.redis.keeper.config.KeeperReplDelayConfig; import com.ctrip.xpipe.redis.keeper.config.KeeperResourceManager; +import com.ctrip.xpipe.redis.keeper.config.ReplDelayConfigCache; import com.ctrip.xpipe.redis.keeper.exception.RedisSlavePromotionException; import com.ctrip.xpipe.redis.keeper.handler.CommandHandlerManager; import com.ctrip.xpipe.redis.keeper.monitor.KeeperMonitor; @@ -162,18 +164,20 @@ public class DefaultRedisKeeperServer extends AbstractRedisServer implements Red private RedisOpParser redisOpParser; + private ReplDelayConfigCache replDelayConfigCache; + public DefaultRedisKeeperServer(Long replId, KeeperMeta currentKeeperMeta, KeeperConfig keeperConfig, File baseDir, LeaderElectorManager leaderElectorManager, KeepersMonitorManager keepersMonitorManager, KeeperResourceManager resourceManager, SyncRateManager syncRateManager){ - this(replId, currentKeeperMeta, keeperConfig, baseDir, leaderElectorManager, keepersMonitorManager, resourceManager, syncRateManager, null); + this(replId, currentKeeperMeta, keeperConfig, baseDir, leaderElectorManager, keepersMonitorManager, resourceManager, syncRateManager, null, null); } public DefaultRedisKeeperServer(Long replId, KeeperMeta currentKeeperMeta, KeeperConfig keeperConfig, File baseDir, LeaderElectorManager leaderElectorManager, KeepersMonitorManager keepersMonitorManager, KeeperResourceManager resourceManager, - SyncRateManager syncRateManager, RedisOpParser redisOpParser){ + SyncRateManager syncRateManager, RedisOpParser redisOpParser, ReplDelayConfigCache replDelayConfigCache){ this.clusterId = ClusterId.from(((ClusterMeta) currentKeeperMeta.parent().parent()).getDbId()); this.shardId = ShardId.from(currentKeeperMeta.parent().getDbId()); @@ -188,6 +192,7 @@ public DefaultRedisKeeperServer(Long replId, KeeperMeta currentKeeperMeta, Keepe this.redisOpParser = redisOpParser; this.crossRegion = new AtomicBoolean(false); this.syncRateManager = syncRateManager; + this.replDelayConfigCache = replDelayConfigCache; } protected ReplicationStoreManager createReplicationStoreManager(KeeperConfig keeperConfig, ClusterId clusterId, ShardId shardId, ReplId replId, @@ -507,8 +512,9 @@ public void initChannel(SocketChannel ch) throws Exception { @Override public RedisClient clientConnected(Channel channel) { - - RedisClient redisClient = new DefaultRedisClient(channel, this); + + DefaultRedisClient redisClient = new DefaultRedisClient(channel, this); + redisClient.setReplDelayConfigCache(replDelayConfigCache); redisClients.put(channel, redisClient); redisClient.addObserver(new Observer() { diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisSlave.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisSlave.java index 5f1e4e486c..fabd3c3218 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisSlave.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisSlave.java @@ -12,6 +12,7 @@ import com.ctrip.xpipe.redis.core.protocal.protocal.SimpleStringParser; import com.ctrip.xpipe.redis.core.redis.operation.RedisOp; import com.ctrip.xpipe.redis.core.store.*; +import com.ctrip.xpipe.redis.core.store.ratelimit.ReplDelayConfig; import com.ctrip.xpipe.redis.keeper.RedisClient; import com.ctrip.xpipe.redis.keeper.RedisKeeperServer; import com.ctrip.xpipe.redis.keeper.RedisSlave; @@ -98,7 +99,6 @@ public void operationComplete(ChannelFuture future) throws Exception { public DefaultRedisSlave(RedisClient redisClient){ this.redisClient = redisClient; - this.setSlaveListeningPort(redisClient.getSlaveListeningPort()); this.redisClient.addChannelCloseReleaseResources(this); initExecutor(((DefaultRedisClient)redisClient).channel); } @@ -527,6 +527,31 @@ public int getSlaveListeningPort() { return redisClient.getSlaveListeningPort(); } + @Override + public void setIdc(String idc) { + redisClient.setIdc(idc); + } + + @Override + public String getIdc() { + return redisClient.getIdc(); + } + + @Override + public long getDelayMilli() { + return redisClient.getDelayMilli(); + } + + @Override + public long getDelayBytes() { + return redisClient.getDelayBytes(); + } + + @Override + public ReplDelayConfig getReplDelayConfig() { + return this; + } + @Override public void setClientIpAddress(String host) { redisClient.setClientIpAddress(host); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/spring/Production.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/spring/Production.java index f85d10f07a..dacff89b37 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/spring/Production.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/spring/Production.java @@ -40,6 +40,11 @@ public KeeperContainerConfig getKeeperContainerConfig(){ public KeeperConfig getKeeperConfig(){ return new DefaultKeeperConfig(); } + + @Bean + public KeeperCommonConfig getKeeperCommonConfig() { + return new DefaultKeeperCommonConfig(); + } @Bean public KeepersMonitorManager getKeeperMonitorManager(){ diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultCommandStore.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultCommandStore.java index 88b26e5a55..f23edfe5a8 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultCommandStore.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultCommandStore.java @@ -2,6 +2,7 @@ import com.ctrip.xpipe.netty.filechannel.ReferenceFileRegion; import com.ctrip.xpipe.redis.core.store.*; +import com.ctrip.xpipe.redis.core.store.ratelimit.ReplDelayConfig; import com.ctrip.xpipe.redis.keeper.monitor.KeeperMonitor; import com.ctrip.xpipe.utils.CloseState; import io.netty.channel.ChannelFuture; @@ -35,12 +36,12 @@ public DefaultCommandStore(File file, int maxFileSize, IntSupplier maxTimeSecond commandReaderFlyingThreshold, cmdReaderWriterFactory, keeperMonitor); } - private CommandReader beginRead(OffsetReplicationProgress replicationProgress) throws IOException { + private CommandReader beginRead(OffsetReplicationProgress replicationProgress, ReplDelayConfig replDelayConfig) throws IOException { makeSureOpen(); CommandReader reader = cmdReaderWriterFactory.createCmdReader(replicationProgress, this, - offsetNotifier, commandReaderFlyingThreshold); + offsetNotifier, replDelayConfig, commandReaderFlyingThreshold); addReader(reader); return reader; } @@ -58,7 +59,7 @@ public void addCommandsListener(ReplicationProgress progress, final CommandsL CommandReader cmdReader = null; try { - cmdReader = beginRead((OffsetReplicationProgress) progress); + cmdReader = beginRead((OffsetReplicationProgress) progress, listener.getReplDelayConfig()); } finally { // ensure beforeCommand() is always called listener.beforeCommand(); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/cmd/OffsetCommandReader.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/cmd/OffsetCommandReader.java index 5f184c8a73..3e3d96e20b 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/cmd/OffsetCommandReader.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/cmd/OffsetCommandReader.java @@ -5,6 +5,7 @@ import com.ctrip.xpipe.redis.core.store.CommandFile; import com.ctrip.xpipe.redis.core.store.CommandReader; import com.ctrip.xpipe.redis.core.store.CommandStore; +import com.ctrip.xpipe.redis.core.store.ratelimit.ReplDelayConfig; import com.ctrip.xpipe.utils.DefaultControllableFile; import com.ctrip.xpipe.utils.OffsetNotifier; import org.slf4j.Logger; @@ -28,15 +29,18 @@ public class OffsetCommandReader extends AbstractFlyingThresholdCommandReader 0 && (delayMilli = replDelayConfig.getDelayMilli()) > 0) { + logger.debug("[readDelay]{}:{}", delayBytes, delayMilli); + offsetNotifier.await(curPosition + delayBytes, delayMilli); + } else if (milliSeconds >= 0) { + offsetNotifier.await(curPosition, milliSeconds); + } else { + offsetNotifier.await(curPosition); + } readNextFileIfNecessary(); } catch (InterruptedException e) { logger.info("[read]", e); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/cmd/OffsetCommandReaderWriterFactory.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/cmd/OffsetCommandReaderWriterFactory.java index f829ec414c..23224026eb 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/cmd/OffsetCommandReaderWriterFactory.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/cmd/OffsetCommandReaderWriterFactory.java @@ -3,6 +3,7 @@ import com.ctrip.xpipe.netty.filechannel.ReferenceFileRegion; import com.ctrip.xpipe.redis.core.redis.operation.RedisOp; import com.ctrip.xpipe.redis.core.store.*; +import com.ctrip.xpipe.redis.core.store.ratelimit.ReplDelayConfig; import com.ctrip.xpipe.utils.OffsetNotifier; import org.slf4j.Logger; @@ -22,8 +23,8 @@ public CommandWriter createCmdWriter(CommandStore cmdStore, @Override public CommandReader createCmdReader(OffsetReplicationProgress replProgress, - CommandStore cmdStore, - OffsetNotifier offsetNotifier, long commandReaderFlyingThreshold) throws IOException { + CommandStore cmdStore, OffsetNotifier offsetNotifier, + ReplDelayConfig replDelayConfig, long commandReaderFlyingThreshold) throws IOException { long currentOffset = replProgress.getProgress(); cmdStore.rotateFileIfNecessary(); CommandFile commandFile = cmdStore.findFileForOffset(currentOffset); @@ -32,7 +33,7 @@ public CommandReader createCmdReader(OffsetReplicationProgr } return new OffsetCommandReader(commandFile, currentOffset, currentOffset - commandFile.getStartOffset(), - cmdStore, offsetNotifier, commandReaderFlyingThreshold); + cmdStore, offsetNotifier, replDelayConfig, commandReaderFlyingThreshold); } @Override diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractRedisKeeperContextTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractRedisKeeperContextTest.java index eb8fbfabca..133a3263fb 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractRedisKeeperContextTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractRedisKeeperContextTest.java @@ -12,6 +12,7 @@ import com.ctrip.xpipe.redis.core.store.ReplId; import com.ctrip.xpipe.redis.keeper.config.KeeperConfig; import com.ctrip.xpipe.redis.keeper.config.KeeperResourceManager; +import com.ctrip.xpipe.redis.keeper.config.ReplDelayConfigCache; import com.ctrip.xpipe.redis.keeper.impl.DefaultRedisKeeperServer; import com.ctrip.xpipe.redis.keeper.ratelimit.SyncRateManager; import com.ctrip.xpipe.redis.keeper.spring.KeeperContextConfig; @@ -125,7 +126,7 @@ protected RedisKeeperServer createRedisKeeperServer(Long replId, KeeperMeta keep protected RedisKeeperServer createRedisKeeperServer(Long replId, KeeperMeta keeper, KeeperConfig keeperConfig, File baseDir, LeaderElectorManager leaderElectorManager) { return new DefaultRedisKeeperServer(replId, keeper, keeperConfig, baseDir, leaderElectorManager, - createkeepersMonitorManager(), getResourceManager(), Mockito.mock(SyncRateManager.class), createRedisOpParser()); + createkeepersMonitorManager(), getResourceManager(), Mockito.mock(SyncRateManager.class), createRedisOpParser(), new ReplDelayConfigCache()); } protected RedisOpParser createRedisOpParser() { diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/GtidRedisKeeperServerTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/GtidRedisKeeperServerTest.java index b15aaa7848..a23b14b506 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/GtidRedisKeeperServerTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/GtidRedisKeeperServerTest.java @@ -16,6 +16,7 @@ import com.ctrip.xpipe.redis.core.store.ReplicationStore; import com.ctrip.xpipe.redis.keeper.AbstractFakeRedisTest; import com.ctrip.xpipe.redis.keeper.config.KeeperConfig; +import com.ctrip.xpipe.redis.keeper.config.ReplDelayConfigCache; import com.ctrip.xpipe.redis.keeper.ratelimit.SyncRateManager; import io.netty.buffer.ByteBuf; import org.junit.After; @@ -59,7 +60,7 @@ public void testPsyncAndXsync() throws Exception { KeeperMeta keeperMeta = createKeeperMeta(); DefaultRedisKeeperServer keeperServer = new DefaultRedisKeeperServer(getReplId().id(), keeperMeta, keeperConfig, getReplicationStoreManagerBaseDir(keeperMeta), getRegistry().getComponent(LeaderElectorManager.class), - createkeepersMonitorManager(), getResourceManager(), Mockito.mock(SyncRateManager.class), parser); + createkeepersMonitorManager(), getResourceManager(), Mockito.mock(SyncRateManager.class), parser, new ReplDelayConfigCache()); keeperServer.initialize(); keeperServer.start(); diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/spring/TestProfile.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/spring/TestProfile.java index 6e35b840c6..d194583732 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/spring/TestProfile.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/spring/TestProfile.java @@ -46,6 +46,11 @@ public ZkClient getZkClient(ZkTestServer zkTestServer) throws Exception{ public KeeperConfig getKeeperConfig(){ return new TestKeeperConfig(1024, 2, 1024, 2000); } + + @Bean + public KeeperCommonConfig getKeeperCommonConfig() { + return new TestKeeperCommonConfig(); + } @Bean public KeeperContainerConfig getKeeperContainerConfig(){ diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/spring/TestWithoutZkProfile.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/spring/TestWithoutZkProfile.java index 2148770926..fc2fb5b02c 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/spring/TestWithoutZkProfile.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/spring/TestWithoutZkProfile.java @@ -43,7 +43,12 @@ public ZkClient getZkClient(KeeperConfig keeperConfig){ public KeeperConfig getKeeperConfig(){ return new TestKeeperConfig(1024, 2, 1024, 2000); } - + + @Bean + public KeeperCommonConfig getKeeperCommonConfig() { + return new TestKeeperCommonConfig(); + } + @Bean public KeeperContainerConfig getKeeperContainerConfig(){ return new TestKeeperContainerConfig(AbstractTest.getUserHome() + "/rsd");