diff --git a/go/src/student_submissions/movielens/Liliana Vera b/go/src/student_submissions/movielens/Liliana Vera new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/go/src/student_submissions/movielens/Liliana Vera @@ -0,0 +1 @@ + diff --git a/go/src/student_submissions/movielens/Liliana_vera/movielesns go b/go/src/student_submissions/movielens/Liliana_vera/movielesns go new file mode 100644 index 00000000..9bb3c5f6 --- /dev/null +++ b/go/src/student_submissions/movielens/Liliana_vera/movielesns go @@ -0,0 +1,313 @@ +package main + +import ( + "encoding/csv" + "fmt" + "log" + "os" + "runtime" + "strconv" + "strings" + "time" + + "github.com/kfultz07/go-dataframe" +) + +// Leer y particionar archivo +// Function to split a csv file into small files. +// You provided a name for the file, the number of chuncks wich it will be divided and the directory +// where the file is located and the new ones will be saved. +func SplitBigFile(file_name string, number_of_chunks int, directory string) []string { + t1 := time.Now() + data := ReadCsv(file_name, directory) + + //Extrae el encabezado para cada CSV + header := data[0] + //Quita el encabezado antes de dividir + data = data[1:] + + fmt.Printf("%v rows in file %s\n", len(data), file_name) + rowsPerFile := len(data) / number_of_chunks + var filesCreated []string + + for i := 0; i < number_of_chunks; i++ { + tempName := file_name + "_" + fmt.Sprintf("%02d", i+1) + fmt.Printf("%s\n", tempName) + path := directory + fmt.Printf("%s\n", path) + tempData := append([][]string{header}, data[i*rowsPerFile:(i+1)*rowsPerFile]...) + WriteCsv(tempData, tempName, path) + filesCreated = append(filesCreated, tempName) + } + tf := time.Since(t1).Seconds() + println("Executed in:", tf, "seconds") + return filesCreated +} + +// Open and read a csv file and returns the content. +func ReadCsv(fileName string, directory string) [][]string { + file, err := os.Open(directory + fileName + ".csv") + + if err != nil { + log.Fatalf("Error opening file: %s", err) + } + defer file.Close() + + csvReader := csv.NewReader(file) + + data, err := csvReader.ReadAll() + + if err != nil { + log.Fatalf("Error extracting data from file %v: %s", fileName, err) + } + return data +} + +// Create a csv file with the name and data provided in the path defined. +func WriteCsv(data [][]string, name string, path string) { + csvFile, err := os.Create(path + name + ".csv") + if err != nil { + log.Fatalf("Error creating new csv file %v: %s", name, err) + } + defer csvFile.Close() + + writer := csv.NewWriter(csvFile) + defer writer.Flush() + + err = writer.WriteAll(data) + if err != nil { + log.Fatalf("Error writing new csv file %v: %s", name, err) + } + + fmt.Printf("File %s has been created with %v rows\n", name, len(data)) +} + +func main() { + files := SplitBigFile("ratings", number_of_workers(), "C:\\Users\\configurar\\Documents\\2024\\CursoGo\\peliculasEQUIPO\\") + fmt.Println(files) + Mt_FindRatingsMaster() +} + +// Definir el numero de hilos + +func number_of_workers() int { + // agregar metodo para leer el numero de workers + //return 10 + return runtime.GOMAXPROCS(0) +} + +// Definir la funcion que ejecutara el worker + +/*func ReadCsvToDataframe(aFileName string) dataframe.DataFrame { + file, _ := os.Open(aFileName) + defer file.Close() + + // Leer el archivo CSV y crear el DataFrame + df := dataframe.ReadCSV(file) + return df +}*/ + +func ReadCsvToDataframe(filePath string) dataframe.DataFrame { + path := "C:\\Users\\configurar\\Documents\\2024\\CursoGo\\peliculasEQUIPO\\" + df := dataframe.CreateDataFrame(path, filePath) + return df +} + +// w: el numero del worker +// ci: el canal +func Mt_FindRatingsWorker(w int, ci chan int, knowGenders []string, ca *[][]int, va *[][]float64, movies dataframe.DataFrame) { + aFileName := "ratings_" + fmt.Sprintf("%02d", w) + ".csv" + println("Worker ", fmt.Sprintf("%02d", w), " is processing file ", aFileName, "\n") + + ratings := ReadCsvToDataframe(aFileName) // modificar para devolver un dataframe + ng := len(knowGenders) + start := time.Now() + + // import all records from the movies DF into the ratings DF, keeping genres column from movies + //df.Merge is the equivalent of an inner-join in the DF lib I am using here + ratings.Merge(&movies, "movieId", "genres") + + // We only need "genres" and "ratings" to find Count(Ratings | Genres), so keep only those columns + grcs := [2]string{"genres", "rating"} // grcs => Genres Ratings Columns + grDF := ratings.KeepColumns(grcs[:]) // grDF => Genres Ratings DF + for ig := 0; ig < ng; ig++ { + for _, row := range grDF.FrameRecords { + if strings.Contains(row.Data[0], knowGenders[ig]) { + (*ca)[ig][w-1] += 1 + v, _ := strconv.ParseFloat((row.Data[1]), 32) // do not check for error + (*va)[ig][w-1] += v + } + } + } + duration := time.Since(start) + fmt.Println("Duration = ", duration) + fmt.Println("Worker ", w, " completed") + + // notify master that this worker has completed its job + ci <- 1 +} + +// 1.0 Definir la funcion master que ejecutará el programa (1.1 Unir el resultado de todos los workers) + +func Mt_FindRatingsMaster() { + fmt.Println("In MtFindRatingsMaster") + start := time.Now() + nf := number_of_workers() + + //SplitBigFile("ratings", nf, "C:\\Users\\configurar\\Documents\\2024\\CursoGo\\peliculasEQUIPO\\") + + // kg is a 1D array that contains the Known Genres + kg := []string{"Action", "Adventure", "Animation", "Children", "Comedy", "Crime", "Documentary", + "Drama", "Fantasy", "Film-Noir", "Horror", "IMAX", "Musical", "Mystery", "Romance", + "Sci-Fi", "Thriller", "War", "Western", "(no genres listed)"} + + ng := len(kg) // number of known genres + // ra is a 2D array where the ratings values for each genre are maintained. + // The columns signal/maintain the core number where a worker is running. + // The rows in that column maintain the rating values for that core and that genre + ra := make([][]float64, ng) + // ca is a 2D array where the count of Ratings for each genre is maintained + // The columns signal the core number where the worker is running + // The rows in that column maintain the counts for that that genre + ca := make([][]int, ng) + // populate the ng rows of ra and ca with nf columns + for i := 0; i < ng; i++ { + ra[i] = make([]float64, nf) + ca[i] = make([]int, nf) + } + var ci = make(chan int) // create the channel to sync all workers + movies := ReadCsvToDataframe("movies.csv") + println("Lectura completa del movies.csv\n") + // run FindRatings in 10 workers + for i := 0; i < nf; i++ { + go Mt_FindRatingsWorker(i+1, ci, kg, &ca, &ra, movies) + } + // wait for the workers + iMsg := 0 + go func() { + for { + i := <-ci + iMsg += i + } + }() + for { + if iMsg == nf { + break + } + } + // all workers completed their work. Collect results and produce report + locCount := make([]int, ng) + locVals := make([]float64, ng) + locPromedio := make([]float64, ng) + for i := 0; i < ng; i++ { + for j := 0; j < nf; j++ { + locCount[i] += ca[i][j] + locVals[i] += ra[i][j] + } + locPromedio[i] = locVals[i] / float64(locCount[i]) + } + for i := 0; i < ng; i++ { + fmt.Println(fmt.Sprintf("%2d", i), " ", fmt.Sprintf("%20s", kg[i]), " ", fmt.Sprintf("%8d", locCount[i]), " ", fmt.Sprintf("%.2f", locPromedio[i])) + } + duration := time.Since(start) + fmt.Println("Duration = ", duration) + println("Mt_FindRatingsMaster is Done") +}#import Pkg; Pkg.add("CSV") +#import Pkg; Pkg.add("DataFrames") + +using DataFrames +using CSV +using Parquet +using Printf + +# Función para dividir el archivo ratings en 10 partes y guardarlas en formato Parquet +function generate_small_files(ratings_file::String, output_prefix::String, output_dir::String) + println("Dividiendo archivo de ratings en partes...") + + # Leer el archivo completo de ratings + data = CSV.read(ratings_file, DataFrame) + total_rows = nrow(data) + num_chunks = 10 # Dividir en 10 partes + chunk_size = ceil(Int, total_rows / num_chunks) + + # Crear y guardar cada chunk en archivos separados + for i in 1:num_chunks + start_row = (i - 1) * chunk_size + 1 + end_row = min(i * chunk_size, total_rows) + chunk = data[start_row:end_row, :] + + output_path = joinpath(output_dir, "$(output_prefix)_ratings$(lpad(i, 2, '0')).parquet") + Parquet.write_parquet(output_path, chunk) + println("Archivo guardado: $output_path con $(nrow(chunk)) filas") + end +end + +# Función principal para procesar archivos de ratings y cruzarlos con movies +function find_ratings_master(input_dir::String, output_dir::String) + nF = 10 # Número de archivos ratings + prqDir = output_dir + + # Lista de géneros de películas + genres = ["Action", "Adventure", "Animation", "Children", "Comedy", "Crime", "Documentary", + "Drama", "Fantasy", "Film-Noir", "Horror", "IMAX", "Musical", "Mystery", "Romance", + "Sci-Fi", "Thriller", "War", "Western", "(no genres listed)"] + ng = length(genres) + + # Arrays para acumular los resultados de calificaciones por género + rating_sum = zeros(ng, nF) + count_sum = zeros(Int, ng, nF) + + # Leer el archivo movies.csv y mantener solo las columnas necesarias + movies_path = joinpath(input_dir, "movies.csv") + df_movies = CSV.read(movies_path, DataFrame) + df_movies = df_movies[:, [:movieId, :genres]] + + # Procesar cada archivo de ratings + for i in 1:nF + rating_file = joinpath(prqDir, "ratings_ratings$(lpad(i, 2, '0')).parquet") + println("Procesando archivo: $rating_file") + + if isfile(rating_file) + df_ratings = DataFrame(read_parquet(rating_file)) + rating_sum[:, i], count_sum[:, i] = process_ratings(ng, genres, df_movies, df_ratings) + else + println("Archivo no encontrado: $rating_file") + end + end + + # Sumar resultados finales por género y mostrarlos + for i in 1:ng + total_rating = sum(rating_sum[i, :]) + total_count = sum(count_sum[i, :]) + promedio = total_rating/total_count + @printf("Género: %s Total calificaciones: %.2f Total conteo: %d Promedio: %.2f\n", genres[i], total_rating, total_count,promedio) + end +end + +# Función para procesar cada archivo ratings y acumular resultados por género +function process_ratings(ng::Int, genres::Vector{String}, df_movies::DataFrame, df_ratings::DataFrame) + rating_accum = zeros(ng) + count_accum = zeros(Int, ng) + + # Hacer un inner join entre movies y ratings + joined_df = innerjoin(df_movies, df_ratings, on=:movieId) + + # Calcular sumas y conteos por cada género + for i in 1:ng + genre_rows = joined_df[occursin.(genres[i], joined_df.genres), :] + count_accum[i] = nrow(genre_rows) + rating_accum[i] = sum(genre_rows.rating) + end + + return rating_accum, count_accum +end + +# Rutas de entrada y salida +input_dir = "C:\\Users\\Alexis\\Documents\\Practica_Julia" +output_dir = "C:\\Users\\Alexis\\Documents\\Practica_Julia" + +# Dividir el archivo ratings y guardar en formato Parquet +generate_small_files(joinpath(input_dir, "ratings.csv"), "ratings", output_dir) + +# Procesar los archivos de ratings y cruzarlos con movies +find_ratings_master(input_dir, output_dir) diff --git a/julia/src/student_submissions/movielens/Liliana_Vera/movielens.jl b/julia/src/student_submissions/movielens/Liliana_Vera/movielens.jl new file mode 100644 index 00000000..3c353709 --- /dev/null +++ b/julia/src/student_submissions/movielens/Liliana_Vera/movielens.jl @@ -0,0 +1,99 @@ +#import Pkg; Pkg.add("CSV") +#import Pkg; Pkg.add("DataFrames") + +using DataFrames +using CSV +using Parquet +using Printf + +# Función para dividir el archivo ratings en 10 partes y guardarlas en formato Parquet +function generate_small_files(ratings_file::String, output_prefix::String, output_dir::String) + println("Dividiendo archivo de ratings en partes...") + + # Leer el archivo completo de ratings + data = CSV.read(ratings_file, DataFrame) + total_rows = nrow(data) + num_chunks = 10 # Dividir en 10 partes + chunk_size = ceil(Int, total_rows / num_chunks) + + # Crear y guardar cada chunk en archivos separados + for i in 1:num_chunks + start_row = (i - 1) * chunk_size + 1 + end_row = min(i * chunk_size, total_rows) + chunk = data[start_row:end_row, :] + + output_path = joinpath(output_dir, "$(output_prefix)_ratings$(lpad(i, 2, '0')).parquet") + Parquet.write_parquet(output_path, chunk) + println("Archivo guardado: $output_path con $(nrow(chunk)) filas") + end +end + +# Función principal para procesar archivos de ratings y cruzarlos con movies +function find_ratings_master(input_dir::String, output_dir::String) + nF = 10 # Número de archivos ratings + prqDir = output_dir + + # Lista de géneros de películas + genres = ["Action", "Adventure", "Animation", "Children", "Comedy", "Crime", "Documentary", + "Drama", "Fantasy", "Film-Noir", "Horror", "IMAX", "Musical", "Mystery", "Romance", + "Sci-Fi", "Thriller", "War", "Western", "(no genres listed)"] + ng = length(genres) + + # Arrays para acumular los resultados de calificaciones por género + rating_sum = zeros(ng, nF) + count_sum = zeros(Int, ng, nF) + + # Leer el archivo movies.csv y mantener solo las columnas necesarias + movies_path = joinpath(input_dir, "movies.csv") + df_movies = CSV.read(movies_path, DataFrame) + df_movies = df_movies[:, [:movieId, :genres]] + + # Procesar cada archivo de ratings + for i in 1:nF + rating_file = joinpath(prqDir, "ratings_ratings$(lpad(i, 2, '0')).parquet") + println("Procesando archivo: $rating_file") + + if isfile(rating_file) + df_ratings = DataFrame(read_parquet(rating_file)) + rating_sum[:, i], count_sum[:, i] = process_ratings(ng, genres, df_movies, df_ratings) + else + println("Archivo no encontrado: $rating_file") + end + end + + # Sumar resultados finales por género y mostrarlos + for i in 1:ng + total_rating = sum(rating_sum[i, :]) + total_count = sum(count_sum[i, :]) + promedio = total_rating/total_count + @printf("Género: %s Total calificaciones: %.2f Total conteo: %d Promedio: %.2f\n", genres[i], total_rating, total_count,promedio) + end +end + +# Función para procesar cada archivo ratings y acumular resultados por género +function process_ratings(ng::Int, genres::Vector{String}, df_movies::DataFrame, df_ratings::DataFrame) + rating_accum = zeros(ng) + count_accum = zeros(Int, ng) + + # Hacer un inner join entre movies y ratings + joined_df = innerjoin(df_movies, df_ratings, on=:movieId) + + # Calcular sumas y conteos por cada género + for i in 1:ng + genre_rows = joined_df[occursin.(genres[i], joined_df.genres), :] + count_accum[i] = nrow(genre_rows) + rating_accum[i] = sum(genre_rows.rating) + end + + return rating_accum, count_accum +end + +# Rutas de entrada y salida +input_dir = "C:\\Users\\Alexis\\Documents\\Practica_Julia" +output_dir = "C:\\Users\\Alexis\\Documents\\Practica_Julia" + +# Dividir el archivo ratings y guardar en formato Parquet +generate_small_files(joinpath(input_dir, "ratings.csv"), "ratings", output_dir) + +# Procesar los archivos de ratings y cruzarlos con movies +find_ratings_master(input_dir, output_dir) \ No newline at end of file