Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/keeper fsync sleep #921

Merged
merged 11 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public void close() throws IOException {
tryCloseChannel();
}

protected ReferenceFileRegion readTilEnd(int maxBytes) throws IOException {
public ReferenceFileRegion read(int maxBytes) throws IOException {

while(true){

Expand All @@ -70,7 +70,7 @@ protected ReferenceFileRegion readTilEnd(int maxBytes) throws IOException {
}

public ReferenceFileRegion readTilEnd() throws IOException {
return readTilEnd(-1);
return read(-1);
}

private void increase() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ protected void doRun() throws Exception {
while (true) {

count++;
ReferenceFileRegion referenceFileRegion = referenceFileChannel.readTilEnd(1);
ReferenceFileRegion referenceFileRegion = referenceFileChannel.read(1);
fileRegions.offer(referenceFileRegion);
if(count > totalFileLen){
logger.info("{}", referenceFileRegion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ client-output-buffer-limit slave 0 0 0
repl-diskless-sync yes
repl-diskless-sync-delay 3
#protected-mode no
maxmemory-policy volatile-lru
maxmemory-policy allkeys-lru
protected-mode no

hz 1
21 changes: 7 additions & 14 deletions redis/package/redis-keeper-package/src/main/scripts/startup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -163,22 +163,15 @@ else
IDC=`getIdc`
total=`getTotalMem`
if ([ $IDC = "PTJQ" ] || [ $IDC = "PTOY" ]) && ([ "$total" -gt 60 ]);then
#MB
USED_MEM=30720
XMN=11520
MAX_DIRECT=5120
elif [ $IDC = "UAT-AWS" ]; then
#MB
USED_MEM=6144
XMN=2400
MAX_DIRECT=300
USED_MEM=30
XMN=12
MAX_DIRECT=5
else
#MB
USED_MEM=1600
XMN=600
MAX_DIRECT=100
USED_MEM=`getSafeXmx`
XMN=`getSafeXmn $USED_MEM`
MAX_DIRECT=1
fi
JAVA_OPTS="$JAVA_OPTS -Xms${USED_MEM}m -Xmx${USED_MEM}m -Xmn${XMN}m -XX:+AlwaysPreTouch -XX:MaxDirectMemorySize=${MAX_DIRECT}m"
JAVA_OPTS="$JAVA_OPTS -Xms${USED_MEM}g -Xmx${USED_MEM}g -Xmn${XMN}g -XX:+AlwaysPreTouch -XX:MaxDirectMemorySize=${MAX_DIRECT}g"
fi
export JAVA_OPTS="$JAVA_OPTS -Dio.netty.maxDirectMemory=0 -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m -XX:+UseParNewGC -XX:MaxTenuringThreshold=2 -XX:+UseConcMarkSweepGC -XX:+UseCMSInitiatingOccupancyOnly -XX:+ScavengeBeforeFullGC -XX:+UseCMSCompactAtFullCollection -XX:+CMSParallelRemarkEnabled -XX:CMSFullGCsBeforeCompaction=9 -XX:CMSInitiatingOccupancyFraction=60 -XX:-CMSClassUnloadingEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:-ReduceInitialCardMarks -XX:+CMSPermGenSweepingEnabled -XX:CMSInitiatingPermOccupancyFraction=70 -XX:+ExplicitGCInvokesConcurrent -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationConcurrentTime -XX:+PrintHeapAtGC -XX:+HeapDumpOnOutOfMemoryError -XX:-OmitStackTraceInFastThrow -Duser.timezone=Asia/Shanghai -Dclient.encoding.override=UTF-8 -Dfile.encoding=UTF-8 -Xloggc:$LOG_DIR/heap_trace.txt -XX:HeapDumpPath=$LOG_DIR/HeapDumpOnOutOfMemoryError/ -Dcom.sun.management.jmxremote.port=$JMX_PORT -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=${IP} -XX:+UnlockCommercialFeatures -XX:+FlightRecorder -Djava.security.egd=file:/dev/./urandom"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected boolean hasResponse() {

public enum ReplConfType {

LISTENING_PORT("listening-port"), CAPA("capa"), CRDT("crdt"), ACK("ack"), KEEPER("keeper");
LISTENING_PORT("listening-port"), CAPA("capa"), CRDT("crdt"), ACK("ack"), KEEPER("keeper"), IDC("idc");

private String command;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.ctrip.xpipe.netty.filechannel.ReferenceFileRegion;
import com.ctrip.xpipe.redis.core.redis.operation.RedisOp;
import com.ctrip.xpipe.redis.core.store.ratelimit.ReplDelayConfig;
import com.ctrip.xpipe.utils.OffsetNotifier;
import org.slf4j.Logger;

Expand All @@ -16,7 +17,7 @@ public interface CommandReaderWriterFactory {
CommandWriter createCmdWriter(CommandStore cmdStore, int maxFileSize, Logger delayTraceLogger) throws IOException;

CommandReader<ReferenceFileRegion> createCmdReader(OffsetReplicationProgress replProgress, CommandStore cmdStore,
OffsetNotifier offsetNotifier, long commandReaderFlyingThreshold) throws IOException;
OffsetNotifier offsetNotifier, ReplDelayConfig replDelayConfig, long commandReaderFlyingThreshold) throws IOException;

CommandReader<RedisOp> createCmdReader(GtidSetReplicationProgress replProgress, CommandStore cmdStore,
OffsetNotifier offsetNotifier, long commandReaderFlyingThreshold) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package com.ctrip.xpipe.redis.core.store;

import com.ctrip.xpipe.redis.core.store.ratelimit.ReplDelayConfig;
import io.netty.channel.ChannelFuture;

/**
* @author wenchao.meng
*
* 2016年4月19日 下午4:59:45
*/
public interface CommandsListener {
public interface CommandsListener extends ReplDelayConfig {

boolean isOpen();

Expand All @@ -17,4 +18,5 @@ public interface CommandsListener {
void beforeCommand();

Long processedOffset();

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.ctrip.xpipe.netty.filechannel.ReferenceFileRegion;
import com.ctrip.xpipe.redis.core.protocal.protocal.EofType;
import com.ctrip.xpipe.redis.core.store.ratelimit.ReplDelayConfig;

import java.io.IOException;

Expand All @@ -10,7 +11,7 @@
*
* 2016年5月9日 下午5:28:47
*/
public interface RdbFileListener {
public interface RdbFileListener extends ReplDelayConfig {

void setRdbFileInfo(EofType eofType, ReplicationProgress<?> rdbProgress);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.ctrip.xpipe.redis.core.store.ratelimit;

public interface ReplDelayConfig {

default long getDelayMilli() {
return 0;
}

default int getLimitBytesPerSecond() {
return -1;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
import com.ctrip.xpipe.redis.core.redis.operation.RedisOpParserManager;
import com.ctrip.xpipe.redis.core.redis.operation.parser.*;
import com.ctrip.xpipe.redis.keeper.RedisKeeperServer;
import com.ctrip.xpipe.redis.keeper.config.DefaultKeeperConfig;
import com.ctrip.xpipe.redis.keeper.config.DefaultKeeperResourceManager;
import com.ctrip.xpipe.redis.keeper.config.KeeperConfig;
import com.ctrip.xpipe.redis.keeper.config.KeeperResourceManager;
import com.ctrip.xpipe.redis.keeper.config.*;
import com.ctrip.xpipe.redis.keeper.impl.DefaultRedisKeeperServer;
import com.ctrip.xpipe.redis.keeper.monitor.KeepersMonitorManager;
import com.ctrip.xpipe.redis.keeper.monitor.impl.NoneKeepersMonitorManager;
Expand Down Expand Up @@ -191,7 +188,7 @@ protected RedisKeeperServer createGtidRedisKeeperServer(KeeperMeta keeperMeta, F

Long replId = keeperMeta.parent().getDbId();
return new DefaultRedisKeeperServer(replId, keeperMeta, keeperConfig, baseDir,
leaderElectorManager, keeperMonitorManager, resourceManager, syncRateManager, redisOpParser);
leaderElectorManager, keeperMonitorManager, resourceManager, syncRateManager, redisOpParser, new ReplDelayConfigCache());
}

protected RedisKeeperServer createRedisKeeperServer(KeeperMeta keeperMeta, File baseDir, KeeperConfig keeperConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.ctrip.xpipe.api.lifecycle.Releasable;
import com.ctrip.xpipe.api.observer.Observable;
import com.ctrip.xpipe.redis.core.protocal.CAPA;
import com.ctrip.xpipe.redis.core.store.ratelimit.ReplDelayConfig;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;

Expand All @@ -15,7 +16,7 @@
*
* 2016年4月22日 上午11:25:07
*/
public interface RedisClient<T extends RedisServer> extends Observable, Infoable, Closeable, RedisRole, Releasable, Keeperable{
public interface RedisClient<T extends RedisServer> extends Observable, Infoable, Closeable, RedisRole, ReplDelayConfig, Releasable, Keeperable{

public static enum CLIENT_ROLE{
NORMAL,
Expand All @@ -33,6 +34,10 @@ public static enum CLIENT_ROLE{

int getSlaveListeningPort();

void setIdc(String idc);

String getIdc();

void setClientIpAddress(String host);

String getClientIpAddress();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.ctrip.xpipe.redis.keeper.config;

import com.ctrip.xpipe.api.codec.Codec;
import com.ctrip.xpipe.api.codec.GenericTypeReference;
import com.ctrip.xpipe.api.config.Config;
import com.ctrip.xpipe.api.config.ConfigProvider;
import com.ctrip.xpipe.config.CompositeConfig;
import com.ctrip.xpipe.config.DefaultFileConfig;
import com.ctrip.xpipe.config.DefaultPropertyConfig;
import com.ctrip.xpipe.redis.core.config.AbstractCoreConfig;

import java.util.List;
import java.util.Map;

public class DefaultKeeperCommonConfig extends AbstractCoreConfig implements KeeperCommonConfig {

private static String KEY_KEEPER_REPL_DELAY_CONFIG = "keeper.repl.delay.config";
private static String KEY_REDIS_REPL_DELAY_CONFIG = "redis.repl.delay.config";

private final GenericTypeReference<List<KeeperReplDelayConfig>> keeperReplDelayConfigListType = new GenericTypeReference<List<KeeperReplDelayConfig>>() {};
private final GenericTypeReference<Map<String, RedisReplDelayConfig>> redisReplDelayConfigMapType = new GenericTypeReference<Map<String, RedisReplDelayConfig>>() {};

public DefaultKeeperCommonConfig(){
CompositeConfig compositeConfig = new CompositeConfig();
compositeConfig.addConfig(ConfigProvider.DEFAULT.getOrCreateConfig(ConfigProvider.COMMON_CONFIG));
compositeConfig.addConfig(new DefaultPropertyConfig());
setConfig(compositeConfig);
}

@Override
public List<KeeperReplDelayConfig> getKeeperReplDelayConfigs() {
String replDelayConfigInfo = getProperty(KEY_KEEPER_REPL_DELAY_CONFIG, "[]");
return Codec.DEFAULT.decode(replDelayConfigInfo, keeperReplDelayConfigListType);
}

@Override
public Map<String, RedisReplDelayConfig> getRedisReplDelayConfigs() {
String replDelayConfigInfo = getProperty(KEY_REDIS_REPL_DELAY_CONFIG, "{}");
return Codec.DEFAULT.decode(replDelayConfigInfo, redisReplDelayConfigMapType);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.ctrip.xpipe.redis.keeper.config;

import java.util.List;
import java.util.Map;

public interface KeeperCommonConfig {

List<KeeperReplDelayConfig> getKeeperReplDelayConfigs();

Map<String, RedisReplDelayConfig> getRedisReplDelayConfigs();

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import com.ctrip.xpipe.redis.core.config.CoreConfig;

import java.util.List;
import java.util.Map;

/**
* @author marsqing
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.ctrip.xpipe.redis.keeper.config;

public class KeeperReplDelayConfig {

private String srcDc;

private String destDc;

private long delayMilli;

public String getSrcDc() {
return srcDc;
}

public void setSrcDc(String srcDc) {
this.srcDc = srcDc;
}

public String getDestDc() {
return destDc;
}

public void setDestDc(String destDc) {
this.destDc = destDc;
}

public long getDelayMilli() {
return delayMilli;
}

public void setDelayMilli(long delayMilli) {
this.delayMilli = delayMilli;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.ctrip.xpipe.redis.keeper.config;

public class RedisReplDelayConfig {

private int bytesLimitPerSecond = -1;

public int getBytesLimitPerSecond() {
return bytesLimitPerSecond;
}

public void setBytesLimitPerSecond(int bytesLimitPerSecond) {
this.bytesLimitPerSecond = bytesLimitPerSecond;
}
}
Loading
Loading