From c6636e2fe369365cbd2b4c6cf5eecd58630e59dd Mon Sep 17 00:00:00 2001 From: hebelala Date: Thu, 16 Nov 2017 17:52:26 +0800 Subject: [PATCH] #290 Extraction and recovery of executor's traffic --- .../job/console/domain/ServerBriefInfo.java | 65 +- .../service/ServerDimensionService.java | 3 + .../job/console/utils/ExecutorNodePath.java | 4 + .../console/controller/ServerController.java | 33 +- .../impl/ServerDimensionServiceImpl.java | 15 + .../src/main/resources/static/js/overview.js | 53 +- .../main/resources/templates/overview.html | 1 + .../com/vip/saturn/it/SaturnAutoBasic.java | 23 + .../saturn/it/impl/ShardingWithTrafficIT.java | 201 ++++++ .../sharding/NamespaceShardingManager.java | 8 +- .../saturn/job/sharding/entity/Executor.java | 9 + ...xecutorTrafficTriggerShardingListener.java | 44 ++ .../sharding/node/SaturnExecutorsNode.java | 33 +- .../service/NamespaceShardingService.java | 578 ++++++++++++------ 14 files changed, 795 insertions(+), 275 deletions(-) create mode 100644 saturn-it/src/test/java/com/vip/saturn/it/impl/ShardingWithTrafficIT.java create mode 100644 saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/listener/ExecutorTrafficTriggerShardingListener.java diff --git a/saturn-console-core/src/main/java/com/vip/saturn/job/console/domain/ServerBriefInfo.java b/saturn-console-core/src/main/java/com/vip/saturn/job/console/domain/ServerBriefInfo.java index 6ff5a22ee..af05d7a34 100644 --- a/saturn-console-core/src/main/java/com/vip/saturn/job/console/domain/ServerBriefInfo.java +++ b/saturn-console-core/src/main/java/com/vip/saturn/job/console/domain/ServerBriefInfo.java @@ -40,6 +40,12 @@ public final class ServerBriefInfo implements Serializable { private String version; + private boolean noTraffic; + + public ServerBriefInfo(String executorName) { + this.executorName = executorName; + } + public String getServerIp() { return this.serverIp; } @@ -100,62 +106,11 @@ public void setVersion(String version) { this.version = version; } - public boolean equals(Object o) { - if (o == this) - return true; - if (!(o instanceof ServerBriefInfo)) - return false; - ServerBriefInfo other = (ServerBriefInfo) o; - Object this$executorName = getExecutorName(); - Object other$executorName = other.getExecutorName(); - if (this$executorName == null ? other$executorName != null : !this$executorName.equals(other$executorName)) - return false; - Object this$totalLoadLevel = getTotalLoadLevel(); - Object other$totalLoadLevel = other.getTotalLoadLevel(); - if (this$totalLoadLevel == null ? other$totalLoadLevel != null - : !this$totalLoadLevel.equals(other$totalLoadLevel)) - return false; - Object this$sharding = getSharding(); - Object other$sharding = other.getSharding(); - if (this$sharding == null ? other$sharding != null : !this$sharding.equals(other$sharding)) - return false; - Object this$hasSharding = getHasSharding(); - Object other$hasSharding = other.getHasSharding(); - if (this$hasSharding == null ? other$hasSharding != null : !this$hasSharding.equals(other$hasSharding)) - return false; - Object this$lastBeginTime = getLastBeginTime(); - Object other$lastBeginTime = other.getLastBeginTime(); - if (this$lastBeginTime == null ? other$lastBeginTime != null : !this$lastBeginTime.equals(other$lastBeginTime)) - return false; - Object this$status = getStatus(); - Object other$status = other.getStatus(); - if (this$status == null ? other$status != null : !this$status.equals(other$status)) - return false; - Object this$version = getVersion(); - Object other$version = other.getVersion(); - return this$version == null ? other$version == null : this$version.equals(other$version); - } - - public int hashCode() { - int PRIME = 59; - int result = 1; - Object $executorName = getExecutorName(); - result = result * 59 + ($executorName == null ? 43 : $executorName.hashCode()); - Object $totalLoadLevel = getTotalLoadLevel(); - result = result * 59 + ($totalLoadLevel == null ? 43 : $totalLoadLevel.hashCode()); - Object $sharding = getSharding(); - result = result * 59 + ($sharding == null ? 43 : $sharding.hashCode()); - Object $hasSharding = getHasSharding(); - result = result * 59 + ($hasSharding == null ? 43 : $hasSharding.hashCode()); - Object $lastBeginTime = getLastBeginTime(); - result = result * 59 + ($lastBeginTime == null ? 43 : $lastBeginTime.hashCode()); - Object $status = getStatus(); - result = result * 59 + ($status == null ? 43 : $status.hashCode()); - Object $version = getVersion(); - return result * 59 + ($version == null ? 43 : $version.hashCode()); + public boolean isNoTraffic() { + return noTraffic; } - public ServerBriefInfo(String executorName) { - this.executorName = executorName; + public void setNoTraffic(boolean noTraffic) { + this.noTraffic = noTraffic; } } diff --git a/saturn-console-core/src/main/java/com/vip/saturn/job/console/service/ServerDimensionService.java b/saturn-console-core/src/main/java/com/vip/saturn/job/console/service/ServerDimensionService.java index 962193d91..d685137ad 100644 --- a/saturn-console-core/src/main/java/com/vip/saturn/job/console/service/ServerDimensionService.java +++ b/saturn-console-core/src/main/java/com/vip/saturn/job/console/service/ServerDimensionService.java @@ -17,6 +17,7 @@ import java.util.Map; import com.vip.saturn.job.console.domain.ServerStatus; +import com.vip.saturn.job.console.exception.SaturnJobConsoleException; public interface ServerDimensionService { @@ -30,4 +31,6 @@ public interface ServerDimensionService { boolean isReady(String jobName, String executor); + void traffic(String executorName, boolean extract) throws SaturnJobConsoleException; + } diff --git a/saturn-console-core/src/main/java/com/vip/saturn/job/console/utils/ExecutorNodePath.java b/saturn-console-core/src/main/java/com/vip/saturn/job/console/utils/ExecutorNodePath.java index 15ebdd927..729d58955 100644 --- a/saturn-console-core/src/main/java/com/vip/saturn/job/console/utils/ExecutorNodePath.java +++ b/saturn-console-core/src/main/java/com/vip/saturn/job/console/utils/ExecutorNodePath.java @@ -42,6 +42,10 @@ public static String getExecutorTaskNodePath(final String executorName) { return getExecutorNodePath(executorName, "task"); } + public static String getExecutorNoTrafficNodePath(final String executorName) { + return getExecutorNodePath(executorName, "noTraffic"); + } + public static String getExecutorIpNodePath(final String executorName) { return getExecutorNodePath(executorName, "ip"); } diff --git a/saturn-console/src/main/java/com/vip/saturn/job/console/controller/ServerController.java b/saturn-console/src/main/java/com/vip/saturn/job/console/controller/ServerController.java index b22535f1a..393741f6a 100644 --- a/saturn-console/src/main/java/com/vip/saturn/job/console/controller/ServerController.java +++ b/saturn-console/src/main/java/com/vip/saturn/job/console/controller/ServerController.java @@ -19,13 +19,14 @@ import javax.annotation.Resource; import javax.servlet.http.HttpServletRequest; +import com.vip.saturn.job.console.domain.RequestResult; +import com.vip.saturn.job.console.exception.SaturnJobConsoleException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; -import com.vip.saturn.job.console.service.JobOperationService; import com.vip.saturn.job.console.service.ServerDimensionService; @RestController @@ -37,12 +38,36 @@ public class ServerController extends AbstractController { @Resource private ServerDimensionService serverDimensionService; - @Resource - private JobOperationService jobOperationService; - @RequestMapping(value = "servers", method = RequestMethod.GET) public Map getAllServersBriefInfo(final HttpServletRequest request) { return serverDimensionService.getAllServersBriefInfo(); } + @RequestMapping(value = "traffic", method = RequestMethod.POST) + public RequestResult traffic(final HttpServletRequest request, String executorName, String operation) { + RequestResult requestResult = new RequestResult(); + try { + if(executorName == null || executorName.trim().isEmpty()) { + throw new SaturnJobConsoleException("The parameter executorName cannot be null or empty"); + } + boolean extract; + if("extract".equals(operation)) { + extract = true; + } else if("recover".equals(operation)) { + extract = false; + } else { + throw new SaturnJobConsoleException("The operation " + operation + " is not supported"); + } + serverDimensionService.traffic(executorName, extract); + requestResult.setSuccess(true); + } catch (SaturnJobConsoleException e) { + requestResult.setSuccess(false); + requestResult.setMessage(e.getMessage()); + } catch (Exception e) { + requestResult.setSuccess(false); + requestResult.setMessage(e.toString()); + } + return requestResult; + } + } diff --git a/saturn-console/src/main/java/com/vip/saturn/job/console/service/impl/ServerDimensionServiceImpl.java b/saturn-console/src/main/java/com/vip/saturn/job/console/service/impl/ServerDimensionServiceImpl.java index 1f3e275d4..1e97f43af 100644 --- a/saturn-console/src/main/java/com/vip/saturn/job/console/service/impl/ServerDimensionServiceImpl.java +++ b/saturn-console/src/main/java/com/vip/saturn/job/console/service/impl/ServerDimensionServiceImpl.java @@ -68,6 +68,7 @@ public Map getAllServersBriefInfo() { ServerBriefInfo sbf = new ServerBriefInfo(executor); String ip = curatorFrameworkOp.getData(ExecutorNodePath.getExecutorNodePath(executor, "ip")); sbf.setServerIp(ip); + sbf.setNoTraffic(curatorFrameworkOp.checkExists(ExecutorNodePath.getExecutorNodePath(executor, "noTraffic"))); String lastBeginTime = curatorFrameworkOp .getData(ExecutorNodePath.getExecutorNodePath(sbf.getExecutorName(), "lastBeginTime")); sbf.setLastBeginTime( @@ -192,4 +193,18 @@ public boolean isReady(String jobName, String executor) { return false; } + @Override + public void traffic(String executorName, boolean extract) throws SaturnJobConsoleException { + CuratorRepository.CuratorFrameworkOp curatorFrameworkOp = curatorRepository.inSessionClient(); + if (!curatorFrameworkOp.checkExists(ExecutorNodePath.getExecutorNodePath(executorName))) { + throw new SaturnJobConsoleException("The executorName(" + executorName + ") is not existing"); + } + String executorNoTrafficNodePath = ExecutorNodePath.getExecutorNoTrafficNodePath(executorName); + if (extract) { + curatorFrameworkOp.create(executorNoTrafficNodePath); + } else { + curatorFrameworkOp.deleteRecursive(executorNoTrafficNodePath); + } + } + } diff --git a/saturn-console/src/main/resources/static/js/overview.js b/saturn-console/src/main/resources/static/js/overview.js index 5fe840e5c..2db9e7f69 100644 --- a/saturn-console/src/main/resources/static/js/overview.js +++ b/saturn-console/src/main/resources/static/js/overview.js @@ -252,6 +252,25 @@ $(function() { return false; }); }); + + $("#traffic-executor-confirm-dialog").on("shown.bs.modal", function (event) { + var button = $(event.relatedTarget); + var executor = button.data('executor'); + var operation = button.data('operation'); + $("#traffic-executor-confirm-dialog-confirm-btn").unbind('click').click(function() { + var $btn = $(this).button('loading'); + $.post("server/traffic", {executorName: executor, operation: operation, nns: regName}, function (data) { + $("#traffic-executor-confirm-dialog").modal("hide"); + if(data.success) { + showSuccessDialogWithCallback(function(){location.reload(true);}); + } else { + $("#executor-failure-dialog .fail-reason").text(data.message); + showFailureDialog("executor-failure-dialog"); + } + }).always(function() { $btn.button('reset'); }); + return false; + }); + }); $("#remove-executor-confirm-dialog").on("shown.bs.modal", function (event) { // 批量删除和单击删除公用一个confirm-dialog,如果取到relatedTarget为字符串,则为批量删除 @@ -1408,6 +1427,19 @@ $(function() { $("#change-jobStatus-confirm-dialog").modal("show", obj); } + function showTrafficConfirmDialog(obj) { + var executor = $(obj).data('executor'); + var operation = $(obj).data('operation'); + var confirmReason = null; + if(operation == "extract") { + confirmReason = "确认要 摘取 Executor(" + executor + ") 的流量吗?"; + } else { + confirmReason = "确认要 恢复 Executor(" + executor + ") 的流量吗?"; + } + $("#traffic-executor-confirm-dialog .confirm-reason").text(confirmReason); + $("#traffic-executor-confirm-dialog").modal("show", obj); + } + function showRemoveExecutorConfirmDialog(obj) { var confirmReason = "确认要删除Executor:(" + $(obj).data('executor') + ")吗?"; $("#remove-executor-confirm-dialog .confirm-reason").text(confirmReason); @@ -1928,30 +1960,37 @@ $(function() { var loadLevels= [], exeNames = [], serverInfos = data["serverInfos"], lv = data["jobShardLoadLevels"]; if(serverInfos){ for (var i = 0;i < serverInfos.length;i++) { - var status = serverInfos[i].status,jobStatus = serverInfos[i].jobStatus,sharding = serverInfos[i].sharding, lastBeginTime = serverInfos[i].lastBeginTime,executorName = serverInfos[i].executorName,trClass = "",removeBtnClass = "",removeBtnTitle=""; - loadLevels.push(serverInfos[i].totalLoadLevel),hasSharding = serverInfos[i].hasSharding; + var serverInfo = serverInfos[i]; + var status = serverInfo.status,jobStatus = serverInfo.jobStatus,sharding = serverInfo.sharding, lastBeginTime = serverInfo.lastBeginTime,executorName = serverInfo.executorName,trClass = "",removeBtnClass = "",removeBtnTitle=""; + loadLevels.push(serverInfo.totalLoadLevel),hasSharding = serverInfo.hasSharding, noTraffic = serverInfo.noTraffic; if ("ONLINE" === status) { trClass = "success"; onlines ++; removeBtnClass = "disabled"; removeBtnTitle="无法删除ONLINE的Executor"; - exeNames.push(serverInfos[i].executorName); + exeNames.push(serverInfo.executorName); } else { trClass = "warning"; offlines ++; lastBeginTime = ""; removeBtnTitle="点击进行删除该Executor"; } + var trafficButton = ""; + if(noTraffic == false) { // 可以摘取流量 + trafficButton = ""; + } else { + trafficButton = ""; + } var removeButton = ""; var baseTd = "" + "" + executorName + "" - + "" + serverInfos[i].serverIp + "" - + "" + serverInfos[i].totalLoadLevel + "" + + "" + serverInfo.serverIp + "" + + "" + serverInfo.totalLoadLevel + "" + "" + sharding + "" + "" + status + "" + "" + lastBeginTime + "" - + "" + serverInfos[i].version + "" - + "" + removeButton + ""; + + "" + serverInfo.version + "" + + "" + trafficButton + removeButton + ""; $("#servers-overview-tbl tbody").append("" + baseTd + ""); } } diff --git a/saturn-console/src/main/resources/templates/overview.html b/saturn-console/src/main/resources/templates/overview.html index 93e9b5488..d769ed214 100644 --- a/saturn-console/src/main/resources/templates/overview.html +++ b/saturn-console/src/main/resources/templates/overview.html @@ -745,6 +745,7 @@
+
diff --git a/saturn-it/src/test/java/com/vip/saturn/it/SaturnAutoBasic.java b/saturn-it/src/test/java/com/vip/saturn/it/SaturnAutoBasic.java index 60054602c..0268feee5 100644 --- a/saturn-it/src/test/java/com/vip/saturn/it/SaturnAutoBasic.java +++ b/saturn-it/src/test/java/com/vip/saturn/it/SaturnAutoBasic.java @@ -5,6 +5,7 @@ import com.vip.saturn.job.console.SaturnEnvProperties; import com.vip.saturn.job.console.domain.RequestResult; import com.vip.saturn.job.console.springboot.SaturnConsoleApp; +import com.vip.saturn.job.console.utils.ExecutorNodePath; import com.vip.saturn.job.executor.Main; import com.vip.saturn.job.executor.SaturnExecutor; import com.vip.saturn.job.internal.config.ConfigurationNode; @@ -349,6 +350,28 @@ public static void forceStopJob(String jobName) { } } + public static void runAtOnceAndWaitShardingCompleted(final JobConfiguration jobConfiguration) throws Exception { + runAtOnce(jobConfiguration.getJobName()); + Thread.sleep(1000L); + + waitForFinish(new FinishCheck() { + + @Override + public boolean docheck() { + return !isNeedSharding(jobConfiguration); + } + + }, 10); + } + + public static void extractTraffic(String executorName) { + regCenter.persist(ExecutorNodePath.getExecutorNoTrafficNodePath(executorName), ""); + } + + public static void recoverTraffic(String executorName) { + regCenter.remove(ExecutorNodePath.getExecutorNoTrafficNodePath(executorName)); + } + protected static void configJob(String jobName, String configPath, Object value) { JobConfiguration jobConfiguration = new JobConfiguration(jobName); JobNodeStorage jobNodeStorage = new JobNodeStorage(regCenter, jobConfiguration); diff --git a/saturn-it/src/test/java/com/vip/saturn/it/impl/ShardingWithTrafficIT.java b/saturn-it/src/test/java/com/vip/saturn/it/impl/ShardingWithTrafficIT.java new file mode 100644 index 000000000..cc1b26089 --- /dev/null +++ b/saturn-it/src/test/java/com/vip/saturn/it/impl/ShardingWithTrafficIT.java @@ -0,0 +1,201 @@ +package com.vip.saturn.it.impl; + +import com.vip.saturn.it.AbstractSaturnIT; +import com.vip.saturn.it.JobType; +import com.vip.saturn.it.job.SimpleJavaJob; +import com.vip.saturn.job.executor.Main; +import com.vip.saturn.job.internal.config.JobConfiguration; +import com.vip.saturn.job.internal.sharding.ShardingNode; +import com.vip.saturn.job.internal.storage.JobNodePath; +import com.vip.saturn.job.utils.ItemUtils; +import org.assertj.core.api.Condition; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author hebelala + */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class ShardingWithTrafficIT extends AbstractSaturnIT { + + @BeforeClass + public static void setUp() throws Exception { + startSaturnConsoleList(1); + } + + @AfterClass + public static void tearDown() throws Exception { + stopExecutorList(); + stopSaturnConsoleList(); + } + + /** + * 一般流量摘取和恢复流程:
+ * 两个启用状态的作业,两台机A、B;
+ * 摘取B的流量,结果B的分片被分配到A;
+ * 下线B,分片分配依然不变;
+ * 上线B,分配分配依然不变;
+ * 恢复B的流量,结果平均分配分片到A、B。 + */ + @Test + public void test_A_NormalFlow() throws Exception { + String jobName = "test_A_Extract"; + String jobName2 = "test_A_Extract2"; + + final JobConfiguration jobConfiguration = new JobConfiguration(jobName); + jobConfiguration.setCron("* * * * * ? 2099"); + jobConfiguration.setJobType(JobType.JAVA_JOB.toString()); + jobConfiguration.setJobClass(SimpleJavaJob.class.getCanonicalName()); + jobConfiguration.setShardingTotalCount(2); + jobConfiguration.setShardingItemParameters("0=0,1=1"); + + final JobConfiguration jobConfiguration2 = new JobConfiguration(jobName2); + jobConfiguration2.setCron("* * * * * ? 2099"); + jobConfiguration2.setJobType(JobType.JAVA_JOB.toString()); + jobConfiguration2.setJobClass(SimpleJavaJob.class.getCanonicalName()); + jobConfiguration2.setShardingTotalCount(2); + jobConfiguration2.setShardingItemParameters("0=0,1=1"); + + addJob(jobConfiguration); + Thread.sleep(1000L); + + addJob(jobConfiguration2); + Thread.sleep(1000L); + + enableJob(jobName); + Thread.sleep(1000L); + + enableJob(jobName2); + Thread.sleep(1000L); + + Main executor1 = startOneNewExecutorList(); + String executorName1 = executor1.getExecutorName(); + + Main executor2 = startOneNewExecutorList(); + String executorName2 = executor2.getExecutorName(); + + runAtOnceAndWaitShardingCompleted(jobConfiguration); + runAtOnceAndWaitShardingCompleted(jobConfiguration2); + isItemsBalanceOk(jobName, jobName2, executorName1, executorName2); + + extractTraffic(executorName2); + Thread.sleep(1000L); + + runAtOnceAndWaitShardingCompleted(jobConfiguration); + runAtOnceAndWaitShardingCompleted(jobConfiguration2); + isItemsToExecutor1(jobName, jobName2, executorName1, executorName2); + + stopExecutor(1); + Thread.sleep(1000L); + + runAtOnceAndWaitShardingCompleted(jobConfiguration); + runAtOnceAndWaitShardingCompleted(jobConfiguration2); + isItemsToExecutor1(jobName, jobName2, executorName1, executorName2); + + executor2 = startExecutor(1); + executorName2 = executor2.getExecutorName(); + + runAtOnceAndWaitShardingCompleted(jobConfiguration); + runAtOnceAndWaitShardingCompleted(jobConfiguration2); + isItemsToExecutor1(jobName, jobName2, executorName1, executorName2); + + recoverTraffic(executorName2); + Thread.sleep(1000L); + + runAtOnceAndWaitShardingCompleted(jobConfiguration); + runAtOnceAndWaitShardingCompleted(jobConfiguration2); + isItemsBalanceOk(jobName, jobName2, executorName1, executorName2); + + // 清理,不影响其他Test + disableJob(jobName); + disableJob(jobName2); + Thread.sleep(1000L); + removeJob(jobName); + removeJob(jobName2); + Thread.sleep(1000L); + stopExecutorList(); + Thread.sleep(2000L); + forceRemoveJob(jobName); + forceRemoveJob(jobName2); + } + + private void isItemsBalanceOk(String jobName, String jobName2, String executorName1, String executorName2) throws Exception { + List itemsJ1E1 = ItemUtils.toItemList(regCenter.getDirectly( + JobNodePath.getNodeFullPath(jobName, ShardingNode.getShardingNode(executorName1)))); + List itemsJ1E2 = ItemUtils.toItemList(regCenter.getDirectly( + JobNodePath.getNodeFullPath(jobName, ShardingNode.getShardingNode(executorName2)))); + List itemsJ2E1 = ItemUtils.toItemList(regCenter.getDirectly( + JobNodePath.getNodeFullPath(jobName2, ShardingNode.getShardingNode(executorName1)))); + List itemsJ2E2 = ItemUtils.toItemList(regCenter.getDirectly( + JobNodePath.getNodeFullPath(jobName2, ShardingNode.getShardingNode(executorName2)))); + + List allItems = new ArrayList<>(); + allItems.addAll(itemsJ1E1); + allItems.addAll(itemsJ2E1); + assertThat(allItems).hasSize(2); + allItems.clear(); + allItems.addAll(itemsJ1E2); + allItems.addAll(itemsJ2E2); + assertThat(allItems).hasSize(2); + allItems.clear(); + allItems.addAll(itemsJ1E1); + allItems.addAll(itemsJ2E1); + allItems.addAll(itemsJ1E2); + allItems.addAll(itemsJ2E2); + assertThat(allItems) + .hasSize(4) + .haveExactly(2, new Condition() { + @Override + public boolean matches(Integer value) { + return value == 0; + } + }) + .haveExactly(2, new Condition() { + @Override + public boolean matches(Integer value) { + return value == 1; + } + }); + } + + private void isItemsToExecutor1(String jobName, String jobName2, String executorName1, String executorName2) throws Exception { + List itemsJ1E1 = ItemUtils.toItemList(regCenter.getDirectly( + JobNodePath.getNodeFullPath(jobName, ShardingNode.getShardingNode(executorName1)))); + List itemsJ1E2 = ItemUtils.toItemList(regCenter.getDirectly( + JobNodePath.getNodeFullPath(jobName, ShardingNode.getShardingNode(executorName2)))); + List itemsJ2E1 = ItemUtils.toItemList(regCenter.getDirectly( + JobNodePath.getNodeFullPath(jobName2, ShardingNode.getShardingNode(executorName1)))); + List itemsJ2E2 = ItemUtils.toItemList(regCenter.getDirectly( + JobNodePath.getNodeFullPath(jobName2, ShardingNode.getShardingNode(executorName2)))); + + List allItems = new ArrayList<>(); + allItems.addAll(itemsJ1E1); + allItems.addAll(itemsJ2E1); + assertThat(allItems) + .hasSize(4) + .haveExactly(2, new Condition() { + @Override + public boolean matches(Integer value) { + return value == 0; + } + }) + .haveExactly(2, new Condition() { + @Override + public boolean matches(Integer value) { + return value == 1; + } + }); + allItems.clear(); + allItems.addAll(itemsJ1E2); + allItems.addAll(itemsJ2E2); + assertThat(allItems).isEmpty(); + } +} diff --git a/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/NamespaceShardingManager.java b/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/NamespaceShardingManager.java index 6c2e96ebc..c72be36f8 100644 --- a/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/NamespaceShardingManager.java +++ b/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/NamespaceShardingManager.java @@ -1,16 +1,12 @@ package com.vip.saturn.job.sharding; +import com.vip.saturn.job.sharding.listener.*; import org.apache.curator.framework.CuratorFramework; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.vip.saturn.job.integrate.service.ReportAlarmService; import com.vip.saturn.job.integrate.service.UpdateJobConfigService; -import com.vip.saturn.job.sharding.listener.AbstractConnectionListener; -import com.vip.saturn.job.sharding.listener.AddOrRemoveJobListener; -import com.vip.saturn.job.sharding.listener.ExecutorOnlineOfflineTriggerShardingListener; -import com.vip.saturn.job.sharding.listener.LeadershipElectionListener; -import com.vip.saturn.job.sharding.listener.SaturnExecutorsShardingTriggerShardingListener; import com.vip.saturn.job.sharding.node.SaturnExecutorsNode; import com.vip.saturn.job.sharding.service.AddJobListenersService; import com.vip.saturn.job.sharding.service.ExecutorCleanService; @@ -108,6 +104,8 @@ private void addOnlineOfflineListener() throws Exception { shardingTreeCacheService.addTreeCacheIfAbsent(path, depth); shardingTreeCacheService.addTreeCacheListenerIfAbsent(path, depth, new ExecutorOnlineOfflineTriggerShardingListener(namespaceShardingService, executorCleanService)); + shardingTreeCacheService.addTreeCacheListenerIfAbsent(path, depth, + new ExecutorTrafficTriggerShardingListener(namespaceShardingService)); } /** diff --git a/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/entity/Executor.java b/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/entity/Executor.java index a4f8d8b3b..f8d397026 100644 --- a/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/entity/Executor.java +++ b/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/entity/Executor.java @@ -9,6 +9,7 @@ public class Executor { private String executorName; private String ip; + private boolean noTraffic; private List jobNameList; // the job list supported by the executor private List shardList; private int totalLoadLevel; @@ -29,6 +30,14 @@ public void setIp(String ip) { this.ip = ip; } + public boolean isNoTraffic() { + return noTraffic; + } + + public void setNoTraffic(boolean noTraffic) { + this.noTraffic = noTraffic; + } + public List getJobNameList() { return jobNameList; } diff --git a/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/listener/ExecutorTrafficTriggerShardingListener.java b/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/listener/ExecutorTrafficTriggerShardingListener.java new file mode 100644 index 000000000..51e43ddbb --- /dev/null +++ b/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/listener/ExecutorTrafficTriggerShardingListener.java @@ -0,0 +1,44 @@ +package com.vip.saturn.job.sharding.listener; + +import com.vip.saturn.job.sharding.node.SaturnExecutorsNode; +import com.vip.saturn.job.sharding.service.NamespaceShardingService; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author hebelala + */ +public class ExecutorTrafficTriggerShardingListener extends AbstractTreeCacheListener { + private static final Logger LOGGER = LoggerFactory.getLogger(ExecutorTrafficTriggerShardingListener.class); + + private NamespaceShardingService namespaceShardingService; + + public ExecutorTrafficTriggerShardingListener(NamespaceShardingService namespaceShardingService) { + this.namespaceShardingService = namespaceShardingService; + } + + @Override + public void childEvent(Type type, String path, String nodeData) throws Exception { + try { + if (isExecutorNoTraffic(type, path)) { + String executorName = SaturnExecutorsNode.getExecutorNameByNoTrafficPath(path); + namespaceShardingService.asyncShardingWhenExecutorExtractTraffic(executorName); + } else if (isExecutorTraffic(type, path)) { + String executorName = SaturnExecutorsNode.getExecutorNameByNoTrafficPath(path); + namespaceShardingService.asyncShardingWhenRecoverExecutorTraffic(executorName); + } + } catch (Exception e) { + LOGGER.error(e.getMessage(), e); + } + } + + private boolean isExecutorNoTraffic(Type type, String path) { + return type == Type.NODE_ADDED && path.matches(SaturnExecutorsNode.EXECUTOR_NO_TRAFFIC_NODE_PATH_REGEX); + } + + public boolean isExecutorTraffic(Type type, String path) { + return type == Type.NODE_REMOVED && path.matches(SaturnExecutorsNode.EXECUTOR_NO_TRAFFIC_NODE_PATH_REGEX); + } + +} diff --git a/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/node/SaturnExecutorsNode.java b/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/node/SaturnExecutorsNode.java index ff9eab3ab..213cdacc6 100644 --- a/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/node/SaturnExecutorsNode.java +++ b/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/node/SaturnExecutorsNode.java @@ -13,6 +13,7 @@ public class SaturnExecutorsNode { private static final String SHARDING = "sharding"; private static final String CONTENT = "content"; public static final String $JOBS = "$Jobs"; + private static final String NO_TRAFFIC = "noTraffic"; private static final String IP = "ip"; private static final String CLEAN = "clean"; private static final String TASK = "task"; @@ -29,6 +30,8 @@ public class SaturnExecutorsNode { + "enabled"; public static final String EXECUTOR_IPNODE_PATH_REGEX = "/\\" + $SATURNEXECUTORS + "/" + EXECUTORS + "/" + "[^/]*" + "/" + IP; + public static final String EXECUTOR_NO_TRAFFIC_NODE_PATH_REGEX = "/\\" + $SATURNEXECUTORS + "/" + EXECUTORS + "/" + "[^/]*" + + "/" + NO_TRAFFIC; public static final String CONFIG_VERSION_PATH = "/" + $SATURNEXECUTORS + "/config/version"; public static final String $JOBSNODE_PATH = "/" + $JOBS; @@ -50,6 +53,14 @@ public static String getExecutorsNodePath() { return "/" + $SATURNEXECUTORS + "/" + EXECUTORS; } + /** + * 获取noTraffic结点名称 + * @return + */ + public static String getNoTrafficNodeName() { + return NO_TRAFFIC; + } + /** * 获取ip结点名称 * @return @@ -76,6 +87,15 @@ public static String getExecutorIpNodePath(String executor) { return "/" + $SATURNEXECUTORS + "/" + EXECUTORS + "/" + executor + "/" + IP; } + /** + * 获取$SaturnExecutors/executors/xx/noTraffic结点完整路径 + * @param executor + * @return + */ + public static String getExecutorNoTrafficNodePath(String executor) { + return "/" + $SATURNEXECUTORS + "/" + EXECUTORS + "/" + executor + "/" + NO_TRAFFIC; + } + /** * 获取$SaturnExecutors/executors/xx/clean结点完整路径 * @param executor @@ -100,7 +120,18 @@ public static String getExecutorTaskNodePath(String executor) { * @return */ public static String getExecutorNameByIpPath(String path) { - int lastIndexOf = path.lastIndexOf("/" + SaturnExecutorsNode.getIpNodeName()); + return getExecutorNameByPath(path, getIpNodeName()); + } + + /** + * 从路径中抽取出executorName + */ + public static String getExecutorNameByNoTrafficPath(String path) { + return getExecutorNameByPath(path, getNoTrafficNodeName()); + } + + private static String getExecutorNameByPath(String path, String nodeName) { + int lastIndexOf = path.lastIndexOf("/" + nodeName); String substring = path.substring(0, lastIndexOf); int lastIndexOf2 = substring.lastIndexOf('/'); return substring.substring(lastIndexOf2 + 1); diff --git a/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/service/NamespaceShardingService.java b/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/service/NamespaceShardingService.java index 52f73f04a..205c2f1f8 100644 --- a/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/service/NamespaceShardingService.java +++ b/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/service/NamespaceShardingService.java @@ -30,7 +30,7 @@ * @author hebelala */ public class NamespaceShardingService { - private static final Logger log = LoggerFactory.getLogger(NamespaceShardingService.class); + private static final Logger LOGGER = LoggerFactory.getLogger(NamespaceShardingService.class); private static final int LOAD_LEVEL_DEFAULT = 1; @@ -110,7 +110,7 @@ public void run() { // 如果需要全量分片,且当前线程不是全量分片线程,则直接返回,没必要做分片 if (needAllSharding.get() && !isAllShardingTask) { - log.info("the {} will be ignored, because there will be {}", this.getClass().getSimpleName(), + LOGGER.info("the {} will be ignored, because there will be {}", this.getClass().getSimpleName(), ExecuteAllShardingTask.class.getSimpleName()); return; } @@ -121,11 +121,12 @@ public void run() { List customLastOnlineExecutorList = customLastOnlineExecutorList(); List lastOnlineExecutorList = customLastOnlineExecutorList == null ? copyOnlineExecutorList(oldOnlineExecutorList) : customLastOnlineExecutorList; + List lastOnlineTrafficExecutorList = getTrafficExecutorList(lastOnlineExecutorList); List shardList = new ArrayList<>(); // 摘取 - if (pick(allJobs, allEnableJobs, shardList, lastOnlineExecutorList)) { + if (pick(allJobs, allEnableJobs, shardList, lastOnlineExecutorList, lastOnlineTrafficExecutorList)) { // 放回 - putBackBalancing(allEnableJobs, shardList, lastOnlineExecutorList); + putBackBalancing(allEnableJobs, shardList, lastOnlineExecutorList, lastOnlineTrafficExecutorList); // 如果当前变为非leader,则返回 if (!isLeadershipOnly()) { return; @@ -143,10 +144,10 @@ public void run() { increaseShardingCount(); } } catch (InterruptedException e) { - log.info("{}-{} {} is interrupted", namespace, hostValue, this.getClass().getSimpleName()); + LOGGER.info("{}-{} {} is interrupted", namespace, hostValue, this.getClass().getSimpleName()); Thread.currentThread().interrupt(); } catch (Throwable t) { - log.error(t.getMessage(), t); + LOGGER.error(t.getMessage(), t); if (!isAllShardingTask) { // 如果当前不是全量分片,则需要全量分片来拯救异常 needAllSharding.set(true); shardingCount.incrementAndGet(); @@ -157,22 +158,22 @@ public void run() { reportAlarmService.allShardingError(namespace, hostValue); } catch (Throwable t2) { if (t2 instanceof InterruptedException) { - log.info("{}-{} {}-allShardingError is interrupted", namespace, hostValue, + LOGGER.info("{}-{} {}-allShardingError is interrupted", namespace, hostValue, this.getClass().getSimpleName()); Thread.currentThread().interrupt(); } else { - log.error(t2.getMessage(), t2); + LOGGER.error(t2.getMessage(), t2); } } } try { shutdown(); } catch (InterruptedException e) { - log.info("{}-{} {}-shutdown is interrupted", namespace, hostValue, + LOGGER.info("{}-{} {}-shutdown is interrupted", namespace, hostValue, this.getClass().getSimpleName()); Thread.currentThread().interrupt(); } catch (Throwable t3) { - log.error(t3.getMessage(), t3); + LOGGER.error(t3.getMessage(), t3); } } } finally { @@ -195,6 +196,7 @@ private List copyOnlineExecutorList(List oldOnlineExecutorLi Executor newExecutor = new Executor(); newExecutor.setTotalLoadLevel(oldExecutor.getTotalLoadLevel()); newExecutor.setIp(oldExecutor.getIp()); + newExecutor.setNoTraffic(oldExecutor.isNoTraffic()); newExecutor.setExecutorName(oldExecutor.getExecutorName()); if (oldExecutor.getJobNameList() != null) { newExecutor.setJobNameList(new ArrayList()); @@ -253,7 +255,7 @@ private void increaseShardingCount() throws Exception { try { _shardingCount = Integer.parseInt(new String(shardingCountData, "UTF-8")) + 1; } catch (NumberFormatException e) { - log.error(e.getMessage(), e); + LOGGER.error(e.getMessage(), e); } } curatorFramework.setData().forPath(SaturnExecutorsNode.SHARDING_COUNT_PATH, @@ -370,7 +372,7 @@ protected int getShardingTotalCount(String jobName) throws Exception { try { shardingTotalCount = Integer.parseInt(new String(shardingTotalCountData, "UTF-8")); } catch (NumberFormatException e) { - log.error(e.getMessage(), e); + LOGGER.error(e.getMessage(), e); } } } @@ -388,7 +390,7 @@ protected int getLoadLevel(String jobName) { } } } catch (Exception e) { - log.error(e.getMessage(), e); + LOGGER.error(e.getMessage(), e); } return loadLevel; } @@ -407,16 +409,27 @@ protected List customLastOnlineExecutorList() throws Exception { return null; } + private List getTrafficExecutorList(List executorList) { + List trafficExecutorList = new ArrayList<>(); + for (Executor executor : executorList) { + if (!executor.isNoTraffic()) { + trafficExecutorList.add(executor); + } + } + return trafficExecutorList; + } + /** * 摘取 * @param allJobs 该域下所有作业 * @param allEnableJobs 该域下所有启用的作业 * @param shardList 默认为空集合 * @param lastOnlineExecutorList 默认为当前存储的数据,如果不想使用存储数据,请重写{@link #customLastOnlineExecutorList()}}方法 + * @param lastOnlineTrafficExecutorList lastOnlineExecutorList中所有noTraffic为false的Executor,注意Executor是同一个对象 * @return true摘取成功;false摘取失败,不需要继续下面的逻辑 */ protected abstract boolean pick(List allJobs, List allEnableJobs, List shardList, - List lastOnlineExecutorList) throws Exception; + List lastOnlineExecutorList, List lastOnlineTrafficExecutorList) throws Exception; /** * 按照loadLevel降序排序,如果loadLevel相同,按照作业名降序排序 @@ -431,7 +444,7 @@ public int compare(Shard o1, Shard o2) { }); } - private List getExecutors(List lastOnlineExecutorList) throws Exception { + private List getNotDockerExecutors(List lastOnlineExecutorList) throws Exception { // if isContainerAlignWithPhysical = false, return all executors; otherwise, return all non-container executors. if (isContainerAlignWithPhysical) { return lastOnlineExecutorList; @@ -450,20 +463,20 @@ private List getExecutors(List lastOnlineExecutorList) throw } protected void putBackBalancing(List allEnableJobs, List shardList, - List lastOnlineExecutorList) throws Exception { + List lastOnlineExecutorList, List lastOnlineTrafficExecutorList) throws Exception { if (lastOnlineExecutorList.isEmpty()) { - log.warn("Unnecessary to put shards back to executors balanced because of no executor"); + LOGGER.warn("Unnecessary to put shards back to executors balanced because of no executor"); return; } sortShardList(shardList); - // 获取所有executor - List allExecutors = getExecutors(lastOnlineExecutorList); + // 获取所有非容器的executors + List notDockerExecutors = getNotDockerExecutors(lastOnlineTrafficExecutorList); // 获取shardList中的作业能够被接管的executors - Map> executorsMapByJob = new HashMap<>(); - Map> lastOnlineExecutorListMapByJob = new HashMap<>(); + Map> noDockerTrafficExecutorsMapByJob = new HashMap<>(); + Map> lastOnlineTrafficExecutorListMapByJob = new HashMap<>(); // 是否为本地模式作业的映射 Map localModeMap = new HashMap<>(); // 是否配置优先节点的作业的映射 @@ -472,26 +485,29 @@ protected void putBackBalancing(List allEnableJobs, List shardLis Map> preferListConfiguredMap = new HashMap<>(); // 是否使用非优先节点的作业的映射 Map useDispreferListMap = new HashMap<>(); - Iterator iterator0 = shardList.iterator(); - while (iterator0.hasNext()) { - String jobName = iterator0.next().getJobName(); - if (!executorsMapByJob.containsKey(jobName)) { - executorsMapByJob.put(jobName, filterExecutorsByJob(allExecutors, jobName)); - } - if (!lastOnlineExecutorListMapByJob.containsKey(jobName)) { - lastOnlineExecutorListMapByJob.put(jobName, filterExecutorsByJob(lastOnlineExecutorList, jobName)); - } - if (!localModeMap.containsKey(jobName)) { - localModeMap.put(jobName, isLocalMode(jobName)); - } - if (!preferListIsConfiguredMap.containsKey(jobName)) { - preferListIsConfiguredMap.put(jobName, preferListIsConfigured(jobName)); - } - if (!preferListConfiguredMap.containsKey(jobName)) { - preferListConfiguredMap.put(jobName, getPreferListConfigured(jobName)); - } - if (!useDispreferListMap.containsKey(jobName)) { - useDispreferListMap.put(jobName, useDispreferList(jobName)); + + { // the block, avoid using the iterator after this code + Iterator iterator = shardList.iterator(); + while (iterator.hasNext()) { + String jobName = iterator.next().getJobName(); + if (!noDockerTrafficExecutorsMapByJob.containsKey(jobName)) { + noDockerTrafficExecutorsMapByJob.put(jobName, filterExecutorsByJob(notDockerExecutors, jobName)); + } + if (!lastOnlineTrafficExecutorListMapByJob.containsKey(jobName)) { + lastOnlineTrafficExecutorListMapByJob.put(jobName, filterExecutorsByJob(lastOnlineTrafficExecutorList, jobName)); + } + if (!localModeMap.containsKey(jobName)) { + localModeMap.put(jobName, isLocalMode(jobName)); + } + if (!preferListIsConfiguredMap.containsKey(jobName)) { + preferListIsConfiguredMap.put(jobName, preferListIsConfigured(jobName)); + } + if (!preferListConfiguredMap.containsKey(jobName)) { + preferListConfiguredMap.put(jobName, getPreferListConfigured(jobName)); + } + if (!useDispreferListMap.containsKey(jobName)) { + useDispreferListMap.put(jobName, useDispreferList(jobName)); + } } } @@ -501,75 +517,81 @@ protected void putBackBalancing(List allEnableJobs, List shardLis // 如果配置了preferList,则选取preferList中的executor。 // 如果preferList中的executor都挂了,则不转移;否则,选取没有接管该作业的executor列表的loadLevel最小的一个。 // 如果没有配置preferList,则选取没有接管该作业的executor列表的loadLevel最小的一个。 - Iterator shardIterator = shardList.iterator(); - while (shardIterator.hasNext()) { - Shard shard = shardIterator.next(); - String jobName = shard.getJobName(); - if (localModeMap.get(jobName)) { - if (preferListIsConfiguredMap.get(jobName)) { - List preferListConfigured = preferListConfiguredMap.get(jobName); - if (!preferListConfigured.isEmpty()) { - List preferExecutorList = new ArrayList<>(); - List lastOnlineExecutorListByJob = lastOnlineExecutorListMapByJob.get(jobName); - for (int i = 0; i < lastOnlineExecutorListByJob.size(); i++) { - Executor executor = lastOnlineExecutorListByJob.get(i); - if (preferListConfigured.contains(executor.getExecutorName())) { - preferExecutorList.add(executor); + { + Iterator iterator = shardList.iterator(); + while (iterator.hasNext()) { + Shard shard = iterator.next(); + String jobName = shard.getJobName(); + if (localModeMap.get(jobName)) { + if (preferListIsConfiguredMap.get(jobName)) { + List preferListConfigured = preferListConfiguredMap.get(jobName); + if (!preferListConfigured.isEmpty()) { + List preferExecutorList = new ArrayList<>(); + List lastOnlineExecutorListByJob = lastOnlineTrafficExecutorListMapByJob.get(jobName); + for (int i = 0; i < lastOnlineExecutorListByJob.size(); i++) { + Executor executor = lastOnlineExecutorListByJob.get(i); + if (preferListConfigured.contains(executor.getExecutorName())) { + preferExecutorList.add(executor); + } + } + if (!preferExecutorList.isEmpty()) { + Executor executor = getExecutorWithMinLoadLevelAndNoThisJob(preferExecutorList, + jobName); + putShardIntoExecutor(shard, executor); } } - if (!preferExecutorList.isEmpty()) { - Executor executor = getExecutorWithMinLoadLevelAndNoThisJob(preferExecutorList, - jobName); - putShardIntoExecutor(shard, executor); - } + } else { + Executor executor = getExecutorWithMinLoadLevelAndNoThisJob(noDockerTrafficExecutorsMapByJob.get(jobName), + jobName); + putShardIntoExecutor(shard, executor); } - } else { - Executor executor = getExecutorWithMinLoadLevelAndNoThisJob(executorsMapByJob.get(jobName), - jobName); - putShardIntoExecutor(shard, executor); + iterator.remove(); } - shardIterator.remove(); } } // 2、放回配置了preferList的Shard - Iterator shardIterator2 = shardList.iterator(); - while (shardIterator2.hasNext()) { - Shard shard = shardIterator2.next(); - String jobName = shard.getJobName(); - if (preferListIsConfiguredMap.get(jobName)) { // fix, - // preferList为空不能作为判断是否配置preferList的依据,比如说配置了容器资源,但是全部下线了。 - List preferList = preferListConfiguredMap.get(jobName); - List preferExecutorList = new ArrayList<>(); - List lastOnlineExecutorListByJob = lastOnlineExecutorListMapByJob.get(jobName); - for (int i = 0; i < lastOnlineExecutorListByJob.size(); i++) { - Executor executor = lastOnlineExecutorListByJob.get(i); - if (preferList.contains(executor.getExecutorName())) { - preferExecutorList.add(executor); + { + Iterator iterator = shardList.iterator(); + while (iterator.hasNext()) { + Shard shard = iterator.next(); + String jobName = shard.getJobName(); + if (preferListIsConfiguredMap.get(jobName)) { // fix, + // preferList为空不能作为判断是否配置preferList的依据,比如说配置了容器资源,但是全部下线了。 + List preferList = preferListConfiguredMap.get(jobName); + List preferExecutorList = new ArrayList<>(); + List lastOnlineExecutorListByJob = lastOnlineTrafficExecutorListMapByJob.get(jobName); + for (int i = 0; i < lastOnlineExecutorListByJob.size(); i++) { + Executor executor = lastOnlineExecutorListByJob.get(i); + if (preferList.contains(executor.getExecutorName())) { + preferExecutorList.add(executor); + } } - } - // 如果preferList的Executor都offline,则放回到全部online的Executor中某一个。如果是这种情况,则后续再操作,避免不均衡的情况 - // 如果存在preferExecutor,择优放回 - if (!preferExecutorList.isEmpty()) { - Executor executor = getExecutorWithMinLoadLevel(preferExecutorList); - putShardIntoExecutor(shard, executor); - shardIterator2.remove(); - } else { // 如果不存在preferExecutor - // 如果“只使用preferExecutor”,则丢弃;否则,等到后续(在第3步)进行放回操作,避免不均衡的情况 - if (!useDispreferListMap.get(jobName)) { - shardIterator2.remove(); + // 如果preferList的Executor都offline,则放回到全部online的Executor中某一个。如果是这种情况,则后续再操作,避免不均衡的情况 + // 如果存在preferExecutor,择优放回 + if (!preferExecutorList.isEmpty()) { + Executor executor = getExecutorWithMinLoadLevel(preferExecutorList); + putShardIntoExecutor(shard, executor); + iterator.remove(); + } else { // 如果不存在preferExecutor + // 如果“只使用preferExecutor”,则丢弃;否则,等到后续(在第3步)进行放回操作,避免不均衡的情况 + if (!useDispreferListMap.get(jobName)) { + iterator.remove(); + } } } } } // 3、放回没有配置preferList的Shard - Iterator shardIterator3 = shardList.iterator(); - while (shardIterator3.hasNext()) { - Shard shard = shardIterator3.next(); - Executor executor = getExecutorWithMinLoadLevel(executorsMapByJob.get(shard.getJobName())); - putShardIntoExecutor(shard, executor); - shardIterator3.remove(); + { + Iterator iterator = shardList.iterator(); + while (iterator.hasNext()) { + Shard shard = iterator.next(); + Executor executor = getExecutorWithMinLoadLevel(noDockerTrafficExecutorsMapByJob.get(shard.getJobName())); + putShardIntoExecutor(shard, executor); + iterator.remove(); + } } } @@ -590,7 +612,7 @@ protected boolean useDispreferList(String jobName) { } return true; } catch (Exception e) { - log.error(e.getMessage(), e); + LOGGER.error(e.getMessage(), e); return true; } } @@ -631,14 +653,14 @@ private Executor getExecutorWithMinLoadLevelAndNoThisJob(List executor private void putShardIntoExecutor(Shard shard, Executor executor) { if (executor != null) { if (isIn(shard, executor.getShardList())) { - log.error("The shard({}-{}) is running in the executor of {}, cannot be put again", + LOGGER.error("The shard({}-{}) is running in the executor of {}, cannot be put again", shard.getJobName(), shard.getItem(), executor.getExecutorName()); } else { executor.getShardList().add(shard); executor.setTotalLoadLevel(executor.getTotalLoadLevel() + shard.getLoadLevel()); } } else { - log.info("No executor to take over the shard: {}-{}", shard.getJobName(), shard.getItem()); + LOGGER.info("No executor to take over the shard: {}-{}", shard.getJobName(), shard.getItem()); } } @@ -825,6 +847,10 @@ protected List createShards(String jobName, List lastOnlineExec return shardList; } + protected boolean getExecutorNoTraffic(String executorName) throws Exception { + return curatorFramework.checkExists().forPath(SaturnExecutorsNode.getExecutorNoTrafficNodePath(executorName)) != null; + } + } /** @@ -834,12 +860,12 @@ private class ExecuteAllShardingTask extends AbstractAsyncShardingTask { @Override protected void logStartInfo() { - log.info("Execute the {} ", this.getClass().getSimpleName()); + LOGGER.info("Execute the {} ", this.getClass().getSimpleName()); } @Override protected boolean pick(List allJobs, List allEnableJob, List shardList, - List lastOnlineExecutorList) throws Exception { + List lastOnlineExecutorList, List lastOnlineTrafficExecutorList) throws Exception { // 修正所有executor对所有作业的jobNameList for (int j = 0; j < allJobs.size(); j++) { fixJobNameList(lastOnlineExecutorList, allJobs.get(j)); @@ -848,7 +874,7 @@ protected boolean pick(List allJobs, List allEnableJob, List customLastOnlineExecutorList() throws Exception { Executor executor = new Executor(); executor.setExecutorName(zkExecutor); executor.setIp(new String(ipData, "UTF-8")); + executor.setNoTraffic(getExecutorNoTraffic(zkExecutor)); executor.setShardList(new ArrayList()); executor.setJobNameList(new ArrayList()); lastOnlineExecutorList.add(executor); @@ -901,15 +928,15 @@ public ExecuteOnlineShardingTask(String executorName, String ip) { @Override protected void logStartInfo() { - log.info("Execute the {} with {} online", this.getClass().getSimpleName(), executorName); + LOGGER.info("Execute the {} with {} online", this.getClass().getSimpleName(), executorName); } @Override protected boolean pick(List allJobs, List allEnableJobs, List shardList, - List lastOnlineExecutorList) throws Exception {// NOSONAR + List lastOnlineExecutorList, List lastOnlineTrafficExecutorList) throws Exception { // 如果没有Executor在运行,则需要进行全量分片 if (lastOnlineExecutorList.isEmpty()) { - log.warn("There are no running executors, need all sharding"); + LOGGER.warn("There are no running executors, need all sharding"); needAllSharding.set(true); shardingCount.incrementAndGet(); executorService.submit(new ExecuteAllShardingTask()); @@ -928,9 +955,13 @@ protected boolean pick(List allJobs, List allEnableJobs, List()); theExecutor.setJobNameList(new ArrayList()); lastOnlineExecutorList.add(theExecutor); + if(!theExecutor.isNoTraffic()) { + lastOnlineTrafficExecutorList.add(theExecutor); + } } else { // 重新设置下ip theExecutor.setIp(ip); } @@ -953,44 +984,158 @@ public ExecuteOfflineShardingTask(String executorName) { @Override protected void logStartInfo() { - log.info("Execute the {} with {} offline", this.getClass().getSimpleName(), executorName); + LOGGER.info("Execute the {} with {} offline", this.getClass().getSimpleName(), executorName); } @Override protected boolean pick(List allJobs, List allEnableJobs, List shardList, - List lastOnlineExecutorList) throws Exception { + List lastOnlineExecutorList, List lastOnlineTrafficExecutorList) throws Exception { /** * 摘取下线的executor全部Shard */ - boolean wasOffline = true; - Iterator iterator = lastOnlineExecutorList.iterator(); + Executor theExecutor = null; + { // the block, avoid using the iterator after this code + Iterator iterator = lastOnlineExecutorList.iterator(); + while (iterator.hasNext()) { + Executor executor = iterator.next(); + if (executor.getExecutorName().equals(executorName)) { + theExecutor = executor; + iterator.remove(); + shardList.addAll(executor.getShardList()); + break; + } + } + } + if (theExecutor != null) { + Iterator iterator = lastOnlineTrafficExecutorList.iterator(); + while (iterator.hasNext()) { + Executor executor = iterator.next(); + if (theExecutor.equals(executor)) { + iterator.remove(); + break; + } + } + } + + // 如果该executor实际上已经在此之前下线,则摘取失败 + if (theExecutor == null) { + return false; + } + + // 移除本地模式的作业分片 + Iterator iterator = shardList.iterator(); while (iterator.hasNext()) { - Executor executor = iterator.next(); - if (executor.getExecutorName().equals(executorName)) { - wasOffline = false; + Shard shard = iterator.next(); + if (isLocalMode(shard.getJobName())) { iterator.remove(); - shardList.addAll(executor.getShardList()); - break; } } - // 如果该executor实际上已经在此之前下线,则摘取失败 - if (wasOffline) { + return true; + } + + } + + /** + * 摘取executor流量,标记该executor的noTraffic为true,并移除其所有作业分片,只摘取所有非本地作业分片,设置totalLoadLevel为0 + */ + private class ExecuteExtractTrafficShardingTask extends AbstractAsyncShardingTask { + + private String executorName; + + public ExecuteExtractTrafficShardingTask(String executorName) { + this.executorName = executorName; + } + + @Override + protected void logStartInfo() { + LOGGER.info("Execute the {} with {} extract traffic", this.getClass().getSimpleName(), executorName); + } + + @Override + protected boolean pick(List allJobs, List allEnableJobs, List shardList, + List lastOnlineExecutorList, List lastOnlineTrafficExecutorList) throws Exception { + // 摘取该executor的所有作业分片 + Executor theExecutor = null; + { + Iterator iterator = lastOnlineTrafficExecutorList.iterator(); + while (iterator.hasNext()) { + Executor executor = iterator.next(); + if (executor.getExecutorName().equals(executorName)) { + shardList.addAll(executor.getShardList()); + executor.getShardList().clear(); + executor.setNoTraffic(true); + executor.setTotalLoadLevel(0); + theExecutor = executor; + iterator.remove(); + break; + } + } + } + + if (theExecutor == null) { + LOGGER.info("The executor {} is offline or already noTraffic, unnecessary to extract traffic", executorName); return false; } // 移除本地模式的作业分片 - Iterator shardIterator = shardList.iterator(); - while (shardIterator.hasNext()) { - Shard shard = shardIterator.next(); + Iterator iterator = shardList.iterator(); + while (iterator.hasNext()) { + Shard shard = iterator.next(); if (isLocalMode(shard.getJobName())) { - shardIterator.remove(); + iterator.remove(); } } return true; } + } + /** + * 恢复executor流量,标记该executor的noTraffic为false,平衡摘取分片 + */ + private class ExecuteRecoverTrafficShardingTask extends AbstractAsyncShardingTask { + + private String executorName; + + public ExecuteRecoverTrafficShardingTask(String executorName) { + this.executorName = executorName; + } + + @Override + protected void logStartInfo() { + LOGGER.info("Execute the {} with {} recover traffic", this.getClass().getSimpleName(), executorName); + } + + @Override + protected boolean pick(List allJobs, List allEnableJobs, List shardList, + List lastOnlineExecutorList, List lastOnlineTrafficExecutorList) throws Exception { + // 设置该executor的noTraffic为false + Executor theExecutor = null; + Iterator iterator = lastOnlineExecutorList.iterator(); + while (iterator.hasNext()) { + Executor executor = iterator.next(); + if (executor.getExecutorName().equals(executorName)) { + executor.setNoTraffic(false); + lastOnlineTrafficExecutorList.add(executor); + theExecutor = executor; + break; + } + } + if (theExecutor == null) { + LOGGER.info("The executor {} is offline, unnecessary to recover traffic", executorName); + return false; + } + + // 平衡摘取每个作业能够运行的分片,可以视为jobNameList中每个作业的jobServerOnline + final List jobNameList = theExecutor.getJobNameList(); + for(String jobName : jobNameList) { + new ExecuteJobServerOnlineShardingTask(jobName, executorName) + .pickIntelligent(allEnableJobs, shardList, lastOnlineTrafficExecutorList); + } + + return true; + } } /** @@ -1006,7 +1151,7 @@ public ExecuteJobEnableShardingTask(String jobName) { @Override protected void logStartInfo() { - log.info("Execute the {} with {} enable", this.getClass().getSimpleName(), jobName); + LOGGER.info("Execute the {} with {} enable", this.getClass().getSimpleName(), jobName); } @Override @@ -1018,10 +1163,10 @@ protected List notifyEnableJobsPrior() { @Override protected boolean pick(List allJobs, List allEnableJobs, List shardList, - List lastOnlineExecutorList) throws Exception { + List lastOnlineExecutorList, List lastOnlineTrafficExecutorList) throws Exception { // 移除已经在Executor运行的该作业的所有Shard - for (int i = 0; i < lastOnlineExecutorList.size(); i++) { - Executor executor = lastOnlineExecutorList.get(i); + for (int i = 0; i < lastOnlineTrafficExecutorList.size(); i++) { + Executor executor = lastOnlineTrafficExecutorList.get(i); Iterator iterator = executor.getShardList().iterator(); while (iterator.hasNext()) { Shard shard = iterator.next(); @@ -1036,7 +1181,7 @@ protected boolean pick(List allJobs, List allEnableJobs, List allJobs, List allEnableJobs, List shardList, - List lastOnlineExecutorList) { + List lastOnlineExecutorList, List lastOnlineTrafficExecutorList) { // 摘取所有该作业的Shard - for (int i = 0; i < lastOnlineExecutorList.size(); i++) { - Executor executor = lastOnlineExecutorList.get(i); + for (int i = 0; i < lastOnlineTrafficExecutorList.size(); i++) { + Executor executor = lastOnlineTrafficExecutorList.get(i); Iterator iterator = executor.getShardList().iterator(); while (iterator.hasNext()) { Shard shard = iterator.next(); @@ -1086,7 +1231,7 @@ protected boolean pick(List allJobs, List allEnableJobs, List allEnableJobs, List shardList, - List lastOnlineExecutorList) { + List lastOnlineExecutorList, List lastOnlineTrafficExecutorList) { // 不做操作 } @@ -1105,7 +1250,7 @@ public ExecuteJobForceShardShardingTask(String jobName) { @Override protected void logStartInfo() { - log.info("Execute the {} with {} forceShard", this.getClass().getSimpleName(), jobName); + LOGGER.info("Execute the {} with {} forceShard", this.getClass().getSimpleName(), jobName); } @Override @@ -1124,16 +1269,16 @@ private void deleteForceShardNode() { curatorFramework.delete().forPath(jobConfigForceShardNodePath); } } catch (Throwable t) { - log.error("delete forceShard node error", t); + LOGGER.error("delete forceShard node error", t); } } @Override protected boolean pick(List allJobs, List allEnableJobs, List shardList, - List lastOnlineExecutorList) throws Exception { + List lastOnlineExecutorList, List lastOnlineTrafficExecutorList) throws Exception { // 移除已经在Executor运行的该作业的所有Shard - for (int i = 0; i < lastOnlineExecutorList.size(); i++) { - Executor executor = lastOnlineExecutorList.get(i); + for (int i = 0; i < lastOnlineTrafficExecutorList.size(); i++) { + Executor executor = lastOnlineTrafficExecutorList.get(i); Iterator iterator = executor.getShardList().iterator(); while (iterator.hasNext()) { Shard shard = iterator.next(); @@ -1147,7 +1292,7 @@ protected boolean pick(List allJobs, List allEnableJobs, List shardList, List executorList return total; } - // 计算平均load,然后摘取最接近平均负载的shard。 - private void pickBalance(List shardList, List allExecutors) { - int totalLoalLevel = getTotalLoadLevel(shardList, allExecutors); - int averageTotalLoal = totalLoalLevel / (allExecutors.size()); - for (int i = 0; i < allExecutors.size(); i++) { - Executor executor = allExecutors.get(i); + // 计算平均load,然后摘取最接近平均负载的shard + private void pickBalance(List shardList, List executorList) { + int totalLoadLevel = getTotalLoadLevel(shardList, executorList); + int averageTotalLoad = totalLoadLevel / (executorList.size()); + for (int i = 0; i < executorList.size(); i++) { + Executor executor = executorList.get(i); while (true) { - int pickLoadLevel = executor.getTotalLoadLevel() - averageTotalLoal; + int pickLoadLevel = executor.getTotalLoadLevel() - averageTotalLoad; // 摘取现在totalLoad > 平均值的executor里面的shard if (pickLoadLevel > 0 && !executor.getShardList().isEmpty()) { Shard pickShard = null; for (int j = 0; j < executor.getShardList().size(); j++) { Shard shard = executor.getShardList().get(j); - if (!shard.getJobName().equals(jobName)) { // 如果当前Shard不属于该作业,则不摘取,继续下一个 + // 如果当前Shard不属于该作业,则不摘取,继续下一个 + if (!shard.getJobName().equals(jobName)) { continue; } if (pickShard == null) { @@ -1347,9 +1493,8 @@ private boolean shardsAllRunningInDispreferList(List preferListConfigure return true; } - @Override - protected boolean pick(List allJobs, List allEnableJobs, List shardList, - List lastOnlineExecutorList) throws Exception { + public void pickIntelligent(List allEnableJobs, List shardList, + List lastOnlineTrafficExecutorList) throws Exception { boolean preferListIsConfigured = preferListIsConfigured(jobName); // 是否配置了preferList boolean useDispreferList = useDispreferList(jobName); // 是否useDispreferList List preferListConfigured = getPreferListConfigured(jobName); // 配置态的preferList @@ -1357,45 +1502,21 @@ protected boolean pick(List allJobs, List allEnableJobs, List()); - theExecutor.setJobNameList(new ArrayList()); - theExecutor.setTotalLoadLevel(0); - lastOnlineExecutorList.add(theExecutor); - } - if (!theExecutor.getJobNameList().contains(jobName)) { - theExecutor.getJobNameList().add(jobName); - } - if (localMode) { if (!preferListIsConfigured || preferListConfigured.contains(executorName)) { if (allEnableJobs.contains(jobName)) { - shardList.add(createLocalShard(lastOnlineExecutorList, loadLevel)); + shardList.add(createLocalShard(lastOnlineTrafficExecutorList, loadLevel)); } } } else { - boolean hasShardRunning = hasShardRunning(lastOnlineExecutorList); + boolean hasShardRunning = hasShardRunning(lastOnlineTrafficExecutorList); if (preferListIsConfigured) { if (preferListConfigured.contains(executorName)) { // 如果有分片正在运行,摘取全部运行在非优先节点上的分片,还可以平衡摘取 if (hasShardRunning) { shardList.addAll( - pickShardsRunningInDispreferList(preferListConfigured, lastOnlineExecutorList)); - pickBalance(shardList, lastOnlineExecutorList); + pickShardsRunningInDispreferList(preferListConfigured, lastOnlineTrafficExecutorList)); + pickBalance(shardList, lastOnlineTrafficExecutorList); } else { // 如果没有分片正在运行,则需要新建,无需平衡摘取 if (allEnableJobs.contains(jobName)) { @@ -1408,12 +1529,12 @@ protected boolean pick(List allJobs, List allEnableJobs, List allJobs, List allEnableJobs, List allJobs, List allEnableJobs, List allJobs, List allEnableJobs, List shardList, + List lastOnlineExecutorList, List lastOnlineTrafficExecutorList) throws Exception { + // 很小的可能性:status的新增事件先于ip的新增事件 + // 那么,如果lastOnlineExecutorList不包含executorName,则添加一个新的Executor + // 添加当前作业至jobNameList + Executor theExecutor = null; + for (int i = 0; i < lastOnlineExecutorList.size(); i++) { + Executor executor = lastOnlineExecutorList.get(i); + if (executor.getExecutorName().equals(executorName)) { + theExecutor = executor; + break; + } + } + if (theExecutor == null) { + theExecutor = new Executor(); + theExecutor.setExecutorName(executorName); + theExecutor.setIp(getExecutorIp()); + theExecutor.setNoTraffic(getExecutorNoTraffic(executorName)); + theExecutor.setShardList(new ArrayList()); + theExecutor.setJobNameList(new ArrayList()); + theExecutor.setTotalLoadLevel(0); + lastOnlineExecutorList.add(theExecutor); + if(!theExecutor.isNoTraffic()) { + lastOnlineTrafficExecutorList.add(theExecutor); + } + } + if (!theExecutor.getJobNameList().contains(jobName)) { + theExecutor.getJobNameList().add(jobName); + } + + // 如果该Executor流量被摘取,则无需摘取,返回true + if(theExecutor.isNoTraffic()) { + return true; + } + + pickIntelligent(allEnableJobs, shardList, lastOnlineTrafficExecutorList); return true; } @@ -1455,7 +1615,7 @@ private class ExecuteJobServerOfflineShardingTask extends AbstractAsyncShardingT @Override protected void logStartInfo() { - log.info("Execute the {}, jobName is {}, executorName is {}", this.getClass().getSimpleName(), jobName, + LOGGER.info("Execute the {}, jobName is {}, executorName is {}", this.getClass().getSimpleName(), jobName, executorName); } @@ -1466,9 +1626,11 @@ public ExecuteJobServerOfflineShardingTask(String jobName, String executorName) @Override protected boolean pick(List allJobs, List allEnableJobs, List shardList, - List lastOnlineExecutorList) throws Exception { + List lastOnlineExecutorList, List lastOnlineTrafficExecutorList) throws Exception { boolean localMode = isLocalMode(jobName); + // Should use lastOnlineExecutorList, because jobName should be removed from jobNameList. + // But use lastOnlineTrafficExecutorList, the executor maybe cannot be found. for (int i = 0; i < lastOnlineExecutorList.size(); i++) { Executor executor = lastOnlineExecutorList.get(i); if (executor.getExecutorName().equals(executorName)) { @@ -1494,7 +1656,6 @@ protected boolean pick(List allJobs, List allEnableJobs, List