Skip to content

Commit

Permalink
Apply more reproducibility test infra fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
charleskawczynski committed Dec 29, 2024
1 parent 8cdd3f8 commit 53c3156
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 39 deletions.
5 changes: 4 additions & 1 deletion reproducibility_tests/move_output.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ move_data_to_save_dir(;

if buildkite_ci && in_merge_queue
folders = get_reference_dirs_to_delete(; root_dir = cluster_data_prefix)
debug_reproducibility() && @warn "Repro: deleting folders $folders"
bins = compute_bins(folders)
msg = prod(x -> " $x\n", folders)
@warn "Repro: deleting folders:\n$msg"
@warn "Deleted folder bins:\n $(string_bins(bins))"
for f in folders
rm(f; recursive = true, force = true)
end
Expand Down
5 changes: 4 additions & 1 deletion reproducibility_tests/ref_counter.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
192
193

# **README**
#
Expand All @@ -21,6 +21,9 @@

#=
193
- More reproducibility infrastructure fixes.
192
- Reproducibility infrastructure fixes.
Expand Down
15 changes: 4 additions & 11 deletions reproducibility_tests/reproducibility_tools.jl
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,6 @@ function error_if_dissimilar_dicts(dicts, dict)
end
end

function all_files_in_dir(dir)
all_files = String[]
for (root, dirs, files) in walkdir(dir)
for file in files
push!(all_files, joinpath(root, file))
end
end
return all_files
end

function no_comparison_error(dirs, non_existent_files)
msg = "\n\n"
msg *= "Pull request author:\n"
Expand All @@ -59,7 +49,10 @@ function no_comparison_error(dirs, non_existent_files)
msg *= "for how to merge this PR."
msg *= "\n\n"
for dir in dirs
msg *= "Files in dirs: $(all_files_in_dir(dir))\n"
msg *= "Files in dir $dir\n"
for file in all_files_in_dir(dir)
msg *= " $file\n"
end
end
error(msg)
end
Expand Down
173 changes: 153 additions & 20 deletions reproducibility_tests/reproducibility_utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,82 @@ comparable states
| 04_d6e48 06_d6d73 08_1cc58 |
v 04_4c042 v newest
```
# File states
Reproducibility tests inherently rely on comparing multiple states, which means
that our reproducibility testing infrastructure is _stateful_. During our
continuous integration testing (CI), files are generated, moved, and zipped. To help
assist our understanding and reasoning, we let's assume that there are two states:
## state 1: end of simulation, folder structure
- `job_id/output_dir/`
- `job_id/output_dir/reproducibility_bundle/`
- `job_id/output_dir/reproducibility_bundle/ref_counter.jl`
- `job_id/output_dir/reproducibility_bundle/prog_state.hdf5`
## state 2: data is saved for future reference
- `commit_hash/job_id/output_dir/`
- `commit_hash/job_id/output_dir/reproducibility_bundle/`
- `commit_hash/job_id/output_dir/reproducibility_bundle/ref_counter.jl`
- `commit_hash/job_id/output_dir/reproducibility_bundle/prog_state.hdf5`
- `commit_hash/reproducibility_bundle/ref_counter.jl`
- `commit_hash/reproducibility_bundle/job_id/`
- `commit_hash/reproducibility_bundle/job_id/prog_state.hdf5`
In other words, we strip out `output_dir/`, and swap `job_id` and
`reproducibility_bundle`. This is done for two reasons:
- The ref_counter is job-independent, hence the swap
- The `output_dir/` is redundant to the purpose of the commit hash folder
################################################################################
=#

# debug_reproducibility() = false
# debug_reproducibility() = true
debug_reproducibility() =
get(ENV, "BUILDKITE_PIPELINE_SLUG", nothing) == "climaatmos-ci"

import Dates
import OrderedCollections

function string_all_files_in_dir(dir)
msg = "Files in dir $dir\n"
for file in all_files_in_dir(dir)
msg *= " $file\n"
end
return msg
end

function all_files_in_dir(dir)
all_files = String[]
for (root, dirs, files) in walkdir(dir)
for file in files
push!(all_files, joinpath(root, file))
end
end
return all_files
end

"""
skip_list()
Occasionally, folders saved folders are modified, resulting in
`sorted_dirs_with_matched_files` returning a list of directories that, while
correctly chronologically ordered, are not the desired folders to be returned.
Generally speaking, fixing this may requiring touching all existing saved files
in a particular order to fix the returned list. This is a bit laborious.
Another way we can fix this is by filtering out a selected group of folders to
avoid comparisons against those results altogether.
"""
function skip_list()
return ["/central/scratch/esm/slurm-buildkite/climaatmos-main/1624f03"]
end

read_ref_counter(file) = parse(Int, first(readlines(file)))

"""
Expand Down Expand Up @@ -76,6 +142,7 @@ function sorted_dirs_with_matched_files(;
# sort by timestamp
sorted_dirs =
sort(matched_dirs; by = f -> Dates.unix2datetime(stat(f).mtime))
filter!(x -> !any(y -> occursin(y, x), skip_list()), sorted_dirs)
return sorted_dirs
end

Expand Down Expand Up @@ -200,18 +267,26 @@ comparable states
v 04_4c042 v newest
```
"""
compute_bins(
function compute_bins(
root_dir::String = "/central/scratch/esm/slurm-buildkite/climaatmos-main";
filename = "ref_counter.jl",
) = compute_bins(
reverse(
sorted_dirs_with_matched_files(;
dir = root_dir,
filename = "ref_counter.jl",
),
),
)
debug_reproducibility() && @info "----Computing bins start"
dirs = sorted_dirs_with_matched_files(;
dir = root_dir,
filename = "ref_counter.jl",
)
bins = compute_bins(reverse(dirs))
debug_reproducibility() && @info "----Computing bins end"
end

function compute_bins(sorted_dirs::Vector{String})
if debug_reproducibility()
@info "sorted_dirs:"
for dir in sorted_dirs
@show dir, Dates.unix2datetime(stat(dir).mtime)
end
end
@assert isempty(invalid_reference_folders(sorted_dirs))
bins = Vector{String}[]
dir_index = 1
Expand Down Expand Up @@ -241,11 +316,31 @@ function compute_bins(sorted_dirs::Vector{String})
return bins
end

print_bins(bins) = print_bins(stdout, bins)
print_bins(io::IO, bins) = println(io, string_bins(bins))

"""
string_bins(bins)
Return a string summarizing the given bins.
"""
function string_bins(bins)
msg = "Bins:\n"
for (i, bin) in enumerate(bins)
msg *= " Bin $i:\n"
for (j, state) in enumerate(bin)
ref_counter = read_ref_counter(joinpath(state, "ref_counter.jl"))
msg *= " (State $j, ref_counter): ($state, $ref_counter)\n"
end
end
return msg
end

"""
get_reference_dirs_to_delete(;
root_dir,
keep_n_comparable_states = 5,
keep_n_bins_back = 7,
keep_n_comparable_states = 100,
keep_n_bins_back = 100,
)
Return a list of folders to delete.
Expand Down Expand Up @@ -301,8 +396,8 @@ keep_n_comparable_states
"""
function get_reference_dirs_to_delete(;
root_dir,
keep_n_comparable_states = 5,
keep_n_bins_back = 7,
keep_n_comparable_states = 100,
keep_n_bins_back = 100,
filename = "ref_counter.jl",
)
dirs = sorted_dirs_with_matched_files(; dir = root_dir, filename)
Expand Down Expand Up @@ -368,26 +463,48 @@ function source_has_changed(;
end
end

rm_folder(path; strip_folder) =
joinpath(filter(x -> !occursin(strip_folder, x), splitpath(path))...)

"""
move_data_to_save_dir(;
dest_root = "/central/scratch/esm/slurm-buildkite/climaatmos-main",
buildkite_ci = get(ENV, "BUILDKITE_PIPELINE_SLUG", nothing) == "climaatmos-ci",
buildkite_ci = get(ENV, "BUILDKITE_PIPELINE_SLUG", nothing) ==
"climaatmos-ci",
commit = get(ENV, "BUILDKITE_COMMIT", nothing),
branch = get(ENV, "BUILDKITE_BRANCH", nothing),
in_merge_queue = startswith(branch, "gh-readonly-queue/main/"),
dirs_src,
strip_folder = Pair("output_active", ""),
ref_counter_file_PR = joinpath(@__DIR__, "ref_counter.jl"),
ref_counter_PR = read_ref_counter(ref_counter_file_PR),
skip = get(ENV, "BUILDKITE_PIPELINE_SLUG", nothing) != "climaatmos-ci",
n_hash_characters = 7,
repro_folder = "reproducibility_bundle",
)
Moves data from directories `dirs_src[i]` to `dest_root/commit_sha/basename
(dirs_src[i])`, given some conditions are met. In particular, data movement
will occur when this function is called:
Moves data in the following way:
for job_id in dest_src
`job_id/out/repro/ref_counter.jl` -> `commit_hash/repro/ref_counter.jl`
`job_id/out/repro/` -> `commit_hash/repro/job_id/`
`job_id/out/repro/prog_state.hdf5` -> `commit_hash/repro/job_id/prog_state.hdf5`
end
Note that files not in the `repro` folder are not moved.
In other words, we strip out `out/`, and swap `job_id` and `repro`. This is done
for two reasons:
- The ref_counter is job-independent, hence the swap
- The `out/` is redundant to the purpose of the commit hash folder
Data movement will occur when this function is called:
- on a job run in buildkite
- when in the merge queue
- when on the main branch if the `source_checksum` is different from the source
code in the latest comparable reference
"""
function move_data_to_save_dir(;
dest_root = "/central/scratch/esm/slurm-buildkite/climaatmos-main",
Expand All @@ -397,6 +514,7 @@ function move_data_to_save_dir(;
branch = get(ENV, "BUILDKITE_BRANCH", nothing),
in_merge_queue = startswith(branch, "gh-readonly-queue/main/"),
dirs_src,
strip_folder = "output_active",
ref_counter_file_PR = joinpath(@__DIR__, "ref_counter.jl"),
ref_counter_PR = read_ref_counter(ref_counter_file_PR),
skip = get(ENV, "BUILDKITE_PIPELINE_SLUG", nothing) != "climaatmos-ci",
Expand Down Expand Up @@ -427,11 +545,26 @@ function move_data_to_save_dir(;
for src in dirs_src
dst = joinpath(dest_repro, basename(src))
mv(src, dst; force = true)
debug_reproducibility() &&
@info "Reproducibility: File $src moved to $dst"
end
for dst in all_files_in_dir(dest_repro)
dst_new = rm_folder(dst; strip_folder)
debug_reproducibility() && @show dst_new
if dst dst_new
mkpath(dirname(dst_new))
mv(dst, dst_new; force = true)
end
end
ref_counter_file_main = joinpath(dest_repro, "ref_counter.jl")
mv(ref_counter_file_PR, ref_counter_file_main; force = true)
if debug_reproducibility()
println("####################### SRC")
for src in dirs_src
@info(string_all_files_in_dir(src))
end
println("####################### DST")
@info(string_all_files_in_dir(dest_repro))
println("#######################")
end
else
if debug_reproducibility()
@warn "Repro: skipping data movement"
Expand Down
11 changes: 10 additions & 1 deletion reproducibility_tests/test_mse.jl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,16 @@ if isempty(computed_mse_filenames)
read_ref_counter(joinpath(newest_saved_dir, "ref_counter.jl"))
ref_counter_PR =
read_ref_counter(joinpath(@__DIR__, "ref_counter.jl"))
@assert ref_counter_PR == newest_saved_ref_counter + 1 "Reference counter must be incremented by 1. ref_counter_PR=$ref_counter_PR, newest_saved_ref_counter=$newest_saved_ref_counter"
if ref_counter_PR newest_saved_ref_counter + 1
if debug_reproducibility()
@info " ref_counter_PR=$ref_counter_PR, newest_saved_ref_counter=$newest_saved_ref_counter\n"
@info "newest_saved_dir: $newest_saved_dir\n"
@info "newest_saved_dir_legacy: $newest_saved_dir_legacy\n"
@info "newest_saved_dir_new: $newest_saved_dir_new\n"
print_bins(bins)
end
error("Reference counter must be incremented by 1.")
end
end
else
msg = "There were comparable references, but no computed mse files exist."
Expand Down
15 changes: 10 additions & 5 deletions test/unit_reproducibility_infra.jl
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,10 @@ end
d6 = make_ref_file_counter(5, dir, "d6")
d7 = make_ref_file_counter(6, dir, "d7")
@test compute_bins(dir) == [[d7], [d6, d5], [d4, d3], [d2], [d1]]
@test occursin(
"(State 1, ref_counter):",
string_bins(compute_bins(dir)),
)
end

# simulating reverted PR
Expand Down Expand Up @@ -1255,16 +1259,16 @@ if pkgversion(ClimaCore) ≥ v"0.14.18"
make_file_with_contents(computed_dir, "file_z.jl", "abc")
ref_counter_file_dir =
make_ref_file_counter(3, computed_dir, "repro_bundle")
job_id_1 = joinpath(computed_dir, "job_id_1")
job_id_2 = joinpath(computed_dir, "job_id_2")
job_id_1 = joinpath(computed_dir, "repro_bundle", "job_id_1")
job_id_2 = joinpath(computed_dir, "repro_bundle", "job_id_2")
put_data_file(
job_id_1,
joinpath(job_id_1, "output_active"),
fv,
comms_ctx;
filename = "ref_prog_state.hdf5",
)
put_data_file(
job_id_2,
joinpath(job_id_2, "output_active"),
fv,
comms_ctx;
filename = "ref_prog_state.hdf5",
Expand All @@ -1273,8 +1277,8 @@ if pkgversion(ClimaCore) ≥ v"0.14.18"
@test source_checksum(hash2) == source_checksum(computed_dir)

repro_folder = "repro_bundle"
repro_dir = joinpath(save_dir, "hash_new", repro_folder)
move_data_to_save_dir(;
strip_folder = "output_active",
dest_root = save_dir,
buildkite_ci = true,
commit = "hash_new",
Expand All @@ -1290,6 +1294,7 @@ if pkgversion(ClimaCore) ≥ v"0.14.18"
ref_counter_PR = 3,
skip = false,
)
repro_dir = joinpath(save_dir, "hash_new", "repro_bundle")
@test isfile(joinpath(repro_dir, "job_id_1", "ref_prog_state.hdf5"))
@test isfile(joinpath(repro_dir, "job_id_2", "ref_prog_state.hdf5"))
@test isfile(joinpath(repro_dir, "ref_counter.jl"))
Expand Down

0 comments on commit 53c3156

Please sign in to comment.