Skip to content

Commit

Permalink
beacon checker
Browse files Browse the repository at this point in the history
  • Loading branch information
qifanwang committed Nov 21, 2024
1 parent 97e9741 commit e7bc1c1
Show file tree
Hide file tree
Showing 20 changed files with 628 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.ctrip.xpipe.api.migration.auto.data.MonitorGroupMeta;

import java.util.Map;
import java.util.Set;

/**
Expand All @@ -27,6 +28,12 @@ public interface MonitorService {

void registerCluster(String system, String clusterName, Set<MonitorGroupMeta> groups);

void updateCluster(String system, String clusterName, Set<MonitorGroupMeta> groups);

void unregisterCluster(String system, String clusterName);

int getBeaconClusterHash(String system, String clusterName);

Map<String,Set<String>> getAllClusterWithDc(String system);

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,13 @@ public void setExtra(Map<String, String> extra) {
this.extra = extra;
}

@Override
public int hashCode() {
int hash = 0;
for (MonitorGroupMeta meta : nodeGroups) {
hash ^= meta.hashCode();
}
return hash;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,13 @@ public boolean equals(Object o) {
MonitorGroupMeta that = (MonitorGroupMeta) o;
return Objects.equals(name, that.name) &&
Objects.equals(idc, that.idc) &&
Objects.equals(nodes, that.nodes);
Objects.equals(nodes, that.nodes) &&
Objects.equals(masterGroup, that.masterGroup);
}

@Override
public int hashCode() {
return Objects.hash(name, idc, nodes);
return Objects.hash(name, idc, nodes, masterGroup);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.ctrip.xpipe.api.migration.auto.data.MonitorGroupMeta;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

/**
Expand Down Expand Up @@ -57,9 +58,24 @@ public void registerCluster(String system, String clusterName, Set<MonitorGroupM
// do nothing
}

@Override
public void updateCluster(String system, String clusterName, Set<MonitorGroupMeta> groups) {

}

@Override
public void unregisterCluster(String system, String clusterName) {
// do nothing
}

@Override
public int getBeaconClusterHash(String system, String clusterName) {
return 0;
}

@Override
public Map<String, Set<String>> getAllClusterWithDc(String system) {
return Collections.emptyMap();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.ctrip.xpipe.api.migration;

import com.ctrip.xpipe.api.migration.auto.data.MonitorClusterMeta;
import com.ctrip.xpipe.api.migration.auto.data.MonitorGroupMeta;
import com.ctrip.xpipe.endpoint.HostPort;
import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;

import java.util.Set;

public class MonitorClusterMetaTest {

@Test
public void testMonitorClusterMeta(){
Set<MonitorGroupMeta> group1 = getMonitorGroupMeta1();
MonitorClusterMeta monitorClusterMeta1 = new MonitorClusterMeta(group1);

Set<MonitorGroupMeta> group2 = getMonitorGroupMeta2();
MonitorClusterMeta monitorClusterMeta2 = new MonitorClusterMeta(group2);

Set<MonitorGroupMeta> group3 = getMonitorGroupMeta3();
MonitorClusterMeta monitorClusterMeta3 = new MonitorClusterMeta(group3);

Set<MonitorGroupMeta> group4 = getMonitorGroupMeta4();
MonitorClusterMeta monitorClusterMeta4 = new MonitorClusterMeta(group4);

Set<MonitorGroupMeta> group5 = getMonitorGroupMeta5();
MonitorClusterMeta monitorClusterMeta5 = new MonitorClusterMeta(group5);

Assert.assertEquals(monitorClusterMeta1.hashCode(), monitorClusterMeta2.hashCode());
Assert.assertNotEquals(monitorClusterMeta1.hashCode(), monitorClusterMeta3.hashCode());
Assert.assertNotEquals(monitorClusterMeta1.hashCode(), monitorClusterMeta4.hashCode());
Assert.assertNotEquals(monitorClusterMeta4.hashCode(), monitorClusterMeta5.hashCode());
System.out.println(monitorClusterMeta1.hashCode());
System.out.println(monitorClusterMeta2.hashCode());
System.out.println(monitorClusterMeta3.hashCode());
System.out.println(monitorClusterMeta4.hashCode());
}


private Set<MonitorGroupMeta> getMonitorGroupMeta1() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6380")), true),
new MonitorGroupMeta("shard2+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6382")), true),
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), false),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), false)
);
}

private Set<MonitorGroupMeta> getMonitorGroupMeta2() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), false),
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6380")), true),
new MonitorGroupMeta("shard2+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6382")), true),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), false)
);
}

private Set<MonitorGroupMeta> getMonitorGroupMeta3() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6380")), false),
new MonitorGroupMeta("shard2+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6382")), false),
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), true),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), true)
);
}

private Set<MonitorGroupMeta> getMonitorGroupMeta4() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6380")), true),
new MonitorGroupMeta("shard2+jq", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6382")), true),
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), false),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), false)
);
}

private Set<MonitorGroupMeta> getMonitorGroupMeta5() {
return Sets.newHashSet(
new MonitorGroupMeta("shard1+jq", "jq", Sets.newHashSet(HostPort.fromString("127.0.0.1:6379"), HostPort.fromString("127.0.0.1:6382")), true),
new MonitorGroupMeta("shard2+jq", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6381"), HostPort.fromString("127.0.0.1:6380")), true),
new MonitorGroupMeta("shard1+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6383"), HostPort.fromString("127.0.0.1:6384")), false),
new MonitorGroupMeta("shard2+oy", "oy", Sets.newHashSet(HostPort.fromString("127.0.0.1:6385"), HostPort.fromString("127.0.0.1:6386")), false)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,8 @@ public interface BeaconManager {

void registerCluster(String clusterId, ClusterType clusterType, int orgId);

void updateCluster(String clusterId, ClusterType clusterType, int orgId);

boolean checkClusterHash(String clusterId, ClusterType clusterType, int orgId);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package com.ctrip.xpipe.redis.checker.healthcheck.clusteractions.beacon;

import com.ctrip.xpipe.cluster.ClusterType;
import com.ctrip.xpipe.metric.MetricData;
import com.ctrip.xpipe.metric.MetricProxy;
import com.ctrip.xpipe.metric.MetricProxyException;
import com.ctrip.xpipe.redis.checker.BeaconManager;
import com.ctrip.xpipe.redis.checker.healthcheck.ClusterHealthCheckInstance;
import com.ctrip.xpipe.redis.checker.healthcheck.ClusterInstanceInfo;
import com.ctrip.xpipe.redis.checker.healthcheck.leader.AbstractLeaderAwareHealthCheckAction;
import com.ctrip.xpipe.utils.ServicesUtil;
import org.slf4j.Logger;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

public class BeaconConsistencyCheckAction extends AbstractLeaderAwareHealthCheckAction<ClusterHealthCheckInstance> {

private BeaconManager beaconManager;

private MetricProxy metricProxy = ServicesUtil.getMetricProxy();

private static long lastSendTime = System.currentTimeMillis();

public BeaconConsistencyCheckAction(ScheduledExecutorService scheduled, ClusterHealthCheckInstance instance, ExecutorService executors, BeaconManager beaconManager) {
super(scheduled, instance, executors);
this.beaconManager = beaconManager;
}

@Override
protected void doTask() {
ClusterInstanceInfo info = instance.getCheckInfo();
String clusterId = info.getClusterId();
int orgId = info.getOrgId();
checkConsistency(clusterId, info.getClusterType(), orgId);
}

@Override
protected Logger getHealthCheckLogger() {
return logger;
}

@Override
protected int getBaseCheckInterval() {
return getActionInstance().getHealthCheckConfig().clusterCheckIntervalMilli();
}

private void checkConsistency(String clusterId, ClusterType clusterType, int orgId) {
Boolean consistency = null;
try {
consistency = beaconManager.checkClusterHash(clusterId, clusterType, orgId);
} catch (Throwable t) {
// cluster not found in beacon
logger.error("[checkConsistency]{}:{}:{}", clusterType, orgId, t.getMessage());
}
handleCheckResult(consistency, clusterId, clusterType, orgId);
}

private void handleCheckResult(Boolean consistency, String clusterId, ClusterType clusterType, int orgId) {
try {
String type = "CONSISTENCY";
if(consistency == null) {
beaconManager.registerCluster(clusterId, clusterType, orgId);
consistency = Boolean.FALSE;
type = "ERROR";
} else if(consistency != null && consistency.booleanValue() == false) {
beaconManager.updateCluster(clusterId, clusterType, orgId);
type = "INCONSISTENCY";
}
sendMetricData(clusterId, consistency, type);
} catch (Throwable t) {
logger.error("[checkPost]{}:{}:{}", clusterType, orgId, t.getMessage());
}
}



private MetricData getMetricData(String clusterId, String consistency, long time) {
MetricData metricData = new MetricData("beacon.checker", null, clusterId, null);
metricData.setTimestampMilli(time);
metricData.setValue(1);
metricData.addTag("consistency", consistency);
return metricData;
}

private void sendMetricData(String trackCluster, boolean consistency, String type) {
long sendTime = System.currentTimeMillis();
if(consistency && sendTime - lastSendTime < getBaseCheckInterval()) {
return;
}
try {
MetricData metricData = getMetricData(trackCluster, type, sendTime);
metricProxy.writeBinMultiDataPoint(metricData);
lastSendTime = sendTime;
} catch (MetricProxyException e) {
logger.error("[sendMetricData]", e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.ctrip.xpipe.redis.checker.healthcheck.clusteractions.beacon;

import com.ctrip.xpipe.redis.checker.BeaconManager;
import com.ctrip.xpipe.redis.checker.alert.ALERT_TYPE;
import com.ctrip.xpipe.redis.checker.healthcheck.BiDirectionSupport;
import com.ctrip.xpipe.redis.checker.healthcheck.ClusterHealthCheckInstance;
import com.ctrip.xpipe.redis.checker.healthcheck.OneWaySupport;
import com.ctrip.xpipe.redis.checker.healthcheck.leader.AbstractClusterLeaderAwareHealthCheckActionFactory;
import com.ctrip.xpipe.redis.checker.healthcheck.leader.SiteLeaderAwareHealthCheckAction;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.List;

@Component
public class BeaconConsistencyCheckActionFactory extends AbstractClusterLeaderAwareHealthCheckActionFactory implements OneWaySupport, BiDirectionSupport {

@Autowired
private BeaconManager beaconManager;

@Autowired
private List<BeaconMetaController> controllers;

@Override
public SiteLeaderAwareHealthCheckAction create(ClusterHealthCheckInstance instance) {
BeaconConsistencyCheckAction action = new BeaconConsistencyCheckAction(scheduled, instance, executors, beaconManager);
action.addControllers(controllers);
return action;
}

@Override
public Class<? extends SiteLeaderAwareHealthCheckAction> support() {
return BeaconConsistencyCheckAction.class;
}

@Override
protected List<ALERT_TYPE> alertTypes() {
return Collections.emptyList();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,14 @@ public class TestBeaconManager implements BeaconManager {
public void registerCluster(String clusterId, ClusterType clusterType, int orgId) {

}

@Override
public void updateCluster(String clusterId, ClusterType clusterType, int orgId) {

}

@Override
public boolean checkClusterHash(String clusterId, ClusterType clusterType, int orgId) {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public abstract class AbstractIntervalAction {

@PostConstruct
public void postConstruct(){
logger.info("[postConstruct]{}", this);
logger.info("[postConstruct] {}", this);

for(ALERT_TYPE type : alertTypes()) {
alertPolicyManager.markCheckInterval(type, this::getIntervalMilli);
Expand Down
Loading

0 comments on commit e7bc1c1

Please sign in to comment.