diff --git a/saturn-core/src/main/java/com/vip/saturn/job/executor/SaturnExecutor.java b/saturn-core/src/main/java/com/vip/saturn/job/executor/SaturnExecutor.java index 92f96c38e..0b5b1bf4a 100644 --- a/saturn-core/src/main/java/com/vip/saturn/job/executor/SaturnExecutor.java +++ b/saturn-core/src/main/java/com/vip/saturn/job/executor/SaturnExecutor.java @@ -282,6 +282,12 @@ public void run() { if (stoped.compareAndSet(true, false)) { log.info(" {} is going to restart for zk reconnected", executorName); + // clear the Executor ip, make sure it can restart properly. + String ipNode = saturnExecutorService.getIpNode(); + if (regCenter != null && ipNode != null && regCenter.isConnected()) { + log.info(" {} is going to delete its ip node {}", executorName, ipNode); + regCenter.remove(ipNode); + } restart(); } @@ -443,6 +449,10 @@ public void shutdown() { resetCountService.shutdownRestCountTimer(); // shutdown timeout-watchdog-threadpool TimeoutSchedulerExecutor.shutdownScheduler(executorName); + // close zk-dump socket. + if(regCenter != null) { + regCenter.closeMonitorService(); + } } } diff --git a/saturn-core/src/main/java/com/vip/saturn/job/internal/sharding/ShardingService.java b/saturn-core/src/main/java/com/vip/saturn/job/internal/sharding/ShardingService.java index fa2dba6dc..91f106da2 100644 --- a/saturn-core/src/main/java/com/vip/saturn/job/internal/sharding/ShardingService.java +++ b/saturn-core/src/main/java/com/vip/saturn/job/internal/sharding/ShardingService.java @@ -159,7 +159,7 @@ private boolean blockUntilShardingComplatedIfNotLeader() throws JobShuttingDownE private void waitingOtherJobCompleted() { while (!isShutdown && executionService.hasRunningItems()) { - log.debug("Elastic job: sleep short time until other job completed."); + log.info("Elastic job: sleep short time until other job completed."); BlockUtils.waitingShortTime(); } } diff --git a/saturn-core/src/main/java/com/vip/saturn/job/reg/zookeeper/ZookeeperRegistryCenter.java b/saturn-core/src/main/java/com/vip/saturn/job/reg/zookeeper/ZookeeperRegistryCenter.java index a6c025ce8..a85ecfd55 100644 --- a/saturn-core/src/main/java/com/vip/saturn/job/reg/zookeeper/ZookeeperRegistryCenter.java +++ b/saturn-core/src/main/java/com/vip/saturn/job/reg/zookeeper/ZookeeperRegistryCenter.java @@ -78,11 +78,18 @@ public class ZookeeperRegistryCenter implements CoordinatorRegistryCenter { private int sessionTimeout = SESSION_TIMEOUT; private String executorName; + + private MonitorService monitorService; public ZookeeperRegistryCenter(final ZookeeperConfiguration zkConfig) { this.zkConfig = zkConfig; } + public void closeMonitorService() { + if (monitorService != null) { + monitorService.close(); + } + } public ZookeeperConfiguration getZkConfig() { return zkConfig; @@ -147,7 +154,7 @@ public List getAclForPath(final String path) { // start monitor. if (zkConfig.getMonitorPort() > 0) { - MonitorService monitorService = new MonitorService(this, zkConfig.getMonitorPort()); + monitorService = new MonitorService(this, zkConfig.getMonitorPort()); monitorService.listen(); log.info("msg=zk monitor port starts at {}. usage: telnet {jobServerIP} {} and execute dump {jobName}", zkConfig.getMonitorPort(), zkConfig.getMonitorPort()); } diff --git a/saturn-integrate/src/main/java/com/vip/saturn/job/integrate/exception/ReportAlarmException.java b/saturn-integrate/src/main/java/com/vip/saturn/job/integrate/exception/ReportAlarmException.java index 888c89ab8..d6caa7708 100644 --- a/saturn-integrate/src/main/java/com/vip/saturn/job/integrate/exception/ReportAlarmException.java +++ b/saturn-integrate/src/main/java/com/vip/saturn/job/integrate/exception/ReportAlarmException.java @@ -5,7 +5,10 @@ */ public class ReportAlarmException extends Exception { - public ReportAlarmException() { + private static final long serialVersionUID = -6687479332667465829L; + + public ReportAlarmException() { + super(); } public ReportAlarmException(String message) { diff --git a/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/listener/JobServersTriggerShardingListener.java b/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/listener/JobServersTriggerShardingListener.java index 215ccf64d..f130302be 100644 --- a/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/listener/JobServersTriggerShardingListener.java +++ b/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/listener/JobServersTriggerShardingListener.java @@ -3,16 +3,12 @@ import com.vip.saturn.job.sharding.node.SaturnExecutorsNode; import com.vip.saturn.job.sharding.service.NamespaceShardingService; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * @author hebelala */ public class JobServersTriggerShardingListener extends AbstractTreeCacheListener { - private static final Logger logger = LoggerFactory.getLogger(JobServersTriggerShardingListener.class); - private String jobName; private NamespaceShardingService namespaceShardingService; @@ -32,6 +28,7 @@ public void childEvent(TreeCacheEvent.Type type, String path, String nodeData) t case NODE_REMOVED: namespaceShardingService.asyncShardingWhenJobServerOffline(jobName, executorName); break; + default: } } } diff --git a/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/service/ExecutorCleanService.java b/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/service/ExecutorCleanService.java index be452af79..493954e14 100644 --- a/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/service/ExecutorCleanService.java +++ b/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/service/ExecutorCleanService.java @@ -52,7 +52,7 @@ public void clean(String executorName) { } } } - } catch (NoNodeException ex) { + } catch (NoNodeException ex) { // NOSONAR ex.printStackTrace(); } catch (Exception e) { log.error("Clean the executor " + executorName + " error", e); @@ -69,7 +69,7 @@ private List getJobList() { jobList.addAll(tmp); } } - } catch (NoNodeException ex) { + } catch (NoNodeException ex) { // NOSONAR ex.printStackTrace(); } catch (Exception e) { log.error("Clean the executor, getJobList error", e); @@ -99,7 +99,7 @@ private void deleteExecutor(String executorName) { } curatorFramework.delete().deletingChildrenIfNeeded().forPath(executorNodePath); } - } catch (NoNodeException ex) { + } catch (NoNodeException ex) { // NOSONAR ex.printStackTrace(); } catch (Exception e) { log.error("Clean the executor, deleteExecutor(" + executorName + ") error", e); @@ -128,7 +128,7 @@ private void deleteJobServerExecutor(String jobName, String executorName) { } curatorFramework.delete().deletingChildrenIfNeeded().forPath(jobServersExecutorNodePath); } - } catch (NoNodeException ex) { + } catch (NoNodeException ex) { // NOSONAR ex.printStackTrace(); } catch (Exception e) { log.error("Clean the executor, deleteJobServerExecutor(" + jobName + ", " + executorName + ") error", e); @@ -158,7 +158,7 @@ private void deleteJobConfigPreferListContentAboutXxx(String jobName, String exe curatorFramework.setData().forPath(jobConfigPreferListNodePath, sb.toString().getBytes("UTF-8")); } } - } catch (NoNodeException ex) { + } catch (NoNodeException ex) { // NOSONAR ex.printStackTrace(); } catch (Exception e) { log.error("Clean the executor, deleteJobConfigPreferListContentAboutXxx(" + jobName + ", " + executorName + ") error", e); diff --git a/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/service/NamespaceShardingService.java b/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/service/NamespaceShardingService.java index fb4a53e4d..ef8bfc256 100644 --- a/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/service/NamespaceShardingService.java +++ b/saturn-job-sharding/src/main/java/com/vip/saturn/job/sharding/service/NamespaceShardingService.java @@ -264,7 +264,7 @@ private Map>> getEnabledAndShardsChangedJobSha List shards = next.getValue(); List newShard = lastShardingItems.get(executorName); if ((shards == null || shards.isEmpty()) && (newShard != null && !newShard.isEmpty()) - || (shards != null && !shards.isEmpty()) && (newShard == null || newShard.isEmpty())) { + || (shards != null && !shards.isEmpty()) && (newShard == null || newShard.isEmpty())) { // NOSONAR isChanged = true; break; } @@ -286,7 +286,7 @@ private Map>> getEnabledAndShardsChangedJobSha List shards = next.getValue(); List oldShard = oldShardingItems.get(executorName); if ((shards == null || shards.isEmpty()) && (oldShard != null && !oldShard.isEmpty()) - || (shards != null && !shards.isEmpty()) && (oldShard == null || oldShard.isEmpty())) { + || (shards != null && !shards.isEmpty()) && (oldShard == null || oldShard.isEmpty())) { // NOSONAR isChanged = true; break; } @@ -1107,7 +1107,6 @@ private String getExecutorIp() { } private Shard createLocalShard(List lastOnlineExecutorList, int loadLevel) { - Shard shard = null; List itemList = new ArrayList<>(); for (int i = 0; i < lastOnlineExecutorList.size(); i++) { List shardList = lastOnlineExecutorList.get(i).getShardList(); @@ -1137,7 +1136,7 @@ public int compare(Integer o1, Integer o2) { } } } - shard = new Shard(); + Shard shard = new Shard(); shard.setJobName(jobName); shard.setItem(item); shard.setLoadLevel(loadLevel); @@ -1516,7 +1515,7 @@ public void leaderElection() throws Exception { // 清理、重置变量 executorService.shutdownNow(); while (!executorService.isTerminated()) { // 等待全部任务已经退出 - Thread.sleep(200); + Thread.sleep(200); //NOSONARA } needAllSharding.set(false); shardingCount.set(0);