Skip to content

Commit

Permalink
Example++
Browse files Browse the repository at this point in the history
  • Loading branch information
henricasanova committed Nov 7, 2024
1 parent e36be94 commit 2b6d033
Show file tree
Hide file tree
Showing 15 changed files with 2,693 additions and 30 deletions.
3 changes: 2 additions & 1 deletion conf/cmake/Examples.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/workflow_api/basic-examples/cl
add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/workflow_api/basic-examples/virtualized-cluster-bag-of-tasks EXCLUDE_FROM_ALL)
add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/workflow_api/basic-examples/batch-bag-of-tasks EXCLUDE_FROM_ALL)
add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/workflow_api/basic-examples/batch-pilot-job EXCLUDE_FROM_ALL)
add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/workflow_api/real-workflow-example EXCLUDE_FROM_ALL)
add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/workflow_api/real-workflow-examples/vms-and-pilot-jobs EXCLUDE_FROM_ALL)
add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/workflow_api/real-workflow-examples/single-cluster-programmatic-platform EXCLUDE_FROM_ALL)
add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/workflow_api/condor-grid-example EXCLUDE_FROM_ALL)
#add_subdirectory(${CMAKE_HOME_DIRECTORY}/examples/workflow_api/basic-examples/io-pagecache EXCLUDE_FROM_ALL)

Expand Down
9 changes: 4 additions & 5 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,9 @@ to analyze action failures.

#### Workflow-API Examples with real workflows and more sophisticated WMS implementations

- `workflow_api/real-workflow-example`: Two simulators, one in which the workflow is executed
using a batch compute service, and another in which the workflow is executed
using a cloud compute service. These simulators take as input workflow description
files from real-world workflow applications. They use the scheduler abstraction
provided by WRENCH to implement complex Workflow Management System.
- `workflow_api/real-workflow-examples`: Two simulators, one in which the workflow is executed
using a batch compute service (with pilot jobs) and a cloud compute services (with VMs),
and another in which the workflow is executed on a single cluster. These simulators take as input workflow description
files from real-world workflow applications, as provided by the WfCommons project.

---
15 changes: 11 additions & 4 deletions examples/run_all_examples.sh.in
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,18 @@ echo "${bold}RUNNING: ${INSTALL_DIR}/workflow_api/basic-examples/batch-pilot-job
${INSTALL_DIR}/workflow_api/basic-examples/batch-pilot-job/wrench-example-batch-pilot-job ${INSTALL_DIR}/workflow_api/basic-examples/batch-pilot-job/four_hosts_scratch.xml
fi

if [[ "${TO_EXCLUDE}" == *"wrench-example-real-workflow"* ]]; then
echo "${bold}SKIPPING: wrench-example-real-workflow${normal}"
if [[ "${TO_EXCLUDE}" == *"wrench-example-real-workflow-single-cluster"* ]]; then
echo "${bold}SKIPPING: wrench-example-real-workflow-single-cluster${normal}"
else
echo "${bold}RUNNING: ${INSTALL_DIR}/workflow_api/real-workflow-example/wrench-example-real-workflow${normal}"
${INSTALL_DIR}/workflow_api/real-workflow-example/wrench-example-real-workflow ${INSTALL_DIR}/workflow_api/real-workflow-example/cloud_batch_platform.xml ${INSTALL_DIR}/workflow_api/real-workflow-example/1000genome-chameleon-2ch-100k-001.json
echo "${bold}RUNNING: ${INSTALL_DIR}/workflow_api/real-workflow-examples/wrench-example-real-workflow-single-cluster${normal}"
${INSTALL_DIR}/workflow_api/real-workflow-examples/single-cluster-programmatic-platform/wrench-example-real-workflow-single-cluster 10 ${INSTALL_DIR}/workflow_api/real-workflow-examples/single-cluster-programmatic-platform/1000genome-chameleon-2ch-100k-001.json
fi

if [[ "${TO_EXCLUDE}" == *"wrench-example-real-workflow-vms-and-pilots"* ]]; then
echo "${bold}SKIPPING: wrench-example-real-workflow-vms-and-pilots${normal}"
else
echo "${bold}RUNNING: ${INSTALL_DIR}/workflow_api/real-workflow-examples/wrench-example-real-workflow-vms-and-pilots${normal}"
${INSTALL_DIR}/workflow_api/real-workflow-examples/vms-and-pilot-jobs/wrench-example-real-workflow-vms-and-pilots ${INSTALL_DIR}/workflow_api/real-workflow-examples/vms-and-pilot-jobs/cloud_batch_platform.xml ${INSTALL_DIR}/workflow_api/real-workflow-examples/vms-and-pilot-jobs/1000genome-chameleon-2ch-100k-001.json
fi

