diff --git a/docs/src/devnotes.md b/docs/src/devnotes.md index 5fd968c8..ccae4124 100644 --- a/docs/src/devnotes.md +++ b/docs/src/devnotes.md @@ -41,6 +41,7 @@ default buffer size is 16KiB for each. - `error`: exception returned by the codec (`<:Error`) - `buffer1`: data buffer that is closer to the user (`<:Buffer`) - `buffer2`: data buffer that is farther to the user (`<:Buffer`) +- `bytes_written_out`: number of bytes written to the underlying stream (`<:Int64`) The `mode` field may be one of the following value: - `:idle` : initial and intermediate mode, no buffered data @@ -78,8 +79,10 @@ Shared buffers Adjacent transcoding streams may share their buffers. This will reduce memory allocation and eliminate data copy between buffers. -`readdata!(input::IO, output::Buffer)` and `writedata!(output::IO, -input::Buffer)` do the actual work of read/write data from/to the underlying +If `buffer2` is shared it is considered to be owned by the underlying stream +by the `stats` and `position` functions. + +`readdata!(input::IO, output::Buffer)` and `flush_buffer2(stream::TranscodingStream)` do the actual work of read/write data from/to the underlying stream. These methods have a special pass for shared buffers. diff --git a/fuzz/fuzz.jl b/fuzz/fuzz.jl index ef36dcc5..8710e0ee 100644 --- a/fuzz/fuzz.jl +++ b/fuzz/fuzz.jl @@ -152,7 +152,25 @@ end ) stream = wrap_stream(kws, IOBuffer()) for i in 1:length(data) + position(stream) == i-1 || return false + if stream isa TranscodingStream + s = TranscodingStreams.stats(stream) + s.in == i-1 || return false + # TODO fix position(stream.stream) + # s.out == position(stream.stream) || return false + # s.transcoded_in == s.out || return false + # s.transcoded_out == s.out || return false + end write(stream, data[i]) == 1 || return false end - take_all(stream) == data + take_all(stream) == data || return false + if stream isa TranscodingStream + s = TranscodingStreams.stats(stream) + s.in == length(data) || return false + # TODO fix position(stream.stream) + # s.out == position(stream.stream) || return false + # s.transcoded_in == s.out || return false + # s.transcoded_out == s.out || return false + end + true end \ No newline at end of file diff --git a/src/noop.jl b/src/noop.jl index 8b64d91b..cbdcf097 100644 --- a/src/noop.jl +++ b/src/noop.jl @@ -72,7 +72,7 @@ end function Base.seek(stream::NoopStream, pos::Integer) mode = stream.state.mode if mode === :write - flushbuffer(stream) + flush_buffer2(stream) end seek(stream.stream, pos) initbuffer!(stream.buffer1) @@ -82,7 +82,7 @@ end function Base.seekstart(stream::NoopStream) mode = stream.state.mode if mode === :write - flushbuffer(stream) + flush_buffer2(stream) end seekstart(stream.stream) initbuffer!(stream.buffer1) @@ -92,7 +92,7 @@ end function Base.seekend(stream::NoopStream) mode = stream.state.mode if mode === :write - flushbuffer(stream) + flush_buffer2(stream) end seekend(stream.stream) initbuffer!(stream.buffer1) @@ -103,19 +103,23 @@ function Base.write(stream::NoopStream, b::UInt8)::Int changemode!(stream, :write) if has_sharedbuf(stream) # directly write data to the underlying stream - n = Int(write(stream.stream, b)) - return n + write(stream.stream, b) + stream.state.bytes_written_out += 1 + else + buffer1 = stream.buffer1 + marginsize(buffer1) > 0 || flush_buffer2(stream) + writebyte!(buffer1, b) end - buffer1 = stream.buffer1 - marginsize(buffer1) > 0 || flushbuffer(stream) - return writebyte!(buffer1, b) + return 1 end function Base.unsafe_write(stream::NoopStream, input::Ptr{UInt8}, nbytes::UInt)::Int changemode!(stream, :write) + Int(nbytes) # Error if nbytes > typemax Int if has_sharedbuf(stream) # directly write data to the underlying stream n = Int(unsafe_write(stream.stream, input, nbytes)) + stream.state.bytes_written_out += n return n end buffer = stream.buffer1 @@ -123,9 +127,10 @@ function Base.unsafe_write(stream::NoopStream, input::Ptr{UInt8}, nbytes::UInt): copydata!(buffer, input, Int(nbytes)) return Int(nbytes) else - flushbuffer(stream) + flush_buffer2(stream) # directly write data to the underlying stream n = Int(unsafe_write(stream.stream, input, nbytes)) + stream.state.bytes_written_out += n return n end end @@ -148,17 +153,20 @@ function stats(stream::NoopStream) buffer = stream.buffer1 @assert buffer === stream.buffer2 if mode === :idle - consumed = supplied = 0 + in = out = 0 elseif mode === :read - supplied = buffer.transcoded - consumed = supplied - buffersize(buffer) + in = buffer.transcoded + out = in - buffersize(buffer) elseif mode === :write - supplied = buffer.transcoded + buffersize(buffer) - consumed = buffer.transcoded + out = stream.state.bytes_written_out + in = out + if !has_sharedbuf(stream) + in += buffersize(buffer) + end else throw_invalid_mode(mode) end - return Stats(consumed, supplied, supplied, supplied) + return Stats(in, out, out, out) end @@ -190,24 +198,15 @@ function fillbuffer(stream::NoopStream; eager::Bool = false)::Int return nfilled end -function flushbuffer(stream::NoopStream, all::Bool=false) - changemode!(stream, :write) - buffer = stream.buffer1 - @assert buffer === stream.buffer2 - nflushed::Int = 0 - if all - while buffersize(buffer) > 0 - nflushed += writedata!(stream.stream, buffer) - end - else - nflushed += writedata!(stream.stream, buffer) - makemargin!(buffer, 0) - end - buffer.transcoded += nflushed - return nflushed +# Empty buffer1 by writing out data. +# `stream` must be in :write mode. +# Ensure there is margin available in buffer1 for at least one byte. +function flush_buffer1(stream::NoopStream)::Nothing + flush_buffer2(stream) end +# This is always called after `flush_buffer1(stream)` function flushuntilend(stream::NoopStream) - stream.buffer1.transcoded += writedata!(stream.stream, stream.buffer1) + @assert iszero(buffersize(stream.buffer1)) return end diff --git a/src/state.jl b/src/state.jl index 0cb54b6c..134b9495 100644 --- a/src/state.jl +++ b/src/state.jl @@ -24,8 +24,11 @@ mutable struct State buffer1::Buffer buffer2::Buffer + # Number of bytes written to the underlying stream + bytes_written_out::Int64 + function State(buffer1::Buffer, buffer2::Buffer) - return new(:idle, :ok, false, Error(), buffer1, buffer2) + return new(:idle, :ok, false, Error(), buffer1, buffer2, Int64(0)) end end diff --git a/src/stream.jl b/src/stream.jl index 22ef593f..ff7f0fda 100644 --- a/src/stream.jl +++ b/src/stream.jl @@ -494,21 +494,24 @@ function Base.write(stream::TranscodingStream) return 0 end -function Base.write(stream::TranscodingStream, b::UInt8) +function Base.write(stream::TranscodingStream, b::UInt8)::Int changemode!(stream, :write) - if marginsize(stream.buffer1) == 0 && flushbuffer(stream) == 0 - return 0 - end - return writebyte!(stream.buffer1, b) + buffer1 = stream.buffer1 + marginsize(buffer1) > 0 || flush_buffer1(stream) + writebyte!(buffer1, b) + return 1 end function Base.unsafe_write(stream::TranscodingStream, input::Ptr{UInt8}, nbytes::UInt) changemode!(stream, :write) - state = stream.state + Int(nbytes) # Error if nbytes > typemax Int buffer1 = stream.buffer1 p = input p_end = p + nbytes - while p < p_end && (marginsize(buffer1) > 0 || flushbuffer(stream) > 0) + while p < p_end + if marginsize(buffer1) ≤ 0 + flush_buffer1(stream) + end m = min(marginsize(buffer1), p_end - p) copydata!(buffer1, p, m) p += m @@ -535,7 +538,7 @@ const TOKEN_END = EndToken() function Base.write(stream::TranscodingStream, ::EndToken) changemode!(stream, :write) - flushbufferall(stream) + flush_buffer1(stream) flushuntilend(stream) return 0 end @@ -543,8 +546,8 @@ end function Base.flush(stream::TranscodingStream) checkmode(stream) if stream.state.mode == :write - flushbufferall(stream) - writedata!(stream.stream, stream.buffer2) + flush_buffer1(stream) + flush_buffer2(stream) end flush(stream.stream) end @@ -600,9 +603,12 @@ function stats(stream::TranscodingStream) out = transcoded_out - buffersize(buffer1) elseif mode === :write transcoded_in = buffer1.transcoded - transcoded_out = buffer2.transcoded + out = state.bytes_written_out + transcoded_out = out + if !has_sharedbuf(stream) + transcoded_out += buffersize(buffer2) + end in = transcoded_in + buffersize(buffer1) - out = transcoded_out - buffersize(buffer2) else throw_invalid_mode(mode) end @@ -633,38 +639,37 @@ function fillbuffer(stream::TranscodingStream; eager::Bool = false) return nfilled end -function flushbuffer(stream::TranscodingStream, all::Bool=false) - changemode!(stream, :write) +# Empty buffer1 by writing out data. +# `stream` must be in :write mode. +# Ensure there is margin available in buffer1 for at least one byte. +function flush_buffer1(stream::TranscodingStream)::Nothing state = stream.state buffer1 = stream.buffer1 buffer2 = stream.buffer2 - nflushed::Int = 0 - while (all ? buffersize(buffer1) != 0 : makemargin!(buffer1, 0) == 0) + while buffersize(buffer1) > 0 if state.code == :end callstartproc(stream, :write) end - writedata!(stream.stream, buffer2) - Δin, _ = callprocess(stream, buffer1, buffer2) - nflushed += Δin + flush_buffer2(stream) + callprocess(stream, buffer1, buffer2) end - return nflushed -end - -function flushbufferall(stream::TranscodingStream) - return flushbuffer(stream, true) + # move positions to the start of the buffer + @assert !iszero(makemargin!(buffer1, 0)) + return end +# This is always called after `flush_buffer1(stream)` function flushuntilend(stream::TranscodingStream) - changemode!(stream, :write) state = stream.state buffer1 = stream.buffer1 buffer2 = stream.buffer2 + @assert buffersize(buffer1) == 0 + @assert stream.state.mode === :write while state.code != :end - writedata!(stream.stream, buffer2) + flush_buffer2(stream) callprocess(stream, buffer1, buffer2) end - writedata!(stream.stream, buffer2) - @assert buffersize(buffer1) == 0 + flush_buffer2(stream) return end @@ -687,7 +692,7 @@ function callprocess(stream::TranscodingStream, inbuf::Buffer, outbuf::Buffer) state = stream.state input = buffermem(inbuf) GC.@preserve inbuf makemargin!(outbuf, minoutsize(stream.codec, input)) - Δin, Δout, state.code = GC.@preserve inbuf outbuf process(stream.codec, input, marginmem(outbuf), state.error) + Δin::Int, Δout::Int, state.code = GC.@preserve inbuf outbuf process(stream.codec, input, marginmem(outbuf), state.error) @debug( "called process()", code = state.code, @@ -698,6 +703,12 @@ function callprocess(stream::TranscodingStream, inbuf::Buffer, outbuf::Buffer) ) consumed!(inbuf, Δin, transcode = true) supplied!(outbuf, Δout, transcode = true) + if has_sharedbuf(stream) + if stream.state.mode === :write + # this must be updated before throwing any error if outbuf is shared. + stream.state.bytes_written_out += Δout + end + end if state.code == :error changemode!(stream, :panic) elseif state.code == :ok && Δin == Δout == 0 @@ -745,20 +756,28 @@ function readdata!(input::IO, output::Buffer)::Int end # Write all data to `output` from the buffer of `input`. -function writedata!(output::IO, input::Buffer) - if output isa TranscodingStream && output.buffer1 === input +function flush_buffer2(stream::TranscodingStream)::Nothing + output = stream.stream + buffer2 = stream.buffer2 + state = stream.state + @assert state.mode === :write + if has_sharedbuf(stream) # Delegate the operation to the underlying stream for shared buffers. - return flushbufferall(output) - end - nwritten::Int = 0 - while buffersize(input) > 0 - n = GC.@preserve input Base.unsafe_write(output, bufferptr(input), buffersize(input)) - consumed!(input, n) - nwritten += n + changemode!(output, :write) + flush_buffer1(output) + else + while buffersize(buffer2) > 0 + n::Int = GC.@preserve buffer2 Base.unsafe_write(output, bufferptr(buffer2), buffersize(buffer2)) + n ≤ 0 && error("short write") + consumed!(buffer2, n) + state.bytes_written_out += n + GC.safepoint() + end + # move positions to the start of the buffer + @assert !iszero(makemargin!(buffer2, 0)) GC.safepoint() end - GC.safepoint() - return nwritten + nothing end @@ -800,7 +819,7 @@ function changemode!(stream::TranscodingStream, newmode::Symbol) end elseif mode == :write if newmode == :close - flushbufferall(stream) + flush_buffer1(stream) flushuntilend(stream) state.mode = newmode finalize_codec(stream.codec, state.error) diff --git a/test/codecdoubleframe.jl b/test/codecdoubleframe.jl index 64b6d83a..ca0c8187 100644 --- a/test/codecdoubleframe.jl +++ b/test/codecdoubleframe.jl @@ -303,6 +303,89 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD end end + @testset "stats" begin + @testset "read" begin + stream = DoubleFrameEncoderStream(IOBuffer(b"foobar")) + stat = TranscodingStreams.stats(stream) + @test stat.in == 0 + @test stat.out == 0 + read(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in == 6 + @test stat.transcoded_in == 6 + @test stat.transcoded_out == 16 + @test stat.out == 16 + close(stream) + + #nested Streams + stream = DoubleFrameDecoderStream(DoubleFrameEncoderStream(IOBuffer(b"foobar"))) + stat = TranscodingStreams.stats(stream) + @test stat.in == 0 + @test stat.out == 0 + read(stream) + stat = TranscodingStreams.stats(stream) + @test_broken stat.in == 16 + @test_broken stat.transcoded_in == 16 + @test stat.transcoded_out == 6 + @test stat.out == 6 + close(stream) + end + + @testset "write" begin + stream = DoubleFrameEncoderStream(IOBuffer()) + stat = TranscodingStreams.stats(stream) + @test stat.in == 0 + @test stat.out == 0 + write(stream, b"foobar") + stat = TranscodingStreams.stats(stream) + @test stat.in == 6 + @test stat.transcoded_in == 0 + @test stat.transcoded_out == 0 + @test stat.out == 0 + flush(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in == 6 + @test stat.transcoded_in == 6 + @test stat.transcoded_out == 14 + @test stat.out == 14 + write(stream, TranscodingStreams.TOKEN_END) + stat = TranscodingStreams.stats(stream) + @test stat.in == 6 + @test stat.transcoded_in == 6 + @test stat.transcoded_out == 16 + @test stat.out == 16 + close(stream) + + #nested Streams + stream = DoubleFrameDecoderStream(DoubleFrameEncoderStream(IOBuffer())) + stat = TranscodingStreams.stats(stream) + @test stat.in == 0 + @test stat.out == 0 + write(stream, b"[ ffoooobbaarr ]") + stat = TranscodingStreams.stats(stream) + @test stat.in == 16 + @test stat.transcoded_in == 0 + @test stat.transcoded_out == 0 + @test stat.out == 0 + flush(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in == 16 + @test stat.transcoded_in == 16 + @test stat.transcoded_out == 6 + @test stat.out == 6 + @test_broken position(stream.stream) == 6 + close(stream) + end + end + + @testset "underlying stream fails" begin + sink = IOBuffer(;maxsize=4) + stream = DoubleFrameEncoderStream(sink) + @test write(stream, "abcd") == 4 + # make sure flush doesn't go into an infinite loop + @test_throws ErrorException("short write") flush(stream) + end + test_roundtrip_read(DoubleFrameEncoderStream, DoubleFrameDecoderStream) test_roundtrip_write(DoubleFrameEncoderStream, DoubleFrameDecoderStream) test_roundtrip_lines(DoubleFrameEncoderStream, DoubleFrameDecoderStream) diff --git a/test/codecnoop.jl b/test/codecnoop.jl index 6203787b..abcd2751 100644 --- a/test/codecnoop.jl +++ b/test/codecnoop.jl @@ -171,22 +171,65 @@ using FillArrays: Zeros @test s1.buffer1 === s2.buffer1 === s3.buffer1 === s1.buffer2 === s2.buffer2 === s3.buffer2 - stream = TranscodingStream(Noop(), IOBuffer(b"foobar")) - @test TranscodingStreams.stats(stream).in === Int64(0) - @test TranscodingStreams.stats(stream).out === Int64(0) - read(stream) - @test TranscodingStreams.stats(stream).in === Int64(6) - @test TranscodingStreams.stats(stream).out === Int64(6) - close(stream) + @testset "stats" begin + stream = TranscodingStream(Noop(), IOBuffer(b"foobar")) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(0) + @test stat.out === Int64(0) + eof(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(6) + @test stat.out === Int64(0) + read(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(6) + @test stat.out === Int64(6) + close(stream) - stream = TranscodingStream(Noop(), IOBuffer()) - @test TranscodingStreams.stats(stream).in === Int64(0) - @test TranscodingStreams.stats(stream).out === Int64(0) - write(stream, b"foobar") - flush(stream) - @test TranscodingStreams.stats(stream).in === Int64(6) - @test TranscodingStreams.stats(stream).out === Int64(6) - close(stream) + #nested NoopStreams + stream = NoopStream(NoopStream(IOBuffer(b"foobar"))) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(0) + @test stat.out === Int64(0) + eof(stream) + stat = TranscodingStreams.stats(stream) + @test_broken stat.in === Int64(0) + @test stat.out === Int64(0) + read(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(6) + @test stat.out === Int64(6) + close(stream) + + stream = TranscodingStream(Noop(), IOBuffer()) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(0) + @test stat.out === Int64(0) + write(stream, b"foobar") + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(6) + @test stat.out === Int64(0) + flush(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(6) + @test stat.out === Int64(6) + close(stream) + + #nested NoopStreams + stream = NoopStream(NoopStream(IOBuffer())) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(0) + @test stat.out === Int64(0) + write(stream, b"foobar") + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(6) + @test stat.out === Int64(6) + flush(stream) + stat = TranscodingStreams.stats(stream) + @test stat.in === Int64(6) + @test stat.out === Int64(6) + close(stream) + end stream = TranscodingStream(Noop(), IOBuffer()) @test stream.state.mode == :idle