Skip to content

Commit

Permalink
Merge pull request #586 from JuliaIO/fixmultithreadingissue
Browse files Browse the repository at this point in the history
Fixmultithreadingissue
  • Loading branch information
JonasIsensee authored Aug 25, 2024
2 parents 140dc81 + 9beaede commit 131c688
Show file tree
Hide file tree
Showing 13 changed files with 160 additions and 255 deletions.
2 changes: 1 addition & 1 deletion src/JLD2.jl
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,8 @@ printtoc(io::IO, f::JLDFile; numlines = typemax(Int64)) =



include("headermessages.jl")
include("object_headers.jl")
include("headermessages.jl")
include("groups.jl")
include("dataspaces.jl")
include("attributes.jl")
Expand Down
1 change: 0 additions & 1 deletion src/committed_datatype_introspection.jl
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ function stringify_object(f, offset)
dataspace = ReadDataspace()
attrs = EMPTY_READ_ATTRIBUTES
datatype::H5Datatype = PlaceholderH5Datatype()
chunked_storage::Bool = false
layout::DataLayout = DataLayout(0,LcCompact,0,-1)
filter_pipeline::FilterPipeline = FilterPipeline(Filter[])
for msg in HeaderMessageIterator(f, offset)
Expand Down
47 changes: 18 additions & 29 deletions src/compression.jl
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ function get_compressor(::Bool)
false, COMPRESSOR_TO_ID[:ZlibCompressor], m.ZlibCompressor()
end

function get_compressor(filter_id::UInt16)
modname, compressorname, decompressorname, = ID_TO_DECOMPRESSOR[filter_id]
invoke_again, m = checked_import(modname)
if invoke_again || !applicable(getproperty(m,compressorname))
_, compressor = Base.invokelatest(get_compressor, filter_id)
return true, compressor

Check warning on line 114 in src/compression.jl

View check run for this annotation

Codecov / codecov/patch

src/compression.jl#L113-L114

Added lines #L113 - L114 were not covered by tests
end
return invoke_again, getproperty(m,compressorname)()
end
function get_decompressor(filter_id::UInt16)
modname, compressorname, decompressorname, = ID_TO_DECOMPRESSOR[filter_id]
invoke_again, m = checked_import(modname)
Expand Down Expand Up @@ -180,35 +189,15 @@ function write_chunked_storage_message( io::IO,
elsize::Int,
dims::NTuple{N,Int},
filtered_size::Int,
offset::RelOffset) where N
jlwrite(io, HeaderMessage(HmDataLayout, chunked_storage_message_size(N) - jlsizeof(HeaderMessage), 0))
jlwrite(io, UInt8(4)) # Version
jlwrite(io, UInt8(LcChunked)) # Layout Class
jlwrite(io, UInt8(2)) # Flags (= SINGLE_INDEX_WITH_FILTER)
jlwrite(io, UInt8(N+1)) # Dimensionality
jlwrite(io, UInt8(jlsizeof(Length))) # Dimensionality Size
for i = N:-1:1
jlwrite(io, Length(dims[i])) # Dimensions 1...N
end
jlwrite(io, Length(elsize)) # Element size (last dimension)
jlwrite(io, UInt8(1)) # Chunk Indexing Type (= Single Chunk)
jlwrite(io, Length(filtered_size)) # Size of filtered chunk
jlwrite(io, UInt32(0)) # Filters for chunk
jlwrite(io, offset) # Address
end


function write_compressed_data(cio, f, data, odr, wsession, filter_id, compressor)
write_filter_pipeline_message(cio, filter_id)

# deflate first
deflated = deflate_data(f, data, odr, wsession, compressor)

write_chunked_storage_message(cio, odr_sizeof(odr), size(data), length(deflated), h5offset(f, f.end_of_data))
jlwrite(f.io, end_checksum(cio))

f.end_of_data += length(deflated)
jlwrite(f.io, deflated)
data_address::RelOffset) where N
write_header_message(io, Val(HmDataLayout);
layout_class = LcChunked,
flags = 2, # (= SINGLE_INDEX_WITH_FILTER)
dimensions = UInt64.((reverse(dims)..., elsize)), # Reversed dimensions with element size as last dim
chunk_indexing_type = 1, # (= Single Chunk)
data_size = filtered_size,
filters = 0, # Filters for chunk
data_address)
end

