diff --git a/docs/src/assets/modes.dot b/docs/src/assets/modes.dot index 67f2e0a5..2031a77d 100644 --- a/docs/src/assets/modes.dot +++ b/docs/src/assets/modes.dot @@ -12,17 +12,19 @@ digraph modes { "read" -> "panic"; "write" -> "write"; - "write" -> "stop"; + "write" -> "done"; "write" -> "close"; "write" -> "panic"; "stop" -> "close"; + "done" -> "close"; "start" [ shape = point ]; "idle" [ shape = circle ]; "read" [ shape = circle ]; "write" [ shape = circle ]; "stop" [ shape = circle; style=bold; ]; + "done" [ shape = circle; style=bold; ]; "close" [ shape = circle; style=bold; ]; "panic" [ shape = circle; style=bold; ]; } diff --git a/docs/src/assets/modes.svg b/docs/src/assets/modes.svg index 4aa0824a..a7ddb427 100644 --- a/docs/src/assets/modes.svg +++ b/docs/src/assets/modes.svg @@ -1,117 +1,150 @@ - - - + + modes - + -start - + +start + -idle - -idle + +idle + +idle -start->idle - - + +start->idle + + -read - -read + +read + +read -idle->read - - + +idle->read + + -write - -write + +write + +write -idle->write - - + +idle->write + + -close - -close + +close + +close -idle->close - - + +idle->close + + -panic - -panic + +panic + +panic -idle->panic - - + +idle->panic + + -read->read - - + +read->read + + -read->close - - + +read->close + + -read->panic - - + +read->panic + + -stop - -stop + +stop + +stop -read->stop - - + +read->stop + + -write->write - - + +write->write + + -write->close - - + +write->close + + -write->panic - - - - -write->stop - - + +write->panic + + + + + +done + +done + + + +write->done + + -stop->close - - + +stop->close + + + + + +done->close + + diff --git a/docs/src/devnotes.md b/docs/src/devnotes.md index 24aba3aa..895bb54e 100644 --- a/docs/src/devnotes.md +++ b/docs/src/devnotes.md @@ -46,7 +46,8 @@ The `mode` field may be one of the following value: - `:idle` : initial and intermediate mode, no buffered data - `:read` : being ready to read data, data may be buffered - `:write`: being ready to write data, data may be buffered -- `:stop` : transcoding is stopped, data may be buffered +- `:stop` : transcoding is stopped after read, data may be buffered +- `:done` : transcoding is stopped after write, data may be buffered - `:close`: closed, no buffered data - `:panic`: an exception has been thrown in codec, data may be buffered but we cannot do anything diff --git a/src/state.jl b/src/state.jl index 12267039..eb68c487 100644 --- a/src/state.jl +++ b/src/state.jl @@ -9,12 +9,12 @@ See Developer's notes for details. """ mutable struct State # current stream mode - mode::Symbol # {:idle, :read, :write, :stop, :close, :panic} + mode::Symbol # {:idle, :read, :write, :stop, :done, :close, :panic} # return code of the last method call code::Symbol # {:ok, :end, :error} - # flag to go :stop on :end + # flag to go :stop or :done on :end stop_on_end::Bool # exception thrown while data processing diff --git a/src/stream.jl b/src/stream.jl index 751711d0..57210b4f 100644 --- a/src/stream.jl +++ b/src/stream.jl @@ -183,7 +183,7 @@ function Base.isopen(stream::TranscodingStream) end function Base.close(stream::TranscodingStream) - stopped = stream.state.mode == :stop + stopped = stream.state.mode ∈ (:stop, :done) if stream.state.mode != :panic changemode!(stream, :close) end @@ -212,7 +212,9 @@ end changemode!(stream, :read) continue elseif mode == :write - return eof(stream.stream) + return true + elseif mode == :done + return true elseif mode == :close return true elseif mode == :stop @@ -252,7 +254,7 @@ function Base.skip(stream::TranscodingStream, offset::Integer) mode = stream.state.mode buffer1 = stream.buffer1 skipped = 0 - if mode == :read + if mode ∈ (:read, :stop) while !eof(stream) && buffersize(buffer1) < offset - skipped n = buffersize(buffer1) emptybuffer!(buffer1) @@ -325,15 +327,11 @@ end # -------------- function Base.read(stream::TranscodingStream, ::Type{UInt8}) - # eof and ready_to_read! are inlined here because ready_to_read! is very slow and eof is broken - eof = buffersize(stream.buffer1) == 0 - state = stream.state - mode = state.mode - if !(mode == :read || mode == :stop) - changemode!(stream, :read) - end - if eof && sloweof(stream) - throw(EOFError()) + if eof(stream) + ready_to_read!(stream) + if eof(stream) + throw(EOFError()) + end end return readbyte!(stream.buffer1) end @@ -459,7 +457,7 @@ end # Ready to read data from the stream. function ready_to_read!(stream::TranscodingStream) mode = stream.state.mode - if !(mode == :read || mode == :stop) + if mode ∉ (:read, :stop) changemode!(stream, :read) end return @@ -623,7 +621,7 @@ function flushbuffer(stream::TranscodingStream, all::Bool=false) buffer1 = stream.buffer1 buffer2 = stream.buffer2 nflushed::Int = 0 - while (all ? buffersize(buffer1) != 0 : makemargin!(buffer1, 0) == 0) && state.mode != :stop + while (all ? buffersize(buffer1) != 0 : makemargin!(buffer1, 0) == 0) && state.mode != :done if state.code == :end callstartproc(stream, :write) end @@ -688,7 +686,11 @@ function callprocess(stream::TranscodingStream, inbuf::Buffer, outbuf::Buffer) # When no progress, expand the output buffer. makemargin!(outbuf, max(16, marginsize(outbuf) * 2)) elseif state.code == :end && state.stop_on_end - changemode!(stream, :stop) + if stream.state.mode == :read + changemode!(stream, :stop) + else + changemode!(stream, :done) + end end return Δin, Δout end @@ -778,7 +780,7 @@ function changemode!(stream::TranscodingStream, newmode::Symbol) return end elseif mode == :write - if newmode == :close || newmode == :stop + if newmode == :close || newmode == :done if newmode == :close flushbufferall(stream) flushuntilend(stream) @@ -792,6 +794,11 @@ function changemode!(stream::TranscodingStream, newmode::Symbol) state.mode = newmode return end + elseif mode == :done + if newmode == :close + state.mode = newmode + return + end elseif mode == :panic throw_panic_error() end diff --git a/test/codecdoubleframe.jl b/test/codecdoubleframe.jl new file mode 100644 index 00000000..a0f81020 --- /dev/null +++ b/test/codecdoubleframe.jl @@ -0,0 +1,224 @@ +using TranscodingStreams +using Random +using Test +using TranscodingStreams: + TranscodingStreams, + TranscodingStream, + test_roundtrip_read, + test_roundtrip_write, + test_roundtrip_lines, + test_roundtrip_transcode, + test_roundtrip_fileio, + test_chunked_read, + test_chunked_write, + Error + +# An insane codec for testing the codec APIs. +struct DoubleFrameEncoder <: TranscodingStreams.Codec + opened::Base.RefValue{Bool} + stopped::Base.RefValue{Bool} + got_stop_msg::Base.RefValue{Bool} +end + +DoubleFrameEncoder() = DoubleFrameEncoder(Ref(false), Ref(false), Ref(false)) + +function TranscodingStreams.process( + codec :: DoubleFrameEncoder, + input :: TranscodingStreams.Memory, + output :: TranscodingStreams.Memory, + error :: TranscodingStreams.Error, + ) + if input.size == 0 + codec.got_stop_msg[] = true + end + + if output.size < 2 + error[] = ErrorException("requires a minimum of 2 bytes of output space") + return 0, 0, :error + elseif codec.stopped[] + error[] = ErrorException("cannot process after stopped") + return 0, 0, :error + elseif codec.got_stop_msg[] && input.size != 0 + error[] = ErrorException("cannot accept more input after getting stop message") + return 0, 0, :error + elseif !codec.opened[] + output[1] = UInt8('[') + output[2] = UInt8(' ') + codec.opened[] = true + return 0, 2, :ok + elseif codec.got_stop_msg[] + output[1] = UInt8(' ') + output[2] = UInt8(']') + codec.stopped[] = true + return 0, 2, :end + else + i = j = 0 + while i + 1 ≤ lastindex(input) && j + 2 ≤ lastindex(output) + b = input[i+1] + i += 1 + output[j+1] = output[j+2] = b + j += 2 + end + return i, j, :ok + end +end + +function TranscodingStreams.expectedsize( + :: DoubleFrameEncoder, + input :: TranscodingStreams.Memory) + return input.size * 2 + 2 + 2 +end + +function TranscodingStreams.minoutsize( + :: DoubleFrameEncoder, + :: TranscodingStreams.Memory) + return 2 +end + +function TranscodingStreams.startproc(codec::DoubleFrameEncoder, ::Symbol, error::Error) + codec.opened[] = false + codec.got_stop_msg[] = false + codec.stopped[] = false + return :ok +end + +# An insane codec for testing the codec APIs. +struct DoubleFrameDecoder <: TranscodingStreams.Codec + state::Base.RefValue{Int} + a::Base.RefValue{UInt8} + b::Base.RefValue{UInt8} +end + +DoubleFrameDecoder() = DoubleFrameDecoder(Ref(1), Ref(0x00), Ref(0x00)) + +function TranscodingStreams.process( + codec :: DoubleFrameDecoder, + input :: TranscodingStreams.Memory, + output :: TranscodingStreams.Memory, + error :: TranscodingStreams.Error, + ) + Δin::Int = 0 + Δout::Int = 0 + + function do_read(ref) + iszero(input.size) && error("Expected byte") + if Δin + 1 ≤ lastindex(input) + Δin += 1 + ref[] = input[Δin] + true + else + false + end + end + + function do_write(x::UInt8) + if Δout + 1 ≤ lastindex(output) + Δout += 1 + output[Δout] = x + true + else + false + end + end + + try + # hacky resumable function using goto, just for fun. + oldstate = codec.state[] + if oldstate == 1 + @goto state1 + elseif oldstate == 2 + @goto state2 + elseif oldstate == 3 + @goto state3 + elseif oldstate == 4 + @goto state4 + elseif oldstate == 5 + @goto state5 + else + error("unexpected state $(oldstate)") + end + + @label state1 + do_read(codec.a) || return (codec.state[]=1; (Δin, Δout, :ok)) + codec.a[] != UInt8('[') && error("expected [") + @label state2 + do_read(codec.a) || return (codec.state[]=2; (Δin, Δout, :ok)) + codec.a[] != UInt8(' ') && error("expected space") + while true + @label state3 + do_read(codec.a) || return (codec.state[]=3; (Δin, Δout, :ok)) + @label state4 + do_read(codec.b) || return (codec.state[]=4; (Δin, Δout, :ok)) + if codec.a[] == codec.b[] + @label state5 + do_write(codec.a[]) || return (codec.state[]=5; (Δin, Δout, :ok)) + elseif codec.a[] == UInt8(' ') && codec.b[] == UInt8(']') + break + else + error("expected matching bytes or space and ]") + end + end + return Δin, Δout, :end + catch e + e isa ErrorException || rethrow() + error[] = e + return Δin, Δout, :error + end +end + +function TranscodingStreams.startproc(codec::DoubleFrameDecoder, ::Symbol, error::Error) + codec.state[] = 1 + codec.a[] = 0x00 + codec.b[] = 0x00 + return :ok +end + +const DoubleFrameEncoderStream{S} = TranscodingStream{DoubleFrameEncoder,S} where S<:IO +DoubleFrameEncoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameEncoder(), stream; kwargs...) + +const DoubleFrameDecoderStream{S} = TranscodingStream{DoubleFrameDecoder,S} where S<:IO +DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameDecoder(), stream; kwargs...) + + +@testset "DoubleFrame Codecs" begin + @test transcode(DoubleFrameEncoder, b"") == b"[ ]" + @test transcode(DoubleFrameEncoder, b"a") == b"[ aa ]" + @test transcode(DoubleFrameEncoder, b"ab") == b"[ aabb ]" + @test transcode(DoubleFrameEncoder(), b"") == b"[ ]" + @test transcode(DoubleFrameEncoder(), b"a") == b"[ aa ]" + @test transcode(DoubleFrameEncoder(), b"ab") == b"[ aabb ]" + + @test_throws Exception transcode(DoubleFrameDecoder, b"") + @test_throws Exception transcode(DoubleFrameDecoder, b" [") + @test_throws Exception transcode(DoubleFrameDecoder, b" ]") + @test_throws Exception transcode(DoubleFrameDecoder, b"[]") + @test_throws Exception transcode(DoubleFrameDecoder, b" ") + @test_throws Exception transcode(DoubleFrameDecoder, b" ") + @test_throws Exception transcode(DoubleFrameDecoder, b"aabb") + @test_throws Exception transcode(DoubleFrameDecoder, b"[ ab ]") + @test transcode(DoubleFrameDecoder, b"[ ]") == b"" + @test transcode(DoubleFrameDecoder, b"[ aa ]") == b"a" + @test transcode(DoubleFrameDecoder, b"[ aabb ]") == b"ab" + @test transcode(DoubleFrameDecoder, b"[ aaaa ]") == b"aa" + @test transcode(DoubleFrameDecoder, b"[ ]") == b" " + @test transcode(DoubleFrameDecoder, b"[ ]] ]") == b" ]" + + @testset "eof is true after write stops" begin + sink = IOBuffer() + stream = TranscodingStream(DoubleFrameDecoder(), sink, stop_on_end=true) + @test_broken write(stream, "[ yy ]sdfsadfasdfdf") == 4 + @test eof(stream) + @test_throws ArgumentError read(stream, UInt8) + flush(stream) + @test take!(sink) == b"y" + close(stream) + end + + test_roundtrip_read(DoubleFrameEncoderStream, DoubleFrameDecoderStream) + test_roundtrip_write(DoubleFrameEncoderStream, DoubleFrameDecoderStream) + test_roundtrip_lines(DoubleFrameEncoderStream, DoubleFrameDecoderStream) + test_roundtrip_transcode(DoubleFrameEncoder, DoubleFrameDecoder) + test_roundtrip_fileio(DoubleFrameEncoder, DoubleFrameDecoder) + test_chunked_read(DoubleFrameEncoder, DoubleFrameDecoder) + test_chunked_write(DoubleFrameEncoder, DoubleFrameDecoder) +end diff --git a/test/codecquadruple.jl b/test/codecquadruple.jl index 87c4950f..d0b3bd39 100644 --- a/test/codecquadruple.jl +++ b/test/codecquadruple.jl @@ -134,4 +134,22 @@ end @test take!(sink) == b"xxxx" close(stream) end + + @testset "eof is true after write" begin + sink = IOBuffer() + stream = TranscodingStream(QuadrupleCodec(), sink, bufsize=16) + write(stream, "x") + @test eof(stream) + @test_throws ArgumentError read(stream, UInt8) + @test eof(stream) + write(stream, "y") + @test eof(stream) + write(stream, TranscodingStreams.TOKEN_END) + @test eof(stream) + flush(stream) + @test eof(stream) + @test take!(sink) == b"xxxxyyyy" + close(stream) + @test eof(stream) + end end diff --git a/test/runtests.jl b/test/runtests.jl index 72b74210..52ad099f 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -126,3 +126,4 @@ end include("codecnoop.jl") include("codecinvalid.jl") include("codecquadruple.jl") +include("codecdoubleframe.jl") \ No newline at end of file