diff --git a/decoder/readbuffer.go b/decoder/readbuffer.go index ecd180ab..040da3c5 100644 --- a/decoder/readbuffer.go +++ b/decoder/readbuffer.go @@ -20,37 +20,53 @@ const ( defaultReadBufferSize = 4096 ) -// readBuffer is a custom buffered reader. See newReadBuffer() for details. +// readBuffer is a custom buffered reader that will automatically handle buffering, allowing us to +// read bytes directly from the buffer without extra copying, unlike *bufio.Reader which requires us +// to copy the bytes on every Read() method call. When using *bufio.reader we might receive fewer bytes +// than requested, readBuffer returns exactly n requested bytes, otherwise, it returns an error. type readBuffer struct { r io.Reader // reader provided by the client - // buf is a bytes buffer to read from io.Reader. - // This has unique memory layout: - // [reserved section] + [resizable section] - // [0, 1,..., 765, + 766, 767,..., size] + // buf is buffer bytes for client reading. // - // reserved section is used to memmove remaining bytes when remaining < n. + // Memory layout: + // [reserved section] + [resizable section]: + // [0, 1, 2,..., 765] + [766, 767, 768,...] + // + // reserved section is used to memmove remaining bytes when remaining < n on reading. // resizable section is the space for reading from io.Reader. // - // This way, fragmented remaining bytes is handled and it will always try to - // read exactly x size bytes from io.Reader. + // This way, fragmented remaining bytes is handled and we can always try + // reading exactly n size bytes from io.Reader. + // + // This should be allocated upon creation with minimum len 2*reservedbuf. buf []byte cur, last int // cur and last of buf positions } -// newReadBuffer creates a new reader that will automatically handle buffering, -// allowing us to read bytes directly from the buffer without extra copying. -// This is unlike *bufio.Reader, which requires us to copy the bytes on every Read() method call. -func newReadBuffer(rd io.Reader, size int) *readBuffer { - r := new(readBuffer) - r.Reset(rd, size) - return r +// Reset resets readBuffer with the new reader and size. +func (b *readBuffer) Reset(r io.Reader, size int) { + b.r, b.cur, b.last = r, 0, 0 + + if size < minReadBufferSize { + size = minReadBufferSize + } else if size > maxReadBufferSize { + size = maxReadBufferSize + } + + oldsize := cap(b.buf) - reservedbuf + if size > oldsize { + b.buf = make([]byte, reservedbuf+size) + } + b.buf = b.buf[:reservedbuf+size] } // ReadN reads bytes from the buffer and return exactly n bytes. // If the remaining bytes in the buffer is less than n bytes requested, it will automatically fill the buffer. -// And if in the process it got less than n, an error will be returned. +// And if it got less than n, an error will be returned. +// +// NOTE: n should be >= 0 and n <= reservedbuf, however, we don't enforce it for efficiency. func (b *readBuffer) ReadN(n int) ([]byte, error) { remaining := b.last - b.cur if n > remaining { // fill buf @@ -72,25 +88,3 @@ func (b *readBuffer) ReadN(n int) ([]byte, error) { b.cur += n return buf, nil } - -// Reset resets buf reader. -func (b *readBuffer) Reset(rd io.Reader, size int) { - if size < minReadBufferSize { - size = minReadBufferSize - } else if size > maxReadBufferSize { - size = maxReadBufferSize - } - - oldsize := cap(b.buf) - reservedbuf - if size > oldsize { - b.buf = make([]byte, reservedbuf+size) - } - b.buf = b.buf[:reservedbuf+size] - - b.r = rd - b.cur = reservedbuf - b.last = reservedbuf -} - -// Size return the len of resizeable section of buf. -func (b *readBuffer) Size() int { return len(b.buf) - reservedbuf } diff --git a/decoder/readbuffer_test.go b/decoder/readbuffer_test.go index 582f32d2..361f24dd 100644 --- a/decoder/readbuffer_test.go +++ b/decoder/readbuffer_test.go @@ -9,36 +9,67 @@ import ( "errors" "fmt" "io" - "math" "reflect" "testing" "github.com/google/go-cmp/cmp" ) -func TestNewReadBuffer(t *testing.T) { +func TestReadBufferReset(t *testing.T) { tt := []struct { - name string - size int - lenBuf int + name string + r io.Reader + size int + expectedSize int }{ - {name: "default", size: defaultReadBufferSize, lenBuf: reservedbuf + defaultReadBufferSize}, - {name: "less than minReadBufferSize", size: 8, lenBuf: reservedbuf + minReadBufferSize}, - {name: "8192", size: 8192, lenBuf: reservedbuf + 8192}, - {name: "more than maxReadBufferSize", size: math.MaxInt, lenBuf: reservedbuf + maxReadBufferSize}, + { + name: "default", + r: fnReaderOK, + size: 4096, + expectedSize: 4096, + }, + { + name: "less than minimal size", + r: fnReaderOK, + size: reservedbuf - 1, + expectedSize: reservedbuf, + }, + { + name: "more than maximum size", + r: fnReaderOK, + size: maxReadBufferSize + 1, + expectedSize: maxReadBufferSize, + }, + { + name: "nil buffer", + r: nil, + size: 4096, + expectedSize: 4096, + }, } for i, tc := range tt { t.Run(fmt.Sprintf("[%d] %s", i, tc.name), func(t *testing.T) { - b := newReadBuffer(nil, tc.size) - if b.cur != reservedbuf { - t.Fatalf("expected cur: %d, got: %d", reservedbuf, b.cur) + b := &readBuffer{ + r: fnReader(func(b []byte) (n int, err error) { return n, err }), + cur: 123, + last: 456, + } + b.Reset(tc.r, tc.size) + + if diff := cmp.Diff(b.r, tc.r, cmp.Transformer("r", func(r io.Reader) uintptr { + return reflect.ValueOf(r).Pointer() + })); diff != "" { + t.Fatal(diff) } - if b.last != reservedbuf { - t.Fatalf("expected last: %d, got: %d", reservedbuf, b.last) + + if b.cur != 0 && b.last != 0 { + t.Fatalf("cur and last should be zero after reset, got: %d and %d", b.cur, b.last) } - if len(b.buf) != tc.lenBuf { - t.Fatalf("expected len(buf): %d, got: %d", tc.lenBuf, len(b.buf)) + + size := len(b.buf) - reservedbuf + if size != tc.expectedSize { + t.Fatalf("expected size: %d, got: %d", tc.expectedSize, size) } }) } @@ -177,36 +208,11 @@ func TestReadBufferReadN(t *testing.T) { for i, tc := range tt { t.Run(fmt.Sprintf("[%d] %s", i, tc.name), func(t *testing.T) { - b := newReadBuffer(tc.r, tc.size) + b := new(readBuffer) + b.Reset(tc.r, 4096) if err := tc.testFn(b); err != nil { t.Fatalf("expected nil, got: %v", err) } }) } } - -func TestReadBufferResetAndSize(t *testing.T) { - r := io.Reader(fnReaderOK) - b := newReadBuffer(nil, 4096) - - b.Reset(r, 4096*2) - if b.Size() != 4096*2 { - t.Fatalf("expected: %d, got: %d", 4096*2, b.Size()) - } - if diff := cmp.Diff(r, b.r, cmp.Transformer("r", func(r io.Reader) uintptr { - return reflect.ValueOf(r).Pointer() - })); diff != "" { - t.Fatal(diff) - } - - // Revert back - b.Reset(nil, 4096) - if diff := cmp.Diff(nil, b.r, cmp.Transformer("r", func(r io.Reader) uintptr { - return reflect.ValueOf(r).Pointer() - })); diff != "" { - t.Fatal(diff) - } - if b.Size() != 4096 { - t.Fatalf("expected: %d, got: %d", 4096, b.Size()) - } -}