Skip to content

Commit

Permalink
Merge branch 'develop' of https://github.com/vipshop/Saturn into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
yangjuanying committed Feb 24, 2017
2 parents 849891a + 0f3efe9 commit 2236e93
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,12 @@ public void run() {

if (stoped.compareAndSet(true, false)) {
log.info(" {} is going to restart for zk reconnected", executorName);
// clear the Executor ip, make sure it can restart properly.
String ipNode = saturnExecutorService.getIpNode();
if (regCenter != null && ipNode != null && regCenter.isConnected()) {
log.info(" {} is going to delete its ip node {}", executorName, ipNode);
regCenter.remove(ipNode);
}
restart();
}

Expand Down Expand Up @@ -443,6 +449,10 @@ public void shutdown() {
resetCountService.shutdownRestCountTimer();
// shutdown timeout-watchdog-threadpool
TimeoutSchedulerExecutor.shutdownScheduler(executorName);
// close zk-dump socket.
if(regCenter != null) {
regCenter.closeMonitorService();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ private boolean blockUntilShardingComplatedIfNotLeader() throws JobShuttingDownE

private void waitingOtherJobCompleted() {
while (!isShutdown && executionService.hasRunningItems()) {
log.debug("Elastic job: sleep short time until other job completed.");
log.info("Elastic job: sleep short time until other job completed.");
BlockUtils.waitingShortTime();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,18 @@ public class ZookeeperRegistryCenter implements CoordinatorRegistryCenter {
private int sessionTimeout = SESSION_TIMEOUT;

private String executorName;

private MonitorService monitorService;

public ZookeeperRegistryCenter(final ZookeeperConfiguration zkConfig) {
this.zkConfig = zkConfig;
}

public void closeMonitorService() {
if (monitorService != null) {
monitorService.close();
}
}

public ZookeeperConfiguration getZkConfig() {
return zkConfig;
Expand Down Expand Up @@ -147,7 +154,7 @@ public List<ACL> getAclForPath(final String path) {

// start monitor.
if (zkConfig.getMonitorPort() > 0) {
MonitorService monitorService = new MonitorService(this, zkConfig.getMonitorPort());
monitorService = new MonitorService(this, zkConfig.getMonitorPort());
monitorService.listen();
log.info("msg=zk monitor port starts at {}. usage: telnet {jobServerIP} {} and execute dump {jobName}", zkConfig.getMonitorPort(), zkConfig.getMonitorPort());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
*/
public class ReportAlarmException extends Exception {

public ReportAlarmException() {
private static final long serialVersionUID = -6687479332667465829L;

public ReportAlarmException() {
super();
}

public ReportAlarmException(String message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,12 @@
import com.vip.saturn.job.sharding.node.SaturnExecutorsNode;
import com.vip.saturn.job.sharding.service.NamespaceShardingService;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author hebelala
*/
public class JobServersTriggerShardingListener extends AbstractTreeCacheListener {

private static final Logger logger = LoggerFactory.getLogger(JobServersTriggerShardingListener.class);

private String jobName;
private NamespaceShardingService namespaceShardingService;

Expand All @@ -32,6 +28,7 @@ public void childEvent(TreeCacheEvent.Type type, String path, String nodeData) t
case NODE_REMOVED:
namespaceShardingService.asyncShardingWhenJobServerOffline(jobName, executorName);
break;
default:
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void clean(String executorName) {
}
}
}
} catch (NoNodeException ex) {
} catch (NoNodeException ex) { // NOSONAR
ex.printStackTrace();
} catch (Exception e) {
log.error("Clean the executor " + executorName + " error", e);
Expand All @@ -69,7 +69,7 @@ private List<String> getJobList() {
jobList.addAll(tmp);
}
}
} catch (NoNodeException ex) {
} catch (NoNodeException ex) { // NOSONAR
ex.printStackTrace();
} catch (Exception e) {
log.error("Clean the executor, getJobList error", e);
Expand Down Expand Up @@ -99,7 +99,7 @@ private void deleteExecutor(String executorName) {
}
curatorFramework.delete().deletingChildrenIfNeeded().forPath(executorNodePath);
}
} catch (NoNodeException ex) {
} catch (NoNodeException ex) { // NOSONAR
ex.printStackTrace();
} catch (Exception e) {
log.error("Clean the executor, deleteExecutor(" + executorName + ") error", e);
Expand Down Expand Up @@ -128,7 +128,7 @@ private void deleteJobServerExecutor(String jobName, String executorName) {
}
curatorFramework.delete().deletingChildrenIfNeeded().forPath(jobServersExecutorNodePath);
}
} catch (NoNodeException ex) {
} catch (NoNodeException ex) { // NOSONAR
ex.printStackTrace();
} catch (Exception e) {
log.error("Clean the executor, deleteJobServerExecutor(" + jobName + ", " + executorName + ") error", e);
Expand Down Expand Up @@ -158,7 +158,7 @@ private void deleteJobConfigPreferListContentAboutXxx(String jobName, String exe
curatorFramework.setData().forPath(jobConfigPreferListNodePath, sb.toString().getBytes("UTF-8"));
}
}
} catch (NoNodeException ex) {
} catch (NoNodeException ex) { // NOSONAR
ex.printStackTrace();
} catch (Exception e) {
log.error("Clean the executor, deleteJobConfigPreferListContentAboutXxx(" + jobName + ", " + executorName + ") error", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ private Map<String, Map<String, List<Integer>>> getEnabledAndShardsChangedJobSha
List<Integer> shards = next.getValue();
List<Integer> newShard = lastShardingItems.get(executorName);
if ((shards == null || shards.isEmpty()) && (newShard != null && !newShard.isEmpty())
|| (shards != null && !shards.isEmpty()) && (newShard == null || newShard.isEmpty())) {
|| (shards != null && !shards.isEmpty()) && (newShard == null || newShard.isEmpty())) { // NOSONAR
isChanged = true;
break;
}
Expand All @@ -286,7 +286,7 @@ private Map<String, Map<String, List<Integer>>> getEnabledAndShardsChangedJobSha
List<Integer> shards = next.getValue();
List<Integer> oldShard = oldShardingItems.get(executorName);
if ((shards == null || shards.isEmpty()) && (oldShard != null && !oldShard.isEmpty())
|| (shards != null && !shards.isEmpty()) && (oldShard == null || oldShard.isEmpty())) {
|| (shards != null && !shards.isEmpty()) && (oldShard == null || oldShard.isEmpty())) { // NOSONAR
isChanged = true;
break;
}
Expand Down Expand Up @@ -1107,7 +1107,6 @@ private String getExecutorIp() {
}

private Shard createLocalShard(List<Executor> lastOnlineExecutorList, int loadLevel) {
Shard shard = null;
List<Integer> itemList = new ArrayList<>();
for (int i = 0; i < lastOnlineExecutorList.size(); i++) {
List<Shard> shardList = lastOnlineExecutorList.get(i).getShardList();
Expand Down Expand Up @@ -1137,7 +1136,7 @@ public int compare(Integer o1, Integer o2) {
}
}
}
shard = new Shard();
Shard shard = new Shard();
shard.setJobName(jobName);
shard.setItem(item);
shard.setLoadLevel(loadLevel);
Expand Down Expand Up @@ -1516,7 +1515,7 @@ public void leaderElection() throws Exception {
// 清理、重置变量
executorService.shutdownNow();
while (!executorService.isTerminated()) { // 等待全部任务已经退出
Thread.sleep(200);
Thread.sleep(200); //NOSONARA
}
needAllSharding.set(false);
shardingCount.set(0);
Expand Down

0 comments on commit 2236e93

Please sign in to comment.