Skip to content
This repository has been archived by the owner on May 21, 2018. It is now read-only.

Commit

Permalink
Merge pull request #120 from mesosphere/cleanup-logic-with-tasknames
Browse files Browse the repository at this point in the history
Clean up logic with task names to use persistent state
  • Loading branch information
elingg committed Apr 11, 2015
2 parents 61c5840 + 750b800 commit 03a9764
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 31 deletions.
28 changes: 17 additions & 11 deletions src/main/java/org/apache/mesos/hdfs/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,7 @@ private boolean launchNode(SchedulerDriver driver, Offer offer,
List<TaskInfo> tasks = new ArrayList<>();
for (String taskType : taskTypes) {
List<Resource> taskResources = getTaskResources(taskType);
String taskName = getNextTaskType(taskType);
if (taskName.isEmpty()) return false;
String taskName = getNextTaskName(taskType);
TaskID taskId = TaskID.newBuilder()
.setValue(String.format("task.%s.%s", taskType, taskIdName))
.build();
Expand All @@ -284,29 +283,36 @@ private boolean launchNode(SchedulerDriver driver, Offer offer,
.build();
tasks.add(task);

liveState.addStagingTask(task.getTaskId(), taskName);
persistentState.addHdfsNode(taskId, offer.getHostname(), taskType);
liveState.addStagingTask(task.getTaskId());
persistentState.addHdfsNode(taskId, offer.getHostname(), taskType, taskName);
}
driver.launchTasks(Arrays.asList(offer.getId()), tasks);
return true;
}

private String getNextTaskType(String taskType) {
private String getNextTaskName(String taskType) {

if (taskType.equals(HDFSConstants.NAME_NODE_ID)) {
Collection<String> nameNodeTaskNames = persistentState.getNameNodeTaskNames().values();
for (int i = 1; i <= HDFSConstants.TOTAL_NAME_NODES; i++) {
if (!liveState.getNameNodeNames().containsValue(HDFSConstants.NAME_NODE_ID + i)) {
if (!nameNodeTaskNames.contains(HDFSConstants.NAME_NODE_ID + i)) {
return HDFSConstants.NAME_NODE_ID + i;
}
}
return ""; // we couldn't find a node name, we must have started enough.
String errorStr = "Cluster is in inconsistent state. Trying to launch more namenodes, but they are all already running.";
log.error(errorStr);
throw new RuntimeException(errorStr);
}
if (taskType.equals(HDFSConstants.JOURNAL_NODE_ID)) {
Collection<String> journalNodeTaskNames = persistentState.getJournalNodeTaskNames().values();
for (int i = 1; i <= conf.getJournalNodeCount(); i++) {
if (!liveState.getJournalNodeNames().containsValue(HDFSConstants.JOURNAL_NODE_ID + i)) {
if (!journalNodeTaskNames.contains(HDFSConstants.JOURNAL_NODE_ID + i)) {
return HDFSConstants.JOURNAL_NODE_ID + i;
}
}
return ""; // we couldn't find a node name, we must have started enough.
String errorStr = "Cluster is in inconsistent state. Trying to launch more journalnodes, but they all are already running.";
log.error(errorStr);
throw new RuntimeException(errorStr);
}
return taskType;
}
Expand Down Expand Up @@ -416,7 +422,7 @@ private boolean tryToLaunchJournalNode(SchedulerDriver driver, Offer offer) {
log.info(deadJournalNodes);

if (deadJournalNodes.isEmpty()) {
if (liveState.getJournalNodeSize() == conf.getJournalNodeCount()) {
if (persistentState.getJournalNodes().size() == conf.getJournalNodeCount()) {
log.info(String.format("Already running %s journalnodes", conf.getJournalNodeCount()));
} else if (persistentState.journalNodeRunningOnSlave(offer.getHostname())) {
log.info(String.format("Already running journalnode on %s", offer.getHostname()));
Expand Down Expand Up @@ -452,7 +458,7 @@ private boolean tryToLaunchNameNode(SchedulerDriver driver, Offer offer) {
List<String> deadNameNodes = persistentState.getDeadNameNodes();

if (deadNameNodes.isEmpty()) {
if (liveState.getNameNodeSize() == HDFSConstants.TOTAL_NAME_NODES) {
if (persistentState.getNameNodes().size() == HDFSConstants.TOTAL_NAME_NODES) {
log.info(String.format("Already running %s namenodes", HDFSConstants.TOTAL_NAME_NODES));
} else if (persistentState.nameNodeRunningOnSlave(offer.getHostname())) {
log.info(String.format("Already running namenode on %s", offer.getHostname()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,16 @@ protected void runCommand(ExecutorDriver driver, Task task, String command) {
log.info("Finished running command, exited with status " + exitCode);
} else {
log.error("Unable to run command: " + command);
task.process.destroy();
if (task.process != null) {
task.process.destroy();
}
sendTaskFailed(driver, task);
}
} catch (InterruptedException | IOException e) {
log.error(e);
task.process.destroy();
if (task.process != null) {
task.process.destroy();
}
sendTaskFailed(driver, task);
}
}
Expand Down
15 changes: 1 addition & 14 deletions src/main/java/org/apache/mesos/hdfs/state/LiveState.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ public class LiveState {
private AcquisitionPhase currentAcquisitionPhase = AcquisitionPhase.RECONCILING_TASKS;
// TODO (nicgrayson) Might need to split this out to jns, nns, and dns if dns too big
private LinkedHashMap<String, Protos.TaskStatus> runningTasks = new LinkedHashMap<>();
private HashMap<String, String> journalNodeNames = new HashMap<>();
private HashMap<String, String> nameNodeNames = new HashMap<>();
private HashMap<Protos.TaskStatus, Boolean> nameNode1TaskMap = new HashMap<>();
private HashMap<Protos.TaskStatus, Boolean> nameNode2TaskMap = new HashMap<>();

Expand All @@ -36,17 +34,8 @@ public boolean isNameNode2Initialized() {
return !nameNode2TaskMap.isEmpty() && nameNode2TaskMap.values().iterator().next();
}

public HashMap<String, String> getJournalNodeNames() { return journalNodeNames; }

public HashMap<String, String> getNameNodeNames() { return nameNodeNames; }

public void addStagingTask(Protos.TaskID taskId, String taskName) {
public void addStagingTask(Protos.TaskID taskId) {
stagingTasks.add(taskId);
if (taskId.getValue().contains(HDFSConstants.JOURNAL_NODE_ID)) {
journalNodeNames.put(taskId.getValue(), taskName);
} else if (taskId.getValue().contains(HDFSConstants.NAME_NODE_TASKID)) {
nameNodeNames.put(taskId.getValue(), taskName);
}
}

public int getStagingTasksSize() {
Expand All @@ -70,8 +59,6 @@ public void removeRunningTask(Protos.TaskID taskId) {
nameNode2TaskMap.clear();
}
runningTasks.remove(taskId.getValue());
journalNodeNames.remove(taskId.getValue());
nameNodeNames.remove(taskId.getValue());
}

public void updateTaskForStatus(Protos.TaskStatus status) {
Expand Down
43 changes: 42 additions & 1 deletion src/main/java/org/apache/mesos/hdfs/state/PersistentState.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class PersistentState {
private static final String NAMENODES_KEY = "nameNodes";
private static final String JOURNALNODES_KEY = "journalNodes";
private static final String DATANODES_KEY = "dataNodes";
private static final String NAMENODE_TASKNAMES_KEY = "nameNodeTaskNames";
private static final String JOURNALNODE_TASKNAMES_KEY = "journalNodeTaskNames";
private ZooKeeperState zkState;
private SchedulerConf conf;

Expand Down Expand Up @@ -162,6 +164,14 @@ public HashMap<String, String> getNameNodes() {
return getHashMap(NAMENODES_KEY);
}

public HashMap<String, String> getJournalNodeTaskNames() {
return getHashMap(JOURNALNODE_TASKNAMES_KEY);
}

public HashMap<String, String> getNameNodeTaskNames() {
return getHashMap(NAMENODE_TASKNAMES_KEY);
}

public HashMap<String, String> getDataNodes() {
return getHashMap(DATANODES_KEY);
}
Expand All @@ -173,17 +183,23 @@ public Collection<String> getAllTaskIds() {
return allTasksIds.values();
}

public void addHdfsNode(Protos.TaskID taskId, String hostname, String taskType) {
public void addHdfsNode(Protos.TaskID taskId, String hostname, String taskType, String taskName) {
switch (taskType) {
case HDFSConstants.NAME_NODE_ID :
HashMap<String, String> nameNodes = getNameNodes();
nameNodes.put(hostname, taskId.getValue());
setNameNodes(nameNodes);
HashMap<String, String> nameNodeTaskNames = getNameNodeTaskNames();
nameNodeTaskNames.put(taskId.getValue(), taskName);
setNameNodeTaskNames(nameNodeTaskNames);
break;
case HDFSConstants.JOURNAL_NODE_ID :
HashMap<String, String> journalNodes = getJournalNodes();
journalNodes.put(hostname, taskId.getValue());
setJournalNodes(journalNodes);
HashMap<String, String> journalNodeTaskNames = getJournalNodeTaskNames();
journalNodeTaskNames.put(taskId.getValue(), taskName);
setJournalNodeTaskNames(journalNodeTaskNames);
break;
case HDFSConstants.DATA_NODE_ID :
HashMap<String, String> dataNodes = getDataNodes();
Expand All @@ -200,30 +216,39 @@ public void addHdfsNode(Protos.TaskID taskId, String hostname, String taskType)
// TODO (elingg) optimize this method/ Possibly index by task id instead of hostname/
// Possibly call removeTask(slaveId, taskId) to avoid iterating through all maps
public void removeTaskId(String taskId) {

HashMap<String, String> journalNodes = getJournalNodes();
if (journalNodes.values().contains(taskId)) {
for (Map.Entry<String, String> entry : journalNodes.entrySet()) {
if (entry.getValue() != null && entry.getValue().equals(taskId)) {
journalNodes.put(entry.getKey(), null);
setJournalNodes(journalNodes);
HashMap<String, String> journalNodeTaskNames = getJournalNodeTaskNames();
journalNodeTaskNames.remove(taskId);
setJournalNodeTaskNames(journalNodeTaskNames);
Date date = DateUtils.addSeconds(new Date(), conf.getDeadNodeTimeout());
deadJournalNodeTimeStamp = new Timestamp(date.getTime());
return;
}
}
}

HashMap<String, String> nameNodes = getNameNodes();
if (nameNodes.values().contains(taskId)) {
for (Map.Entry<String, String> entry : nameNodes.entrySet()) {
if (entry.getValue() != null && entry.getValue().equals(taskId)) {
nameNodes.put(entry.getKey(), null);
setNameNodes(nameNodes);
HashMap<String, String> nameNodeTaskNames = getNameNodeTaskNames();
nameNodeTaskNames.remove(taskId);
setNameNodeTaskNames(nameNodeTaskNames);
Date date = DateUtils.addSeconds(new Date(), conf.getDeadNodeTimeout());
deadNameNodeTimeStamp = new Timestamp(date.getTime());
return;
}
}
}

HashMap<String, String> dataNodes = getDataNodes();
if (dataNodes.values().contains(taskId)) {
for (Map.Entry<String, String> entry : dataNodes.entrySet()) {
Expand Down Expand Up @@ -266,6 +291,22 @@ private void setJournalNodes(HashMap<String, String> journalNodes) {
}
}

private void setNameNodeTaskNames(HashMap<String, String> nameNodeTaskNames) {
try {
set(NAMENODE_TASKNAMES_KEY, nameNodeTaskNames);
} catch (Exception e) {
log.error("Error while setting namenodes in persistent state", e);
}
}

private void setJournalNodeTaskNames(HashMap<String, String> journalNodeTaskNames) {
try {
set(JOURNALNODE_TASKNAMES_KEY, journalNodeTaskNames);
} catch (Exception e) {
log.error("Error while setting journalnodes in persistent state", e);
}
}

private void setDataNodes(HashMap<String, String> dataNodes) {
try {
set(DATANODES_KEY, dataNodes);
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/org/apache/mesos/hdfs/TestLiveState.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void getsSecondNamenodeSlaveId() {

@Test
public void addsAndRemovesStagingTasks() {
liveState.addStagingTask(createTaskInfo("journalnode").getTaskId(), "journalnode1");
liveState.addStagingTask(createTaskInfo("journalnode").getTaskId());
assertEquals(1, liveState.getStagingTasksSize());
liveState.removeStagingTask(createTaskInfo("journalnode").getTaskId());
assertEquals(0, liveState.getStagingTasksSize());
Expand Down
8 changes: 6 additions & 2 deletions src/test/java/org/apache/mesos/hdfs/TestScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,11 @@ public void startsAJournalNodeWhenGivenAnOffer() {
@Test
public void launchesOnlyNeededNumberOfJournalNodes() {
when(liveState.getCurrentAcquisitionPhase()).thenReturn(AcquisitionPhase.JOURNAL_NODES);
when(liveState.getJournalNodeSize()).thenReturn(3);
HashMap<String, String> journalNodes = new HashMap<String, String>();
journalNodes.put("host1", "journalnode1");
journalNodes.put("host2", "journalnode2");
journalNodes.put("host3", "journalnode3");
when(persistentState.getJournalNodes()).thenReturn(journalNodes);

scheduler.resourceOffers(driver, Lists.newArrayList(createTestOffer(0)));

Expand All @@ -156,7 +160,7 @@ public void launchesOnlyNeededNumberOfJournalNodes() {
@Test
public void launchesNamenodeWhenInNamenode1Phase() {
when(liveState.getCurrentAcquisitionPhase()).thenReturn(AcquisitionPhase.START_NAME_NODES);
when(liveState.getNameNodeNames()).thenReturn(new HashMap<String, String>());
when(persistentState.getNameNodeTaskNames()).thenReturn(new HashMap<String, String>());
when(persistentState.journalNodeRunningOnSlave("host0")).thenReturn(true);
when(dnsResolver.journalNodesResolvable()).thenReturn(true);

Expand Down

0 comments on commit 03a9764

Please sign in to comment.