Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ZstdFrameCompressor and ZstdFrameDecompressor for non-streaming API #46

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/CodecZstd.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ export
ZstdCompressor,
ZstdCompressorStream,
ZstdDecompressor,
ZstdDecompressorStream
ZstdDecompressorStream,
ZstdFrameCompressor,
ZstdFrameDecompressor,
ZstdError

import TranscodingStreams:
TranscodingStreams,
Expand All @@ -23,5 +26,7 @@ include("LibZstd_clang.jl")
include("libzstd.jl")
include("compression.jl")
include("decompression.jl")
include("frameCompression.jl")
include("frameDecompression.jl")

end # module
11 changes: 6 additions & 5 deletions src/compression.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,22 @@
end

# Same as the zstd command line tool (v1.2.0).
const DEFAULT_COMPRESSION_LEVEL = 3
const DEFAULT_COMPRESSION_LEVEL = DEFAULT_CLEVEL

"""
ZstdCompressor(;level=$(DEFAULT_COMPRESSION_LEVEL))

Create a new zstd compression codec.
Create a new zstd compression codec using the streaming API.
This compressor uses `ZSTD_compressStream`.

Arguments
---------
- `level`: compression level (1..$(MAX_CLEVEL))
- `level`: compression level ($(MIN_CLEVEL)..$(MAX_CLEVEL))
"""
function ZstdCompressor(;level::Integer=DEFAULT_COMPRESSION_LEVEL)
if !(1 ≤ level ≤ MAX_CLEVEL)
throw(ArgumentError("level must be within 1..$(MAX_CLEVEL)"))
if !(MIN_CLEVEL ≤ level ≤ MAX_CLEVEL)
throw(ArgumentError("level must be within $(MIN_CLEVEL)..$(MAX_CLEVEL)"))
end

Check warning on line 29 in src/compression.jl

View check run for this annotation

Codecov / codecov/patch

src/compression.jl#L28-L29

Added lines #L28 - L29 were not covered by tests
return ZstdCompressor(CStream(), level)
end

Expand Down
3 changes: 2 additions & 1 deletion src/decompression.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ end
"""
ZstdDecompressor()

Create a new zstd decompression codec.
Create a new zstd decompression codec using the streaming API.
This decompressor uses `ZSTD_decompressStream`.
"""
function ZstdDecompressor()
return ZstdDecompressor(DStream())
Expand Down
101 changes: 101 additions & 0 deletions src/frameCompression.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Frame Compressor Codec
# ======================

struct ZstdFrameCompressor <: TranscodingStreams.Codec
cstream::CStream
level::Int
end

function Base.show(io::IO, codec::ZstdFrameCompressor)
print(io, summary(codec), "(level=$(codec.level))")
end

# See compressor.jl for DEFAULT_COMPRESSION_LEVEL

"""
ZstdFrameCompressor(;level=$(DEFAULT_COMPRESSION_LEVEL))

Create a new zstd compression codec using the non-streaming API.
This is uses `ZSTD_compress2`. This compressor expects to have the
entire input buffer to be compressed available and stores the
decompressed length in the frame header.

Arguments
---------
- `level`: compression level ($(MIN_CLEVEL)..$(MAX_CLEVEL))
"""
function ZstdFrameCompressor(;level::Integer=DEFAULT_COMPRESSION_LEVEL)
if !(MIN_CLEVEL ≤ level ≤ MAX_CLEVEL)
throw(ArgumentError("level must be within $(MIN_CLEVEL)..$(MAX_CLEVEL)"))
end
return ZstdFrameCompressor(CStream(), level)
end

const ZstdFrameCompressorStream{S} = TranscodingStream{ZstdFrameCompressor,S} where S<:IO

"""
ZstdFrameCompressorStream(stream::IO; kwargs...)

Create a new zstd compression stream (see `ZstdFrameCompressor` for `kwargs`).
"""
function ZstdFrameCompressorStream(stream::IO; kwargs...)
x, y = splitkwargs(kwargs, (:level,))
return TranscodingStream(ZstdFrameCompressor(;x...), stream; y...)

Check warning on line 43 in src/frameCompression.jl

View check run for this annotation

Codecov / codecov/patch

src/frameCompression.jl#L41-L43

Added lines #L41 - L43 were not covered by tests
end


# Methods
# -------

function TranscodingStreams.initialize(codec::ZstdFrameCompressor)
code = initialize!(codec.cstream, codec.level)
if iserror(code)
throw(ZstdError(code))