if [[ "${TO_EXCLUDE}" == *"wrench-example-condor-grid-universe"* ]]; then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,16 @@ namespace sg4 = simgrid::s4u;
class PlatformCreator {

public:
PlatformCreator(double link_bw) : link_bw(link_bw) {}
explicit PlatformCreator(double link_bw) : link_bw(link_bw) {}

void operator()() const {
create_platform(this->link_bw);
create_platform();
}

private:
double link_bw;

void create_platform(double link_bw) const {
void create_platform() const {
// Create the top-level zone
auto zone = sg4::create_full_zone("AS0");
// Create the WMSHost host with its disk
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@

set(CMAKE_CXX_STANDARD 17)

# Add source to this project's executable.
add_executable(wrench-example-real-workflow-single-cluster
EXCLUDE_FROM_ALL
SimpleWMS.h
SimpleWMS.cpp
SimpleWorkflowSimulator.cpp)

add_custom_target(
wrench-example-real-workflow-single-cluster-files
COMMAND /bin/sh -c "if [ '${CMAKE_CURRENT_SOURCE_DIR}' != '${CMAKE_CURRENT_BINARY_DIR}' ]; then /bin/cp -f ${CMAKE_CURRENT_SOURCE_DIR}/1000genome-chameleon-2ch-100k-001.json ${CMAKE_CURRENT_BINARY_DIR}/1000genome-chameleon-2ch-100k-001.json ; fi ;"
VERBATIM
)

add_dependencies(examples wrench-example-real-workflow-single-cluster)
add_dependencies(wrench-example-real-workflow-single-cluster wrench-example-real-workflow-single-cluster-files)
add_dependencies(wrench-example-real-workflow-single-cluster wrenchwfcommonsworkflowparser)

if (ENABLE_BATSCHED)
target_link_libraries(wrench-example-real-workflow-single-cluster
wrench
wrenchwfcommonsworkflowparser
${SimGrid_LIBRARY}
${Boost_LIBRARIES}
${ZMQ_LIBRARY}
)
else()
target_link_libraries(wrench-example-real-workflow-single-cluster
wrench
wrenchwfcommonsworkflowparser
${SimGrid_LIBRARY}
${Boost_LIBRARIES}
)
endif()
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/**
* Copyright (c) 2017-2021. The WRENCH Team.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*/

#include <iostream>

#include "SimpleWMS.h"

WRENCH_LOG_CATEGORY(simple_wms, "Log category for Simple WMS");

namespace wrench {

/**
* @brief Constructor that creates a Simple WMS with
* a scheduler implementation, and a list of compute services
*
* @param workflow: a workflow to execute
* @param bare_metal_compute_services: bare-metal compute services available to run jobs
* @param storage_service: a storage service available to store files
* @param hostname: the name of the host on which to start the WMS
*/
SimpleWMS::SimpleWMS(const std::shared_ptr<Workflow> &workflow,
const std::set<std::shared_ptr<wrench::BareMetalComputeService>> &bare_metal_compute_services,
const std::shared_ptr<StorageService> &storage_service,
const std::string &hostname) : ExecutionController(hostname, "simple"),
workflow(workflow),
bare_metal_compute_services(bare_metal_compute_services),
storage_service(storage_service) {}

/**
* @brief main method of the SimpleWMS daemon
*
* @return 0 on completion
*
*/
int SimpleWMS::main() {
TerminalOutput::setThisProcessLoggingColor(TerminalOutput::COLOR_GREEN);

WRENCH_INFO("Starting on host %s", S4U_Simulation::getHostName().c_str());
WRENCH_INFO("About to execute a workflow with %lu tasks", this->workflow->getNumberOfTasks());

// Create a job manager
this->job_manager = this->createJobManager();

// Populate data structure to keep track of idle cores at each compute service
for (auto const &cs : this->bare_metal_compute_services) {
this->core_utilization_map[cs] = cs->getTotalNumCores(false);
}


while (true) {
scheduleReadyTasks(workflow->getReadyTasks());

// Wait for a workflow execution event, and process it
try {
this->waitForAndProcessNextEvent();
} catch (ExecutionException &e) {
WRENCH_INFO("Error while getting next execution event (%s)... ignoring and trying again",
(e.getCause()->toString().c_str()));
continue;
}
if (this->workflow->isDone()) {
break;
}
}

S4U_Simulation::sleep(10);

WRENCH_INFO("--------------------------------------------------------");
if (this->workflow->isDone()) {
WRENCH_INFO("Workflow execution is complete!");
} else {
WRENCH_INFO("Workflow execution is incomplete!");
}

WRENCH_INFO("WMS terminating");

return 0;
}

/**
* @brief Process a StandardJobFailedEvent
*
* @param event: a workflow execution event
*/
void SimpleWMS::processEventStandardJobFailure(const std::shared_ptr<StandardJobFailedEvent> &event) {
auto job = event->standard_job;
TerminalOutput::setThisProcessLoggingColor(TerminalOutput::COLOR_RED);
WRENCH_INFO("Task %s has failed", (*job->getTasks().begin())->getID().c_str());
WRENCH_INFO("failure cause: %s", event->failure_cause->toString().c_str());
TerminalOutput::setThisProcessLoggingColor(TerminalOutput::COLOR_GREEN);
}

/**
* @brief Process a StandardJobCompletedEvent
*
* @param event: a workflow execution event
*/
void SimpleWMS::processEventStandardJobCompletion(const std::shared_ptr<StandardJobCompletedEvent> &event) {
auto job = event->standard_job;
TerminalOutput::setThisProcessLoggingColor(TerminalOutput::COLOR_BLUE);
WRENCH_INFO("Task %s has COMPLETED (on service %s)",
(*job->getTasks().begin())->getID().c_str(),
job->getParentComputeService()->getName().c_str());
TerminalOutput::setThisProcessLoggingColor(TerminalOutput::COLOR_GREEN);
this->core_utilization_map[job->getParentComputeService()]++;
}


/**
* @brief Helper method to schedule a task one available compute services. The naive scheduling
* strategy is to pick the task with the most computational work, and run it on
* the compute services with the fastest cores. In this example, all compute services
* are homogeneous, so we just pick the first available.
*
* @param ready_task: the ready tasks to schedule
* @return
*/
void SimpleWMS::scheduleReadyTasks(std::vector<std::shared_ptr<WorkflowTask>> ready_tasks) {

if (ready_tasks.empty()) {
return;
}

WRENCH_INFO("Trying to schedule %zu ready tasks", ready_tasks.size());
// Sort the tasks
std::sort(ready_tasks.begin(), ready_tasks.end(),
[](const std::shared_ptr<wrench::WorkflowTask> &x,
const std::shared_ptr<wrench::WorkflowTask> &y) {
if (x->getFlops() < y->getFlops()) {
return true;
} else if (x->getFlops() > y->getFlops()) {
return false;
} else {
return (x.get() > y.get());
}
}
);

unsigned long num_tasks_scheduled = 0;
for (auto const &task: ready_tasks) {
bool scheduled = false;
for (auto const &cs: this->bare_metal_compute_services) {
if (this->core_utilization_map[cs] > 0) {
// Specify that ALL files are read/written from the one storage service
std::map<std::shared_ptr<DataFile>, std::shared_ptr<FileLocation>> file_locations;
for (auto const &f: task->getInputFiles()) {
file_locations[f] = wrench::FileLocation::LOCATION(this->storage_service, f);
}
for (auto const &f: task->getOutputFiles()) {
file_locations[f] = wrench::FileLocation::LOCATION(this->storage_service, f);
}
try {
auto job = job_manager->createStandardJob(task, file_locations);
WRENCH_INFO(
"Submitting task %s to compute service %s", task->getID().c_str(),
cs->getName().c_str());
job_manager->submitJob(job, cs);
this->core_utilization_map[cs]--;
num_tasks_scheduled++;
scheduled = true;
} catch (ExecutionException &e) {
WRENCH_INFO("WARNING: Was not able to submit task %s, likely due to the pilot job having expired "
"(I should get a notification of its expiration soon)",
task->getID().c_str());
}
break;
}
}
if (not scheduled) break;
}
WRENCH_INFO("Was able to schedule %lu out of %zu ready tasks", num_tasks_scheduled, ready_tasks.size());
}

}// namespace wrench
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Copyright (c) 2017-2018. The WRENCH Team.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*/

#ifndef WRENCH_EXAMPLE_SIMPLEWMS_H
#define WRENCH_EXAMPLE_SIMPLEWMS_H

#include "wrench-dev.h"

namespace wrench {

/**
* @brief A simple WMS implementation
*/
class SimpleWMS : public ExecutionController {

public:
SimpleWMS(const std::shared_ptr<Workflow> &workflow,
const std::set<std::shared_ptr<wrench::BareMetalComputeService>> &bare_metal_compute_services,
const std::shared_ptr<StorageService> &storage_service,
const std::string &hostname);

protected:
void processEventStandardJobCompletion(const std::shared_ptr<StandardJobCompletedEvent> &event) override;
void processEventStandardJobFailure(const std::shared_ptr<StandardJobFailedEvent> &event) override;

private:
int main() override;

void scheduleReadyTasks(std::vector<std::shared_ptr<WorkflowTask>> ready_tasks);

std::shared_ptr<Workflow> workflow;
std::set<std::shared_ptr<wrench::BareMetalComputeService>> bare_metal_compute_services;
std::shared_ptr<StorageService> storage_service;
std::shared_ptr<JobManager> job_manager;

std::map<std::shared_ptr<ComputeService>, unsigned long> core_utilization_map;
};
}// namespace wrench

#endif//WRENCH_EXAMPLE_SIMPLEWMS_H
Loading

0 comments on commit 2b6d033

Please sign in to comment.