Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes for position with nested NoopStreams #203

Merged
merged 7 commits into from
Apr 6, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ext/TestExt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ function TranscodingStreams.test_chunked_read(Encoder, Decoder)
for chunk in chunks
stream = TranscodingStream(Decoder(), buffer, stop_on_end=true)
ok &= read(stream) == chunk
ok &= position(stream) == length(chunk)
ok &= eof(stream)
ok &= isreadable(stream)
close(stream)
Expand Down
3 changes: 1 addition & 2 deletions fuzz/fuzz.jl
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,7 @@ end
for r in rs
d = r(stream)
append!(x, d)
# TODO fix position
# length(x) == position(stream) || return false
length(x) == position(stream) || return false
end
x == data[eachindex(x)]
end
Expand Down
38 changes: 29 additions & 9 deletions src/noop.jl
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,18 @@ Note that this method may return a wrong position when
- some data have been inserted by `TranscodingStreams.unread`, or
- the position of the wrapped stream has been changed outside of this package.
"""
function Base.position(stream::NoopStream)
function Base.position(stream::NoopStream)::Int64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to decide if this should be Int64 or Int. The latter varies based on the system word size.

$ julia +1.10~x86 -E "typeof(0)"
Int32

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, I didn't know juliaup had 32-bit julia.

This should be Int64 to support large files.

mode = stream.state.mode
if mode === :idle
return Int64(0)
if !isopen(stream)
throw_invalid_mode(mode)
elseif mode === :idle
return 0
nhz2 marked this conversation as resolved.
Show resolved Hide resolved
nhz2 marked this conversation as resolved.
Show resolved Hide resolved
elseif has_sharedbuf(stream)
return position(stream.stream)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I treat the buffer as being "owned" by the underlying stream even though it is being shared. This is also why I need to add special cases for writing to a NoopStream that shares its buffer with the underlying stream. The underlying stream must be notified to change from :idle to :write mode before its buffer1 is changed.

elseif mode === :write
return position(stream.stream) + buffersize(stream.buffer1)
elseif mode === :read
else # read
return position(stream.stream) - buffersize(stream.buffer1)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If buffer1 is shared by stream.stream and stream this would double count buffersize

else
throw_invalid_mode(mode)
end
@assert false "unreachable"
end
Expand Down Expand Up @@ -97,16 +99,34 @@ function Base.seekend(stream::NoopStream)
return stream
end

function Base.unsafe_write(stream::NoopStream, input::Ptr{UInt8}, nbytes::UInt)
function Base.write(stream::NoopStream, b::UInt8)::Int
changemode!(stream, :write)
if has_sharedbuf(stream)
# directly write data to the underlying stream
n = Int(write(stream.stream, b))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need the explicit cast to Int here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need this to fix the stats function for NoopStream, but that's for a future PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now the cast isn't doing anything.

return n
end
buffer1 = stream.buffer1
marginsize(buffer1) > 0 || flushbuffer(stream)
return writebyte!(buffer1, b)
end

function Base.unsafe_write(stream::NoopStream, input::Ptr{UInt8}, nbytes::UInt)::Int
changemode!(stream, :write)
if has_sharedbuf(stream)
# directly write data to the underlying stream
n = Int(unsafe_write(stream.stream, input, nbytes))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need the explicit cast to Int here?

return n
end
buffer = stream.buffer1
if marginsize(buffer) ≥ nbytes
copydata!(buffer, input, nbytes)
return Int(nbytes)
else
flushbuffer(stream)
# directly write data to the underlying stream
return unsafe_write(stream.stream, input, nbytes)
n = Int(unsafe_write(stream.stream, input, nbytes))
return n
end
end

Expand Down Expand Up @@ -152,7 +172,7 @@ function fillbuffer(stream::NoopStream; eager::Bool = false)::Int
changemode!(stream, :read)
buffer = stream.buffer1
@assert buffer === stream.buffer2
if stream.stream isa TranscodingStream && buffer === stream.stream.buffer1
if has_sharedbuf(stream)
# Delegate the operation when buffers are shared.
underlying_mode::Symbol = stream.stream.state.mode
if underlying_mode === :idle || underlying_mode === :read
Expand Down
8 changes: 6 additions & 2 deletions src/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ end
# throw ArgumentError that mode is invalid.
throw_invalid_mode(mode) = throw(ArgumentError(string("invalid mode :", mode)))

# Return true if the stream shares buffers with underlying stream
function has_sharedbuf(stream::TranscodingStream)::Bool
stream.stream isa TranscodingStream && stream.buffer2 === stream.stream.buffer1
end

# Base IO Functions
# -----------------
Expand Down Expand Up @@ -264,7 +268,7 @@ function Base.position(stream::TranscodingStream)
mode = stream.state.mode
if mode === :idle
return Int64(0)
elseif mode === :read
elseif mode === :read || mode === :stop
return stats(stream).out
elseif mode === :write
return stats(stream).in
Expand Down Expand Up @@ -584,7 +588,7 @@ function stats(stream::TranscodingStream)
buffer2 = stream.buffer2
if mode === :idle
transcoded_in = transcoded_out = in = out = 0
elseif mode === :read
elseif mode === :read || mode === :stop
transcoded_in = buffer2.transcoded
transcoded_out = buffer1.transcoded
in = transcoded_in + buffersize(buffer2)
Expand Down
2 changes: 2 additions & 0 deletions test/codecdoubleframe.jl
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD
)
))
@test read(s1) == b""
@test position(s1) == 0
@test eof(s1)

s2 = NoopStream(
Expand All @@ -281,6 +282,7 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD
)
)
@test read(s2) == b""
@test position(s1) == 0
@test eof(s2)
end

Expand Down
35 changes: 35 additions & 0 deletions test/codecnoop.jl
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,41 @@
@test position(stream) == pos
end
end

@testset "writing nested NoopStream sharedbuf=$(sharedbuf)" for sharedbuf in (true, false)
stream = NoopStream(NoopStream(IOBuffer()); sharedbuf, bufsize=4)
@test position(stream) == 0
write(stream, 0x01)
@test position(stream) == 1
flush(stream)
@test position(stream) == 1
write(stream, "abc")
@test position(stream) == 4
flush(stream)
@test position(stream) == 4
for i in 1:10
write(stream, 0x01)
@test position(stream) == 4 + i
end
end

@testset "reading nested NoopStream sharedbuf=$(sharedbuf)" for sharedbuf in (true, false)
stream = NoopStream(NoopStream(IOBuffer("abcdefghijk")); sharedbuf, bufsize=4)
@test position(stream) == 0
@test !eof(stream)
@test position(stream) == 0
@test read(stream, UInt8) == b"a"[1]
@test position(stream) == 1
@test read(stream, 3) == b"bcd"
@test position(stream) == 4
@test !eof(stream)
@test position(stream) == 4
@test read(stream) == b"efghijk"
@test position(stream) == 11
@test eof(stream)
@test position(stream) == 11
end

end

@testset "seek doesn't delete data" begin
Expand Down
Loading