Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unread extra data when stopping #209

Merged
merged 2 commits into from
May 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/src/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ eof(stream) #> true
In the case where you need to reuse the wrapped stream, the code above must be
slightly modified because the transcoding stream may read more bytes than
necessary from the wrapped stream. Wrapping the stream with `NoopStream` solves
the problem because adjacent transcoding streams share the same buffer.
the problem because any extra data read after the end of the chunk will be
stored back in the internal buffer of the wrapped transcoding stream.
```julia
using CodecZlib
using TranscodingStreams
Expand Down
34 changes: 18 additions & 16 deletions ext/TestExt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -81,24 +81,26 @@ function TranscodingStreams.test_chunked_read(Encoder, Decoder)
alpha = b"色即是空"
encoder = Encoder()
initialize(encoder)
for _ in 1:500
chunks = [rand(alpha, rand(0:100)) for _ in 1:rand(1:100)]
data = mapfoldl(x->transcode(encoder, x), vcat, chunks, init=UInt8[])
buffer = NoopStream(IOBuffer(data))
ok = true
for chunk in chunks
stream = TranscodingStream(Decoder(), buffer, stop_on_end=true)
ok &= read(stream) == chunk
ok &= position(stream) == length(chunk)
ok &= eof(stream)
ok &= isreadable(stream)
for sharedbuf in false:true
for _ in 1:500
chunks = [rand(alpha, rand(0:100)) for _ in 1:rand(1:100)]
data = mapfoldl(x->transcode(encoder, x), vcat, chunks, init=UInt8[])
buffer = NoopStream(IOBuffer(data))
ok = true
for chunk in chunks
stream = TranscodingStream(Decoder(), buffer; stop_on_end=true, sharedbuf)
ok &= read(stream) == chunk
ok &= position(stream) == length(chunk)
ok &= eof(stream)
ok &= isreadable(stream)
close(stream)
end
# read without stop_on_end should read the full data.
stream = TranscodingStream(Decoder(), IOBuffer(data))
ok &= read(stream) == reduce(vcat, chunks)
close(stream)
Test.@test ok
end
# read without stop_on_end should read the full data.
stream = TranscodingStream(Decoder(), IOBuffer(data))
ok &= read(stream) == reduce(vcat, chunks)
close(stream)
Test.@test ok
end
finalize(encoder)
end
Expand Down
4 changes: 4 additions & 0 deletions src/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,10 @@ function callprocess(stream::TranscodingStream, inbuf::Buffer, outbuf::Buffer)
makemargin!(outbuf, max(16, marginsize(outbuf) * 2))
elseif state.code == :end && state.stop_on_end
if stream.state.mode == :read
if stream.stream isa TranscodingStream && !has_sharedbuf(stream) && !iszero(buffersize(inbuf))
# unread data to match behavior if inbuf was shared.
GC.@preserve inbuf unsafe_unread(stream.stream, bufferptr(inbuf), buffersize(inbuf))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we not have a low-method that does the GC.@preserve?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the normal unread should be changed to work with any AbstractVector{UInt8}, then I could just unread a view of inbuf.data. Currently, it only supports Union{Vector{UInt8},Base.CodeUnits{UInt8}}

end
changemode!(stream, :stop)
end
end
Expand Down
Loading