Skip to content

Commit

Permalink
Лаганина Елена. Задача 3. Вариант 22. Поиск кратчайших путей из одной…
Browse files Browse the repository at this point in the history
… вершины (Алгоритм Дейкстры). С CRS графов. (#811)

Seq версия: 

Алгоритм инициализирует массив расстояний, присваивая начальной вершине
расстояние ноль, а всем остальным вершинам — бесконечность. Также
создается массив для отслеживания посещенных вершин, изначально все
вершины помечены как непосещенные. На каждом шаге алгоритм выбирает
непосещенную вершину с наименьшим текущим расстоянием. Выбранная вершина
отмечается как посещенная. Затем для каждой соседней вершины выбранной
вершины пересчитывается расстояние от начальной вершины. Если новое
вычисленное расстояние оказывается меньше, чем текущее расстояние до
соседней вершины, то текущее расстояние обновляется. Данная процедура
повторяется до тех пор, пока все вершины не будут посещены. По
завершении алгоритма, массив расстояний содержит кратчайшие пути от
начальной вершины до всех остальных вершин графа.

MPI версия:

В представленной параллельной реализации алгоритма Дейкстры для
повышения вычислительной эффективности используется распределенная
обработка графа между несколькими процессами. Суть подхода заключается в
том, что на начальном этапе определяются все непосредственные потомки
стартовой вершины, которые затем распределяются между вычислительными
процессами. Каждый процесс, получив потомка, вычисляет кратчайшие
расстояния от этого потомка до всех остальных вершин графа параллельно.
При этом используется локальный массив расстояний, и глубина обхода
графа в каждом процессе будет меньше по сравнению с последовательным
алгоритмом. За счет параллельной обработки, вычислительная нагрузка на
каждом процессе снижается пропорционально числу задействованных
процессов. После завершения вычислений на каждом процессе, результаты
собираются на управляющем процессе. На управляющем процессе также
вычисляются расстояния от стартовой вершины до всех ее потомков и
добавляются к полученным расстояниям. Далее происходит сбор минимальных
расстояний, с использованием операции reduce, для получения
окончательных значений. Полученные минимальные расстояния сравниваются с
локальными данными на управляющем процессе и обновляются, таким образом,
формируется финальный массив кратчайших расстояний от стартовой вершины
до всех вершин графа. Основным преимуществом данного алгоритма является
распараллеливание вычислений, что обеспечивает сокращение общего времени
выполнения.
  • Loading branch information
laganina-cod authored Dec 29, 2024
1 parent 23b8ad6 commit 108a937
Show file tree
Hide file tree
Showing 8 changed files with 1,364 additions and 0 deletions.
472 changes: 472 additions & 0 deletions tasks/mpi/laganina_e_dejkstras_a/func_tests/main.cpp

Large diffs are not rendered by default.

112 changes: 112 additions & 0 deletions tasks/mpi/laganina_e_dejkstras_a/include/ops_mpi.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#pragma once

#include <gtest/gtest.h>

#include <boost/mpi/collectives.hpp>
#include <boost/mpi/communicator.hpp>
#include <boost/mpi/environment.hpp>
#include <queue>
#include <random>
#include <vector>

#include "core/task/include/task.hpp"

namespace laganina_e_dejskras_a_mpi {

std::vector<int> getRandomgraph(int v);
int minDistanceVertex(const std::vector<int>& dist, const std::vector<int>& marker);

class TestMPITaskSequential : public ppc::core::Task {
public:
explicit TestMPITaskSequential(std::shared_ptr<ppc::core::TaskData> taskData_) : Task(std::move(taskData_)) {}
bool pre_processing() override;
bool validation() override;
bool run() override;
bool post_processing() override;
// static void dijkstra(int start_vertex, const std::vector<int>& row_ptr, const std::vector<int>& col_ind,
// const std::vector<int>& data, int v, std::vector<int>& distances);
static void get_children_with_weights(int vertex, const std::vector<int>& row_ptr, const std::vector<int>& col_ind,
const std::vector<int>& data, std::vector<int>& neighbor,
std::vector<int>& weight) {
// Get the beginning and end of edges for a given vertex
int start = row_ptr[vertex];
int end = row_ptr[vertex + 1];

for (int i = start; i < end; ++i) {
neighbor.push_back(col_ind[i]); // Neighboring vertex
weight.push_back(data[i]); // Edge weight
}
}
static void dijkstra(int start_vertex, const std::vector<int>& row_ptr, const std::vector<int>& col_ind,
const std::vector<int>& data, int v, std::vector<int>& distances) {
// Initialize distances
distances.resize(v, std::numeric_limits<int>::max());
distances[start_vertex] = 0;

// Array to track visited vertices
std::vector<bool> visited(v, false);

// Priority queue for storing pairs (distance, vertex)
std::priority_queue<std::pair<int, int>, std::vector<std::pair<int, int>>, std::greater<>> priority_queue;
priority_queue.emplace(0, start_vertex); // Use start_vertex instead of 0

while (!priority_queue.empty()) {
int current_distance = priority_queue.top().first;
int current_vertex = priority_queue.top().second;
priority_queue.pop();

// If the vertex has already been visited, skip it
if (visited[current_vertex]) {
continue;
}

// Mark the vertex as visited
visited[current_vertex] = true;

// Process all neighboring vertices
int start = row_ptr[current_vertex];
int end = row_ptr[current_vertex + 1];
for (int i = start; i < end; ++i) {
int neighbor_vertex = col_ind[i];
int weight = data[i];
int new_distance = current_distance + weight;

// If a shorter distance is found, update it
if (new_distance < distances[neighbor_vertex]) {
distances[neighbor_vertex] = new_distance;
priority_queue.emplace(new_distance, neighbor_vertex);
}
}
}
}

// static void get_children_with_weights(int vertex, const std::vector<int>& row_ptr, const std::vector<int>& col_ind,
// const std::vector<int>& data, std::vector<int>& neighbor,
// std::vector<int>& weight);

private:
std::vector<int> row_ptr;
std::vector<int> col_ind;
std::vector<int> data;
int v{}; // dimension
std::vector<int> distances;
};
class TestMPITaskParallel : public ppc::core::Task {
public:
explicit TestMPITaskParallel(std::shared_ptr<ppc::core::TaskData> taskData_) : Task(std::move(taskData_)) {}
bool pre_processing() override;
bool validation() override;
bool run() override;
bool post_processing() override;

private:
std::vector<int> row_ptr;
std::vector<int> col_ind;
std::vector<int> data;
int v{}; // dimension
std::vector<int> distances;

boost::mpi::communicator world;
};

} // namespace laganina_e_dejskras_a_mpi
117 changes: 117 additions & 0 deletions tasks/mpi/laganina_e_dejkstras_a/perf_tests/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
#include <gtest/gtest.h>

#include "core/perf/include/perf.hpp"
#include "mpi/laganina_e_dejkstras_a/include/ops_mpi.hpp"

TEST(laganina_e_dejkstras_a_mpi, test_pipeline_run) {
boost::mpi::communicator world;
int v_ = 1000;
// Create data
std::vector<int> graph(v_ * v_, 0);
for (int i = 0; i < v_ - 1; i++) {
for (int j = 1; j < v_; j++) {
graph[i * v_ + j] = 1;
}
}
for (int k = 0; k < v_ * v_; k += (v_ + 1)) {
graph[k] = 0;
}
std::vector<int> expectResult(v_, 0);
for (int i = 1; i < v_; i++) {
expectResult[i] = 1;
}
std::vector<int> trueResult(v_, 0);

// Create TaskData
std::shared_ptr<ppc::core::TaskData> taskDataPar = std::make_shared<ppc::core::TaskData>();
if (world.rank() == 0) {
taskDataPar->inputs_count.emplace_back(v_);
taskDataPar->inputs.emplace_back(reinterpret_cast<uint8_t*>(graph.data()));
taskDataPar->outputs.emplace_back(reinterpret_cast<uint8_t*>(trueResult.data()));
taskDataPar->outputs_count.emplace_back(trueResult.size());
}
// Create Task
auto testTaskParallel = std::make_shared<laganina_e_dejskras_a_mpi::TestMPITaskParallel>(taskDataPar);

ASSERT_TRUE(testTaskParallel->validation());
testTaskParallel->pre_processing();
testTaskParallel->run();
testTaskParallel->post_processing();

// Create Perf attributes
auto perfAttr = std::make_shared<ppc::core::PerfAttr>();
perfAttr->num_running = 10;
const auto t0 = std::chrono::high_resolution_clock::now();
perfAttr->current_timer = [&] {
auto current_time_point = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(current_time_point - t0).count();
return static_cast<double>(duration) * 1e-9;
};

// Create and init perf results
auto perfResults = std::make_shared<ppc::core::PerfResults>();

// Create Perf analyzer
auto perfAnalyzer = std::make_shared<ppc::core::Perf>(testTaskParallel);
perfAnalyzer->pipeline_run(perfAttr, perfResults);
ppc::core::Perf::print_perf_statistic(perfResults);
if (world.rank() == 0) {
ASSERT_EQ(expectResult, trueResult);
}
}

TEST(laganina_e_dejkstras_a_mpi, test_task_run) {
boost::mpi::communicator world;
int v_ = 1000;
// Create data
std::vector<int> graph(v_ * v_, 0);
for (int i = 0; i < v_ - 1; i++) {
for (int j = 1; j < v_; j++) {
graph[i * v_ + j] = 1;
}
}
for (int k = 0; k < v_ * v_; k += (v_ + 1)) {
graph[k] = 0;
}
std::vector<int> expectResult(v_, 0);
for (int i = 1; i < v_; i++) {
expectResult[i] = 1;
}
std::vector<int> trueResult(v_, 0);

// Create TaskData
std::shared_ptr<ppc::core::TaskData> taskDataPar = std::make_shared<ppc::core::TaskData>();
if (world.rank() == 0) {
taskDataPar->inputs_count.emplace_back(v_);
taskDataPar->inputs.emplace_back(reinterpret_cast<uint8_t*>(graph.data()));
taskDataPar->outputs.emplace_back(reinterpret_cast<uint8_t*>(trueResult.data()));
taskDataPar->outputs_count.emplace_back(trueResult.size());
}
// Create Task
auto testTaskParallel = std::make_shared<laganina_e_dejskras_a_mpi::TestMPITaskParallel>(taskDataPar);

ASSERT_TRUE(testTaskParallel->validation());
testTaskParallel->pre_processing();
testTaskParallel->run();
testTaskParallel->post_processing();

// Create Perf attributes
auto perfAttr = std::make_shared<ppc::core::PerfAttr>();
perfAttr->num_running = 10;
const auto t0 = std::chrono::high_resolution_clock::now();
perfAttr->current_timer = [&] {
auto current_time_point = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(current_time_point - t0).count();
return static_cast<double>(duration) * 1e-9;
};
// Create and init perf results
auto perfResults = std::make_shared<ppc::core::PerfResults>();

// Create Perf analyzer
auto perfAnalyzer = std::make_shared<ppc::core::Perf>(testTaskParallel);
perfAnalyzer->task_run(perfAttr, perfResults);
ppc::core::Perf::print_perf_statistic(perfResults);
if (world.rank() == 0) {
ASSERT_EQ(expectResult, trueResult);
}
}
Loading

0 comments on commit 108a937

Please sign in to comment.