Skip to content

Commit

Permalink
mem: implement ReadAll() for more efficient io.Reader consumption (
Browse files Browse the repository at this point in the history
  • Loading branch information
ash2k authored Nov 12, 2024
1 parent d2c1aae commit 60c70a4
Show file tree
Hide file tree
Showing 3 changed files with 366 additions and 4 deletions.
59 changes: 57 additions & 2 deletions mem/buffer_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ import (
"io"
)

const (
// 32 KiB is what io.Copy uses.
readAllBufSize = 32 * 1024
)

// BufferSlice offers a means to represent data that spans one or more Buffer
// instances. A BufferSlice is meant to be immutable after creation, and methods
// like Ref create and return copies of the slice. This is why all methods have
Expand Down Expand Up @@ -219,8 +224,58 @@ func (w *writer) Write(p []byte) (n int, err error) {

// NewWriter wraps the given BufferSlice and BufferPool to implement the
// io.Writer interface. Every call to Write copies the contents of the given
// buffer into a new Buffer pulled from the given pool and the Buffer is added to
// the given BufferSlice.
// buffer into a new Buffer pulled from the given pool and the Buffer is
// added to the given BufferSlice.
func NewWriter(buffers *BufferSlice, pool BufferPool) io.Writer {
return &writer{buffers: buffers, pool: pool}
}

// ReadAll reads from r until an error or EOF and returns the data it read.
// A successful call returns err == nil, not err == EOF. Because ReadAll is
// defined to read from src until EOF, it does not treat an EOF from Read
// as an error to be reported.
//
// Important: A failed call returns a non-nil error and may also return
// partially read buffers. It is the responsibility of the caller to free the
// BufferSlice returned, or its memory will not be reused.
func ReadAll(r io.Reader, pool BufferPool) (BufferSlice, error) {
var result BufferSlice
if wt, ok := r.(io.WriterTo); ok {
// This is more optimal since wt knows the size of chunks it wants to
// write and, hence, we can allocate buffers of an optimal size to fit
// them. E.g. might be a single big chunk, and we wouldn't chop it
// into pieces.
w := NewWriter(&result, pool)
_, err := wt.WriteTo(w)
return result, err
}
nextBuffer:
for {
buf := pool.Get(readAllBufSize)
// We asked for 32KiB but may have been given a bigger buffer.
// Use all of it if that's the case.
*buf = (*buf)[:cap(*buf)]
usedCap := 0
for {
n, err := r.Read((*buf)[usedCap:])
usedCap += n
if err != nil {
if usedCap == 0 {
// Nothing in this buf, put it back
pool.Put(buf)
} else {
*buf = (*buf)[:usedCap]
result = append(result, NewBuffer(buf, pool))
}
if err == io.EOF {
err = nil
}
return result, err
}
if len(*buf) == usedCap {
result = append(result, NewBuffer(buf, pool))
continue nextBuffer
}
}
}
}
Loading

0 comments on commit 60c70a4

Please sign in to comment.