Skip to content

Commit

Permalink
initial MPI implementation (issue #122)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander-Barth committed Apr 23, 2024
1 parent 6735b42 commit 70ef683
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 38 deletions.
7 changes: 7 additions & 0 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,18 @@ NetCDF_jll = "7243133f-43d8-5620-bbf4-c2c921802cf3"
NetworkOptions = "ca575930-c2e3-43a9-ace4-1e988b2c1908"
Printf = "de0858da-6303-5e67-8744-51eddeeeb8d7"

[weakdeps]
MPI = "da04e1cc-30fd-572f-bb4f-1f8673147195"

[extensions]
NCDatasetsMPIExt = "MPI"

[compat]
CFTime = "0.1.1"
CommonDataModel = "0.3.4"
DataStructures = "0.17, 0.18"
DiskArrays = "0.3.22"
MPI = "0.20"
NetCDF_jll = "=400.701.400, =400.702.400, =400.902.5, =400.902.208, =400.902.209, =400.902.211"
NetworkOptions = "1.2"
julia = "1.6"
Expand Down
87 changes: 49 additions & 38 deletions src/dataset.jl
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,51 @@ function NCDataset(ncid::Integer,
return ds
end

function _dataset_ncmode(filename,mode,format;
diskless::Bool = false,
persist::Bool = false,
share::Bool = false)

ncmode =
if mode == "r"
NC_NOWRITE
elseif mode == "a"
NC_WRITE
elseif mode == "c"
NC_CLOBBER
else
throw(NetCDFError(-1, "Unsupported mode '$(mode)' for filename '$(filename)'"))
end

if diskless
ncmode = ncmode | NC_DISKLESS

if persist
ncmode = ncmode | NC_PERSIST
end
end

if share
@debug "share mode"
ncmode = ncmode | NC_SHARE
end

if format == :netcdf5_64bit_data
ncmode = ncmode | NC_64BIT_DATA
elseif format == :netcdf3_64bit_offset
ncmode = ncmode | NC_64BIT_OFFSET
elseif format == :netcdf4_classic
ncmode = ncmode | NC_NETCDF4 | NC_CLASSIC_MODEL
elseif format == :netcdf4
ncmode = ncmode | NC_NETCDF4
elseif format == :netcdf3_classic
# do nothing
else
throw(NetCDFError(-1, "Unkown format '$(format)' for filename '$(filename)'"))
end

return ncmode
end

############################################################
# High-level
Expand Down Expand Up @@ -179,30 +224,10 @@ function NCDataset(filename::AbstractString,

ncid = -1
isdefmode = Ref(false)

ncmode =
if mode == "r"
NC_NOWRITE
elseif mode == "a"
NC_WRITE
elseif mode == "c"
NC_CLOBBER
else
throw(NetCDFError(-1, "Unsupported mode '$(mode)' for filename '$(filename)'"))
end

if diskless
ncmode = ncmode | NC_DISKLESS

if persist
ncmode = ncmode | NC_PERSIST
end
end

if share
@debug "share mode"
ncmode = ncmode | NC_SHARE
end
ncmode = _dataset_ncmode(filename,mode,format;
diskless = diskless,
persist = persist,
share = share)

@debug "ncmode: $ncmode"

Expand All @@ -213,20 +238,6 @@ function NCDataset(filename::AbstractString,
ncid = nc_open_mem(filename,ncmode,memory)
end
elseif mode == "c"
if format == :netcdf5_64bit_data
ncmode = ncmode | NC_64BIT_DATA
elseif format == :netcdf3_64bit_offset
ncmode = ncmode | NC_64BIT_OFFSET
elseif format == :netcdf4_classic
ncmode = ncmode | NC_NETCDF4 | NC_CLASSIC_MODEL
elseif format == :netcdf4
ncmode = ncmode | NC_NETCDF4
elseif format == :netcdf3_classic
# do nothing
else
throw(NetCDFError(-1, "Unkown format '$(format)' for filename '$(filename)'"))
end

ncid = nc_create(filename,ncmode)
isdefmode[] = true
end
Expand Down
2 changes: 2 additions & 0 deletions src/variable.jl
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ end
export fillvalue


access() = error("This function is available after importing MPI")

"""
a = nomissing(da)
Expand Down
61 changes: 61 additions & 0 deletions test/test_mpi_netcdf.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# for example run as:
# $HOME/.julia/packages/MPI/z2owj/bin/mpiexecjl -n 4 julia test_mpi_netcdf.jl

using MPIPreferences
using MPI
using NCDatasets
using Test

# only tested so far with OpenMPI
# and NetCDF_jll v400.902.211+0
@assert MPIPreferences.binary == "OpenMPI_jll"

mpiexec = realpath(joinpath(dirname(pathof(MPI)),"..","bin","mpiexecjl"))

#println("mpiexec ",mpiexec)

print("$mpiexec -n 4 julia test_mpi.jl")

MPI.Init()

mpi_comm = MPI.COMM_WORLD
mpi_comm_size = MPI.Comm_size(mpi_comm)
mpi_rank = MPI.Comm_rank(mpi_comm)

# need to be the same file for all processes
path = "/tmp/test-mpi.nc"
i = mpi_rank + 1


ds = NCDataset(mpi_comm,path,"c")

defDim(ds,"lon",10)
defDim(ds,"lat",mpi_comm_size)
ncv = defVar(ds,"temp",Float32,("lon","lat"))

# see
# https://web.archive.org/web/20240414204638/https://docs.unidata.ucar.edu/netcdf-c/current/parallel_io.html
NCDatasets.access(ncv.var,:collective)


print("rank $(mpi_rank) writing to netCDF variable\n")
ncv[:,i] .= mpi_rank

ncv.attrib["units"] = "degree Celsius"
ds.attrib["comment"] = "MPI test"
close(ds)


ds = NCDataset(mpi_comm,path,"r")
ncv = ds["temp"]

@test size(ncv) == (10,mpi_comm_size)
print("rank $(mpi_rank) reading from netCDF variable\n")

@test all(==(mpi_rank),ncv[:,i])
@test ncv.attrib["units"] == "degree Celsius"
@test ds.attrib["comment"] == "MPI test"

close(ds)

MPI.Finalize()

0 comments on commit 70ef683

Please sign in to comment.