diff --git a/fuzz/fuzz.jl b/fuzz/fuzz.jl index 8710e0ee..795a5784 100644 --- a/fuzz/fuzz.jl +++ b/fuzz/fuzz.jl @@ -91,15 +91,41 @@ read_methods = Data.SampledFrom([ end ]) +# Return true if the stats of a stream are self consistent +# This function assumes stream was never seeked. +function is_stats_consistent(stream) + if stream isa TranscodingStream + s = TranscodingStreams.stats(stream) + inner_pos = position(stream.stream) + pos = position(stream) + # event!("stats(stream)", s) + # event!("position(stream.stream)", inner_pos) + # event!("position(stream)", pos) + if isreadable(stream) + s.out == pos || return false + s.in == inner_pos || return false + else + iswritable(stream) || return false + s.in == pos || return false + s.out == inner_pos || return false + end + s.transcoded_in ≤ s.in || return false + s.transcoded_out ≥ s.out || return false + end + true +end @check function read_byte_data( kws=read_codecs_kws, data=datas, ) stream = wrap_stream(kws, IOBuffer(data)) - for i in eachindex(data) + for i in 1:length(data) + position(stream) == i-1 || return false + is_stats_consistent(stream) || return false read(stream, UInt8) == data[i] || return false end + is_stats_consistent(stream) || return false eof(stream) end @check function read_data( @@ -108,6 +134,7 @@ end ) stream = wrap_stream(kws, IOBuffer(data)) read(stream) == data || return false + is_stats_consistent(stream) || return false eof(stream) end @check function read_data_methods( @@ -122,13 +149,15 @@ end append!(x, d) length(x) == position(stream) || return false end + is_stats_consistent(stream) || return false x == data[eachindex(x)] end # flush all nested streams and return final data function take_all(stream) if stream isa Base.GenericIOBuffer - take!(stream) + seekstart(stream) + read(stream) else write(stream, TranscodingStreams.TOKEN_END) flush(stream) @@ -144,7 +173,9 @@ const write_codecs_kws = map(reverse, read_codecs_kws) ) stream = wrap_stream(kws, IOBuffer()) write(stream, data) == length(data) || return false - take_all(stream) == data + take_all(stream) == data || return false + is_stats_consistent(stream) || return false + true end @check function write_byte_data( kws=write_codecs_kws, @@ -153,24 +184,10 @@ end stream = wrap_stream(kws, IOBuffer()) for i in 1:length(data) position(stream) == i-1 || return false - if stream isa TranscodingStream - s = TranscodingStreams.stats(stream) - s.in == i-1 || return false - # TODO fix position(stream.stream) - # s.out == position(stream.stream) || return false - # s.transcoded_in == s.out || return false - # s.transcoded_out == s.out || return false - end + is_stats_consistent(stream) || return false write(stream, data[i]) == 1 || return false end take_all(stream) == data || return false - if stream isa TranscodingStream - s = TranscodingStreams.stats(stream) - s.in == length(data) || return false - # TODO fix position(stream.stream) - # s.out == position(stream.stream) || return false - # s.transcoded_in == s.out || return false - # s.transcoded_out == s.out || return false - end + is_stats_consistent(stream) || return false true end \ No newline at end of file diff --git a/src/noop.jl b/src/noop.jl index cbdcf097..3b63205c 100644 --- a/src/noop.jl +++ b/src/noop.jl @@ -155,8 +155,12 @@ function stats(stream::NoopStream) if mode === :idle in = out = 0 elseif mode === :read - in = buffer.transcoded - out = in - buffersize(buffer) + out = buffer.transcoded - buffersize(buffer) + if has_sharedbuf(stream) + in = out + else + in = buffer.transcoded + end elseif mode === :write out = stream.state.bytes_written_out in = out diff --git a/src/stream.jl b/src/stream.jl index ff7f0fda..bbdc6932 100644 --- a/src/stream.jl +++ b/src/stream.jl @@ -597,10 +597,15 @@ function stats(stream::TranscodingStream) if mode === :idle transcoded_in = transcoded_out = in = out = 0 elseif mode === :read || mode === :stop - transcoded_in = buffer2.transcoded transcoded_out = buffer1.transcoded - in = transcoded_in + buffersize(buffer2) out = transcoded_out - buffersize(buffer1) + if has_sharedbuf(stream) + transcoded_in = stats(stream.stream).out + in = transcoded_in + else + transcoded_in = buffer2.transcoded + in = transcoded_in + buffersize(buffer2) + end elseif mode === :write transcoded_in = buffer1.transcoded out = state.bytes_written_out @@ -701,8 +706,12 @@ function callprocess(stream::TranscodingStream, inbuf::Buffer, outbuf::Buffer) input_delta = Δin, output_delta = Δout, ) - consumed!(inbuf, Δin, transcode = true) - supplied!(outbuf, Δout, transcode = true) + consumed!(inbuf, Δin; + transcode = !has_sharedbuf(stream) || stream.state.mode === :write, + ) # inbuf is buffer1 if mode is :write + supplied!(outbuf, Δout; + transcode = !has_sharedbuf(stream) || stream.state.mode !== :write, + ) # outbuf is buffer1 if mode is not :write if has_sharedbuf(stream) if stream.state.mode === :write # this must be updated before throwing any error if outbuf is shared. diff --git a/test/codecdoubleframe.jl b/test/codecdoubleframe.jl index ca0c8187..0394d9db 100644 --- a/test/codecdoubleframe.jl +++ b/test/codecdoubleframe.jl @@ -324,8 +324,8 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD @test stat.out == 0 read(stream) stat = TranscodingStreams.stats(stream) - @test_broken stat.in == 16 - @test_broken stat.transcoded_in == 16 + @test stat.in == 16 + @test stat.transcoded_in == 16 @test stat.transcoded_out == 6 @test stat.out == 6 close(stream) @@ -373,7 +373,7 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD @test stat.transcoded_in == 16 @test stat.transcoded_out == 6 @test stat.out == 6 - @test_broken position(stream.stream) == 6 + @test position(stream.stream) == 6 close(stream) end end diff --git a/test/codecnoop.jl b/test/codecnoop.jl index abcd2751..c9ab0660 100644 --- a/test/codecnoop.jl +++ b/test/codecnoop.jl @@ -193,7 +193,7 @@ using FillArrays: Zeros @test stat.out === Int64(0) eof(stream) stat = TranscodingStreams.stats(stream) - @test_broken stat.in === Int64(0) + @test stat.in === Int64(0) @test stat.out === Int64(0) read(stream) stat = TranscodingStreams.stats(stream)