From 19ee9a96471920425054c2821787eb0b7bf1d389 Mon Sep 17 00:00:00 2001 From: hebelala Date: Mon, 7 Jan 2019 18:38:20 +0800 Subject: [PATCH] =?UTF-8?q?#567=20=E4=BF=AE=E5=A4=8D=EF=BC=8C=E4=BD=9C?= =?UTF-8?q?=E4=B8=9A=E9=85=8D=E7=BD=AE=E5=9C=A8DB=E5=92=8CZK=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BA=93=E7=9A=84=E7=9B=B8=E5=85=B3=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../console/service/impl/JobServiceImpl.java | 52 ++++++++++++++++--- 1 file changed, 44 insertions(+), 8 deletions(-) diff --git a/saturn-console-api/src/main/java/com/vip/saturn/job/console/service/impl/JobServiceImpl.java b/saturn-console-api/src/main/java/com/vip/saturn/job/console/service/impl/JobServiceImpl.java index a132b0806..2681d8667 100644 --- a/saturn-console-api/src/main/java/com/vip/saturn/job/console/service/impl/JobServiceImpl.java +++ b/saturn-console-api/src/main/java/com/vip/saturn/job/console/service/impl/JobServiceImpl.java @@ -694,11 +694,18 @@ private void validateAndUpdateStream(JobConfig jobConfig, Set stream, Li continue; } if (isDownStream) { - otherJob.setUpStream(removeFromStream(jobName, otherJob.getUpStream())); + String upStream = removeFromStreamIfNecessary(jobName, otherJob.getUpStream()); + if (upStream != null) { + otherJob.setUpStream(upStream); + streamChangedJobs.add(otherJob); + } } else { - otherJob.setDownStream(removeFromStream(jobName, otherJob.getDownStream())); + String downStream = removeFromStreamIfNecessary(jobName, otherJob.getDownStream()); + if (downStream != null) { + otherJob.setDownStream(downStream); + streamChangedJobs.add(otherJob); + } } - streamChangedJobs.add(otherJob); } } @@ -710,12 +717,14 @@ private String appendToStream(String jobName, String stream) { return formatStream(streamSet); } - private String removeFromStream(String jobName, String stream) { + private String removeFromStreamIfNecessary(String jobName, String stream) { Set streamSet = parseStreamToList(stream); if (StringUtils.isNotBlank(jobName)) { - streamSet.remove(jobName); + if (streamSet.remove(jobName)) { + return formatStream(streamSet); + } } - return formatStream(streamSet); + return null; } private String formatStream(Set streamSet) { @@ -1121,9 +1130,16 @@ private void createJobConfigToZk(JobConfig jobConfig, Set streamChang // 添加作业根节点和config结点 curatorFrameworkOp.create(JobNodePath.getConfigNodePath(jobName), ""); CuratorFrameworkOp.CuratorTransactionOp curatorTransactionOp = curatorFrameworkOp.inTransaction(); + // 数据库有可能有重复作业的数据,去重,zk无需更新两次 + Collection streamChangedJobsNew = removeDuplicateByJobName(streamChangedJobs); // 更新关联作业的上下游 - for (JobConfig streamChangedJob : streamChangedJobs) { + for (JobConfig streamChangedJob : streamChangedJobsNew) { String changedJobName = streamChangedJob.getJobName(); + if (!curatorFrameworkOp.checkExists(JobNodePath.getConfigNodePath(changedJobName))) { + // 数据库存在该作业,但是zk不存在该作业,为垃圾数据 + log.warn("the job({}) config node is not existing in zk", changedJobName); + continue; + } curatorTransactionOp .replaceIfChanged(JobNodePath.getConfigNodePath(changedJobName, CONFIG_ITEM_UPSTREAM), streamChangedJob.getUpStream()) @@ -2216,9 +2232,16 @@ private void updateJobConfigToZk(JobConfig jobConfig, Set streamChang } } CuratorFrameworkOp.CuratorTransactionOp curatorTransactionOp = curatorFrameworkOp.inTransaction(); + // 数据库有可能有重复作业的数据,去重,zk无需更新两次 + Collection streamChangedJobsNew = removeDuplicateByJobName(streamChangedJobs); // 更新关联作业的上下游 - for (JobConfig streamChangedJob : streamChangedJobs) { + for (JobConfig streamChangedJob : streamChangedJobsNew) { String changedJobName = streamChangedJob.getJobName(); + if (!curatorFrameworkOp.checkExists(JobNodePath.getConfigNodePath(changedJobName))) { + // 数据库存在该作业,但是zk不存在该作业,为垃圾数据 + log.warn("the job({}) config node is not existing in ZK", changedJobName); + continue; + } curatorTransactionOp .replaceIfChanged(JobNodePath.getConfigNodePath(changedJobName, CONFIG_ITEM_UPSTREAM), streamChangedJob.getUpStream()) @@ -2297,6 +2320,19 @@ private void updateJobConfigToZk(JobConfig jobConfig, Set streamChang } } + private Collection removeDuplicateByJobName(Set streamChangedJobs) { + Map streamChangedJobsMap = new HashMap<>(); + for (JobConfig streamChangedJob : streamChangedJobs) { + String jobName = streamChangedJob.getJobName(); + if (streamChangedJobsMap.containsKey(jobName)) { + log.warn("the DB have duplicated jobName({})", jobName); + } else { + streamChangedJobsMap.put(jobName, streamChangedJob); + } + } + return streamChangedJobsMap.values(); + } + @Override public List getAllJobNamesFromZK(String namespace) throws SaturnJobConsoleException { CuratorRepository.CuratorFrameworkOp curatorFrameworkOp = registryCenterService