function decompress!(inptr::Ptr, data_length, element_size, n, decompressor::TranscodingStreams.Codec)
Expand Down
84 changes: 15 additions & 69 deletions src/datalayouts.jl
Original file line number Diff line number Diff line change
@@ -1,31 +1,3 @@
struct CompactStorageMessage
hm::HeaderMessage
version::UInt8
layout_class::LayoutClass
data_size::UInt16
end
define_packed(CompactStorageMessage)
CompactStorageMessage(datasz::Int) =
CompactStorageMessage(
HeaderMessage(HmDataLayout, jlsizeof(CompactStorageMessage) - jlsizeof(HeaderMessage) + datasz, 0),
4, LcCompact, datasz
)

struct ContiguousStorageMessage
hm::HeaderMessage
version::UInt8
layout_class::LayoutClass
address::RelOffset
data_size::Length
end
define_packed(ContiguousStorageMessage)
ContiguousStorageMessage(datasz::Int, offset::RelOffset) =
ContiguousStorageMessage(
HeaderMessage(HmDataLayout, jlsizeof(ContiguousStorageMessage) - jlsizeof(HeaderMessage), 0),
4, LcContiguous, offset, datasz
)


## Left over header message parsing that does not have a good place.

struct DataLayout
Expand Down Expand Up @@ -87,47 +59,21 @@ function FilterPipeline(msg_::Hmessage)
nfilters = msg.nfilters
io = msg.m.io
seek(io, msg.m.address+2)
if version == 1
skip(io, 6)
filters = map(1:nfilters) do _
id = jlread(io, UInt16)
name_length = jlread(io, UInt16)
flags = jlread(io, UInt16)
nclient_vals = jlread(io, UInt16)
if iszero(name_length)
name = ""
else
name = read_bytestring(io)
skip(io, 8-mod1(sizeof(name), 8)-1)
end
client_data = jlread(io, UInt32, nclient_vals)
isodd(nclient_vals) && skip(io, 4)
Filter(id, flags, name, client_data)
end
return FilterPipeline(filters)
elseif version == 2
filters = map(1:nfilters) do _
id = jlread(io, UInt16)
if id > 255
name_length = jlread(io, UInt16)
flags = jlread(io, UInt16)
nclient_vals = jlread(io, UInt16)
if iszero(name_length)
name = ""
else
name = read_bytestring(io)
skip(io, 8-mod1(sizeof(name), 8)-1)
end
else
name = ""
flags = jlread(io, UInt16)
nclient_vals = jlread(io, UInt16)
end
client_data = jlread(io, UInt32, nclient_vals)
Filter(id, flags, name, client_data)
version == 1 && skip(io, 6)
filters = map(1:nfilters) do _
id = jlread(io, UInt16)
name_length = (version == 2 && id < 255) ? zero(UInt16) : jlread(io, UInt16)
flags = jlread(io, UInt16)
nclient_vals = jlread(io, UInt16)
if iszero(name_length)
name = ""
else
name = read_bytestring(io)
skip(io, 8-mod1(sizeof(name), 8)-1)
end
return FilterPipeline(filters)
else
throw(UnsupportedVersionException("Filter Pipeline Message version $version is not implemented"))
client_data = jlread(io, UInt32, nclient_vals)
(version == 1 && isodd(nclient_vals)) && skip(io, 4)
Filter(id, flags, name, client_data)
end
return FilterPipeline(filters)
end
45 changes: 15 additions & 30 deletions src/datasets.jl
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ end
# Figure out the layout
if datasz == 0 || (!(data isa Array) && datasz < 8192)
layout_class = LcCompact
psz += jlsizeof(CompactStorageMessage) + datasz
psz += jlsizeof(Val(HmDataLayout); layout_class, data_size=datasz)
elseif data isa Array && compress != false && isconcretetype(eltype(data)) && isbitstype(eltype(data))
# Only now figure out if the compression argument is valid
invoke_again, filter_id, compressor = get_compressor(compress)
Expand All @@ -376,7 +376,7 @@ end
psz += chunked_storage_message_size(ndims(data)) + pipeline_message_size(filter_id::UInt16)
else
layout_class = LcContiguous
psz += jlsizeof(ContiguousStorageMessage)
psz += jlsizeof(Val(HmDataLayout); layout_class)
end

fullsz = jlsizeof(ObjectStart) + size_size(psz) + psz + 4
Expand All @@ -396,32 +396,29 @@ end

# Data storage layout
if layout_class == LcCompact
jlwrite(cio, CompactStorageMessage(datasz))
write_header_message(cio, Val(HmDataLayout); layout_class, data_size=datasz)
if datasz != 0
write_data(cio, f, data, odr, datamode(odr), wsession)
end
jlwrite(cio, CONTINUATION_PLACEHOLDER)
write_continuation_placeholder(cio)
jlwrite(io, end_checksum(cio))
elseif layout_class == LcChunked

write_filter_pipeline_message(cio, filter_id)

