Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional pledgeinsize function to transcoding protocol #239

Merged
merged 5 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/src/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Base.position(::NoopStream)
```@docs
TranscodingStreams.Codec
TranscodingStreams.expectedsize
TranscodingStreams.pledgeinsize
TranscodingStreams.minoutsize
TranscodingStreams.initialize
TranscodingStreams.finalize
Expand Down
35 changes: 30 additions & 5 deletions src/codec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Transcoding proceeds by calling some functions in a specific way. We call this

There are six functions for a codec to implement:
- `expectedsize`: return the expected size of transcoded data
- `pledgeinsize`: tell the codec the total input size
- `minoutsize`: return the minimum output size of `process`
- `initialize`: initialize the codec
- `finalize`: finalize the codec
Expand All @@ -22,7 +23,7 @@ There are six functions for a codec to implement:

These are defined in the `TranscodingStreams` and a new codec type must extend
these methods if necessary. Implementing a `process` method is mandatory but
others are optional. `expectedsize`, `minoutsize`, `initialize`, `finalize`,
others are optional. `expectedsize`, `minoutsize`, `pledgeinsize`, `initialize`, `finalize`,
and `startproc` have a default implementation.

Your codec type is denoted by `C` and its object by `codec`.
Expand All @@ -39,6 +40,18 @@ used as a hint to determine the size of a data buffer when `transcode` is
called. A good hint will reduce the number of buffer resizing and hence result
in better performance.

### `pledgeinsize`

The `pledgeinsize(codec::C, insize::Int64, error::Error)::Symbol` method is used
when `transcode` is called to tell the `codec` the total input size.
This is called after `startproc` and before `process`. Some
compressors can add this total input size to a header, making `expectedsize`
accurate during later decompression. By default this just returns `:ok`.
If there is an error, the return code must be `:error` and the `error` argument
must be set to an exception object. Setting an inaccurate `insize` may cause the
codec to error later on while processing data. A negative `insize` means unknown
content size.

### `minoutsize`

The `minoutsize(codec::C, input::Memory)::Int` method takes `codec` and `input`,
Expand Down Expand Up @@ -71,10 +84,11 @@ the stream will become the close mode for safety.
### `startproc`

The `startproc(codec::C, mode::Symbol, error::Error)::Symbol` method takes
`codec`, `mode` and `error`, and returns a status code. This is called just
before the stream starts reading or writing data. `mode` is either `:read` or
`:write` and then the stream starts reading or writing, respectively. The
return code must be `:ok` if `codec` is ready to read or write data. Otherwise,
`codec`, `mode`, and `error`, and returns a status code. This resets the state
of the codec and is called before the stream starts processing data.
After a call to `startproc`, `pledgeinsize` can be optionally called.
`mode` is either `:read` or `:write`. The
return code must be `:ok` if `codec` is ready to process data. Otherwise,
it must be `:error` and the `error` argument must be set to an exception object.

### `process`
Expand Down Expand Up @@ -112,6 +126,17 @@ function expectedsize(codec::Codec, input::Memory)::Int
return input.size
end

"""
pledgeinsize(codec::Codec, insize::Int64, error::Error)::Symbol

Tell the codec the total input size.

The default method does nothing and returns `:ok`.
"""
function pledgeinsize(codec::Codec, insize::Int64, error::Error)::Symbol
return :ok
end

"""
minoutsize(codec::Codec, input::Memory)::Int

Expand Down
6 changes: 6 additions & 0 deletions src/transcode.jl
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@
if code === :error
@goto error
end
if pledgeinsize(codec, Int64(buffersize(input)), error) === :error
@goto error

Check warning on line 151 in src/transcode.jl

View check run for this annotation

Codecov / codecov/patch

src/transcode.jl#L151

Added line #L151 was not covered by tests
end
n = GC.@preserve input minoutsize(codec, buffermem(input))
@label process
makemargin!(output, n)
Expand All @@ -168,6 +171,9 @@
if startproc(codec, :write, error) === :error
@goto error
end
if pledgeinsize(codec, Int64(buffersize(input)), error) === :error
@goto error

Check warning on line 175 in src/transcode.jl

View check run for this annotation

Codecov / codecov/patch

src/transcode.jl#L175

Added line #L175 was not covered by tests
end
n = GC.@preserve input minoutsize(codec, buffermem(input))
@goto process
end
Expand Down
58 changes: 49 additions & 9 deletions test/codecdoubleframe.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,19 @@ struct DoubleFrameEncoder <: TranscodingStreams.Codec
opened::Base.RefValue{Bool}
stopped::Base.RefValue{Bool}
got_stop_msg::Base.RefValue{Bool}
pledged_in_size::Base.RefValue{Int64}
in_size_count::Base.RefValue{Int64}
end

DoubleFrameEncoder() = DoubleFrameEncoder(Ref(false), Ref(false), Ref(false))
DoubleFrameEncoder() = DoubleFrameEncoder(Ref(false), Ref(false), Ref(false), Ref(Int64(-1)), Ref(Int64(0)))

function TranscodingStreams.process(
codec :: DoubleFrameEncoder,
input :: TranscodingStreams.Memory,
output :: TranscodingStreams.Memory,
error_ref :: TranscodingStreams.Error,
)
pledged = codec.pledged_in_size[]
if input.size == 0
codec.got_stop_msg[] = true
end
Expand All @@ -45,26 +48,59 @@ function TranscodingStreams.process(
return 0, 0, :error
elseif !codec.opened[]
output[1] = UInt8('[')
output[2] = UInt8(' ')
if pledged ∈ (0:9)
output[2] = UInt8('0'+pledged)
else
output[2] = UInt8(' ')
end
codec.opened[] = true
return 0, 2, :ok
elseif codec.got_stop_msg[]
# check in_size_count against pledged
if pledged ∈ (0:9)
if pledged > codec.in_size_count[]
error_ref[] = ErrorException("pledged in size was too big")
return 0, 0, :error
end
end
output[1] = UInt8(' ')
output[2] = UInt8(']')
codec.stopped[] = true
return 0, 2, :end
else
i = j = 0
# check input.size against pledged
if pledged ∈ (0:9)
if input.size > pledged || pledged - input.size < codec.in_size_count[]
error_ref[] = ErrorException("pledged in size was too small")
return 0, 0, :error
end
end
while i + 1 ≤ lastindex(input) && j + 2 ≤ lastindex(output)
b = input[i+1]
i += 1
output[j+1] = output[j+2] = b
j += 2
end
codec.in_size_count[] += i
return i, j, :ok
end
end

function TranscodingStreams.pledgeinsize(
codec::DoubleFrameEncoder,
insize::Int64,
error::Error,
)::Symbol
if codec.opened[]
error[] = ErrorException("pledgeinsize called after opening")
return :error
else
codec.pledged_in_size[] = insize
return :ok
end
end

function TranscodingStreams.expectedsize(
:: DoubleFrameEncoder,
input :: TranscodingStreams.Memory)
Expand All @@ -81,6 +117,8 @@ function TranscodingStreams.startproc(codec::DoubleFrameEncoder, ::Symbol, error
codec.opened[] = false
codec.got_stop_msg[] = false
codec.stopped[] = false
codec.pledged_in_size[] = -1
codec.in_size_count[] = 0
return :ok
end

Expand Down Expand Up @@ -149,7 +187,7 @@ function TranscodingStreams.process(
codec.a[] != UInt8('[') && error("expected [")
@label state2
do_read(codec.a) || return (codec.state[]=2; (Δin, Δout, :ok))
codec.a[] != UInt8(' ') && error("expected space")
codec.a[] ∉ (UInt8(' '), UInt8('0'):UInt8('9')...) && error("expected space or size")
while true
@label state3
do_read(codec.a) || return (codec.state[]=3; (Δin, Δout, :ok))
Expand Down Expand Up @@ -189,12 +227,14 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD


@testset "DoubleFrame Codecs" begin
@test transcode(DoubleFrameEncoder, b"") == b"[ ]"
@test transcode(DoubleFrameEncoder, b"a") == b"[ aa ]"
@test transcode(DoubleFrameEncoder, b"ab") == b"[ aabb ]"
@test transcode(DoubleFrameEncoder(), b"") == b"[ ]"
@test transcode(DoubleFrameEncoder(), b"a") == b"[ aa ]"
@test transcode(DoubleFrameEncoder(), b"ab") == b"[ aabb ]"
@test transcode(DoubleFrameEncoder, b"") == b"[0 ]"
@test transcode(DoubleFrameEncoder, b"a") == b"[1aa ]"
@test transcode(DoubleFrameEncoder, b"ab") == b"[2aabb ]"
@test transcode(DoubleFrameEncoder(), b"") == b"[0 ]"
@test transcode(DoubleFrameEncoder(), b"a") == b"[1aa ]"
@test transcode(DoubleFrameEncoder(), b"ab") == b"[2aabb ]"
@test transcode(DoubleFrameEncoder(), ones(UInt8,9)) == [b"[9"; ones(UInt8,18); b" ]";]
@test transcode(DoubleFrameEncoder(), ones(UInt8,10)) == [b"[ "; ones(UInt8,20); b" ]";]

@test_throws Exception transcode(DoubleFrameDecoder, b"")
@test_throws Exception transcode(DoubleFrameDecoder, b" [")
Expand Down
Loading