From 61f5f29b2539f74f67af44da3c9861ceadb33667 Mon Sep 17 00:00:00 2001 From: Cassondra Foesch Date: Sat, 22 May 2021 15:42:42 +0000 Subject: [PATCH] Export a ReadFromWithConcurrency function that permits ensuring concurrency usage --- client.go | 33 +++++++++++++++++++++++---------- 1 file changed, 23 insertions(+), 10 deletions(-) diff --git a/client.go b/client.go index f9bc5ce2..377c367e 100644 --- a/client.go +++ b/client.go @@ -1500,8 +1500,13 @@ func (f *File) WriteAt(b []byte, off int64) (written int, err error) { return len(b), nil } -// readFromConcurrent implements ReaderFrom, but works concurrently rather than sequentially. -func (f *File) readFromConcurrent(r io.Reader, remain int64) (read int64, err error) { +// ReadFromWithConcurrency implements ReaderFrom, +// but uses the given concurrency to issue multiple requests at the same time. +// +// Giving a concurrency of less than one will default to the Client’s max concurrency. +// +// Otherwise, the given concurrency will be capped by the Client's max concurrency. +func (f *File) ReadFromWithConcurrency(r io.Reader, concurrency int) (read int64, err error) { // Split the write into multiple maxPacket sized concurrent writes. // This allows writes with a suitably large reader // to transfer data at a much faster rate due to overlapping round trip times. @@ -1521,12 +1526,9 @@ func (f *File) readFromConcurrent(r io.Reader, remain int64) (read int64, err er } errCh := make(chan rwErr) - concurrency64 := remain/int64(f.c.maxPacket) + 1 // a bad guess, but better than no guess - if concurrency64 > int64(f.c.maxConcurrentRequests) || concurrency64 < 1 { - concurrency64 = int64(f.c.maxConcurrentRequests) + if concurrency > f.c.maxConcurrentRequests || concurrency < 1 { + concurrency = f.c.maxConcurrentRequests } - // Now that concurrency64 is saturated to an int value, we know this assignment cannot possibly overflow. - concurrency := int(concurrency64) pool := newBufPool(concurrency, f.c.maxPacket) @@ -1655,12 +1657,23 @@ func (f *File) ReadFrom(r io.Reader) (int64, error) { } if remain < 0 { - remain = math.MaxInt64 + // We can strongly assert that we want default max concurrency here. + return f.ReadFromWithConcurrency(r, f.c.maxConcurrentRequests) } if remain > int64(f.c.maxPacket) { - // Only use concurrency, if it would be at least two read/writes. - return f.readFromConcurrent(r, remain) + // Otherwise, only use concurrency, if it would be at least two packets. + + // This is the best reasonable guess we can make. + concurrency64 := remain/int64(f.c.maxPacket) + 1 + + // We need to cap this value to an `int` size value to avoid overflow on 32-bit machines. + // So, we may as well pre-cap it to `f.c.maxConcurrentRequests`. + if concurrency64 > int64(f.c.maxConcurrentRequests) { + concurrency64 = int64(f.c.maxConcurrentRequests) + } + + return f.ReadFromWithConcurrency(r, int(concurrency64)) } }