Skip to content

Commit

Permalink
Merge pull request #893 from wangqifan/feature/metacache_long_pull_im…
Browse files Browse the repository at this point in the history
…prove

improve for metacache load
  • Loading branch information
LanternLee authored Oct 10, 2024
2 parents 7811da3 + c2a1ea4 commit af4d3ee
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.ctrip.xpipe.redis.checker.alert.ALERT_TYPE;
import com.ctrip.xpipe.redis.checker.alert.AlertManager;
import com.ctrip.xpipe.redis.console.AbstractSiteLeaderIntervalAction;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import com.ctrip.xpipe.spring.AbstractProfile;
import com.ctrip.xpipe.utils.VisibleForTesting;
Expand Down Expand Up @@ -39,8 +40,16 @@ protected List<ALERT_TYPE> alertTypes() {
return alertType;
}

private int getMetaCacheUpdateThreshold() {
if(consoleConfig.disableDb()) {
return META_CACHE_UPDATE_THREASHOLD * 2;
} else {
return META_CACHE_UPDATE_THREASHOLD;
}
}

private boolean isMetaCacheOverDue() {
return System.currentTimeMillis() - metaCache.getLastUpdateTime() > META_CACHE_UPDATE_THREASHOLD;
return System.currentTimeMillis() - metaCache.getLastUpdateTime() > getMetaCacheUpdateThreshold();
}

@VisibleForTesting
Expand All @@ -54,4 +63,10 @@ protected MetaCacheCheck setAlertManager(AlertManager alertManager) {
this.alertManager = alertManager;
return this;
}

@VisibleForTesting
protected MetaCacheCheck setConfig(ConsoleConfig consoleConfig) {
this.consoleConfig = consoleConfig;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public void go() {
try {
XpipeMeta xpipeMeta = consolePortalService.getXpipeAllMeta(getVersion());
checkMeta(xpipeMeta, config.maxRemovedDcsCnt(), config.maxRemovedClustersPercent());
refreshMetaParts();
refreshMetaParts(xpipeMeta);
refreshMeta(xpipeMeta);
} catch (Throwable th) {
logger.error("[MetaCacheApi][load]", th);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@
import com.ctrip.xpipe.redis.console.service.KeeperContainerService;
import com.ctrip.xpipe.redis.console.service.RedisCheckRuleService;
import com.ctrip.xpipe.redis.console.service.meta.DcMetaService;
import com.ctrip.xpipe.redis.core.entity.DcMeta;
import com.ctrip.xpipe.redis.core.entity.RedisCheckRuleMeta;
import com.ctrip.xpipe.redis.core.entity.RouteMeta;
import com.ctrip.xpipe.redis.core.entity.XpipeMeta;
import com.ctrip.xpipe.redis.core.entity.*;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import com.ctrip.xpipe.redis.core.meta.XpipeMetaManager;
import com.ctrip.xpipe.redis.core.route.RouteChooseStrategy;
Expand Down Expand Up @@ -180,8 +177,8 @@ public void go() throws Exception {
}

synchronized (this) {
refreshMetaParts();
XpipeMeta xpipeMeta = createXpipeMeta(dcMetas, redisCheckRuleMetas);
refreshMetaParts(xpipeMeta);
refreshMeta(xpipeMeta);
}
}
Expand All @@ -193,17 +190,44 @@ public Map getData() {
});
}

