Skip to content

Commit

Permalink
Unread extra data when stopping (#209)
Browse files Browse the repository at this point in the history
  • Loading branch information
nhz2 authored May 5, 2024
1 parent 8fdabed commit a79c78f
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 17 deletions.
3 changes: 2 additions & 1 deletion docs/src/examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,8 @@ eof(stream) #> true
In the case where you need to reuse the wrapped stream, the code above must be
slightly modified because the transcoding stream may read more bytes than
necessary from the wrapped stream. Wrapping the stream with `NoopStream` solves
the problem because adjacent transcoding streams share the same buffer.
the problem because any extra data read after the end of the chunk will be
stored back in the internal buffer of the wrapped transcoding stream.
```julia
using CodecZlib
using TranscodingStreams
Expand Down
34 changes: 18 additions & 16 deletions ext/TestExt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -81,24 +81,26 @@ function TranscodingStreams.test_chunked_read(Encoder, Decoder)
alpha = b"色即是空"
encoder = Encoder()
initialize(encoder)
for _ in 1:500
chunks = [rand(alpha, rand(0:100)) for _ in 1:rand(1:100)]
data = mapfoldl(x->transcode(encoder, x), vcat, chunks, init=UInt8[])
buffer = NoopStream(IOBuffer(data))
ok = true
for chunk in chunks
stream = TranscodingStream(Decoder(), buffer, stop_on_end=true)
ok &= read(stream) == chunk
ok &= position(stream) == length(chunk)
ok &= eof(stream)
ok &= isreadable(stream)
for sharedbuf in false:true
for _ in 1:500
chunks = [rand(alpha, rand(0:100)) for _ in 1:rand(1:100)]
data = mapfoldl(x->transcode(encoder, x), vcat, chunks, init=UInt8[])
buffer = NoopStream(IOBuffer(data))
ok = true
for chunk in chunks
stream = TranscodingStream(Decoder(), buffer; stop_on_end=true, sharedbuf)
ok &= read(stream) == chunk
ok &= position(stream) == length(chunk)
ok &= eof(stream)
ok &= isreadable(stream)
close(stream)
end
# read without stop_on_end should read the full data.
stream = TranscodingStream(Decoder(), IOBuffer(data))
ok &= read(stream) == reduce(vcat, chunks)
close(stream)
Test.@test ok
end
# read without stop_on_end should read the full data.
stream = TranscodingStream(Decoder(), IOBuffer(data))
ok &= read(stream) == reduce(vcat, chunks)
close(stream)
Test.@test ok
end
finalize(encoder)
end
Expand Down
4 changes: 4 additions & 0 deletions src/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,10 @@ function callprocess(stream::TranscodingStream, inbuf::Buffer, outbuf::Buffer)
makemargin!(outbuf, max(16, marginsize(outbuf) * 2))
elseif state.code == :end && state.stop_on_end
if stream.state.mode == :read
if stream.stream isa TranscodingStream && !has_sharedbuf(stream) && !iszero(buffersize(inbuf))
# unread data to match behavior if inbuf was shared.
GC.@preserve inbuf unsafe_unread(stream.stream, bufferptr(inbuf), buffersize(inbuf))
end
changemode!(stream, :stop)
end
end
Expand Down

0 comments on commit a79c78f

Please sign in to comment.