diff --git a/core/src/main/java/com/ctrip/xpipe/netty/filechannel/ReferenceFileChannel.java b/core/src/main/java/com/ctrip/xpipe/netty/filechannel/ReferenceFileChannel.java index 0127cbd792..48722611e6 100644 --- a/core/src/main/java/com/ctrip/xpipe/netty/filechannel/ReferenceFileChannel.java +++ b/core/src/main/java/com/ctrip/xpipe/netty/filechannel/ReferenceFileChannel.java @@ -46,7 +46,7 @@ public void close() throws IOException { tryCloseChannel(); } - protected ReferenceFileRegion readTilEnd(int maxBytes) throws IOException { + public ReferenceFileRegion read(int maxBytes) throws IOException { while(true){ @@ -70,7 +70,7 @@ protected ReferenceFileRegion readTilEnd(int maxBytes) throws IOException { } public ReferenceFileRegion readTilEnd() throws IOException { - return readTilEnd(-1); + return read(-1); } private void increase() { diff --git a/core/src/test/java/com/ctrip/xpipe/netty/filechannel/ReferenceFileChannelTest.java b/core/src/test/java/com/ctrip/xpipe/netty/filechannel/ReferenceFileChannelTest.java index 8ac19edce7..e15b68a37a 100644 --- a/core/src/test/java/com/ctrip/xpipe/netty/filechannel/ReferenceFileChannelTest.java +++ b/core/src/test/java/com/ctrip/xpipe/netty/filechannel/ReferenceFileChannelTest.java @@ -84,7 +84,7 @@ protected void doRun() throws Exception { while (true) { count++; - ReferenceFileRegion referenceFileRegion = referenceFileChannel.readTilEnd(1); + ReferenceFileRegion referenceFileRegion = referenceFileChannel.read(1); fileRegions.offer(referenceFileRegion); if(count > totalFileLen){ logger.info("{}", referenceFileRegion); diff --git a/redis/package/redis-keeper-package/src/main/scripts/redis/template/master.conf b/redis/package/redis-keeper-package/src/main/scripts/redis/template/master.conf index 24ff27f838..c9c8c5bfcd 100755 --- a/redis/package/redis-keeper-package/src/main/scripts/redis/template/master.conf +++ b/redis/package/redis-keeper-package/src/main/scripts/redis/template/master.conf @@ -16,7 +16,7 @@ client-output-buffer-limit slave 0 0 0 repl-diskless-sync yes repl-diskless-sync-delay 3 #protected-mode no -maxmemory-policy volatile-lru +maxmemory-policy allkeys-lru protected-mode no hz 1 diff --git a/redis/package/redis-keeper-package/src/main/scripts/startup.sh b/redis/package/redis-keeper-package/src/main/scripts/startup.sh index 8924434744..3c7e30af94 100644 --- a/redis/package/redis-keeper-package/src/main/scripts/startup.sh +++ b/redis/package/redis-keeper-package/src/main/scripts/startup.sh @@ -163,22 +163,15 @@ else IDC=`getIdc` total=`getTotalMem` if ([ $IDC = "PTJQ" ] || [ $IDC = "PTOY" ]) && ([ "$total" -gt 60 ]);then - #MB - USED_MEM=30720 - XMN=11520 - MAX_DIRECT=5120 - elif [ $IDC = "UAT-AWS" ]; then - #MB - USED_MEM=6144 - XMN=2400 - MAX_DIRECT=300 + USED_MEM=30 + XMN=12 + MAX_DIRECT=5 else - #MB - USED_MEM=1600 - XMN=600 - MAX_DIRECT=100 + USED_MEM=`getSafeXmx` + XMN=`getSafeXmn $USED_MEM` + MAX_DIRECT=1 fi - JAVA_OPTS="$JAVA_OPTS -Xms${USED_MEM}m -Xmx${USED_MEM}m -Xmn${XMN}m -XX:+AlwaysPreTouch -XX:MaxDirectMemorySize=${MAX_DIRECT}m" + JAVA_OPTS="$JAVA_OPTS -Xms${USED_MEM}g -Xmx${USED_MEM}g -Xmn${XMN}g -XX:+AlwaysPreTouch -XX:MaxDirectMemorySize=${MAX_DIRECT}g" fi export JAVA_OPTS="$JAVA_OPTS -Dio.netty.maxDirectMemory=0 -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m -XX:+UseParNewGC -XX:MaxTenuringThreshold=2 -XX:+UseConcMarkSweepGC -XX:+UseCMSInitiatingOccupancyOnly -XX:+ScavengeBeforeFullGC -XX:+UseCMSCompactAtFullCollection -XX:+CMSParallelRemarkEnabled -XX:CMSFullGCsBeforeCompaction=9 -XX:CMSInitiatingOccupancyFraction=60 -XX:-CMSClassUnloadingEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:-ReduceInitialCardMarks -XX:+CMSPermGenSweepingEnabled -XX:CMSInitiatingPermOccupancyFraction=70 -XX:+ExplicitGCInvokesConcurrent -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationConcurrentTime -XX:+PrintHeapAtGC -XX:+HeapDumpOnOutOfMemoryError -XX:-OmitStackTraceInFastThrow -Duser.timezone=Asia/Shanghai -Dclient.encoding.override=UTF-8 -Dfile.encoding=UTF-8 -Xloggc:$LOG_DIR/heap_trace.txt -XX:HeapDumpPath=$LOG_DIR/HeapDumpOnOutOfMemoryError/ -Dcom.sun.management.jmxremote.port=$JMX_PORT -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=${IP} -XX:+UnlockCommercialFeatures -XX:+FlightRecorder -Djava.security.egd=file:/dev/./urandom" diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/Replconf.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/Replconf.java index dc368ee99b..9df88ece8f 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/Replconf.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/protocal/cmd/Replconf.java @@ -50,7 +50,7 @@ protected boolean hasResponse() { public enum ReplConfType { - LISTENING_PORT("listening-port"), CAPA("capa"), CRDT("crdt"), ACK("ack"), KEEPER("keeper"); + LISTENING_PORT("listening-port"), CAPA("capa"), CRDT("crdt"), ACK("ack"), KEEPER("keeper"), IDC("idc"); private String command; diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/CommandReaderWriterFactory.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/CommandReaderWriterFactory.java index 91e08a7d07..1a36620ca0 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/CommandReaderWriterFactory.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/CommandReaderWriterFactory.java @@ -2,6 +2,7 @@ import com.ctrip.xpipe.netty.filechannel.ReferenceFileRegion; import com.ctrip.xpipe.redis.core.redis.operation.RedisOp; +import com.ctrip.xpipe.redis.core.store.ratelimit.ReplDelayConfig; import com.ctrip.xpipe.utils.OffsetNotifier; import org.slf4j.Logger; @@ -16,7 +17,7 @@ public interface CommandReaderWriterFactory { CommandWriter createCmdWriter(CommandStore cmdStore, int maxFileSize, Logger delayTraceLogger) throws IOException; CommandReader createCmdReader(OffsetReplicationProgress replProgress, CommandStore cmdStore, - OffsetNotifier offsetNotifier, long commandReaderFlyingThreshold) throws IOException; + OffsetNotifier offsetNotifier, ReplDelayConfig replDelayConfig, long commandReaderFlyingThreshold) throws IOException; CommandReader createCmdReader(GtidSetReplicationProgress replProgress, CommandStore cmdStore, OffsetNotifier offsetNotifier, long commandReaderFlyingThreshold) throws IOException; diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/CommandsListener.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/CommandsListener.java index 72104a633d..08ac7554ec 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/CommandsListener.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/CommandsListener.java @@ -1,5 +1,6 @@ package com.ctrip.xpipe.redis.core.store; +import com.ctrip.xpipe.redis.core.store.ratelimit.ReplDelayConfig; import io.netty.channel.ChannelFuture; /** @@ -7,7 +8,7 @@ * * 2016年4月19日 下午4:59:45 */ -public interface CommandsListener { +public interface CommandsListener extends ReplDelayConfig { boolean isOpen(); @@ -17,4 +18,5 @@ public interface CommandsListener { void beforeCommand(); Long processedOffset(); + } diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/RdbFileListener.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/RdbFileListener.java index 53c4cfc24f..ab35de7e22 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/RdbFileListener.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/RdbFileListener.java @@ -2,6 +2,7 @@ import com.ctrip.xpipe.netty.filechannel.ReferenceFileRegion; import com.ctrip.xpipe.redis.core.protocal.protocal.EofType; +import com.ctrip.xpipe.redis.core.store.ratelimit.ReplDelayConfig; import java.io.IOException; @@ -10,7 +11,7 @@ * * 2016年5月9日 下午5:28:47 */ -public interface RdbFileListener { +public interface RdbFileListener extends ReplDelayConfig { void setRdbFileInfo(EofType eofType, ReplicationProgress rdbProgress); diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/ratelimit/ReplDelayConfig.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/ratelimit/ReplDelayConfig.java new file mode 100644 index 0000000000..e7009612bd --- /dev/null +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/store/ratelimit/ReplDelayConfig.java @@ -0,0 +1,13 @@ +package com.ctrip.xpipe.redis.core.store.ratelimit; + +public interface ReplDelayConfig { + + default long getDelayMilli() { + return 0; + } + + default int getLimitBytesPerSecond() { + return -1; + } + +} diff --git a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/AbstractIntegratedTest.java b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/AbstractIntegratedTest.java index e81c303869..68b8d6b157 100644 --- a/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/AbstractIntegratedTest.java +++ b/redis/redis-integration-test/src/test/java/com/ctrip/xpipe/redis/integratedtest/AbstractIntegratedTest.java @@ -15,10 +15,7 @@ import com.ctrip.xpipe.redis.core.redis.operation.RedisOpParserManager; import com.ctrip.xpipe.redis.core.redis.operation.parser.*; import com.ctrip.xpipe.redis.keeper.RedisKeeperServer; -import com.ctrip.xpipe.redis.keeper.config.DefaultKeeperConfig; -import com.ctrip.xpipe.redis.keeper.config.DefaultKeeperResourceManager; -import com.ctrip.xpipe.redis.keeper.config.KeeperConfig; -import com.ctrip.xpipe.redis.keeper.config.KeeperResourceManager; +import com.ctrip.xpipe.redis.keeper.config.*; import com.ctrip.xpipe.redis.keeper.impl.DefaultRedisKeeperServer; import com.ctrip.xpipe.redis.keeper.monitor.KeepersMonitorManager; import com.ctrip.xpipe.redis.keeper.monitor.impl.NoneKeepersMonitorManager; @@ -191,7 +188,7 @@ protected RedisKeeperServer createGtidRedisKeeperServer(KeeperMeta keeperMeta, F Long replId = keeperMeta.parent().getDbId(); return new DefaultRedisKeeperServer(replId, keeperMeta, keeperConfig, baseDir, - leaderElectorManager, keeperMonitorManager, resourceManager, syncRateManager, redisOpParser); + leaderElectorManager, keeperMonitorManager, resourceManager, syncRateManager, redisOpParser, new ReplDelayConfigCache()); } protected RedisKeeperServer createRedisKeeperServer(KeeperMeta keeperMeta, File baseDir, KeeperConfig keeperConfig, diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisClient.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisClient.java index 33edbf51f5..fa47a518e3 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisClient.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/RedisClient.java @@ -4,6 +4,7 @@ import com.ctrip.xpipe.api.lifecycle.Releasable; import com.ctrip.xpipe.api.observer.Observable; import com.ctrip.xpipe.redis.core.protocal.CAPA; +import com.ctrip.xpipe.redis.core.store.ratelimit.ReplDelayConfig; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; @@ -15,7 +16,7 @@ * * 2016年4月22日 上午11:25:07 */ -public interface RedisClient extends Observable, Infoable, Closeable, RedisRole, Releasable, Keeperable{ +public interface RedisClient extends Observable, Infoable, Closeable, RedisRole, ReplDelayConfig, Releasable, Keeperable{ public static enum CLIENT_ROLE{ NORMAL, @@ -33,6 +34,10 @@ public static enum CLIENT_ROLE{ int getSlaveListeningPort(); + void setIdc(String idc); + + String getIdc(); + void setClientIpAddress(String host); String getClientIpAddress(); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/DefaultKeeperCommonConfig.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/DefaultKeeperCommonConfig.java new file mode 100644 index 0000000000..ae88b02ab1 --- /dev/null +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/DefaultKeeperCommonConfig.java @@ -0,0 +1,41 @@ +package com.ctrip.xpipe.redis.keeper.config; + +import com.ctrip.xpipe.api.codec.Codec; +import com.ctrip.xpipe.api.codec.GenericTypeReference; +import com.ctrip.xpipe.api.config.Config; +import com.ctrip.xpipe.api.config.ConfigProvider; +import com.ctrip.xpipe.config.CompositeConfig; +import com.ctrip.xpipe.config.DefaultFileConfig; +import com.ctrip.xpipe.config.DefaultPropertyConfig; +import com.ctrip.xpipe.redis.core.config.AbstractCoreConfig; + +import java.util.List; +import java.util.Map; + +public class DefaultKeeperCommonConfig extends AbstractCoreConfig implements KeeperCommonConfig { + + private static String KEY_KEEPER_REPL_DELAY_CONFIG = "keeper.repl.delay.config"; + private static String KEY_REDIS_REPL_DELAY_CONFIG = "redis.repl.delay.config"; + + private final GenericTypeReference> keeperReplDelayConfigListType = new GenericTypeReference>() {}; + private final GenericTypeReference> redisReplDelayConfigMapType = new GenericTypeReference>() {}; + + public DefaultKeeperCommonConfig(){ + CompositeConfig compositeConfig = new CompositeConfig(); + compositeConfig.addConfig(ConfigProvider.DEFAULT.getOrCreateConfig(ConfigProvider.COMMON_CONFIG)); + compositeConfig.addConfig(new DefaultPropertyConfig()); + setConfig(compositeConfig); + } + + @Override + public List getKeeperReplDelayConfigs() { + String replDelayConfigInfo = getProperty(KEY_KEEPER_REPL_DELAY_CONFIG, "[]"); + return Codec.DEFAULT.decode(replDelayConfigInfo, keeperReplDelayConfigListType); + } + + @Override + public Map getRedisReplDelayConfigs() { + String replDelayConfigInfo = getProperty(KEY_REDIS_REPL_DELAY_CONFIG, "{}"); + return Codec.DEFAULT.decode(replDelayConfigInfo, redisReplDelayConfigMapType); + } +} diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperCommonConfig.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperCommonConfig.java new file mode 100644 index 0000000000..2ccca389d2 --- /dev/null +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperCommonConfig.java @@ -0,0 +1,12 @@ +package com.ctrip.xpipe.redis.keeper.config; + +import java.util.List; +import java.util.Map; + +public interface KeeperCommonConfig { + + List getKeeperReplDelayConfigs(); + + Map getRedisReplDelayConfigs(); + +} diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperConfig.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperConfig.java index 7e5759e0f3..5ec1a700c2 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperConfig.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperConfig.java @@ -2,6 +2,9 @@ import com.ctrip.xpipe.redis.core.config.CoreConfig; +import java.util.List; +import java.util.Map; + /** * @author marsqing * diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperReplDelayConfig.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperReplDelayConfig.java new file mode 100644 index 0000000000..7a9c877367 --- /dev/null +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/KeeperReplDelayConfig.java @@ -0,0 +1,35 @@ +package com.ctrip.xpipe.redis.keeper.config; + +public class KeeperReplDelayConfig { + + private String srcDc; + + private String destDc; + + private long delayMilli; + + public String getSrcDc() { + return srcDc; + } + + public void setSrcDc(String srcDc) { + this.srcDc = srcDc; + } + + public String getDestDc() { + return destDc; + } + + public void setDestDc(String destDc) { + this.destDc = destDc; + } + + public long getDelayMilli() { + return delayMilli; + } + + public void setDelayMilli(long delayMilli) { + this.delayMilli = delayMilli; + } + +} diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/RedisReplDelayConfig.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/RedisReplDelayConfig.java new file mode 100644 index 0000000000..5c2eba2739 --- /dev/null +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/RedisReplDelayConfig.java @@ -0,0 +1,14 @@ +package com.ctrip.xpipe.redis.keeper.config; + +public class RedisReplDelayConfig { + + private int bytesLimitPerSecond = -1; + + public int getBytesLimitPerSecond() { + return bytesLimitPerSecond; + } + + public void setBytesLimitPerSecond(int bytesLimitPerSecond) { + this.bytesLimitPerSecond = bytesLimitPerSecond; + } +} diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/ReplDelayConfigCache.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/ReplDelayConfigCache.java new file mode 100644 index 0000000000..5ba285a52b --- /dev/null +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/ReplDelayConfigCache.java @@ -0,0 +1,111 @@ +package com.ctrip.xpipe.redis.keeper.config; + +import com.ctrip.xpipe.api.foundation.FoundationService; +import com.ctrip.xpipe.api.lifecycle.TopElement; +import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask; +import com.ctrip.xpipe.lifecycle.AbstractLifecycle; +import com.ctrip.xpipe.utils.XpipeThreadFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.lang.Nullable; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +@Component +public class ReplDelayConfigCache extends AbstractLifecycle implements TopElement { + + private KeeperCommonConfig keeperConfig; + + private ScheduledExecutorService scheduled; + + private ScheduledFuture future; + + private Map keeperReplDelayConfigMap; + + private RedisReplDelayConfig redisReplDelayConfig; + + public ReplDelayConfigCache() { + this(null); + } + + @Autowired + public ReplDelayConfigCache(KeeperCommonConfig keeperConfig) { + this.keeperConfig = keeperConfig; + this.redisReplDelayConfig = null; + this.keeperReplDelayConfigMap = new HashMap<>(); + } + + private void refresh() { + logger.debug("[refresh]"); + List keeperReplDelayConfigs = keeperConfig.getKeeperReplDelayConfigs(); + Map redisReplDelayConfigs = keeperConfig.getRedisReplDelayConfigs(); + String currentDc = FoundationService.DEFAULT.getDataCenter(); + + if (redisReplDelayConfigs.containsKey(currentDc)) { + this.redisReplDelayConfig = redisReplDelayConfigs.get(currentDc); + } else { + this.redisReplDelayConfig = redisReplDelayConfigs.getOrDefault("DEFAULT", null); + } + + Map localReplDelayConfigMap = new HashMap<>(); + for (KeeperReplDelayConfig config: keeperReplDelayConfigs) { + if (currentDc.equalsIgnoreCase(config.getSrcDc())) { + localReplDelayConfigMap.put(config.getDestDc().toUpperCase(), config); + } + } + this.keeperReplDelayConfigMap = localReplDelayConfigMap; + } + + @Nullable + public KeeperReplDelayConfig getKeeperReplDelayConfig(String destIdc) { + if (null == destIdc || this.keeperReplDelayConfigMap.isEmpty()) return null; + return this.keeperReplDelayConfigMap.get(destIdc); + } + + @Nullable + public RedisReplDelayConfig getRedisReplDelayConfig() { + return this.redisReplDelayConfig; + } + + @Override + protected void doInitialize() throws Exception { + super.doInitialize(); + this.scheduled = Executors.newScheduledThreadPool(1, XpipeThreadFactory.create("ReplDelayConfigCache")); + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + this.future = scheduled.scheduleWithFixedDelay(new AbstractExceptionLogTask() { + @Override + protected void doRun() throws Exception { + ReplDelayConfigCache.this.refresh(); + } + }, 5, 10, TimeUnit.SECONDS); + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + if (null != future) { + this.future.cancel(false); + this.future = null; + } + } + + @Override + protected void doDispose() throws Exception { + super.doDispose(); + if (null != scheduled) { + this.scheduled.shutdownNow(); + this.scheduled = null; + } + } + +} diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperCommonConfig.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperCommonConfig.java new file mode 100644 index 0000000000..8eaba0cfad --- /dev/null +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperCommonConfig.java @@ -0,0 +1,18 @@ +package com.ctrip.xpipe.redis.keeper.config; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class TestKeeperCommonConfig implements KeeperCommonConfig { + + @Override + public List getKeeperReplDelayConfigs() { + return Collections.emptyList(); + } + + @Override + public Map getRedisReplDelayConfigs() { + return Collections.emptyMap(); + } +} diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperConfig.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperConfig.java index 8f5171ea9c..ee162d182a 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperConfig.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/config/TestKeeperConfig.java @@ -3,6 +3,9 @@ import com.ctrip.xpipe.redis.core.config.AbstractCoreConfig; import com.ctrip.xpipe.redis.keeper.store.DefaultCommandStore; +import java.util.Collections; +import java.util.List; + /** * @author wenchao.meng * diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/container/KeeperContainerService.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/container/KeeperContainerService.java index fc104d9605..ff2b543eec 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/container/KeeperContainerService.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/container/KeeperContainerService.java @@ -16,6 +16,7 @@ import com.ctrip.xpipe.redis.keeper.config.KeeperConfig; import com.ctrip.xpipe.redis.keeper.config.KeeperContainerConfig; import com.ctrip.xpipe.redis.keeper.config.KeeperResourceManager; +import com.ctrip.xpipe.redis.keeper.config.ReplDelayConfigCache; import com.ctrip.xpipe.redis.keeper.exception.RedisKeeperRuntimeException; import com.ctrip.xpipe.redis.keeper.health.DiskHealthChecker; import com.ctrip.xpipe.redis.keeper.health.HealthState; @@ -61,6 +62,8 @@ public class KeeperContainerService extends AbstractLifecycle implements TopElem private DiskHealthChecker diskHealthChecker; @Autowired private SyncRateManager syncRateManager; + @Autowired + private ReplDelayConfigCache replDelayConfigCache; private Map redisKeeperServers = Maps.newConcurrentMap(); @@ -110,7 +113,8 @@ public RedisKeeperServer add(KeeperTransMeta keeperTransMeta) { File baseDir = getReplicationStoreDir(keeperMeta); RedisKeeperServer redisKeeperServer = new DefaultRedisKeeperServer(keeperTransMeta.getReplId(), keeperMeta, - keeperConfig, baseDir, leaderElectorManager, keepersMonitorManager, resourceManager, syncRateManager, redisOpParser); + keeperConfig, baseDir, leaderElectorManager, keepersMonitorManager, resourceManager, syncRateManager, + redisOpParser, replDelayConfigCache); try { register(redisKeeperServer); @@ -313,7 +317,8 @@ private RedisKeeperServer createRedisKeeperServer(Long replId, KeeperMeta keeper File baseDir) throws Exception { RedisKeeperServer redisKeeperServer = new DefaultRedisKeeperServer(replId, keeper, keeperConfig, - baseDir, leaderElectorManager, keepersMonitorManager, resourceManager, syncRateManager, redisOpParser); + baseDir, leaderElectorManager, keepersMonitorManager, resourceManager, syncRateManager, + redisOpParser, replDelayConfigCache); register(redisKeeperServer); return redisKeeperServer; diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/ReplconfHandler.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/ReplconfHandler.java index 75917ba98f..ed61cf6d24 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/ReplconfHandler.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/handler/keeper/ReplconfHandler.java @@ -1,6 +1,7 @@ package com.ctrip.xpipe.redis.keeper.handler.keeper; import com.ctrip.xpipe.redis.core.protocal.CAPA; +import com.ctrip.xpipe.redis.core.protocal.cmd.Replconf; import com.ctrip.xpipe.redis.core.protocal.protocal.RedisErrorParser; import com.ctrip.xpipe.redis.core.protocal.protocal.SimpleStringParser; import com.ctrip.xpipe.redis.keeper.RedisClient; @@ -49,7 +50,9 @@ protected void doHandle(String[] args, RedisClient redisClient) { throw new IllegalStateException("[doHandle][getack not supported]" ); }else if("keeper".equalsIgnoreCase(option)){//extends by keeper redisClient.setKeeper(); - }else{ + }else if(Replconf.ReplConfType.IDC.name().equalsIgnoreCase(option)) { + redisClient.setIdc(args[1].toUpperCase()); + } else{ logger.error("[doHandler][unkonwn command]" + StringUtil.join(" ", args)); redisClient.sendMessage(new RedisErrorParser("unknown replconf command " + StringUtil.join(" ", args)).format()); return; diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/AbstractRedisMasterReplication.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/AbstractRedisMasterReplication.java index 006fa35868..75da6e7148 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/AbstractRedisMasterReplication.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/AbstractRedisMasterReplication.java @@ -1,9 +1,11 @@ package com.ctrip.xpipe.redis.keeper.impl; +import com.ctrip.framework.foundation.Foundation; import com.ctrip.xpipe.api.command.Command; import com.ctrip.xpipe.api.command.CommandFuture; import com.ctrip.xpipe.api.command.CommandFutureListener; import com.ctrip.xpipe.api.endpoint.Endpoint; +import com.ctrip.xpipe.api.foundation.FoundationService; import com.ctrip.xpipe.api.monitor.EventMonitor; import com.ctrip.xpipe.api.pool.SimpleObjectPool; import com.ctrip.xpipe.command.CommandExecutionException; @@ -272,6 +274,7 @@ public void masterConnected(Channel channel) { Replconf crdt = new Replconf(clientPool, ReplConfType.CRDT, scheduled, commandTimeoutMilli, REPL_CONF_CRDT_MARK); chain.add(new FailSafeCommandWrapper<>(crdt)); + chain.add(new FailSafeCommandWrapper<>(keeperIdcCommand())); try { executeCommand(chain).addListener(new CommandFutureListener() { @@ -357,6 +360,11 @@ protected CommandFuture executeCommand(Command command) { return null; } + private Replconf keeperIdcCommand() { + return new Replconf(clientPool, ReplConfType.IDC, scheduled, commandTimeoutMilli, + FoundationService.DEFAULT.getDataCenter()); + } + private Replconf listeningPortCommand() { Replconf replconf = new Replconf(clientPool, ReplConfType.LISTENING_PORT, scheduled, commandTimeoutMilli, diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/ApplierRedisClient.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/ApplierRedisClient.java index 62daa6f0bb..2db414360b 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/ApplierRedisClient.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/ApplierRedisClient.java @@ -100,4 +100,13 @@ protected Logger getLogger() { return logger; } + @Override + public void setIdc(String idc) { + } + + @Override + public String getIdc() { + return null; + } + } diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisClient.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisClient.java index 00b5c5ce92..7234c720fe 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisClient.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisClient.java @@ -1,12 +1,17 @@ package com.ctrip.xpipe.redis.keeper.impl; import com.ctrip.xpipe.api.endpoint.Endpoint; +import com.ctrip.xpipe.api.foundation.FoundationService; import com.ctrip.xpipe.redis.core.protocal.CAPA; import com.ctrip.xpipe.redis.keeper.RedisClient; import com.ctrip.xpipe.redis.keeper.RedisKeeperServer; import com.ctrip.xpipe.redis.keeper.RedisSlave; +import com.ctrip.xpipe.redis.keeper.config.KeeperReplDelayConfig; +import com.ctrip.xpipe.redis.keeper.config.RedisReplDelayConfig; +import com.ctrip.xpipe.redis.keeper.config.ReplDelayConfigCache; import com.ctrip.xpipe.utils.ChannelUtil; import com.ctrip.xpipe.utils.IpUtils; +import com.ctrip.xpipe.utils.StringUtil; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; @@ -29,6 +34,8 @@ public class DefaultRedisClient extends AbstractRedisClient i private Set capas = new HashSet(); private int slaveListeningPort; + + private String idc = null; private AtomicBoolean isKeeper = new AtomicBoolean(false); @@ -38,6 +45,10 @@ public class DefaultRedisClient extends AbstractRedisClient i private Endpoint endpoint; + private ReplDelayConfigCache replDelayConfigCache; + + private final String CURRENT_DC = FoundationService.DEFAULT.getDataCenter(); + public DefaultRedisClient(Channel channel, RedisKeeperServer redisKeeperServer) { super(channel, redisKeeperServer); String remoteIpLocalPort = ChannelUtil.getRemoteAddr(channel); @@ -60,6 +71,38 @@ public void setSlaveListeningPort(int port) { this.slaveListeningPort = port; } + @Override + public void setIdc(String _idc) { + if(logger.isInfoEnabled()){ + logger.info("[setIdc][{}] {}", this, _idc); + } + this.idc = _idc; + } + + @Override + public String getIdc() { + return this.idc; + } + + public void setReplDelayConfigCache(ReplDelayConfigCache replDelayConfigCache) { + this.replDelayConfigCache = replDelayConfigCache; + } + + @Override + public long getDelayMilli() { + if (null == replDelayConfigCache || !isKeeper() || StringUtil.isEmpty(idc)) return super.getDelayMilli(); + KeeperReplDelayConfig replDelayConfig = replDelayConfigCache.getKeeperReplDelayConfig(idc); + if (null == replDelayConfig) return super.getDelayMilli(); + else return replDelayConfig.getDelayMilli(); + } + + public int getLimitBytesPerSecond() { + if (null == replDelayConfigCache || isKeeper()) return super.getLimitBytesPerSecond(); + RedisReplDelayConfig replDelayConfig = replDelayConfigCache.getRedisReplDelayConfig(); + if (null == replDelayConfig) return super.getLimitBytesPerSecond(); + else return replDelayConfig.getBytesLimitPerSecond(); + } + @Override public void capa(CAPA capa) { logger.info("[capa]{}, {}", capa, this); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServer.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServer.java index a810516e8a..023c25dfb0 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServer.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisKeeperServer.java @@ -35,7 +35,9 @@ import com.ctrip.xpipe.redis.core.store.*; import com.ctrip.xpipe.redis.keeper.*; import com.ctrip.xpipe.redis.keeper.config.KeeperConfig; +import com.ctrip.xpipe.redis.keeper.config.KeeperReplDelayConfig; import com.ctrip.xpipe.redis.keeper.config.KeeperResourceManager; +import com.ctrip.xpipe.redis.keeper.config.ReplDelayConfigCache; import com.ctrip.xpipe.redis.keeper.exception.RedisSlavePromotionException; import com.ctrip.xpipe.redis.keeper.handler.CommandHandlerManager; import com.ctrip.xpipe.redis.keeper.monitor.KeeperMonitor; @@ -162,18 +164,20 @@ public class DefaultRedisKeeperServer extends AbstractRedisServer implements Red private RedisOpParser redisOpParser; + private ReplDelayConfigCache replDelayConfigCache; + public DefaultRedisKeeperServer(Long replId, KeeperMeta currentKeeperMeta, KeeperConfig keeperConfig, File baseDir, LeaderElectorManager leaderElectorManager, KeepersMonitorManager keepersMonitorManager, KeeperResourceManager resourceManager, SyncRateManager syncRateManager){ - this(replId, currentKeeperMeta, keeperConfig, baseDir, leaderElectorManager, keepersMonitorManager, resourceManager, syncRateManager, null); + this(replId, currentKeeperMeta, keeperConfig, baseDir, leaderElectorManager, keepersMonitorManager, resourceManager, syncRateManager, null, null); } public DefaultRedisKeeperServer(Long replId, KeeperMeta currentKeeperMeta, KeeperConfig keeperConfig, File baseDir, LeaderElectorManager leaderElectorManager, KeepersMonitorManager keepersMonitorManager, KeeperResourceManager resourceManager, - SyncRateManager syncRateManager, RedisOpParser redisOpParser){ + SyncRateManager syncRateManager, RedisOpParser redisOpParser, ReplDelayConfigCache replDelayConfigCache){ this.clusterId = ClusterId.from(((ClusterMeta) currentKeeperMeta.parent().parent()).getDbId()); this.shardId = ShardId.from(currentKeeperMeta.parent().getDbId()); @@ -188,6 +192,7 @@ public DefaultRedisKeeperServer(Long replId, KeeperMeta currentKeeperMeta, Keepe this.redisOpParser = redisOpParser; this.crossRegion = new AtomicBoolean(false); this.syncRateManager = syncRateManager; + this.replDelayConfigCache = replDelayConfigCache; } protected ReplicationStoreManager createReplicationStoreManager(KeeperConfig keeperConfig, ClusterId clusterId, ShardId shardId, ReplId replId, @@ -507,8 +512,9 @@ public void initChannel(SocketChannel ch) throws Exception { @Override public RedisClient clientConnected(Channel channel) { - - RedisClient redisClient = new DefaultRedisClient(channel, this); + + DefaultRedisClient redisClient = new DefaultRedisClient(channel, this); + redisClient.setReplDelayConfigCache(replDelayConfigCache); redisClients.put(channel, redisClient); redisClient.addObserver(new Observer() { diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisSlave.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisSlave.java index 5f1e4e486c..21f67cff07 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisSlave.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/impl/DefaultRedisSlave.java @@ -12,6 +12,7 @@ import com.ctrip.xpipe.redis.core.protocal.protocal.SimpleStringParser; import com.ctrip.xpipe.redis.core.redis.operation.RedisOp; import com.ctrip.xpipe.redis.core.store.*; +import com.ctrip.xpipe.redis.core.store.ratelimit.ReplDelayConfig; import com.ctrip.xpipe.redis.keeper.RedisClient; import com.ctrip.xpipe.redis.keeper.RedisKeeperServer; import com.ctrip.xpipe.redis.keeper.RedisSlave; @@ -20,6 +21,7 @@ import com.ctrip.xpipe.redis.keeper.util.KeeperReplIdAwareThreadFactory; import com.ctrip.xpipe.utils.*; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.RateLimiter; import com.google.common.util.concurrent.SettableFuture; import io.netty.buffer.ByteBuf; import io.netty.channel.*; @@ -98,7 +100,6 @@ public void operationComplete(ChannelFuture future) throws Exception { public DefaultRedisSlave(RedisClient redisClient){ this.redisClient = redisClient; - this.setSlaveListeningPort(redisClient.getSlaveListeningPort()); this.redisClient.addChannelCloseReleaseResources(this); initExecutor(((DefaultRedisClient)redisClient).channel); } @@ -527,6 +528,26 @@ public int getSlaveListeningPort() { return redisClient.getSlaveListeningPort(); } + @Override + public void setIdc(String idc) { + redisClient.setIdc(idc); + } + + @Override + public String getIdc() { + return redisClient.getIdc(); + } + + @Override + public long getDelayMilli() { + return redisClient.getDelayMilli(); + } + + @Override + public int getLimitBytesPerSecond() { + return redisClient.getLimitBytesPerSecond(); + } + @Override public void setClientIpAddress(String host) { redisClient.setClientIpAddress(host); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/monitor/CommandStoreDelay.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/monitor/CommandStoreDelay.java index c60f3cd08c..5f829cfa82 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/monitor/CommandStoreDelay.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/monitor/CommandStoreDelay.java @@ -13,6 +13,8 @@ public interface CommandStoreDelay { void endWrite(long offset); + void endRead(final CommandsListener commandsListener, final long offset); + void beginSend(CommandsListener commandsListener, long offset); void flushSucceed(CommandsListener commandsListener, long offset); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/monitor/impl/DefaultCommandStoreDelay.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/monitor/impl/DefaultCommandStoreDelay.java index d49366b118..06940885b1 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/monitor/impl/DefaultCommandStoreDelay.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/monitor/impl/DefaultCommandStoreDelay.java @@ -87,6 +87,22 @@ public void run() { }); } + public void endRead(final CommandsListener commandsListener, final long offset) { + exceptionLogWrapper.execute(new Runnable() { + + @Override + public void run() { + updateLastActionTime(); + + OffsetDelay offsetDelay = getOffsetDelay(offset); + if(offsetDelay != null){ + offsetDelay.endRead(commandsListener); + } + } + + }); + } + @Override public void beginSend(final CommandsListener commandsListener, final long offset){ @@ -188,6 +204,13 @@ public void endWrite(long offset){ logIfShould(beginWriteTime.get(), endWriteTime.get(), "[endWrite]"); } + public void endRead(CommandsListener commandsListener) { + ListenerDelay listenerDelay = getOrAddListenerDelay(commandsListener); + if(listenerDelay != null){ + listenerDelay.endRead(); + } + } + public void beginSend(CommandsListener commandsListener, long offset){ ListenerDelay listenerDelay = getOrAddListenerDelay(commandsListener); @@ -259,15 +282,20 @@ public class ListenerDelay{ private CommandsListener commandsListener; + private AtomicLong endReadTime = new AtomicLong(); + private AtomicLong beginSendTime = new AtomicLong(); private AtomicLong endSendTime = new AtomicLong(); + public void endRead() { + this.endReadTime.set(System.nanoTime()); + logIfShould(endWriteTime.get(), endReadTime.get(), "[readOut]"); + } + public void beginSend(){ this.beginSendTime.set(System.nanoTime()); - logIfShould(endWriteTime.get(), beginSendTime.get(), "[beginSend]"); - logger.trace("[beginSend]{}", offset); } @@ -279,6 +307,7 @@ public void reset() { }finally{ beginSendTime.set(0); endSendTime.set(0); + endReadTime.set(0); this.commandsListener = null; } } @@ -306,7 +335,7 @@ protected boolean logIfShould(long begin, long end, String message) { logger.info("{}, {}, {}, delay:{} micro", message, begin, end, delayMicro); return true; } - return false; + return false; } } diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/monitor/impl/NoneCommandStoreDelay.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/monitor/impl/NoneCommandStoreDelay.java index 276af08599..8a165dc0a4 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/monitor/impl/NoneCommandStoreDelay.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/monitor/impl/NoneCommandStoreDelay.java @@ -20,6 +20,11 @@ public void endWrite(long offset) { } + @Override + public void endRead(CommandsListener commandsListener, long offset) { + + } + @Override public void beginSend(CommandsListener commandsListener, long offset) { diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/spring/Production.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/spring/Production.java index f85d10f07a..dacff89b37 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/spring/Production.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/spring/Production.java @@ -40,6 +40,11 @@ public KeeperContainerConfig getKeeperContainerConfig(){ public KeeperConfig getKeeperConfig(){ return new DefaultKeeperConfig(); } + + @Bean + public KeeperCommonConfig getKeeperCommonConfig() { + return new DefaultKeeperCommonConfig(); + } @Bean public KeepersMonitorManager getKeeperMonitorManager(){ diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultCommandStore.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultCommandStore.java index 88b26e5a55..898cbfcf61 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultCommandStore.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultCommandStore.java @@ -2,8 +2,10 @@ import com.ctrip.xpipe.netty.filechannel.ReferenceFileRegion; import com.ctrip.xpipe.redis.core.store.*; +import com.ctrip.xpipe.redis.core.store.ratelimit.ReplDelayConfig; import com.ctrip.xpipe.redis.keeper.monitor.KeeperMonitor; import com.ctrip.xpipe.utils.CloseState; +import com.google.common.util.concurrent.RateLimiter; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import org.slf4j.Logger; @@ -11,6 +13,7 @@ import java.io.File; import java.io.IOException; +import java.util.concurrent.TimeUnit; import java.util.function.IntSupplier; /** @@ -35,12 +38,12 @@ public DefaultCommandStore(File file, int maxFileSize, IntSupplier maxTimeSecond commandReaderFlyingThreshold, cmdReaderWriterFactory, keeperMonitor); } - private CommandReader beginRead(OffsetReplicationProgress replicationProgress) throws IOException { + private CommandReader beginRead(OffsetReplicationProgress replicationProgress, ReplDelayConfig replDelayConfig) throws IOException { makeSureOpen(); CommandReader reader = cmdReaderWriterFactory.createCmdReader(replicationProgress, this, - offsetNotifier, commandReaderFlyingThreshold); + offsetNotifier, replDelayConfig, commandReaderFlyingThreshold); addReader(reader); return reader; } @@ -56,9 +59,10 @@ public void addCommandsListener(ReplicationProgress progress, final CommandsL logger.info("[addCommandsListener][begin] from offset {}, {}", progress, listener); CommandReader cmdReader = null; + RateLimiter rateLimiter = RateLimiter.create(Double.MAX_VALUE); try { - cmdReader = beginRead((OffsetReplicationProgress) progress); + cmdReader = beginRead((OffsetReplicationProgress) progress, listener); } finally { // ensure beforeCommand() is always called listener.beforeCommand(); @@ -73,6 +77,8 @@ public void addCommandsListener(ReplicationProgress progress, final CommandsL if (null == referenceFileRegion) continue; logger.debug("[addCommandsListener] {}", referenceFileRegion); + getCommandStoreDelay().endRead(listener, referenceFileRegion.getTotalPos()); + sleepForDelay(rateLimiter, referenceFileRegion, listener); if(getDelayTraceLogger().isDebugEnabled()){ getDelayTraceLogger().debug("[write][begin]{}, {}", listener, referenceFileRegion.getTotalPos()); @@ -119,6 +125,28 @@ public void operationComplete(ChannelFuture future) throws Exception { logger.info("[addCommandsListener][end] from {}, {}", progress, listener); } + private void sleepForDelay(RateLimiter rateLimiter, final ReferenceFileRegion referenceFileRegion, final CommandsListener listener) { + long delayMilli = listener.getDelayMilli(); + int limitBytes = listener.getLimitBytesPerSecond(); + logger.debug("[sleepForDelay]{},{}", delayMilli,limitBytes); + + if (limitBytes > 0 && referenceFileRegion.count() > 0) { + long start = System.nanoTime(); + if (((int)rateLimiter.getRate()) != limitBytes) rateLimiter.setRate(limitBytes); + int readBytes = (int)Math.min(referenceFileRegion.count(), limitBytes); + rateLimiter.acquire(readBytes); + delayMilli = delayMilli - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + } + + if (delayMilli > 0) { + try { + Thread.sleep(delayMilli); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + @Override public Logger getLogger() { return logger; diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultFullSyncListener.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultFullSyncListener.java index a9d40c4e45..c210d1de88 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultFullSyncListener.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultFullSyncListener.java @@ -100,6 +100,16 @@ public Long processedOffset() { return redisSlave.getAck(); } + @Override + public long getDelayMilli() { + return redisSlave.getDelayMilli(); + } + + @Override + public int getLimitBytesPerSecond() { + return redisSlave.getLimitBytesPerSecond(); + } + @Override public String toString() { return String.format("%s:%s", getClass().getSimpleName(), redisSlave); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultRdbStore.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultRdbStore.java index 4a234fa172..e72af519e4 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultRdbStore.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/DefaultRdbStore.java @@ -12,9 +12,11 @@ import com.ctrip.xpipe.redis.core.store.RdbStore; import com.ctrip.xpipe.redis.core.store.RdbStoreListener; import com.ctrip.xpipe.redis.core.store.OffsetReplicationProgress; +import com.ctrip.xpipe.redis.core.store.ratelimit.ReplDelayConfig; import com.ctrip.xpipe.redis.core.store.ratelimit.SyncRateLimiter; import com.ctrip.xpipe.utils.DefaultControllableFile; import com.ctrip.xpipe.utils.SizeControllableFile; +import com.google.common.util.concurrent.RateLimiter; import io.netty.buffer.ByteBuf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,6 +27,7 @@ import java.nio.channels.FileChannel; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongSupplier; @@ -252,10 +255,16 @@ protected void doReadRdbFileInfo(RdbFileListener rdbFileListener) { protected void doReadRdbFile(RdbFileListener rdbFileListener, ReferenceFileChannel referenceFileChannel) throws IOException { long lastLogTime = System.currentTimeMillis(); + RateLimiter rateLimiter = RateLimiter.create(Double.MAX_VALUE); while (rdbFileListener.isOpen() && (isRdbWriting(status.get()) || (status.get() == Status.Success && referenceFileChannel.hasAnythingToRead()))) { - - ReferenceFileRegion referenceFileRegion = referenceFileChannel.readTilEnd(); + int limitBytes = rdbFileListener.getLimitBytesPerSecond(); + ReferenceFileRegion referenceFileRegion = referenceFileChannel.read(limitBytes); + if (limitBytes > 0 && referenceFileRegion.count() > 0) { + if (((int)rateLimiter.getRate()) != limitBytes) rateLimiter.setRate(limitBytes); + int readBytes = (int)Math.min(referenceFileRegion.count(), limitBytes); + rateLimiter.acquire(readBytes); + } rdbFileListener.onFileData(referenceFileRegion); if(referenceFileRegion.count() <= 0) { try { diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/cmd/OffsetCommandReader.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/cmd/OffsetCommandReader.java index 3610679198..e939777bb5 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/cmd/OffsetCommandReader.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/cmd/OffsetCommandReader.java @@ -5,6 +5,7 @@ import com.ctrip.xpipe.redis.core.store.CommandFile; import com.ctrip.xpipe.redis.core.store.CommandReader; import com.ctrip.xpipe.redis.core.store.CommandStore; +import com.ctrip.xpipe.redis.core.store.ratelimit.ReplDelayConfig; import com.ctrip.xpipe.utils.DefaultControllableFile; import com.ctrip.xpipe.utils.OffsetNotifier; import org.slf4j.Logger; @@ -28,15 +29,18 @@ public class OffsetCommandReader extends AbstractFlyingThresholdCommandReader= 0) { + offsetNotifier.await(curPosition, milliSeconds); + } else { + offsetNotifier.await(curPosition); + } readNextFileIfNecessary(); } catch (InterruptedException e) { logger.info("[read]", e); @@ -54,7 +61,7 @@ public ReferenceFileRegion doRead(long milliSeconds) throws IOException { } if (!referenceFileChannel.hasAnythingToRead()) return null; - ReferenceFileRegion referenceFileRegion = referenceFileChannel.readTilEnd(); + ReferenceFileRegion referenceFileRegion = referenceFileChannel.read(replDelayConfig.getLimitBytesPerSecond()); curPosition += referenceFileRegion.count(); diff --git a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/cmd/OffsetCommandReaderWriterFactory.java b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/cmd/OffsetCommandReaderWriterFactory.java index f829ec414c..23224026eb 100644 --- a/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/cmd/OffsetCommandReaderWriterFactory.java +++ b/redis/redis-keeper/src/main/java/com/ctrip/xpipe/redis/keeper/store/cmd/OffsetCommandReaderWriterFactory.java @@ -3,6 +3,7 @@ import com.ctrip.xpipe.netty.filechannel.ReferenceFileRegion; import com.ctrip.xpipe.redis.core.redis.operation.RedisOp; import com.ctrip.xpipe.redis.core.store.*; +import com.ctrip.xpipe.redis.core.store.ratelimit.ReplDelayConfig; import com.ctrip.xpipe.utils.OffsetNotifier; import org.slf4j.Logger; @@ -22,8 +23,8 @@ public CommandWriter createCmdWriter(CommandStore cmdStore, @Override public CommandReader createCmdReader(OffsetReplicationProgress replProgress, - CommandStore cmdStore, - OffsetNotifier offsetNotifier, long commandReaderFlyingThreshold) throws IOException { + CommandStore cmdStore, OffsetNotifier offsetNotifier, + ReplDelayConfig replDelayConfig, long commandReaderFlyingThreshold) throws IOException { long currentOffset = replProgress.getProgress(); cmdStore.rotateFileIfNecessary(); CommandFile commandFile = cmdStore.findFileForOffset(currentOffset); @@ -32,7 +33,7 @@ public CommandReader createCmdReader(OffsetReplicationProgr } return new OffsetCommandReader(commandFile, currentOffset, currentOffset - commandFile.getStartOffset(), - cmdStore, offsetNotifier, commandReaderFlyingThreshold); + cmdStore, offsetNotifier, replDelayConfig, commandReaderFlyingThreshold); } @Override diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractRedisKeeperContextTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractRedisKeeperContextTest.java index eb8fbfabca..133a3263fb 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractRedisKeeperContextTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/AbstractRedisKeeperContextTest.java @@ -12,6 +12,7 @@ import com.ctrip.xpipe.redis.core.store.ReplId; import com.ctrip.xpipe.redis.keeper.config.KeeperConfig; import com.ctrip.xpipe.redis.keeper.config.KeeperResourceManager; +import com.ctrip.xpipe.redis.keeper.config.ReplDelayConfigCache; import com.ctrip.xpipe.redis.keeper.impl.DefaultRedisKeeperServer; import com.ctrip.xpipe.redis.keeper.ratelimit.SyncRateManager; import com.ctrip.xpipe.redis.keeper.spring.KeeperContextConfig; @@ -125,7 +126,7 @@ protected RedisKeeperServer createRedisKeeperServer(Long replId, KeeperMeta keep protected RedisKeeperServer createRedisKeeperServer(Long replId, KeeperMeta keeper, KeeperConfig keeperConfig, File baseDir, LeaderElectorManager leaderElectorManager) { return new DefaultRedisKeeperServer(replId, keeper, keeperConfig, baseDir, leaderElectorManager, - createkeepersMonitorManager(), getResourceManager(), Mockito.mock(SyncRateManager.class), createRedisOpParser()); + createkeepersMonitorManager(), getResourceManager(), Mockito.mock(SyncRateManager.class), createRedisOpParser(), new ReplDelayConfigCache()); } protected RedisOpParser createRedisOpParser() { diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/GtidRedisKeeperServerTest.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/GtidRedisKeeperServerTest.java index b15aaa7848..a23b14b506 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/GtidRedisKeeperServerTest.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/impl/GtidRedisKeeperServerTest.java @@ -16,6 +16,7 @@ import com.ctrip.xpipe.redis.core.store.ReplicationStore; import com.ctrip.xpipe.redis.keeper.AbstractFakeRedisTest; import com.ctrip.xpipe.redis.keeper.config.KeeperConfig; +import com.ctrip.xpipe.redis.keeper.config.ReplDelayConfigCache; import com.ctrip.xpipe.redis.keeper.ratelimit.SyncRateManager; import io.netty.buffer.ByteBuf; import org.junit.After; @@ -59,7 +60,7 @@ public void testPsyncAndXsync() throws Exception { KeeperMeta keeperMeta = createKeeperMeta(); DefaultRedisKeeperServer keeperServer = new DefaultRedisKeeperServer(getReplId().id(), keeperMeta, keeperConfig, getReplicationStoreManagerBaseDir(keeperMeta), getRegistry().getComponent(LeaderElectorManager.class), - createkeepersMonitorManager(), getResourceManager(), Mockito.mock(SyncRateManager.class), parser); + createkeepersMonitorManager(), getResourceManager(), Mockito.mock(SyncRateManager.class), parser, new ReplDelayConfigCache()); keeperServer.initialize(); keeperServer.start(); diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/spring/TestProfile.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/spring/TestProfile.java index 6e35b840c6..d194583732 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/spring/TestProfile.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/spring/TestProfile.java @@ -46,6 +46,11 @@ public ZkClient getZkClient(ZkTestServer zkTestServer) throws Exception{ public KeeperConfig getKeeperConfig(){ return new TestKeeperConfig(1024, 2, 1024, 2000); } + + @Bean + public KeeperCommonConfig getKeeperCommonConfig() { + return new TestKeeperCommonConfig(); + } @Bean public KeeperContainerConfig getKeeperContainerConfig(){ diff --git a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/spring/TestWithoutZkProfile.java b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/spring/TestWithoutZkProfile.java index 2148770926..fc2fb5b02c 100644 --- a/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/spring/TestWithoutZkProfile.java +++ b/redis/redis-keeper/src/test/java/com/ctrip/xpipe/redis/keeper/spring/TestWithoutZkProfile.java @@ -43,7 +43,12 @@ public ZkClient getZkClient(KeeperConfig keeperConfig){ public KeeperConfig getKeeperConfig(){ return new TestKeeperConfig(1024, 2, 1024, 2000); } - + + @Bean + public KeeperCommonConfig getKeeperCommonConfig() { + return new TestKeeperCommonConfig(); + } + @Bean public KeeperContainerConfig getKeeperContainerConfig(){ return new TestKeeperContainerConfig(AbstractTest.getUserHome() + "/rsd");