protected void refreshMetaParts() {
protected List<Set<String>> divideClusters(int partsCnt, XpipeMeta xpipeMeta) {

List<Set<String>> parts = new ArrayList<>(partsCnt);
IntStream.range(0, partsCnt).forEach(i -> parts.add(new HashSet<>()));

for(DcMeta dcMeta : xpipeMeta.getDcs().values()) {
for(ClusterMeta clusterMeta : dcMeta.getClusters().values()) {
parts.get((int) (clusterMeta.getDbId() % partsCnt))
.add(clusterMeta.getId());
}
}
return parts;
}

List<Set<Long>> divideKeeperContainers(int partsCount, XpipeMeta xpipeMeta) {
List<Set<Long>> result = new ArrayList<>(partsCount);
IntStream.range(0, partsCount).forEach(i -> result.add(new HashSet<>()));
for(DcMeta dcMeta : xpipeMeta.getDcs().values()) {
for(KeeperContainerMeta keeperContainerMeta : dcMeta.getKeeperContainers()) {
result.get((int) (keeperContainerMeta.getId() % partsCount)).add(
keeperContainerMeta.getId()
);
}
}
return result;
}

protected void refreshMetaParts(XpipeMeta xpipeMeta) {
try {
int parts = Math.max(1, consoleConfig.getClusterDividedParts());
logger.debug("[refreshClusterParts] start parts {}", parts);

List<Set<String>> newClusterParts = clusterService.divideClusters(parts);
List<Set<String>> newClusterParts = divideClusters(parts, xpipeMeta);
if (newClusterParts.size() < parts) {
logger.info("[refreshClusterParts] skip for parts miss, expect {}, actual {}", parts, newClusterParts.size());
return;
}
List<Set<Long>> newKeeperContainerParts = keeperContainerService.divideKeeperContainers(parts);
List<Set<Long>> newKeeperContainerParts = divideKeeperContainers(parts, xpipeMeta);
if (newKeeperContainerParts.size() < parts) {
logger.info("[refreshKeeperContainerParts] skip for parts miss, expect {}, actual {}",
parts, newKeeperContainerParts.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,6 @@ public List<Set<Long>> divideKeeperContainers(int partsCount) {
return result;
}


@Override
public List<KeeperContainerInfoModel> findAllInfos() {
List<KeepercontainerTbl> baseInfos = findContainerBaseInfos();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.ctrip.xpipe.redis.console.healthcheck.nonredis.metacache;

import com.ctrip.xpipe.redis.checker.alert.AlertManager;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.redis.core.meta.MetaCache;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -17,6 +18,9 @@ public class MetaCacheCheckTest {
@Mock
private AlertManager alertManager;

@Mock
private ConsoleConfig consoleConfig;

@Mock
private MetaCache metaCache;

Expand All @@ -26,6 +30,7 @@ public void beforeMetaCacheCheckTest() {
metaCacheCheck = new MetaCacheCheck();
metaCacheCheck.setAlertManager(alertManager);
metaCacheCheck.setMetaCache(metaCache);
metaCacheCheck.setConfig(consoleConfig);
}

@Test
Expand All @@ -42,5 +47,9 @@ public void doCheckWithAlert() {
when(metaCache.getLastUpdateTime()).thenReturn(System.currentTimeMillis() - 15 * 1000);
metaCacheCheck.doAction();
verify(alertManager, times(1)).alert(any(), any(), any(), any(), any());

when(consoleConfig.disableDb()).thenReturn(true);
metaCacheCheck.doAction();
verify(alertManager, never()).alert(any(), any(), any());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.mockito.MockitoAnnotations;
import org.unidal.tuple.Triple;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -143,6 +144,36 @@ public void testGetAllKeepers() {
new HostPort("127.0.0.2", 6101)), allKeepers);
}

@Test
public void testDivideKeeperAndCluster() {
when(consoleConfig.getClusterDividedParts()).thenReturn(3);

metaCache.refreshMetaParts(metaCache.getXpipeMeta());

Set<HostPort> allKeepers = new HashSet<>();
Set<Long> allCluster = new HashSet<>();
XpipeMeta meta = metaCache.getDividedXpipeMeta(0);

for(DcMeta dcMeta : meta.getDcs().values()) {
for(KeeperContainerMeta keeperContainerMeta : dcMeta.getKeeperContainers()) {
allKeepers.add(new HostPort(keeperContainerMeta.getIp(), keeperContainerMeta.getPort()));
}
for(ClusterMeta clusterMeta : dcMeta.getClusters().values()) {
allCluster.add(clusterMeta.getDbId());
}
}

Assert.assertEquals(1, allKeepers.size());
Assert.assertEquals(Sets.newHashSet(new HostPort("1.1.1.3", 8080)), allKeepers);

Assert.assertEquals(1, allCluster.size());
Assert.assertEquals(3, allCluster.stream().findFirst().get().intValue());


}



@Test
public void testGetAllKeeperContainersDcMap() {
Map<String, String> allKeeperContainersDcMap = metaCache.getAllKeeperContainersDcMap();
Expand Down

0 comments on commit af4d3ee

Please sign in to comment.