Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix position of underlying stream when sharing buffers #222

Merged
merged 1 commit into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 36 additions & 19 deletions fuzz/fuzz.jl
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,41 @@ read_methods = Data.SampledFrom([
end
])

# Return true if the stats of a stream are self consistent
# This function assumes stream was never seeked.
function is_stats_consistent(stream)
if stream isa TranscodingStream
s = TranscodingStreams.stats(stream)
inner_pos = position(stream.stream)
pos = position(stream)
# event!("stats(stream)", s)
# event!("position(stream.stream)", inner_pos)
# event!("position(stream)", pos)
if isreadable(stream)
s.out == pos || return false
s.in == inner_pos || return false
else
iswritable(stream) || return false
s.in == pos || return false
s.out == inner_pos || return false
end
s.transcoded_in ≤ s.in || return false
s.transcoded_out ≥ s.out || return false
end
true
end

@check function read_byte_data(
kws=read_codecs_kws,
data=datas,
)
stream = wrap_stream(kws, IOBuffer(data))
for i in eachindex(data)
for i in 1:length(data)
position(stream) == i-1 || return false
is_stats_consistent(stream) || return false
read(stream, UInt8) == data[i] || return false
end
is_stats_consistent(stream) || return false
eof(stream)
end
@check function read_data(
Expand All @@ -108,6 +134,7 @@ end
)
stream = wrap_stream(kws, IOBuffer(data))
read(stream) == data || return false
is_stats_consistent(stream) || return false
eof(stream)
end
@check function read_data_methods(
Expand All @@ -122,13 +149,15 @@ end
append!(x, d)
length(x) == position(stream) || return false
end
is_stats_consistent(stream) || return false
x == data[eachindex(x)]
end

# flush all nested streams and return final data
function take_all(stream)
if stream isa Base.GenericIOBuffer
take!(stream)
seekstart(stream)
read(stream)
else
write(stream, TranscodingStreams.TOKEN_END)
flush(stream)
Expand All @@ -144,7 +173,9 @@ const write_codecs_kws = map(reverse, read_codecs_kws)
)
stream = wrap_stream(kws, IOBuffer())
write(stream, data) == length(data) || return false
take_all(stream) == data
take_all(stream) == data || return false
is_stats_consistent(stream) || return false
true
end
@check function write_byte_data(
kws=write_codecs_kws,
Expand All @@ -153,24 +184,10 @@ end
stream = wrap_stream(kws, IOBuffer())
for i in 1:length(data)
position(stream) == i-1 || return false
if stream isa TranscodingStream
s = TranscodingStreams.stats(stream)
s.in == i-1 || return false
# TODO fix position(stream.stream)
# s.out == position(stream.stream) || return false
# s.transcoded_in == s.out || return false
# s.transcoded_out == s.out || return false
end
is_stats_consistent(stream) || return false
write(stream, data[i]) == 1 || return false
end
take_all(stream) == data || return false
if stream isa TranscodingStream
s = TranscodingStreams.stats(stream)
s.in == length(data) || return false
# TODO fix position(stream.stream)
# s.out == position(stream.stream) || return false
# s.transcoded_in == s.out || return false
# s.transcoded_out == s.out || return false
end
is_stats_consistent(stream) || return false
true
end
8 changes: 6 additions & 2 deletions src/noop.jl
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,12 @@ function stats(stream::NoopStream)
if mode === :idle
in = out = 0
elseif mode === :read
in = buffer.transcoded
out = in - buffersize(buffer)
out = buffer.transcoded - buffersize(buffer)
if has_sharedbuf(stream)
in = out
else
in = buffer.transcoded
end
elseif mode === :write
out = stream.state.bytes_written_out
in = out
Expand Down
17 changes: 13 additions & 4 deletions src/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -597,10 +597,15 @@ function stats(stream::TranscodingStream)
if mode === :idle
transcoded_in = transcoded_out = in = out = 0
elseif mode === :read || mode === :stop
transcoded_in = buffer2.transcoded
transcoded_out = buffer1.transcoded
in = transcoded_in + buffersize(buffer2)
out = transcoded_out - buffersize(buffer1)
if has_sharedbuf(stream)
transcoded_in = stats(stream.stream).out
in = transcoded_in
else
transcoded_in = buffer2.transcoded
in = transcoded_in + buffersize(buffer2)
end
elseif mode === :write
transcoded_in = buffer1.transcoded
out = state.bytes_written_out
Expand Down Expand Up @@ -701,8 +706,12 @@ function callprocess(stream::TranscodingStream, inbuf::Buffer, outbuf::Buffer)
input_delta = Δin,
output_delta = Δout,
)
consumed!(inbuf, Δin, transcode = true)
supplied!(outbuf, Δout, transcode = true)
consumed!(inbuf, Δin;
transcode = !has_sharedbuf(stream) || stream.state.mode === :write,
) # inbuf is buffer1 if mode is :write
supplied!(outbuf, Δout;
transcode = !has_sharedbuf(stream) || stream.state.mode !== :write,
) # outbuf is buffer1 if mode is not :write
if has_sharedbuf(stream)
if stream.state.mode === :write
# this must be updated before throwing any error if outbuf is shared.
Expand Down
6 changes: 3 additions & 3 deletions test/codecdoubleframe.jl
Original file line number Diff line number Diff line change
Expand Up @@ -324,8 +324,8 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD
@test stat.out == 0
read(stream)
stat = TranscodingStreams.stats(stream)
@test_broken stat.in == 16
@test_broken stat.transcoded_in == 16
@test stat.in == 16
@test stat.transcoded_in == 16
@test stat.transcoded_out == 6
@test stat.out == 6
close(stream)
Expand Down Expand Up @@ -373,7 +373,7 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD
@test stat.transcoded_in == 16
@test stat.transcoded_out == 6
@test stat.out == 6
@test_broken position(stream.stream) == 6
@test position(stream.stream) == 6
close(stream)
end
end
Expand Down
2 changes: 1 addition & 1 deletion test/codecnoop.jl
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ using FillArrays: Zeros
@test stat.out === Int64(0)
eof(stream)
stat = TranscodingStreams.stats(stream)
@test_broken stat.in === Int64(0)
@test stat.in === Int64(0)
@test stat.out === Int64(0)
read(stream)
stat = TranscodingStreams.stats(stream)
Expand Down
Loading