Skip to content

Commit

Permalink
bugs--
Browse files Browse the repository at this point in the history
  • Loading branch information
henricasanova committed Oct 17, 2024
1 parent e37c081 commit 8c8ec21
Show file tree
Hide file tree
Showing 17 changed files with 56 additions and 62 deletions.
2 changes: 1 addition & 1 deletion include/wrench/services/Service.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ namespace wrench {
const WRENCH_PROPERTY_COLLECTION_TYPE &overriden_property_values);

// MessagePayload stuff
void setMessagePayload(WRENCH_MESSAGEPAYLOAD_TYPE, double);
void setMessagePayload(WRENCH_MESSAGEPAYLOAD_TYPE, sg_size_t);

void setMessagePayloads(const WRENCH_MESSAGE_PAYLOAD_COLLECTION_TYPE &default_messagepayload_values,
const WRENCH_MESSAGE_PAYLOAD_COLLECTION_TYPE &overriden_messagepayload_values);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ namespace wrench {

virtual bool isVMDown(const std::string &vm_name);


// std::vector<std::string> getExecutionHosts();

/***********************/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace wrench {

public:
unsigned long getNumCoresAllocated() const;
double getMemoryAllocated() const;
sg_size_t getMemoryAllocated() const;
double getThreadCreationOverhead() const;
std::shared_ptr<Action> getAction();

Expand Down
6 changes: 6 additions & 0 deletions include/wrench/services/storage/StorageService.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,15 @@ namespace wrench {
}
location->getStorageService()->writeFile(location);
}

/**
* @brief Write a file at the storage service (incurs simulated overheads)
* @param file a file
*/
void writeFile(const std::shared_ptr<DataFile> &file) {
this->writeFile(wrench::FileLocation::LOCATION(this->getSharedPtr<StorageService>(), file));
}

/**
* @brief Write a file at the storage service (incurs simulated overheads)
* @param file a file
Expand All @@ -212,6 +214,7 @@ namespace wrench {
virtual void writeFile(const std::shared_ptr<DataFile> &file, const std::string &path) {
this->writeFile(wrench::FileLocation::LOCATION(this->getSharedPtr<StorageService>(), FileLocation::sanitizePath(path), file));
}

/**
* @brief Write a file at the storage service (incurs simulated overheads)
* @param location a location
Expand All @@ -235,6 +238,7 @@ namespace wrench {
}
return location->getStorageService()->hasFile(location);
}

/**
* @brief Determines whether a file is present at the storage service (in zero simulated time)
* @param file: a file
Expand All @@ -243,6 +247,7 @@ namespace wrench {
bool hasFile(const std::shared_ptr<DataFile> &file) {
return this->hasFile(wrench::FileLocation::LOCATION(this->getSharedPtr<StorageService>(), file));
}

/**
* @brief Determines whether a file is present at the storage service (in zero simulated time)
* @param file: a file
Expand All @@ -252,6 +257,7 @@ namespace wrench {
virtual bool hasFile(const std::shared_ptr<DataFile> &file, const std::string &path) {
return this->hasFile(wrench::FileLocation::LOCATION(this->getSharedPtr<StorageService>(), FileLocation::sanitizePath(path), file));
}

/**
* @brief Determines whether a file is present at the storage service (in zero simulated time)
* @param location: a location
Expand Down
2 changes: 1 addition & 1 deletion include/wrench/services/storage/StorageServiceMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ namespace wrench {
*
* @param location: the file location
**/
explicit StorageServiceAckMessage(std::shared_ptr<FileLocation> location) : StorageServiceMessage(0), location(std::move(location)) {}
explicit StorageServiceAckMessage(std::shared_ptr<FileLocation> location) : StorageServiceMessage(S4U_CommPort::default_control_message_size), location(std::move(location)) {}

/** @brief The location */
std::shared_ptr<FileLocation> location;
Expand Down
2 changes: 1 addition & 1 deletion include/wrench/simulation/Simulation.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ namespace wrench {
void init(int *, char **);

static bool isInitialized();
bool isRunning() const;
[[nodiscard]] bool isRunning() const;


void instantiatePlatform(const std::string &);
Expand Down
11 changes: 2 additions & 9 deletions src/wrench/services/Service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,10 @@ namespace wrench {

/**
* @brief Set a message payload of the Service
* @param messagepayload: the message payload (which must a a string representation of a >=0 double)
* @param messagepayload: the message payload (which must be a string representation of a >=0 sg_size_t)
* @param value: the message payload value
*/
void Service::setMessagePayload(WRENCH_MESSAGEPAYLOAD_TYPE messagepayload, double value) {
// Check that the value is a >=0 double

if (value < 0) {
throw std::invalid_argument(
"Service::setMessagePayload(): Invalid message payload value " + ServiceMessagePayload::translatePayloadType(messagepayload) + ": " +
std::to_string(value));
}
void Service::setMessagePayload(WRENCH_MESSAGEPAYLOAD_TYPE messagepayload, sg_size_t value) {
this->messagepayload_list[messagepayload] = value;
}

Expand Down
26 changes: 13 additions & 13 deletions src/wrench/services/compute/cloud/CloudComputeService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -703,9 +703,9 @@ namespace wrench {
std::sort(possible_hosts.begin(), possible_hosts.end(),
[](std::string const &a, std::string const &b) {
unsigned long a_num_cores = Simulation::getHostNumCores(a);
double a_ram = Simulation::getHostMemoryCapacity(a);
sg_size_t a_ram = Simulation::getHostMemoryCapacity(a);
unsigned long b_num_cores = Simulation::getHostNumCores(b);
double b_ram = Simulation::getHostMemoryCapacity(b);
sg_size_t b_ram = Simulation::getHostMemoryCapacity(b);

if (a_ram != b_ram) {
return a_ram < b_ram;
Expand All @@ -721,9 +721,9 @@ namespace wrench {
std::sort(possible_hosts.begin(), possible_hosts.end(),
[](std::string const &a, std::string const &b) {
unsigned long a_num_cores = Simulation::getHostNumCores(a);
double a_ram = Simulation::getHostMemoryCapacity(a);
sg_size_t a_ram = Simulation::getHostMemoryCapacity(a);
unsigned long b_num_cores = Simulation::getHostNumCores(b);
double b_ram = Simulation::getHostMemoryCapacity(b);
sg_size_t b_ram = Simulation::getHostMemoryCapacity(b);

if (a_num_cores != b_num_cores) {
return a_num_cores < b_num_cores;
Expand Down Expand Up @@ -991,13 +991,13 @@ namespace wrench {

} else if (key == "ram_capacities") {
for (auto &host: this->execution_hosts) {
double total_ram = Simulation::getHostMemoryCapacity(host);
dict.insert(std::make_pair(host, total_ram));
sg_size_t total_ram = Simulation::getHostMemoryCapacity(host);
dict.insert(std::make_pair(host, (double)total_ram));
}

} else if (key == "ram_availabilities") {
for (auto &host: this->execution_hosts) {
double total_ram = Simulation::getHostMemoryCapacity(host);
sg_size_t total_ram = Simulation::getHostMemoryCapacity(host);
// Available RAM
sg_size_t ram_allocated_to_vms = 0;
for (auto &vm_pair: this->vm_list) {
Expand All @@ -1007,7 +1007,7 @@ namespace wrench {
ram_allocated_to_vms += actual_vm->getMemory();
}
}
dict.insert(std::make_pair(host, total_ram - ram_allocated_to_vms));
dict.insert(std::make_pair(host, (double)total_ram - (double)ram_allocated_to_vms));
}

} else {
Expand Down Expand Up @@ -1112,11 +1112,11 @@ namespace wrench {
return;
}
auto vm = std::get<0>(this->vm_list[vm_name]);
unsigned long used_cores = vm->getNumCores();
double used_ram = vm->getMemory();
std::string pm_name = vm->getPhysicalHostname();
// WRENCH_INFO("GOT A DEATH NOTIFICATION: %s %ld %lf (exit_code = %d)",
// pm_name.c_str(), used_cores, used_ram, exit_code);
// unsigned long used_cores = vm->getNumCores();
// sg_size_t used_ram = vm->getMemory();
// std::string pm_name = vm->getPhysicalHostname();
// WRENCH_INFO("GOT A DEATH NOTIFICATION: %s %ld %lf (exit_code = %d)",
// pm_name.c_str(), used_cores, used_ram, exit_code);

if (vm->getState() != S4U_VirtualMachine::State::DOWN) {
vm->shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ namespace wrench {
std::to_string(requested_cores) + " are requested");
}

double requested_ram = std::get<1>(host.second);
double available_ram = S4U_Simulation::getHostMemoryCapacity(host.first);
sg_size_t requested_ram = std::get<1>(host.second);
sg_size_t available_ram = S4U_Simulation::getHostMemoryCapacity(host.first);
if (requested_ram < 0) {
throw std::invalid_argument(
"ActionExecutionService::ActionExecutionService(): requested RAM should be non-negative");
Expand Down Expand Up @@ -334,7 +334,7 @@ namespace wrench {
// Compute possible hosts
std::set<simgrid::s4u::Host *> possible_hosts;
simgrid::s4u::Host *new_host_to_avoid = nullptr;
double new_host_to_avoid_ram_capacity = 0;
sg_size_t new_host_to_avoid_ram_capacity = 0;
for (auto const &r: this->compute_resources) {

// If there is a required host, then don't even look at others
Expand Down Expand Up @@ -442,7 +442,7 @@ namespace wrench {
std::string picked_host;
simgrid::s4u::Host *target_host = nullptr;
unsigned long target_num_cores;
double required_ram;
sg_size_t required_ram;

std::tuple<simgrid::s4u::Host *, unsigned long> allocation =
pickAllocation(action,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ namespace wrench {
* @brief Return the action executor's allocated RAM
* @return a number of bytes
*/
double ActionExecutor::getMemoryAllocated() const {
sg_size_t ActionExecutor::getMemoryAllocated() const {
return this->ram_footprint;
}

Expand Down
4 changes: 2 additions & 2 deletions src/wrench/services/storage/StorageService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ namespace wrench {
auto file = location->getFile();
for (auto const &dwmb: msg->data_write_commport_and_bytes) {
// Bufferized
double remaining = dwmb.second;
while (remaining - buffer_size > DBL_EPSILON) {
sg_size_t remaining = dwmb.second;
while (remaining > buffer_size) {

dwmb.first->putMessage(
new StorageServiceFileContentChunkMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,9 @@ namespace wrench {
// Stripping case
if (file_size > this->max_chunk_size && this->internal_stripping) {
WRENCH_INFO("CSS::processStorageSelectionMessage(): Stripping file before sending to allocator");
double remaining = file_size;
sg_size_t remaining = file_size;
auto part_id = 0;
while (remaining - this->max_chunk_size > DBL_EPSILON) {
while (remaining > this->max_chunk_size) {
std::string part_file_name = file_name + "_stripe_" + std::to_string(part_id);
std::shared_ptr<DataFile> part_file = wrench::Simulation::getFileByIDOrNull(part_file_name);
if (not part_file) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <wrench/exceptions/ExecutionException.h>
#include <wrench/simulation/SimulationTimestampTypes.h>
#include <wrench/services/storage/storage_helpers/FileLocation.h>
#include <wrench/failure_causes/NetworkError.h>
//#include <wrench/services/memory/MemoryManager.h>

namespace sgfs = simgrid::fsmod;
Expand Down Expand Up @@ -495,10 +496,9 @@ namespace wrench {

// Send back the relevant ack if this was a read
if (ftt->dst_location == nullptr) {
// WRENCH_INFO("Sending back an ack for a successful file read");
ftt->answer_commport_if_read->dputMessage(new StorageServiceAckMessage(ftt->src_location));
ftt->answer_commport_if_read->dputMessage(new StorageServiceAckMessage(ftt->src_location));
} else if (ftt->src_location == nullptr) {
WRENCH_INFO("File %s stored", ftt->dst_location->getFile()->getID().c_str());

ftt->answer_commport_if_write->dputMessage(new StorageServiceAckMessage(ftt->dst_location));
} else {
if (ftt->dst_location->getStorageService() == shared_from_this()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ namespace wrench {
S4U_CommPort *commport,
const std::shared_ptr<FileLocation> &location) {
/** Ideal Fluid model buffer size */
if (this->buffer_size < DBL_EPSILON) {
if (this->buffer_size == 0) {
throw std::runtime_error(
"FileTransferThread::receiveFileFromNetwork(): Zero buffer size not implemented yet");

Expand Down Expand Up @@ -399,7 +399,7 @@ namespace wrench {
sg_size_t num_bytes,
S4U_CommPort *commport) {
/** Ideal Fluid model buffer size */
if (this->buffer_size < DBL_EPSILON) {
if (this->buffer_size == 0) {
throw std::runtime_error(
"FileTransferThread::sendLocalFileToNetwork(): Zero buffer size not implemented yet");

Expand All @@ -408,16 +408,17 @@ namespace wrench {
/** Non-zero buffer size */
std::shared_ptr<S4U_PendingCommunication> req = nullptr;
// Sending a zero-byte f is really sending a 1-byte f
double remaining = std::max<double>(1, num_bytes);
sg_size_t remaining = std::max<sg_size_t>(1, num_bytes);


#ifdef PAGE_CACHE_SIMULATION
if (Simulation::isPageCachingEnabled()) {
simulation->getMemoryManagerByHost(location->getStorageService()->hostname)->log();
}
#endif

while (remaining > DBL_EPSILON) {
double chunk_size = std::min<double>(this->buffer_size, remaining);
while (remaining > 0) {
sg_size_t chunk_size = std::min<sg_size_t>(this->buffer_size, remaining);

#ifdef PAGE_CACHE_SIMULATION
if (Simulation::isPageCachingEnabled()) {
Expand All @@ -437,7 +438,7 @@ namespace wrench {
}
#endif

remaining -= this->buffer_size;
remaining -= chunk_size;
if (req) {
req->wait();
// WRENCH_INFO("Bytes sent over the network were received");
Expand Down Expand Up @@ -489,9 +490,9 @@ namespace wrench {
}

// Read the first chunk
double remaining = f->getSize();
while (remaining > DBL_EPSILON) {
double to_read = std::min<double>(remaining, this->buffer_size);
sg_size_t remaining = f->getSize();
while (remaining > 0) {
sg_size_t to_read = std::min<sg_size_t>(remaining, this->buffer_size);
int unique_disk_sequence_number_read = this->simulation_->getOutput().addTimestampDiskReadStart(Simulation::getCurrentSimulatedDate(), hostname, src_opened_file->get_path(), to_read);
src_opened_file->read(to_read);
this->simulation_->getOutput().addTimestampDiskReadCompletion(Simulation::getCurrentSimulatedDate(), hostname, src_opened_file->get_path(), to_read, unique_disk_sequence_number_read);
Expand Down Expand Up @@ -522,8 +523,8 @@ namespace wrench {
f->getID().c_str(), src_loc->toString().c_str());

// Check that the buffer size is compatible
if (((this->buffer_size < DBL_EPSILON) && (src_loc->getStorageService()->getBufferSize() > DBL_EPSILON)) or
((this->buffer_size > DBL_EPSILON) && (src_loc->getStorageService()->getBufferSize() < DBL_EPSILON))) {
if (((this->buffer_size == 0) && (src_loc->getStorageService()->getBufferSize() > 0)) or
((this->buffer_size > 0) && (src_loc->getStorageService()->getBufferSize() == 0))) {
throw std::invalid_argument("FileTransferThread::downloadFileFromStorageService(): "
"Incompatible buffer size specs (both must be zero, or both must be non-zero");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ class StorageServiceLinkFailuresTest : public ::testing::Test {
}

xml +=
" <link id=\"link1\" bandwidth=\"1Bps\" latency=\"0us\"/>"
" <link id=\"link2\" bandwidth=\"1Bps\" latency=\"0us\"/>"
" <link id=\"link1\" bandwidth=\"100MBps\" latency=\"0us\"/>"
" <link id=\"link2\" bandwidth=\"100MBps\" latency=\"0us\"/>"
" <route src=\"Host1\" dst=\"Host2\"> <link_ctn id=\"link1\""
" /> </route>"
" <route src=\"Host1\" dst=\"Host3\"> <link_ctn id=\"link1\""
Expand All @@ -101,7 +101,7 @@ class StorageServiceLinkFailuresTestWMS : public wrench::ExecutionController {

public:
StorageServiceLinkFailuresTestWMS(StorageServiceLinkFailuresTest *test,
std::string hostname) : wrench::ExecutionController(hostname, "test") {
const std::string& hostname) : wrench::ExecutionController(hostname, "test") {
this->test = test;
this->rng.seed(666);
}
Expand Down Expand Up @@ -259,7 +259,6 @@ class StorageServiceLinkFailuresTestWMS : public wrench::ExecutionController {
for (unsigned long i = 0; i < NUM_TRIALS; i++) {

// Do a random synchronous file copy
std::cerr << "DO A RANDOM SYNCHRONOUS FILE COPY\n";
try {
this->doRandomSynchronousFileCopy();
} catch (wrench::ExecutionException &e) {
Expand All @@ -271,7 +270,6 @@ class StorageServiceLinkFailuresTestWMS : public wrench::ExecutionController {
wrench::Simulation::sleep(5);

// Do a random asynchronous copy
std::cerr << "DO A RANDOM ASYNCHRONOUS FILE COPY\n";
try {
this->doRandomAsynchronousFileCopy();
} catch (wrench::ExecutionException &e) {
Expand All @@ -282,7 +280,6 @@ class StorageServiceLinkFailuresTestWMS : public wrench::ExecutionController {

wrench::Simulation::sleep(5);

std::cerr << "DO A RANDOM FILE DELETE\n";
// Do a random delete
try {
this->doRandomFileDelete();
Expand All @@ -295,7 +292,6 @@ class StorageServiceLinkFailuresTestWMS : public wrench::ExecutionController {
wrench::Simulation::sleep(5);

// Do a random file read
std::cerr << "DO A RANDOM FILE READ\n";
try {
this->doRandomFileRead();
} catch (wrench::ExecutionException &e) {
Expand All @@ -307,7 +303,6 @@ class StorageServiceLinkFailuresTestWMS : public wrench::ExecutionController {
wrench::Simulation::sleep(5);

// Do a random file write
std::cerr << "DO A RANDOM FILE WRITE\n";
try {
this->doRandomFileWrite();
} catch (wrench::ExecutionException &e) {
Expand Down
Loading

0 comments on commit 8c8ec21

Please sign in to comment.