Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Movielens Liliana Vera #89

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open

Conversation

Irais11
Copy link

@Irais11 Irais11 commented Nov 8, 2024

Ejercicio Movieles
Equipo Mudos

Se agrega los archivos de go y julia

Itroducción

@Irais11
Copy link
Author

Irais11 commented Nov 8, 2024

Archivo de introducción
20241031 DOCUMENTO DE REQUERIMIENTO (propuesta LIVR).docx

Código en Go

package main

import (
"encoding/csv"
"fmt"
"log"
"os"
"runtime"
"strconv"
"strings"
"time"

"github.com/kfultz07/go-dataframe"

)

// Leer y particionar archivo
// Function to split a csv file into small files.
// You provided a name for the file, the number of chuncks wich it will be divided and the directory
// where the file is located and the new ones will be saved.
func SplitBigFile(file_name string, number_of_chunks int, directory string) []string {
t1 := time.Now()
data := ReadCsv(file_name, directory)

//Extrae el encabezado para cada CSV
header := data[0]
//Quita el encabezado antes de dividir
data = data[1:]

fmt.Printf("%v rows in file %s\n", len(data), file_name)
rowsPerFile := len(data) / number_of_chunks
var filesCreated []string

for i := 0; i < number_of_chunks; i++ {
	tempName := file_name + "_" + fmt.Sprintf("%02d", i+1)
	fmt.Printf("%s\n", tempName)
	path := directory
	fmt.Printf("%s\n", path)
	tempData := append([][]string{header}, data[i*rowsPerFile:(i+1)*rowsPerFile]...)
	WriteCsv(tempData, tempName, path)
	filesCreated = append(filesCreated, tempName)
}
tf := time.Since(t1).Seconds()
println("Executed in:", tf, "seconds")
return filesCreated

}

// Open and read a csv file and returns the content.
func ReadCsv(fileName string, directory string) [][]string {
file, err := os.Open(directory + fileName + ".csv")

if err != nil {
	log.Fatalf("Error opening file: %s", err)
}
defer file.Close()

csvReader := csv.NewReader(file)

data, err := csvReader.ReadAll()

if err != nil {
	log.Fatalf("Error extracting data from file %v: %s", fileName, err)
}
return data

}

// Create a csv file with the name and data provided in the path defined.
func WriteCsv(data [][]string, name string, path string) {
csvFile, err := os.Create(path + name + ".csv")
if err != nil {
log.Fatalf("Error creating new csv file %v: %s", name, err)
}
defer csvFile.Close()

writer := csv.NewWriter(csvFile)
defer writer.Flush()

err = writer.WriteAll(data)
if err != nil {
	log.Fatalf("Error writing new csv file %v: %s", name, err)
}

fmt.Printf("File %s has been created with %v rows\n", name, len(data))

}