# deflate first
deflated = deflate_data(f, data, odr, wsession, compressor)

write_chunked_storage_message(cio, odr_sizeof(odr), size(data), length(deflated), h5offset(f, f.end_of_data))

# Add NIL message replacable by continuation message
jlwrite(cio, CONTINUATION_PLACEHOLDER)
write_chunked_storage_message(cio, odr_sizeof(odr), size(data), length(deflated), h5offset(f, f.end_of_data))
write_continuation_placeholder(cio)
jlwrite(f.io, end_checksum(cio))

seek(f.io, f.end_of_data)
f.end_of_data += length(deflated)
jlwrite(f.io, deflated)
else
data_address = f.end_of_data + 8 - mod1(f.end_of_data, 8)
jlwrite(cio, ContiguousStorageMessage(datasz, h5offset(f, data_address)))
jlwrite(cio, CONTINUATION_PLACEHOLDER)
write_header_message(cio, Val(HmDataLayout);
layout_class, data_address=h5offset(f, data_address), data_size=datasz)
write_continuation_placeholder(cio)
jlwrite(io, end_checksum(cio))

f.end_of_data = data_address + datasz
Expand All @@ -435,27 +432,15 @@ end
function write_object_header_and_dataspace_message(cio::IO, f::JLDFile, psz::Int, dataspace::WriteDataspace)
jlwrite(cio, ObjectStart(size_flag(psz)))
write_size(cio, psz)

# Fill value
jlwrite(cio, HeaderMessage(HmFillValue, 2, 0))
jlwrite(cio, UInt8(3)) # Version
jlwrite(cio, 0x09) # Flags

# Dataspace
jlwrite(cio, HeaderMessage(HmDataspace, jlsizeof(dataspace), 0))
jlwrite(cio, dataspace)

# Attributes
write_header_message(cio, Val(HmFillValue); flags=0x09)
write_header_message(cio, Val(HmDataspace); dataspace.dataspace_type, dimensions=dataspace.size)
for attr in dataspace.attributes
jlwrite(cio, HeaderMessage(HmAttribute, jlsizeof(attr), 0))
write_attribute(cio, f, attr, f.datatype_wsession)
write_header_message(cio, f, attr)
end
end

function write_datatype_message(cio::IO, datatype::H5Datatype)
jlwrite(cio, HeaderMessage(HmDatatype, jlsizeof(datatype), 1 | (2*isa(datatype, CommittedDatatype))))
jlwrite(cio, datatype)
end
write_datatype_message(cio::IO, dt::H5Datatype) =
write_header_message(cio, Val(HmDatatype), 1 | (2*isa(dt, CommittedDatatype)); dt)


@nospecializeinfer function write_dataset(f::JLDFile, @nospecialize(x), wsession::JLDWriteSession)::RelOffset
Expand Down Expand Up @@ -523,7 +508,7 @@ function delete_written_link!(f::JLDFile, roffset::RelOffset, name::AbstractStri
if msg.type == HmLinkMessage && HmWrap(HmLinkMessage, msg).link_name == name
# delete link
seek(f.io, fileoffset(f, msg.offset))
jlwrite(f.io, HeaderMessage(HmNil, msg.size, 0))
write_header_message(f.io, Val(HmNil), 0, msg.size)
update_checksum(f.io, iter.chunk.chunk_start, iter.chunk.chunk_end)
end
end
Expand Down
9 changes: 4 additions & 5 deletions src/datatypes.jl
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,8 @@ end
function commit(f::JLDFile,
@nospecialize(dt::H5Datatype),
attrs::Tuple{Vararg{WrittenAttribute}}=())
psz = jlsizeof(HeaderMessage) * (length(attrs) + 1) + jlsizeof(dt)
psz = jlsizeof(Val(HmDatatype), 64; dt)
psz += jlsizeof(HeaderMessage) * (length(attrs))
for attr in attrs
psz += jlsizeof(attr)
end
Expand All @@ -326,11 +327,9 @@ function commit(f::JLDFile,
cio = begin_checksum_write(io, sz)
jlwrite(cio, ObjectStart(size_flag(psz)))
write_size(cio, psz)
jlwrite(cio, HeaderMessage(HmDatatype, jlsizeof(dt), 64))
jlwrite(cio, dt)
write_header_message(cio, Val(HmDatatype), 64; dt)
for attr in attrs
jlwrite(cio, HeaderMessage(HmAttribute, jlsizeof(attr), 0))
write_attribute(cio, f, attr, f.datatype_wsession)
write_header_message(cio, f, attr)
end
jlwrite(io, end_checksum(cio))
end
Expand Down
Loading

0 comments on commit 131c688

Please sign in to comment.