Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pledgeinsize #64

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/Downstream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ jobs:
Pkg.Registry.update()
Pkg.activate(;temp=true)
# force it to use this PR's version of the package
ENV["JULIA_PKG_DEVDIR"]= mktempdir()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ensures fresh versions of the developed packages, in case this is run locally.

Pkg.develop([
PackageSpec(path="."),
PackageSpec(name="${{ matrix.package }}"),
Expand Down
36 changes: 36 additions & 0 deletions .github/workflows/Upstream.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: Upstream
on:
push:
branches: [master]
tags: [v*]
pull_request:

jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: julia-actions/setup-julia@v2
with:
version: 1
arch: x64
- uses: julia-actions/julia-buildpkg@latest
- name: Load the upstream packages and run the tests
shell: julia --color=yes {0}
run: |
using Pkg
Pkg.Registry.update()
Pkg.activate(;temp=true)
# force it to use this PR's version of the package
ENV["JULIA_PKG_DEVDIR"]= mktempdir()
Pkg.develop([
PackageSpec(path="."),
PackageSpec(name="TranscodingStreams"),
])
Pkg.update()
Pkg.test("CodecZstd"; coverage=true)
- uses: julia-actions/julia-processcoverage@v1
- uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: lcov.info
4 changes: 2 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ TranscodingStreams = "3bb67fe8-82b1-5028-8e26-92a6c54297fa"
Zstd_jll = "3161d3a3-bdf6-5164-811a-617609db77b4"

[compat]
TranscodingStreams = "0.9, 0.10, 0.11"
TranscodingStreams = "0.11.3"
Zstd_jll = "1.5.5"
julia = "1.3"
julia = "1.6"
24 changes: 19 additions & 5 deletions src/compression.jl
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,31 @@ function TranscodingStreams.startproc(codec::ZstdCompressor, mode::Symbol, error
return :error
end
end
code = reset!(codec.cstream, 0 #=unknown source size=#)
reset!(codec.cstream)
return :ok
end

function TranscodingStreams.pledgeinsize(codec::ZstdCompressor, insize::Int64, error::Error)::Symbol
if codec.cstream.ptr == C_NULL
Base.error("`startproc` must be called before `pledgeinsize`")
end
srcsize = if signbit(insize)
ZSTD_CONTENTSIZE_UNKNOWN
else
Culonglong(insize)
end
code = LibZstd.ZSTD_CCtx_setPledgedSrcSize(codec.cstream, srcsize)
if iserror(code)
error[] = ErrorException("zstd error")
return :error
error[] = ErrorException("zstd error setting pledged source size")
:error
else
:ok
end
return :ok
end

function TranscodingStreams.process(codec::ZstdCompressor, input::Memory, output::Memory, error::Error)
if codec.cstream.ptr == C_NULL
error("startproc must be called before process")
Base.error("`startproc` must be called before `process`")
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error is the name of a local variable, so I need to specify Base.error here.

end
cstream = codec.cstream
ibuffer_starting_pos = UInt(0)
Expand Down
20 changes: 6 additions & 14 deletions src/libzstd.jl
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,15 @@
return LibZstd.ZSTD_initCStream(cstream, level)
end

function reset!(cstream::CStream, srcsize::Integer)
# ZSTD_resetCStream is deprecated
# https://github.com/facebook/zstd/blob/9d2a45a705e22ad4817b41442949cd0f78597154/lib/zstd.h#L2253-L2272
function reset!(cstream::CStream)
res = LibZstd.ZSTD_CCtx_reset(cstream, LibZstd.ZSTD_reset_session_only)
if iserror(res)
nhz2 marked this conversation as resolved.
Show resolved Hide resolved
return res
end
if srcsize == 0
# From zstd.h:
# Note: ZSTD_resetCStream() interprets pledgedSrcSize == 0 as ZSTD_CONTENTSIZE_UNKNOWN, but
# ZSTD_CCtx_setPledgedSrcSize() does not do the same, so ZSTD_CONTENTSIZE_UNKNOWN must be
# explicitly specified.
srcsize = ZSTD_CONTENTSIZE_UNKNOWN
end
reset!(cstream.ibuffer)
reset!(cstream.obuffer)
return LibZstd.ZSTD_CCtx_setPledgedSrcSize(cstream, srcsize)
if iserror(res)
# According to zstd.h "Resetting session never fails" so this branch should be unreachable.
error("unreachable")

Check warning on line 65 in src/libzstd.jl

View check run for this annotation

Codecov / codecov/patch

src/libzstd.jl#L65

Added line #L65 was not covered by tests
end
return
end

"""
Expand Down
76 changes: 74 additions & 2 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,17 @@ include("utils.jl")
@test CodecZstd.find_decompressed_size(v) == 22

codec = ZstdCompressor
buffer3 = transcode(codec, b"Hello")
buffer4 = transcode(codec, b"World!")
sink = IOBuffer()
s = TranscodingStream(codec(), sink; stop_on_end=true)
write(s, b"Hello")
close(s)
buffer3 = take!(sink)
@test CodecZstd.find_decompressed_size(buffer3) == CodecZstd.ZSTD_CONTENTSIZE_UNKNOWN
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

transcode with ZstdCompressor now records the decompressed size.

sink = IOBuffer()
s = TranscodingStream(codec(), sink; stop_on_end=true)
write(s, b"Hello")
close(s)
buffer4 = take!(sink)
@test CodecZstd.find_decompressed_size(buffer4) == CodecZstd.ZSTD_CONTENTSIZE_UNKNOWN

write(iob, buffer1)
Expand All @@ -156,6 +164,66 @@ include("utils.jl")
@test CodecZstd.find_decompressed_size(v) == CodecZstd.ZSTD_CONTENTSIZE_ERROR
end

@testset "pledgeinsize" begin
# when pledgeinsize is available transcode should save the
# decompressed size in a header
for n in [0:30; 1000; 1000000;]
v = transcode(ZstdCompressor, rand(UInt8, n))
@test CodecZstd.find_decompressed_size(v) == n
end

# Test what happens if pledgeinsize promise is broken
d1 = zeros(UInt8, 10000)
d2 = zeros(UInt8, 10000)
GC.@preserve d1 d2 begin
@testset "too many bytes" begin
m1 = TranscodingStreams.Memory(pointer(d1), 1000)
m2 = TranscodingStreams.Memory(pointer(d2), 1000)
codec = ZstdCompressor()
e = TranscodingStreams.Error()
@test TranscodingStreams.startproc(codec, :read, e) === :ok
@test TranscodingStreams.pledgeinsize(codec, Int64(10), e) === :ok
@test TranscodingStreams.process(codec, m1, m2, e) === (0, 0, :error)
@test e[] == ErrorException("zstd error: Src size is incorrect")
TranscodingStreams.finalize(codec)
end
@testset "too few bytes" begin
m1 = TranscodingStreams.Memory(pointer(d1), 10)
m2 = TranscodingStreams.Memory(pointer(d2), 1000)
codec = ZstdCompressor()
e = TranscodingStreams.Error()
@test TranscodingStreams.startproc(codec, :read, e) === :ok
@test TranscodingStreams.pledgeinsize(codec, Int64(10000), e) === :ok
@test TranscodingStreams.process(codec, m1, m2, e)[3] === :ok
m1 = TranscodingStreams.Memory(pointer(d1), 0)
@test TranscodingStreams.process(codec, m1, m2, e)[3] === :error
@test e[] == ErrorException("zstd error: Src size is incorrect")
TranscodingStreams.finalize(codec)
end
@testset "set pledgeinsize after process" begin
m1 = TranscodingStreams.Memory(pointer(d1), 1000)
m2 = TranscodingStreams.Memory(pointer(d2), 1000)
codec = ZstdCompressor()
e = TranscodingStreams.Error()
@test TranscodingStreams.startproc(codec, :read, e) === :ok
@test TranscodingStreams.process(codec, m1, m2, e)[3] === :ok
@test TranscodingStreams.pledgeinsize(codec, Int64(10000), e) === :error
@test e[] == ErrorException("zstd error setting pledged source size")
TranscodingStreams.finalize(codec)
end
@testset "set unknown pledgeinsize" begin
m1 = TranscodingStreams.Memory(pointer(d1), 1000)
m2 = TranscodingStreams.Memory(pointer(d2), 1000)
codec = ZstdCompressor()
e = TranscodingStreams.Error()
@test TranscodingStreams.startproc(codec, :read, e) === :ok
@test TranscodingStreams.pledgeinsize(codec, Int64(-1), e) === :ok
@test TranscodingStreams.process(codec, m1, m2, e)[3] === :ok
TranscodingStreams.finalize(codec)
end
end
end

include("compress_endOp.jl")
include("static_only_tests.jl")

Expand Down Expand Up @@ -195,6 +263,10 @@ include("utils.jl")
TranscodingStreams.finalize(codec)
data = [0x00,0x01]
GC.@preserve data let m = TranscodingStreams.Memory(pointer(data), length(data))
try
TranscodingStreams.pledgeinsize(codec, Int64(10), TranscodingStreams.Error())
catch
end
try
TranscodingStreams.expectedsize(codec, m)
catch
Expand Down
Loading