func main() {
files := SplitBigFile("ratings", number_of_workers(), "C:\Users\configurar\Documents\2024\CursoGo\peliculasEQUIPO\")
fmt.Println(files)
Mt_FindRatingsMaster()
}

// Definir el numero de hilos

func number_of_workers() int {
// agregar metodo para leer el numero de workers
//return 10
return runtime.GOMAXPROCS(0)
}

// Definir la funcion que ejecutara el worker

/*func ReadCsvToDataframe(aFileName string) dataframe.DataFrame {
file, _ := os.Open(aFileName)
defer file.Close()

// Leer el archivo CSV y crear el DataFrame
df := dataframe.ReadCSV(file)
return df

}*/

func ReadCsvToDataframe(filePath string) dataframe.DataFrame {
path := "C:\Users\configurar\Documents\2024\CursoGo\peliculasEQUIPO\"
df := dataframe.CreateDataFrame(path, filePath)
return df
}

// w: el numero del worker
// ci: el canal
func Mt_FindRatingsWorker(w int, ci chan int, knowGenders []string, ca *[][]int, va *[][]float64, movies dataframe.DataFrame) {
aFileName := "ratings_" + fmt.Sprintf("%02d", w) + ".csv"
println("Worker ", fmt.Sprintf("%02d", w), " is processing file ", aFileName, "\n")

ratings := ReadCsvToDataframe(aFileName) // modificar para devolver un dataframe
ng := len(knowGenders)
start := time.Now()

// import all records from the movies DF into the ratings DF, keeping genres column from movies
//df.Merge is the equivalent of an inner-join in the DF lib I am using here
ratings.Merge(&movies, "movieId", "genres")

// We only need "genres" and "ratings" to find Count(Ratings | Genres), so keep only those columns
grcs := [2]string{"genres", "rating"} // grcs => Genres Ratings Columns
grDF := ratings.KeepColumns(grcs[:])  // grDF => Genres Ratings DF
for ig := 0; ig < ng; ig++ {
	for _, row := range grDF.FrameRecords {
		if strings.Contains(row.Data[0], knowGenders[ig]) {
			(*ca)[ig][w-1] += 1
			v, _ := strconv.ParseFloat((row.Data[1]), 32) // do not check for error
			(*va)[ig][w-1] += v
		}
	}
}
duration := time.Since(start)
fmt.Println("Duration = ", duration)
fmt.Println("Worker ", w, " completed")

// notify master that this worker has completed its job
ci <- 1

}

// 1.0 Definir la funcion master que ejecutará el programa (1.1 Unir el resultado de todos los workers)

func Mt_FindRatingsMaster() {
fmt.Println("In MtFindRatingsMaster")
start := time.Now()
nf := number_of_workers()

//SplitBigFile("ratings", nf, "C:\\Users\\configurar\\Documents\\2024\\CursoGo\\peliculasEQUIPO\\")

// kg is a 1D array that contains the Known Genres
kg := []string{"Action", "Adventure", "Animation", "Children", "Comedy", "Crime", "Documentary",
	"Drama", "Fantasy", "Film-Noir", "Horror", "IMAX", "Musical", "Mystery", "Romance",
	"Sci-Fi", "Thriller", "War", "Western", "(no genres listed)"}

ng := len(kg) // number of known genres
// ra is a 2D array where the ratings values for each genre are maintained.
// The columns signal/maintain the core number where a worker is running.
// The rows in that column maintain the rating values for that core and that genre
ra := make([][]float64, ng)
// ca is a 2D array where the count of Ratings for each genre is maintained
// The columns signal the core number where the worker is running
// The rows in that column maintain the counts for that that genre
ca := make([][]int, ng)
// populate the ng rows of ra and ca with nf columns
for i := 0; i < ng; i++ {
	ra[i] = make([]float64, nf)
	ca[i] = make([]int, nf)
}
var ci = make(chan int) // create the channel to sync all workers
movies := ReadCsvToDataframe("movies.csv")
println("Lectura completa del movies.csv\n")
// run FindRatings in 10 workers
for i := 0; i < nf; i++ {
	go Mt_FindRatingsWorker(i+1, ci, kg, &ca, &ra, movies)
}
// wait for the workers
iMsg := 0
go func() {
	for {
		i := <-ci
		iMsg += i
	}
}()
for {
	if iMsg == nf {
		break
	}
}
// all workers completed their work. Collect results and produce report
locCount := make([]int, ng)
locVals := make([]float64, ng)
locPromedio := make([]float64, ng)
for i := 0; i < ng; i++ {
	for j := 0; j < nf; j++ {
		locCount[i] += ca[i][j]
		locVals[i] += ra[i][j]
	}
	locPromedio[i] = locVals[i] / float64(locCount[i])
}
for i := 0; i < ng; i++ {
	fmt.Println(fmt.Sprintf("%2d", i), "  ", fmt.Sprintf("%20s", kg[i]), "  ", fmt.Sprintf("%8d", locCount[i]), " ", fmt.Sprintf("%.2f", locPromedio[i]))
}
duration := time.Since(start)
fmt.Println("Duration = ", duration)
println("Mt_FindRatingsMaster is Done")

}

Código en Julia

`#import Pkg; Pkg.add("CSV")
#import Pkg; Pkg.add("DataFrames")

using DataFrames
using CSV
using Parquet
using Printf

Función para dividir el archivo ratings en 10 partes y guardarlas en formato Parquet

function generate_small_files(ratings_file::String, output_prefix::String, output_dir::String)
println("Dividiendo archivo de ratings en partes...")

# Leer el archivo completo de ratings
data = CSV.read(ratings_file, DataFrame)
total_rows = nrow(data)
num_chunks = 10  # Dividir en 10 partes
chunk_size = ceil(Int, total_rows / num_chunks)

# Crear y guardar cada chunk en archivos separados
for i in 1:num_chunks
    start_row = (i - 1) * chunk_size + 1
    end_row = min(i * chunk_size, total_rows)
    chunk = data[start_row:end_row, :]

    output_path = joinpath(output_dir, "$(output_prefix)_ratings$(lpad(i, 2, '0')).parquet")
    Parquet.write_parquet(output_path, chunk)
    println("Archivo guardado: $output_path con $(nrow(chunk)) filas")
end

end

Función principal para procesar archivos de ratings y cruzarlos con movies

function find_ratings_master(input_dir::String, output_dir::String)
nF = 10 # Número de archivos ratings
prqDir = output_dir

# Lista de géneros de películas
genres = ["Action", "Adventure", "Animation", "Children", "Comedy", "Crime", "Documentary",
          "Drama", "Fantasy", "Film-Noir", "Horror", "IMAX", "Musical", "Mystery", "Romance",
          "Sci-Fi", "Thriller", "War", "Western", "(no genres listed)"]
ng = length(genres)

# Arrays para acumular los resultados de calificaciones por género
rating_sum = zeros(ng, nF)
count_sum = zeros(Int, ng, nF)

# Leer el archivo movies.csv y mantener solo las columnas necesarias
movies_path = joinpath(input_dir, "movies.csv")
df_movies = CSV.read(movies_path, DataFrame)
df_movies = df_movies[:, [:movieId, :genres]]

# Procesar cada archivo de ratings
for i in 1:nF
    rating_file = joinpath(prqDir, "ratings_ratings$(lpad(i, 2, '0')).parquet")
    println("Procesando archivo: $rating_file")

    if isfile(rating_file)
        df_ratings = DataFrame(read_parquet(rating_file))
        rating_sum[:, i], count_sum[:, i] = process_ratings(ng, genres, df_movies, df_ratings)
    else
        println("Archivo no encontrado: $rating_file")
    end
end

# Sumar resultados finales por género y mostrarlos
for i in 1:ng
    total_rating = sum(rating_sum[i, :])
    total_count = sum(count_sum[i, :])
    promedio = total_rating/total_count 
    @printf("Género: %s   Total calificaciones: %.2f   Total conteo: %d   Promedio: %.2f\n", genres[i], total_rating, total_count,promedio) 
end

end

Función para procesar cada archivo ratings y acumular resultados por género

function process_ratings(ng::Int, genres::Vector{String}, df_movies::DataFrame, df_ratings::DataFrame)
rating_accum = zeros(ng)
count_accum = zeros(Int, ng)

# Hacer un inner join entre movies y ratings
joined_df = innerjoin(df_movies, df_ratings, on=:movieId)

# Calcular sumas y conteos por cada género
for i in 1:ng
    genre_rows = joined_df[occursin.(genres[i], joined_df.genres), :]
    count_accum[i] = nrow(genre_rows)
    rating_accum[i] = sum(genre_rows.rating)
end

return rating_accum, count_accum

end

Rutas de entrada y salida

input_dir = "C:\Users\Alexis\Documents\Practica_Julia"
output_dir = "C:\Users\Alexis\Documents\Practica_Julia"

Dividir el archivo ratings y guardar en formato Parquet

generate_small_files(joinpath(input_dir, "ratings.csv"), "ratings", output_dir)

Procesar los archivos de ratings y cruzarlos con movies

find_ratings_master(input_dir, output_dir)`

@Irais11 Irais11 marked this pull request as draft November 8, 2024 17:58
@Irais11 Irais11 marked this pull request as ready for review November 8, 2024 18:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant