Skip to content

Commit

Permalink
keeper replication delay
Browse files Browse the repository at this point in the history
  • Loading branch information
sl_li committed Dec 9, 2024
1 parent 841d61f commit a195c95
Show file tree
Hide file tree
Showing 28 changed files with 382 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ protected boolean hasResponse() {

public enum ReplConfType {

LISTENING_PORT("listening-port"), CAPA("capa"), CRDT("crdt"), ACK("ack"), KEEPER("keeper");
LISTENING_PORT("listening-port"), CAPA("capa"), CRDT("crdt"), ACK("ack"), KEEPER("keeper"), IDC("idc");

private String command;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.ctrip.xpipe.netty.filechannel.ReferenceFileRegion;
import com.ctrip.xpipe.redis.core.redis.operation.RedisOp;
import com.ctrip.xpipe.redis.core.store.ratelimit.ReplDelayConfig;
import com.ctrip.xpipe.utils.OffsetNotifier;
import org.slf4j.Logger;

Expand All @@ -16,7 +17,7 @@ public interface CommandReaderWriterFactory {
CommandWriter createCmdWriter(CommandStore cmdStore, int maxFileSize, Logger delayTraceLogger) throws IOException;

CommandReader<ReferenceFileRegion> createCmdReader(OffsetReplicationProgress replProgress, CommandStore cmdStore,
OffsetNotifier offsetNotifier, long commandReaderFlyingThreshold) throws IOException;
OffsetNotifier offsetNotifier, ReplDelayConfig replDelayConfig, long commandReaderFlyingThreshold) throws IOException;

CommandReader<RedisOp> createCmdReader(GtidSetReplicationProgress replProgress, CommandStore cmdStore,
OffsetNotifier offsetNotifier, long commandReaderFlyingThreshold) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.ctrip.xpipe.redis.core.store;

import com.ctrip.xpipe.redis.core.store.ratelimit.ReplDelayConfig;
import io.netty.channel.ChannelFuture;

/**
Expand All @@ -17,4 +18,9 @@ public interface CommandsListener {
void beforeCommand();

Long processedOffset();

default ReplDelayConfig getReplDelayConfig() {
return null;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.ctrip.xpipe.redis.core.store.ratelimit;

public interface ReplDelayConfig {

long getDelayMilli();

long getDelayBytes();

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
import com.ctrip.xpipe.redis.core.redis.operation.RedisOpParserManager;
import com.ctrip.xpipe.redis.core.redis.operation.parser.*;
import com.ctrip.xpipe.redis.keeper.RedisKeeperServer;
import com.ctrip.xpipe.redis.keeper.config.DefaultKeeperConfig;
import com.ctrip.xpipe.redis.keeper.config.DefaultKeeperResourceManager;
import com.ctrip.xpipe.redis.keeper.config.KeeperConfig;
import com.ctrip.xpipe.redis.keeper.config.KeeperResourceManager;
import com.ctrip.xpipe.redis.keeper.config.*;
import com.ctrip.xpipe.redis.keeper.impl.DefaultRedisKeeperServer;
import com.ctrip.xpipe.redis.keeper.monitor.KeepersMonitorManager;
import com.ctrip.xpipe.redis.keeper.monitor.impl.NoneKeepersMonitorManager;
Expand Down Expand Up @@ -191,7 +188,7 @@ protected RedisKeeperServer createGtidRedisKeeperServer(KeeperMeta keeperMeta, F

Long replId = keeperMeta.parent().getDbId();
return new DefaultRedisKeeperServer(replId, keeperMeta, keeperConfig, baseDir,
leaderElectorManager, keeperMonitorManager, resourceManager, syncRateManager, redisOpParser);
leaderElectorManager, keeperMonitorManager, resourceManager, syncRateManager, redisOpParser, new ReplDelayConfigCache());
}

protected RedisKeeperServer createRedisKeeperServer(KeeperMeta keeperMeta, File baseDir, KeeperConfig keeperConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.ctrip.xpipe.api.lifecycle.Releasable;
import com.ctrip.xpipe.api.observer.Observable;
import com.ctrip.xpipe.redis.core.protocal.CAPA;
import com.ctrip.xpipe.redis.core.store.ratelimit.ReplDelayConfig;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;

Expand All @@ -15,7 +16,7 @@
*
* 2016年4月22日 上午11:25:07
*/
public interface RedisClient<T extends RedisServer> extends Observable, Infoable, Closeable, RedisRole, Releasable, Keeperable{
public interface RedisClient<T extends RedisServer> extends Observable, Infoable, Closeable, RedisRole, ReplDelayConfig, Releasable, Keeperable{

public static enum CLIENT_ROLE{
NORMAL,
Expand All @@ -33,6 +34,10 @@ public static enum CLIENT_ROLE{

int getSlaveListeningPort();

void setIdc(String idc);

String getIdc();

void setClientIpAddress(String host);

String getClientIpAddress();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.ctrip.xpipe.redis.keeper.config;

import com.ctrip.xpipe.api.codec.Codec;
import com.ctrip.xpipe.api.codec.GenericTypeReference;
import com.ctrip.xpipe.api.config.Config;
import com.ctrip.xpipe.api.config.ConfigProvider;
import com.ctrip.xpipe.config.CompositeConfig;
import com.ctrip.xpipe.config.DefaultFileConfig;
import com.ctrip.xpipe.config.DefaultPropertyConfig;
import com.ctrip.xpipe.redis.core.config.AbstractCoreConfig;

import java.util.List;

public class DefaultKeeperCommonConfig extends AbstractCoreConfig implements KeeperCommonConfig {

private static String KEY_REPL_DELAY_CONFIG = "keeper.repl.delay.config";

private final GenericTypeReference<List<KeeperReplDelayConfig>> replDelayConfigListType = new GenericTypeReference<List<KeeperReplDelayConfig>>() {};

public DefaultKeeperCommonConfig(){
CompositeConfig compositeConfig = new CompositeConfig();
compositeConfig.addConfig(ConfigProvider.DEFAULT.getOrCreateConfig(ConfigProvider.COMMON_CONFIG));
compositeConfig.addConfig(new DefaultPropertyConfig());
setConfig(compositeConfig);
}

@Override
public List<KeeperReplDelayConfig> getReplDelayConfigs() {
String replDelayConfigInfo = getProperty(KEY_REPL_DELAY_CONFIG, "[]");
return Codec.DEFAULT.decode(replDelayConfigInfo, replDelayConfigListType);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.ctrip.xpipe.redis.keeper.config;

import java.util.List;

public interface KeeperCommonConfig {

List<KeeperReplDelayConfig> getReplDelayConfigs();

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import com.ctrip.xpipe.redis.core.config.CoreConfig;

import java.util.List;
import java.util.Map;

/**
* @author marsqing
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.ctrip.xpipe.redis.keeper.config;

public class KeeperReplDelayConfig {

private String srcDc;

private String destDc;

private long delayMilli;

private long delayBytes;

public String getSrcDc() {
return srcDc;
}

public void setSrcDc(String srcDc) {
this.srcDc = srcDc;
}

public String getDestDc() {
return destDc;
}

public void setDestDc(String destDc) {
this.destDc = destDc;
}

public long getDelayMilli() {
return delayMilli;
}

public void setDelayMilli(long delayMilli) {
this.delayMilli = delayMilli;
}

public long getDelayBytes() {
return delayBytes;
}

public void setDelayBytes(long delayBytes) {
this.delayBytes = delayBytes;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package com.ctrip.xpipe.redis.keeper.config;

import com.ctrip.xpipe.api.foundation.FoundationService;
import com.ctrip.xpipe.api.lifecycle.TopElement;
import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask;
import com.ctrip.xpipe.lifecycle.AbstractLifecycle;
import com.ctrip.xpipe.utils.XpipeThreadFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

@Component
public class ReplDelayConfigCache extends AbstractLifecycle implements TopElement {

private KeeperCommonConfig keeperConfig;

private ScheduledExecutorService scheduled;

private ScheduledFuture<?> future;

private Map<String, KeeperReplDelayConfig> keeperReplDelayConfigMap;

public ReplDelayConfigCache() {
this(null);
}

@Autowired
public ReplDelayConfigCache(KeeperCommonConfig keeperConfig) {
this.keeperConfig = keeperConfig;
this.keeperReplDelayConfigMap = new HashMap<>();
}

private void refresh() {
logger.debug("[refresh]");
List<KeeperReplDelayConfig> replDelayConfigs = keeperConfig.getReplDelayConfigs();
Map<String, KeeperReplDelayConfig> localReplDelayConfigMap = new HashMap<>();
String currentDc = FoundationService.DEFAULT.getDataCenter();
for (KeeperReplDelayConfig config: replDelayConfigs) {
if (currentDc.equalsIgnoreCase(config.getSrcDc())) {
localReplDelayConfigMap.put(config.getDestDc().toUpperCase(), config);
}
}
this.keeperReplDelayConfigMap = localReplDelayConfigMap;
}

public KeeperReplDelayConfig getReplDelayConfig(String destIdc) {
if (null == destIdc || this.keeperReplDelayConfigMap.isEmpty()) return null;
return this.keeperReplDelayConfigMap.get(destIdc);
}

@Override
protected void doInitialize() throws Exception {
super.doInitialize();
this.scheduled = Executors.newScheduledThreadPool(1, XpipeThreadFactory.create("ReplDelayConfigCache"));
}

@Override
protected void doStart() throws Exception {
super.doStart();
this.future = scheduled.scheduleWithFixedDelay(new AbstractExceptionLogTask() {
@Override
protected void doRun() throws Exception {
ReplDelayConfigCache.this.refresh();
}
}, 5, 10, TimeUnit.SECONDS);
}

@Override
protected void doStop() throws Exception {
super.doStop();
if (null != future) {
this.future.cancel(false);
this.future = null;
}
}

@Override
protected void doDispose() throws Exception {
super.doDispose();
if (null != scheduled) {
this.scheduled.shutdownNow();
this.scheduled = null;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.ctrip.xpipe.redis.keeper.config;

import java.util.Collections;
import java.util.List;

public class TestKeeperCommonConfig implements KeeperCommonConfig {

@Override
public List<KeeperReplDelayConfig> getReplDelayConfigs() {
return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import com.ctrip.xpipe.redis.core.config.AbstractCoreConfig;
import com.ctrip.xpipe.redis.keeper.store.DefaultCommandStore;

import java.util.Collections;
import java.util.List;

/**
* @author wenchao.meng
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.ctrip.xpipe.redis.keeper.config.KeeperConfig;
import com.ctrip.xpipe.redis.keeper.config.KeeperContainerConfig;
import com.ctrip.xpipe.redis.keeper.config.KeeperResourceManager;
import com.ctrip.xpipe.redis.keeper.config.ReplDelayConfigCache;
import com.ctrip.xpipe.redis.keeper.exception.RedisKeeperRuntimeException;
import com.ctrip.xpipe.redis.keeper.health.DiskHealthChecker;
import com.ctrip.xpipe.redis.keeper.health.HealthState;
Expand Down Expand Up @@ -61,6 +62,8 @@ public class KeeperContainerService extends AbstractLifecycle implements TopElem
private DiskHealthChecker diskHealthChecker;
@Autowired
private SyncRateManager syncRateManager;
@Autowired
private ReplDelayConfigCache replDelayConfigCache;

private Map<String, RedisKeeperServer> redisKeeperServers = Maps.newConcurrentMap();

Expand Down Expand Up @@ -110,7 +113,8 @@ public RedisKeeperServer add(KeeperTransMeta keeperTransMeta) {

File baseDir = getReplicationStoreDir(keeperMeta);
RedisKeeperServer redisKeeperServer = new DefaultRedisKeeperServer(keeperTransMeta.getReplId(), keeperMeta,
keeperConfig, baseDir, leaderElectorManager, keepersMonitorManager, resourceManager, syncRateManager, redisOpParser);
keeperConfig, baseDir, leaderElectorManager, keepersMonitorManager, resourceManager, syncRateManager,
redisOpParser, replDelayConfigCache);

try {
register(redisKeeperServer);
Expand Down Expand Up @@ -313,7 +317,8 @@ private RedisKeeperServer createRedisKeeperServer(Long replId, KeeperMeta keeper
File baseDir) throws Exception {

RedisKeeperServer redisKeeperServer = new DefaultRedisKeeperServer(replId, keeper, keeperConfig,
baseDir, leaderElectorManager, keepersMonitorManager, resourceManager, syncRateManager, redisOpParser);
baseDir, leaderElectorManager, keepersMonitorManager, resourceManager, syncRateManager,
redisOpParser, replDelayConfigCache);

register(redisKeeperServer);
return redisKeeperServer;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.ctrip.xpipe.redis.keeper.handler.keeper;

import com.ctrip.xpipe.redis.core.protocal.CAPA;
import com.ctrip.xpipe.redis.core.protocal.cmd.Replconf;
import com.ctrip.xpipe.redis.core.protocal.protocal.RedisErrorParser;
import com.ctrip.xpipe.redis.core.protocal.protocal.SimpleStringParser;
import com.ctrip.xpipe.redis.keeper.RedisClient;
Expand Down Expand Up @@ -49,7 +50,9 @@ protected void doHandle(String[] args, RedisClient<?> redisClient) {
throw new IllegalStateException("[doHandle][getack not supported]" );
}else if("keeper".equalsIgnoreCase(option)){//extends by keeper
redisClient.setKeeper();
}else{
}else if(Replconf.ReplConfType.IDC.name().equalsIgnoreCase(option)) {
redisClient.setIdc(args[1].toUpperCase());
} else{
logger.error("[doHandler][unkonwn command]" + StringUtil.join(" ", args));
redisClient.sendMessage(new RedisErrorParser("unknown replconf command " + StringUtil.join(" ", args)).format());
return;
Expand Down
Loading

0 comments on commit a195c95

Please sign in to comment.