Check warning on line 53 in src/frameCompression.jl

View check run for this annotation

Codecov / codecov/patch

src/frameCompression.jl#L53

Added line #L53 was not covered by tests
end
return
end

function TranscodingStreams.finalize(codec::ZstdFrameCompressor)
if codec.cstream.ptr != C_NULL
code = free!(codec.cstream)
if iserror(code)
throw(ZstdError(code))

Check warning on line 62 in src/frameCompression.jl

View check run for this annotation

Codecov / codecov/patch

src/frameCompression.jl#L62

Added line #L62 was not covered by tests
end
codec.cstream.ptr = C_NULL
end
return
end

function TranscodingStreams.expectedsize(codec::ZstdFrameCompressor, input::Memory)
code = compressed_size_bound(input.size)
if iserror(code)
throw(ZstdError(code))

Check warning on line 72 in src/frameCompression.jl

View check run for this annotation

Codecov / codecov/patch

src/frameCompression.jl#L72

Added line #L72 was not covered by tests
end
return Int(code)
end

function TranscodingStreams.startproc(codec::ZstdFrameCompressor, mode::Symbol, error::Error)
code = reset!(codec.cstream, 0 #=unknown source size=#)
if iserror(code)
error[] = ZstdError(code)
return :error

Check warning on line 81 in src/frameCompression.jl

View check run for this annotation

Codecov / codecov/patch

src/frameCompression.jl#L80-L81

Added lines #L80 - L81 were not covered by tests
end
return :ok
end

function TranscodingStreams.process(codec::ZstdFrameCompressor, input::Memory, output::Memory, error::Error)
cstream = codec.cstream
cstream.ibuffer.src = input.ptr
cstream.ibuffer.size = input.size
cstream.ibuffer.pos = 0
cstream.obuffer.dst = output.ptr
cstream.obuffer.size = output.size
cstream.obuffer.pos = 0
code = frameCompress!(cstream)
if iserror(code)
error[] = ZstdError(code)
return 0, 0, :error

Check warning on line 97 in src/frameCompression.jl

View check run for this annotation

Codecov / codecov/patch

src/frameCompression.jl#L96-L97

Added lines #L96 - L97 were not covered by tests
else
return Int(input.size), Int(code), :end
Copy link
Member

@nhz2 nhz2 May 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think process can return :end here. Generally when compressing, :end is only returned if input is empty and no extra output needs to be written for the current frame.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are either going to return :end or :ok here as in compression.jl:

return Δin, Δout, input.size == 0 && code == 0 ? :end : :ok

Here, unlike the streaming API, we consumed the entire input buffer and have written the entire output by invoking ZSTD_compress2. There is no continuation. There is no more input to process.

By the completion of ZSTD_compress2 there are two possible outcomes. We have either successfully compressed the data into the output buffer. How else would you describe the state after ZSTD_compress2 runs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caller of process is allowed to break up the input data because process is a streaming API. Those small inputs could be appended to an internal buffer in the codec, and then only after input.size is set to zero, signaling there are no more bytes in the frame, ZSTD_compress2 is called.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be the best way to detect additional bytes added and throw an error?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you have to assume there will be additional bytes in the frame until process is called with input size zero.
There should be a new process2 similar to ZSTD_compressStream2 with an additional endOp argument, to allow one-shot compression of a frame.

end
end
92 changes: 92 additions & 0 deletions src/frameDecompression.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Decompressor Codec
# ==================

struct ZstdFrameDecompressor <: TranscodingStreams.Codec
dstream::DStream
end

function Base.show(io::IO, codec::ZstdFrameDecompressor)
print(io, summary(codec), "()")
end

"""
ZstdFrameDecompressor()

Create a new zstd decompression codec.
This decompressor uses the non-streaming API, expecting a known length, via `ZSTD_decompressDCtx`
"""
function ZstdFrameDecompressor()
return ZstdFrameDecompressor(DStream())
end

const ZstdFrameDecompressorStream{S} = TranscodingStream{ZstdFrameDecompressor,S} where S<:IO

"""
ZstdFrameDecompressorStream(stream::IO; kwargs...)

Create a new zstd decompression stream (`kwargs` are passed to `TranscodingStream`).
"""
function ZstdFrameDecompressorStream(stream::IO; kwargs...)
return TranscodingStream(ZstdFrameDecompressor(), stream; kwargs...)

Check warning on line 30 in src/frameDecompression.jl

View check run for this annotation

Codecov / codecov/patch

src/frameDecompression.jl#L29-L30

Added lines #L29 - L30 were not covered by tests
end


# Methods
# -------

function TranscodingStreams.initialize(codec::ZstdFrameDecompressor)
code = initialize!(codec.dstream)
if iserror(code)
throw(ZstdError(code))

Check warning on line 40 in src/frameDecompression.jl

View check run for this annotation

Codecov / codecov/patch

src/frameDecompression.jl#L40

Added line #L40 was not covered by tests
end
return
end

function TranscodingStreams.finalize(codec::ZstdFrameDecompressor)
if codec.dstream.ptr != C_NULL
code = free!(codec.dstream)
if iserror(code)
throw(ZstdError(code))

Check warning on line 49 in src/frameDecompression.jl

View check run for this annotation

Codecov / codecov/patch

src/frameDecompression.jl#L49

Added line #L49 was not covered by tests
end
codec.dstream.ptr = C_NULL
end
return
end

function TranscodingStreams.startproc(codec::ZstdFrameDecompressor, mode::Symbol, error::Error)
code = reset!(codec.dstream)
if iserror(code)
error[] = ZstdError(code)
return :error

Check warning on line 60 in src/frameDecompression.jl

View check run for this annotation

Codecov / codecov/patch

src/frameDecompression.jl#L59-L60

Added lines #L59 - L60 were not covered by tests
end
return :ok
end

function TranscodingStreams.process(codec::ZstdFrameDecompressor, input::Memory, output::Memory, error::Error)
dstream = codec.dstream
dstream.ibuffer.src = input.ptr
dstream.ibuffer.size = input.size
dstream.ibuffer.pos = 0
dstream.obuffer.dst = output.ptr
dstream.obuffer.size = output.size
dstream.obuffer.pos = 0
code = frameDecompress!(dstream)
if iserror(code)
error[] = ZstdError(code)
return 0, 0, :error

Check warning on line 76 in src/frameDecompression.jl

View check run for this annotation

Codecov / codecov/patch

src/frameDecompression.jl#L75-L76

Added lines #L75 - L76 were not covered by tests
else
return Int(input.size), Int(code), :end
end
end

function TranscodingStreams.expectedsize(codec::ZstdFrameDecompressor, input::Memory)
ret = find_decompressed_size(input.ptr, input.size)
if ret == ZSTD_CONTENTSIZE_ERROR
throw(ZstdError())

Check warning on line 85 in src/frameDecompression.jl

View check run for this annotation

Codecov / codecov/patch

src/frameDecompression.jl#L85

Added line #L85 was not covered by tests
elseif ret == ZSTD_CONTENTSIZE_UNKNOWN
return Int(decompressed_size_bound(input.ptr, input.size))
else
# exact size
return Int(ret)
end
end
47 changes: 47 additions & 0 deletions src/libzstd.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,34 @@
end

function zstderror(stream, code::Csize_t)
zstderror(code)

Check warning on line 9 in src/libzstd.jl

View check run for this annotation

Codecov / codecov/patch

src/libzstd.jl#L9

Added line #L9 was not covered by tests
end
function zstderror(code::Csize_t)

Check warning on line 11 in src/libzstd.jl

View check run for this annotation

Codecov / codecov/patch

src/libzstd.jl#L11

Added line #L11 was not covered by tests
ptr = LibZstd.ZSTD_getErrorName(code)
error("zstd error: ", unsafe_string(ptr))
end

struct ZstdError <: Exception
code::Csize_t
end
ZstdError() = ZstdError(typemax(Csize_t))

Check warning on line 19 in src/libzstd.jl

View check run for this annotation

Codecov / codecov/patch

src/libzstd.jl#L19

Added line #L19 was not covered by tests
function Base.show(io::IO, e::ZstdError)
print(io, "ZstdError: ", unsafe_string(LibZstd.ZSTD_getErrorName(e.code)))
end

function max_clevel()
return LibZstd.ZSTD_maxCLevel()
end
function min_clevel()
return LibZstd.ZSTD_minCLevel()
end
function default_clevel()
return LibZstd.ZSTD_defaultCLevel()
end

const MAX_CLEVEL = max_clevel()
const MIN_CLEVEL = min_clevel()
const DEFAULT_CLEVEL = default_clevel()

const InBuffer = LibZstd.ZSTD_inBuffer
InBuffer() = InBuffer(C_NULL, 0, 0)
Expand Down Expand Up @@ -98,6 +117,14 @@
return endOp
end

function frameCompress!(cstream::CStream)
return LibZstd.ZSTD_compress2(
cstream,
cstream.obuffer.dst, cstream.obuffer.size,
cstream.ibuffer.src, cstream.ibuffer.size
)
end

function finish!(cstream::CStream)
return LibZstd.ZSTD_endStream(cstream, cstream.obuffer)
end
Expand Down Expand Up @@ -138,6 +165,14 @@
return LibZstd.ZSTD_decompressStream(dstream, dstream.obuffer, dstream.ibuffer)
end

function frameDecompress!(dstream::DStream)
return LibZstd.ZSTD_decompressDCtx(
dstream,
dstream.obuffer.dst, dstream.obuffer.size,
dstream.ibuffer.src, dstream.ibuffer.size
)
end

function free!(dstream::DStream)
return LibZstd.ZSTD_freeDStream(dstream)
end
Expand All @@ -152,3 +187,15 @@
function find_decompressed_size(src::Ptr, size::Integer)
return LibZstd.ZSTD_findDecompressedSize(src, size)
end
function find_decompressed_size(src::Vector{UInt8})
return LibZstd.ZSTD_findDecompressedSize(src, sizeof(src))

Check warning on line 191 in src/libzstd.jl

View check run for this annotation

Codecov / codecov/patch

src/libzstd.jl#L190-L191

Added lines #L190 - L191 were not covered by tests
end
function compressed_size_bound(sz)
return LibZstd.ZSTD_compressBound(sz)
end
function decompressed_size_bound(src::Ptr, size::Integer)
return LibZstd.ZSTD_decompressBound(src, size)
end
function decompressed_size_bound(src::Vector{UInt8})
return LibZstd.ZSTD_decompressBound(src, sizeof(src))

Check warning on line 200 in src/libzstd.jl

View check run for this annotation

Codecov / codecov/patch

src/libzstd.jl#L199-L200

Added lines #L199 - L200 were not covered by tests
end
21 changes: 21 additions & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@ Random.seed!(1234)
@test CodecZstd.initialize(codec) === nothing
@test CodecZstd.finalize(codec) === nothing

codec = ZstdFrameCompressor()
@test codec isa ZstdFrameCompressor
@test occursin(r"^ZstdFrameCompressor\(level=\d+\)$", sprint(show, codec))
@test CodecZstd.initialize(codec) === nothing
@test CodecZstd.finalize(codec) === nothing

codec = ZstdFrameDecompressor()
@test codec isa ZstdFrameDecompressor
@test occursin(r"^ZstdFrameDecompressor\(\)$", sprint(show, codec))
@test CodecZstd.initialize(codec) === nothing
@test CodecZstd.finalize(codec) === nothing

data = [0x28, 0xb5, 0x2f, 0xfd, 0x04, 0x50, 0x19, 0x00, 0x00, 0x66, 0x6f, 0x6f, 0x3f, 0xba, 0xc4, 0x59]
@test read(ZstdDecompressorStream(IOBuffer(data))) == b"foo"
@test read(ZstdDecompressorStream(IOBuffer(vcat(data, data)))) == b"foofoo"
Expand All @@ -43,6 +55,15 @@ Random.seed!(1234)
TranscodingStreams.test_roundtrip_write(ZstdCompressorStream, ZstdDecompressorStream)
TranscodingStreams.test_roundtrip_lines(ZstdCompressorStream, ZstdDecompressorStream)
TranscodingStreams.test_roundtrip_transcode(ZstdCompressor, ZstdDecompressor)
TranscodingStreams.test_roundtrip_transcode(ZstdFrameCompressor, ZstdDecompressor)
TranscodingStreams.test_roundtrip_transcode(ZstdFrameCompressor, ZstdFrameDecompressor)
TranscodingStreams.test_roundtrip_transcode(ZstdCompressor, ZstdFrameDecompressor)

@static if VERSION ≥ v"1.8"
@test_throws "ZstdError: Destination buffer is too small" throw(ZstdError(0xffffffffffffffba))
else
@test_throws ZstdError throw(ZstdError(0xffffffffffffffba))
end

include("compress_endOp.jl")
end
Loading