From 89f21792fe64023a68c6eb505385d4eac81be6a2 Mon Sep 17 00:00:00 2001 From: Roberto Santalla Date: Sun, 1 Oct 2023 11:56:41 +0200 Subject: [PATCH 01/23] v2: big v2 commit --- Dockerfile | 12 +- client/client.go | 139 ---------------- go.mod | 6 +- helm/refractor/values-roobre.yaml | 21 +++ pool/peeker/peeker.go | 51 ------ pool/peeker/peeker_test.go | 120 -------------- pool/pool.go | 184 ++++++--------------- provider/providers/fake/fake.go | 18 +++ refractor.yaml | 17 +- refractor/refractor.go | 259 ++++++++++++++++++++++++++++++ refractor/refractor_test.go | 92 +++++++++++ server/server.go | 61 +++---- stats/readerWrapper.go | 45 ++++++ stats/stats.go | 31 ++-- worker/worker.go | 114 +++++++++---- 15 files changed, 626 insertions(+), 544 deletions(-) delete mode 100644 client/client.go create mode 100644 helm/refractor/values-roobre.yaml delete mode 100644 pool/peeker/peeker.go delete mode 100644 pool/peeker/peeker_test.go create mode 100644 provider/providers/fake/fake.go create mode 100644 refractor/refractor.go create mode 100644 refractor/refractor_test.go create mode 100644 stats/readerWrapper.go diff --git a/Dockerfile b/Dockerfile index d956384..fb9250b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,17 +1,17 @@ -FROM golang:1.18-alpine3.16 as builder +FROM golang:1.21-alpine as builder -WORKDIR /build +WORKDIR /src COPY go.mod go.sum ./ RUN go mod download COPY . ./ -RUN go build -o /refractor ./cmd +RUN go build -o /bin/refractor ./cmd -FROM alpine:3.16 +FROM alpine:3.18.4 -COPY --from=builder /refractor /bin/ -COPY --from=builder /build/refractor.yaml /config/refractor.yaml +COPY --from=builder /bin/refractor /bin +COPY --from=builder /src/refractor.yaml /config/refractor.yaml ENTRYPOINT ["/bin/refractor"] CMD ["-config", "/config/refractor.yaml"] diff --git a/client/client.go b/client/client.go deleted file mode 100644 index 344decd..0000000 --- a/client/client.go +++ /dev/null @@ -1,139 +0,0 @@ -package client - -import ( - "context" - "fmt" - "github.com/rs/dnscache" - log "github.com/sirupsen/logrus" - "net" - "net/http" - "strings" - "time" -) - -const clientHeader = "X-Refracted-By" - -type Client struct { - HTTPClient *http.Client - resolver *dnscache.Resolver - baseUrl string -} - -type Config struct { - PreDownloadTimeout time.Duration `yaml:"preDownloadTimeout"` - DownloadTimeout time.Duration `yaml:"downloadTimeout"` -} - -func (c Config) WithDefaults() Config { - if c.PreDownloadTimeout == 0 { - c.PreDownloadTimeout = 3 * time.Second - } - - if c.DownloadTimeout == 0 { - c.DownloadTimeout = 2 * time.Minute - } - - return c -} - -type Request struct { - Path string - Header http.Header - ResponseChan chan Response -} - -type Response struct { - HTTPResponse *http.Response - Worker string - Error error - Done func(written int64) -} - -func NewClient(c Config, baseUrl string) *Client { - c = c.WithDefaults() - - timeoutDialer := &net.Dialer{ - Timeout: c.PreDownloadTimeout, - } - - resolver := &dnscache.Resolver{} - - // Stolen from https://github.com/rs/dnscache - dialContext := func(ctx context.Context, network string, addr string) (conn net.Conn, err error) { - host, port, err := net.SplitHostPort(addr) - if err != nil { - return nil, fmt.Errorf("splitting host and port %q: %w", addr, err) - } - ips, err := resolver.LookupHost(ctx, host) - if err != nil { - return nil, fmt.Errorf("looking up %q: %w", host, err) - } - for _, ip := range ips { - conn, err = timeoutDialer.DialContext(ctx, network, net.JoinHostPort(ip, port)) - if err == nil { - break - } - } - return - } - - transport := &http.Transport{ - Proxy: http.ProxyFromEnvironment, - DialContext: dialContext, - MaxIdleConns: 10, - ResponseHeaderTimeout: c.PreDownloadTimeout, - IdleConnTimeout: c.PreDownloadTimeout, - TLSHandshakeTimeout: c.PreDownloadTimeout, - } - - return &Client{ - HTTPClient: &http.Client{ - Transport: transport, - Timeout: c.DownloadTimeout, - }, - baseUrl: baseUrl, - resolver: resolver, - } -} - -func (c *Client) String() string { - return c.baseUrl -} - -func (c *Client) URL(path string) string { - url := strings.TrimSuffix(c.baseUrl, "/") - url += "/" - url += strings.TrimPrefix(path, "/") - - return url -} - -func (c *Client) Do(request Request) (r Response) { - c.resolver.Refresh(true) - - // TODO: Calculate a better deadline by making a HEAD request and a target throughput - //ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) - //defer cancel() - - url := c.URL(request.Path) - - //req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) - req, err := http.NewRequest(http.MethodGet, url, nil) - if err != nil { - r.Error = fmt.Errorf("building request to %s: %w", url, err) - return - } - - req.Header = request.Header - log.Debugf("%s %s", req.Method, req.URL.String()) - resp, err := c.HTTPClient.Do(req) - if err != nil { - r.Error = fmt.Errorf("performing %s to %q: %w", req.Method, req.URL.String(), err) - return - } - - resp.Header.Add(clientHeader, c.String()) - r.HTTPResponse = resp - - return -} diff --git a/go.mod b/go.mod index 7dc7cc2..125ee84 100644 --- a/go.mod +++ b/go.mod @@ -1,16 +1,16 @@ module roob.re/refractor -go 1.18 +go 1.21 require ( + github.com/rs/dnscache v0.0.0-20211102005908-e0241e321417 github.com/sirupsen/logrus v1.8.1 + github.com/yelinaung/go-haikunator v0.0.0-20220607145230-74ef2cbd6d59 golang.org/x/exp v0.0.0-20220602145555-4a0574d9293f gopkg.in/yaml.v3 v3.0.1 ) require ( - github.com/rs/dnscache v0.0.0-20211102005908-e0241e321417 // indirect - github.com/yelinaung/go-haikunator v0.0.0-20220607145230-74ef2cbd6d59 // indirect golang.org/x/sync v0.0.0-20190423024810-112230192c58 // indirect golang.org/x/sys v0.0.0-20211019181941-9d821ace8654 // indirect ) diff --git a/helm/refractor/values-roobre.yaml b/helm/refractor/values-roobre.yaml new file mode 100644 index 0000000..ff803f0 --- /dev/null +++ b/helm/refractor/values-roobre.yaml @@ -0,0 +1,21 @@ +image: + pullPolicy: Always + +config: + peekSizeMiBs: 2 + peekTimeout: 2s + +ingress: + enabled: true + className: internal + hosts: + - host: refractor.terabox.moe + paths: + - path: / + pathType: ImplementationSpecific + # Legacy + - host: flexo.terabox.moe + paths: + - path: / + pathType: ImplementationSpecific + tls: [] diff --git a/pool/peeker/peeker.go b/pool/peeker/peeker.go deleted file mode 100644 index ccd5c80..0000000 --- a/pool/peeker/peeker.go +++ /dev/null @@ -1,51 +0,0 @@ -package peeker - -import ( - "context" - "errors" - "io" - "time" -) - -type Peeker struct { - SizeBytes int64 - Timeout time.Duration -} - -var ErrPeekTimeout = errors.New("peek timed out") - -type peekResult struct { - buf []byte - err error -} - -// Peek attempts to get a few bytes from the body within some time. -func (p *Peeker) Peek(body io.Reader) ([]byte, error) { - ctx, cancel := context.WithTimeout(context.Background(), p.Timeout) - defer cancel() - - readChan := p.readContext(ctx, body) - - select { - case result := <-readChan: - return result.buf, result.err - case <-ctx.Done(): - return nil, ErrPeekTimeout - } -} - -func (p *Peeker) readContext(ctx context.Context, body io.Reader) chan peekResult { - res := make(chan peekResult) - - go func() { - result := peekResult{} - result.buf, result.err = io.ReadAll(io.LimitReader(body, p.SizeBytes)) - - select { - case <-ctx.Done(): - case res <- result: - } - }() - - return res -} diff --git a/pool/peeker/peeker_test.go b/pool/peeker/peeker_test.go deleted file mode 100644 index 86ecabf..0000000 --- a/pool/peeker/peeker_test.go +++ /dev/null @@ -1,120 +0,0 @@ -package peeker_test - -import ( - "bytes" - "errors" - "io" - "io/ioutil" - "roob.re/refractor/pool/peeker" - "strings" - "testing" - "time" -) - -const ( - part1 = "lorem ipsum " - part2 = "dolor sit amet" - full = "lorem ipsum dolor sit amet" -) - -func TestPeeker_Peeks_Some_Bytes(t *testing.T) { - t.Parallel() - - reader := strings.NewReader(full) - pk := peeker.Peeker{ - SizeBytes: int64(len(part1)), - Timeout: 1 * time.Second, - } - - read, err := pk.Peek(reader) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(read, []byte(part1)) { - t.Fatal("read is expected to be part1") - } - - rest, err := ioutil.ReadAll(reader) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(rest, []byte(part2)) { - t.Fatal("read is expected to be part1") - } -} - -func TestPeeker_Peeks(t *testing.T) { - t.Parallel() - - for _, tc := range []struct { - name string - len int - }{ - {name: "All_Bytes", len: len(full)}, - {name: "No_Extra_Bytes", len: len(full) + 10}, - } { - tc := tc - t.Run(tc.name, func(t *testing.T) { - reader := strings.NewReader(full) - peeker := peeker.Peeker{ - SizeBytes: int64(tc.len), - Timeout: 1 * time.Second, - } - - read, err := peeker.Peek(reader) - if err != nil { - t.Fatal(err) - } - - if !bytes.Equal(read, []byte(full)) { - t.Fatal("read is expected to be part1") - } - - rest, err := ioutil.ReadAll(reader) - if err != nil { - t.Fatal(err) - } - - if len(rest) != 0 { - t.Fatal("read is expected to be part1") - } - }) - } -} - -type delayReader struct { - io.Reader -} - -func (dr delayReader) Read(buf []byte) (int, error) { - time.Sleep(1 * time.Second) - return dr.Reader.Read(buf) -} - -func TestPeeker_Times_Out(t *testing.T) { - t.Parallel() - - reader := delayReader{strings.NewReader(full)} - pk := peeker.Peeker{ - SizeBytes: int64(len(part1)), - Timeout: 500 * time.Millisecond, - } - - read, err := pk.Peek(reader) - if !errors.Is(err, peeker.ErrPeekTimeout) { - t.Fatal("peeker did not time out") - } - - if len(read) != 0 { - t.Fatal("peeker returned more than 0 bytes on timeout") - } - - time.Sleep(1 * time.Second) - - rest, err := io.ReadAll(reader) - if !bytes.Equal(rest, []byte(part2)) { - t.Fatal("peeker left unexpected stuf fin the buffer") - } -} diff --git a/pool/pool.go b/pool/pool.go index 6c94234..b77541f 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -1,173 +1,93 @@ package pool import ( - "errors" + "context" "fmt" - log "github.com/sirupsen/logrus" - "io" "net/http" - "roob.re/refractor/client" + + log "github.com/sirupsen/logrus" + "roob.re/refractor/names" - "roob.re/refractor/pool/peeker" "roob.re/refractor/provider/types" "roob.re/refractor/stats" "roob.re/refractor/worker" - "strings" - "time" ) type Pool struct { - Config - stats *stats.Stats - peeker peeker.Peeker - namer func() string - - clients chan *client.Client - requests chan client.Request -} + workers int + stats *stats.Stats + provider types.Provider + namer func() string -type Config struct { - // Retries controls how many times a request is re-enqueued after a retryable error occurs. - // Errors are considered retryable if they occur before writing anything to the client. - Retries int `yaml:"retries"` - // Workers is the amount of workers that will serve requests in parallel. It should be higher that the amount of - // expected connections to refractor, otherwise requests will be serialized. - Workers int `yaml:"workers"` - - // PeekSizeMiBs is the amount of bytes to peek before starting to feed the response back to the client. - // If PeekSizeMiBs are not transferred within PeekTimeout, the request is aborted and requeued to another mirror. - PeekSizeMiBs int64 `yaml:"peekSizeMiBs"` - // PeekTimeout is the amount of time to give for PeekSizeBytes to be read before switching to another mirror. - PeekTimeout time.Duration `yaml:"peekTimeout"` + mirrors chan string + requests chan worker.RequestResponse } -func New(config Config, stats *stats.Stats) *Pool { +func New(provider types.Provider, stats *stats.Stats) *Pool { return &Pool{ - Config: config, + provider: provider, stats: stats, + mirrors: make(chan string), + requests: make(chan worker.RequestResponse), namer: names.Haiku, - clients: make(chan *client.Client), - requests: make(chan client.Request), - peeker: peeker.Peeker{ - SizeBytes: config.PeekSizeMiBs * 1024 * 1024, - Timeout: config.PeekTimeout, - }, } } -func (p *Pool) Feed(provider types.Provider) { - log.Infof("Starting to feed mirrors to the pool") - for { - url, err := provider.Mirror() - if err != nil { - log.Errorf("Provided returned an error: %v", err) - time.Sleep(10 * time.Second) - } - p.clients <- client.NewClient(client.Config{}, url) - } -} +func (p *Pool) Start(ctx context.Context) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() -func (p *Pool) Run() { - for i := 0; i < p.Workers; i++ { - log.Debugf("Starting worker manager thread #%d", i) - go p.work() + for i := 0; i < p.stats.NumWorkers; i++ { + go func() { + p.manageWorker(ctx) + }() } -} -func (p *Pool) work() { - for cli := range p.clients { - worker := worker.Worker{ - Client: cli, - Stats: p.stats, - Name: p.namer(), - } - log.Error(worker.Work(p.requests)) - p.stats.Remove(worker.String()) - } + return p.feedMirrors(ctx) } -func (p *Pool) ServeHTTP(rw http.ResponseWriter, r *http.Request) { - retries := 0 +func (p *Pool) feedMirrors(ctx context.Context) error { for { - if retries > p.Config.Retries { - log.Errorf("Max retries for %s exhausted", r.URL.Path) - rw.WriteHeader(http.StatusInternalServerError) - return + mirror, err := p.provider.Mirror() + if err != nil { + return fmt.Errorf("getting mirror from provider: %w", err) } - err, retryable := p.tryRequest(r, rw) - if err == nil { - return + select { + case <-ctx.Done(): + return ctx.Err() + case p.mirrors <- mirror: + continue } - - log.Errorf("%v", err) - if !retryable { - return - } - - log.Warnf("Retrying %s", r.URL.Path) - retries++ } } -func (p *Pool) tryRequest(r *http.Request, rw http.ResponseWriter) (error, bool) { - responseChan := make(chan client.Response) - request := client.Request{ - Path: r.URL.Path, - ResponseChan: responseChan, - Header: r.Header, - } - - log.Debugf("Dispatching request %s to workers", request.Path) - p.requests <- request - response := <-responseChan - if response.Error != nil { - return fmt.Errorf("%s%s errored: %w", response.Worker, request.Path, response.Error), true - } - - if response.HTTPResponse.StatusCode >= 400 { - // TODO: Hack: Archlinux mirrors are somehow expected to return 404 for .sig files. - // For this reason, we do not attempt to retry 404s for .sig files. - if !strings.HasSuffix(r.URL.Path, ".db.sig") { - return fmt.Errorf("%s%s returned non-200 status: %d", response.Worker, request.Path, response.HTTPResponse.StatusCode), true - } - } - - written, err := p.writeResponse(response.HTTPResponse, rw) - response.Done(written) +func (p *Pool) Do(r *http.Request) (*http.Response, error) { + respChan := make(chan worker.ResponseErr) - if err != nil { - err = fmt.Errorf("writing %s%s to client: %w", response.Worker, request.Path, err) - return err, written == 0 + p.requests <- worker.RequestResponse{ + Request: r, + ResponseCh: respChan, } - return nil, false + re := <-respChan + return re.Response, re.Err } -func (p *Pool) writeResponse(response *http.Response, rw http.ResponseWriter) (int64, error) { - // Peek body before writing headers - peeked, err := p.peeker.Peek(response.Body) - if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) { - return 0, fmt.Errorf("peeking response body: %w", err) - } - - for header, values := range response.Header { - for _, value := range values { - rw.Header().Add(header, value) +func (p *Pool) manageWorker(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case mirror := <-p.mirrors: + worker := worker.Worker{ + Mirror: mirror, + Stats: p.stats, + Name: p.namer(), + } + + log.Errorf("%s terminated: %v", worker.Name, worker.Work(p.requests)) + p.stats.Remove(worker.String()) } } - - rw.WriteHeader(response.StatusCode) - peekedWritten, err := rw.Write(peeked) - if err != nil { - return int64(peekedWritten), fmt.Errorf("writing peeked body: %w", err) - } - - restWritten, err := io.Copy(rw, response.Body) - written := int64(peekedWritten) + restWritten - if err != nil { - return written, fmt.Errorf("writing body: %w", err) - } - - return written, nil } diff --git a/provider/providers/fake/fake.go b/provider/providers/fake/fake.go new file mode 100644 index 0000000..0069834 --- /dev/null +++ b/provider/providers/fake/fake.go @@ -0,0 +1,18 @@ +package fake + +import ( + "fmt" + "math/rand" +) + +type Fake struct { + Mirrors []string +} + +func (f Fake) Mirror() (string, error) { + if len(f.Mirrors) == 0 { + return "", fmt.Errorf("no mirrors available") + } + + return f.Mirrors[rand.Intn(len(f.Mirrors))], nil +} diff --git a/refractor.yaml b/refractor.yaml index 87913d9..cdf4e69 100644 --- a/refractor.yaml +++ b/refractor.yaml @@ -1,10 +1,15 @@ -workers: 8 -goodThroughputMiBs: 10 -peekSizeMiBs: 1 -peekTimeout: 1s +# Number of workers to start. This controls the amount of parallel chunk downloads. +# This should be set reasonably higher than the number of pacman parallel downloads. +#workers: 8 -preDownloadTimeout: 500ms -downloadTimeout: 1m +# Number of top workers. Mirrors that are ranked below this number in throughput will be rotated out of the pool +# and replaced with new random mirrors. +#topWorkers: 4 + +# Throughput considered "good" regardless of worker rank. Mirrors that are currently transmitting data at a +# rate higher than this are never rotated out, regardless of their rank. +# It should be set to a value _smaller_ than `(your ISP bandwidth in MiB/s)/(workers)`. +goodThroughputMiBs: 4 provider: archlinux: diff --git a/refractor/refractor.go b/refractor/refractor.go new file mode 100644 index 0000000..36fc439 --- /dev/null +++ b/refractor/refractor.go @@ -0,0 +1,259 @@ +package refractor + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "strings" + "sync" + "time" + + log "github.com/sirupsen/logrus" + + "roob.re/refractor/pool" + "roob.re/refractor/stats" +) + +type Refractor struct { + Config + Pool *pool.Pool + + buffers sync.Pool +} + +type Config struct { + ChunkSize int64 + ChunkTimeout time.Duration +} + +func (c Config) WithDefaults() Config { + const ( + defaultChunkSize = 4 << 20 // 4 MiB. + defaultChunkTimeout = 5 * time.Second + ) + + if c.ChunkSize == 0 { + c.ChunkSize = defaultChunkSize + } + + if c.ChunkTimeout == 0 { + c.ChunkTimeout = defaultChunkTimeout + } + + return c +} + +func New(c Config, pool *pool.Pool) *Refractor { + return &Refractor{ + Config: c.WithDefaults(), + Pool: pool, + buffers: sync.Pool{ + New: func() any { + return &bytes.Buffer{} + }, + }, + } +} + +type responseErr struct { + err error + response *http.Response +} + +func (rf *Refractor) ServeHTTP(rw http.ResponseWriter, r *http.Request) { + url := r.URL.String() + + // Archlinux hack: Mirrors return 404 for .db.sig files. + // TODO: Mirror-specific hacks should be a on a different, possibly config-driven object that wraps Refractor. + if strings.HasSuffix(url, ".db.sig") { + rw.WriteHeader(http.StatusNotFound) + return + } + + // Archlinux quirk: .db files change very often between mirrors, splitting them is almost guaranteed to return a + // corrupted file, so they are handled to a single mirror. + if strings.HasSuffix(url, ".db") { + rf.handlePlain(rw, r) + return + } + + // Other requests are refracted across mirrors. + rf.handleRefracted(rw, r) +} + +func (rf *Refractor) handlePlain(rw http.ResponseWriter, r *http.Request) { + url := r.URL.String() + + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + log.Errorf("building GET request for %q: %v", url, err) + rw.WriteHeader(http.StatusInternalServerError) + return + } + + br := <-rf.retryRequest(req) + if br.err != nil { + log.Errorf("GET request for %q failed: %v", url, err) + rw.WriteHeader(http.StatusBadGateway) + return + } + + _, err = io.Copy(rw, br.response.Body) + if err != nil { + log.Errorf("writing GET body: %v", err) + } +} + +func (rf *Refractor) handleRefracted(rw http.ResponseWriter, r *http.Request) { + url := r.URL.String() + + headReq, err := http.NewRequest(http.MethodHead, url, nil) + if err != nil { + log.Errorf("building HEAD request for %q: %v", url, err) + rw.WriteHeader(http.StatusInternalServerError) + return + } + + headReq.Header.Add("accept-encoding", "identity") // Prevent server from gzipping response. + br := <-rf.retryRequest(headReq) + + var responseChannels []chan responseErr + + size := br.response.ContentLength + start := int64(0) + for start < size { + end := start + rf.ChunkSize + if end > size { + end = size + } + + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + log.Errorf("building ranged retryRequest for %q: %v", url, err) + rw.WriteHeader(http.StatusInternalServerError) + return + } + + req.Header.Add("range", fmt.Sprintf("bytes=%d-%d", start, end)) + // Prevent servers from gzipping request, as that would break ranges across servers. + req.Header.Add("accept-encoding", "identity") + responseChannels = append(responseChannels, rf.retryRequest(req)) + + start = end + 1 // Server returns [start-end], both inclusive, so next request should start on end + 1. + } + + rw.Header().Add("content-length", fmt.Sprint(br.response.ContentLength)) + for _, rc := range responseChannels { + responseErr := <-rc + if responseErr.err != nil { + log.Errorf("Reading resopnse from channel: %v", err) + rw.WriteHeader(http.StatusInternalServerError) + return + } + + _, err := io.Copy(rw, responseErr.response.Body) + if err != nil { + log.Errorf("Writing response chunk: %v", err) + rw.WriteHeader(http.StatusInternalServerError) + return + } + + responseErr.response.Body.Close() + } +} + +func (rf *Refractor) retryRequest(r *http.Request) chan responseErr { + respChan := make(chan responseErr) + go func() { + const retries = 5 + try := 0 + for { + try++ + + response, err := rf.request(r) + if err != nil { + log.Errorf("[%d/%d] Requesting %s[%s]: %v", try, retries, r.URL.Path, r.Header.Get("range"), err) + if try < retries { + continue + } + + log.Errorf("Giving up on %s[%s]: %v", r.URL.Path, r.Header.Get("range"), err) + + respChan <- responseErr{ + err: err, + } + + return + } + + respChan <- responseErr{ + response: response, + } + + return + } + }() + + return respChan +} + +func (rf *Refractor) request(r *http.Request) (*http.Response, error) { + ctx, cancel := context.WithTimeout(context.Background(), rf.ChunkTimeout) + defer cancel() + + r = r.WithContext(ctx) + + expectedStatus := http.StatusOK + if r.Header.Get("range") != "" { + expectedStatus = http.StatusPartialContent + } + + response, err := rf.Pool.Do(r) + if err != nil { + return nil, err + } + + defer response.Body.Close() + + if response.StatusCode != expectedStatus { + return nil, fmt.Errorf("got status %d, expected %d", response.StatusCode, expectedStatus) + } + + // If this is a HEAD request there is no need to copy the body. + if r.Method == http.MethodHead { + return response, nil + } + + buf := rf.buffers.Get().(*bytes.Buffer) + buf.Reset() + + body := response.Body + // Asynchronously wait for context and close body if copy takes too long. + go func() { + <-ctx.Done() + err := body.Close() + if err != nil { + log.Errorf("Closing body due to context timeout: %v", err) + } + }() + + n, err := io.Copy(buf, body) + if err != nil { + return nil, err + } + + if n != response.ContentLength { + return nil, fmt.Errorf("expected to read bytes %d but read %d instead", response.ContentLength, n) + } + + response.Body = &stats.ReaderWrapper{ + Underlying: buf, + OnDone: func(_ uint64) { + rf.buffers.Put(buf) + }, + } + + return response, nil +} diff --git a/refractor/refractor_test.go b/refractor/refractor_test.go new file mode 100644 index 0000000..acf438d --- /dev/null +++ b/refractor/refractor_test.go @@ -0,0 +1,92 @@ +package refractor_test + +import ( + "bytes" + "context" + "crypto/rand" + "io" + mrand "math/rand" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "roob.re/refractor/pool" + "roob.re/refractor/provider/providers/fake" + "roob.re/refractor/refractor" + "roob.re/refractor/stats" +) + +type requestJournal struct { + entries []*http.Request + mtx sync.Mutex +} + +func (j *requestJournal) Log(r *http.Request) { + j.mtx.Lock() + defer j.mtx.Unlock() + + j.entries = append(j.entries, r) +} + +func Test_Refractor(t *testing.T) { + journal := requestJournal{} + + rubbish, err := io.ReadAll(io.LimitReader(rand.Reader, 50<<20)) + if err != nil { + t.Fatalf("error reading rubbish: %v", err) + } + + mirrors := make([]string, 5) + for i := range mirrors { + s := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + journal.Log(r) + + if mrand.Float32() < 0.2 { + rw.WriteHeader(http.StatusGatewayTimeout) + return + } + + http.ServeContent(rw, r, "rubbish", time.Now(), bytes.NewReader(rubbish)) + })) + + mirrors[i] = s.URL + t.Cleanup(func() { + s.Close() + }) + } + + pool := pool.New( + fake.Fake{Mirrors: mirrors}, + stats.New(stats.Config{}), + ) + + go func() { + err := pool.Start(context.Background()) + if err != nil { + t.Errorf("starting pool: %v", err) + } + }() + + r := refractor.New(refractor.Config{}, pool) + server := httptest.NewServer(r) + + response, err := http.Get(server.URL + "/rubbish") + if err != nil { + t.Fatalf("refractor returned error: %v", err) + } + + body, err := io.ReadAll(response.Body) + if err != nil { + t.Fatalf("cannot read response body error: %v", err) + } + + if !bytes.Equal(body, rubbish) { + t.Fatalf("body does not equal rubbish") + } + + if len(journal.entries) <= 1 { + t.Fatalf("server pool did not get the expected number of requests") + } +} diff --git a/server/server.go b/server/server.go index e536d5f..db712e7 100644 --- a/server/server.go +++ b/server/server.go @@ -1,37 +1,32 @@ package server import ( + "context" "fmt" - log "github.com/sirupsen/logrus" - "gopkg.in/yaml.v3" "io" "net/http" - "roob.re/refractor/client" + + log "github.com/sirupsen/logrus" + "gopkg.in/yaml.v3" + "roob.re/refractor/pool" "roob.re/refractor/provider/providers" "roob.re/refractor/provider/types" + "roob.re/refractor/refractor" "roob.re/refractor/stats" - "time" ) type Config struct { - Pool pool.Config `yaml:",inline"` - Client client.Config `yaml:",inline"` - Stats stats.Config `yaml:",inline"` + Stats stats.Config `yaml:",inline"` + Refractor refractor.Config `yaml:",inline"` // Provider contains the name of the chosen provider, and provider-specific config. Provider map[string]yaml.Node } -const ( - defaultPeekSizeMiBs = 1.0 - defaultPeekTimeout = 4 * time.Second - defaultRetries = 3 -) - type Server struct { - pool *pool.Pool - provider types.Provider + pool *pool.Pool + refractor *refractor.Refractor } func New(configFile io.Reader) (*Server, error) { @@ -41,9 +36,6 @@ func New(configFile io.Reader) (*Server, error) { return nil, fmt.Errorf("unmarshalling config: %w", err) } - // Both pool and stats share the number of workers, as a hack we use pool.Config as the source of truth. - config.Stats.NumWorkers = config.Pool.Workers - var provider types.Provider for pName, yamlConfig := range config.Provider { pBuilder, found := providers.Map[pName] @@ -67,34 +59,25 @@ func New(configFile io.Reader) (*Server, error) { break } - if config.Pool.PeekSizeMiBs == 0 { - log.Infof("Defaulting PeekSizeMiBs to %.1f", defaultPeekSizeMiBs) - config.Pool.PeekSizeMiBs = defaultPeekSizeMiBs - } - - if config.Pool.PeekTimeout == 0 { - log.Infof("Defaulting PeekTimeout to %s", defaultPeekTimeout) - config.Pool.PeekTimeout = defaultPeekTimeout - } - - if config.Pool.Retries == 0 { - log.Infof("Defaulting Retries to %d", defaultRetries) - config.Pool.Retries = defaultRetries - } + p := pool.New( + provider, + stats.New(config.Stats), + ) return &Server{ - provider: provider, - pool: pool.New( - config.Pool, - stats.New(config.Stats), + pool: p, + refractor: refractor.New( + config.Refractor, + p, ), }, nil } func (s *Server) Run(address string) error { - go s.pool.Run() - go s.pool.Feed(s.provider) + ctx := context.Background() + + go s.pool.Start(ctx) log.Infof("Listening on %s", address) - return http.ListenAndServe(address, s.pool) + return http.ListenAndServe(address, s.refractor) } diff --git a/stats/readerWrapper.go b/stats/readerWrapper.go new file mode 100644 index 0000000..a907959 --- /dev/null +++ b/stats/readerWrapper.go @@ -0,0 +1,45 @@ +package stats + +import ( + "io" + "sync" +) + +// ReaderWrapper wraps an io.Reader and calls OnDone with the total number of bytes read from the underlying reader +// when the underlying reader returns the first error, that can be EOF, or when it gets Close()d. +// OnDone is called at most once. +// If the underlying reader implements io.Closer, ReaderWrapper will forward calls to Close() to it. Otherwise, the +// Close() operation always returns nil. +type ReaderWrapper struct { + Underlying io.Reader + OnDone func(totalRead uint64) + + read uint64 + once sync.Once +} + +func (w *ReaderWrapper) Read(p []byte) (n int, err error) { + n, err = w.Underlying.Read(p) + w.read += uint64(n) + + if err != nil { + w.report() + } + + return +} + +func (w *ReaderWrapper) Close() (err error) { + if closer, ok := w.Underlying.(io.Closer); ok { + err = closer.Close() + } + + w.report() + return +} + +func (w *ReaderWrapper) report() { + w.once.Do(func() { + w.OnDone(w.read) + }) +} diff --git a/stats/stats.go b/stats/stats.go index 8dcca64..0586897 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -2,18 +2,20 @@ package stats import ( "fmt" - log "github.com/sirupsen/logrus" - "golang.org/x/exp/slices" "sync" "time" + + log "github.com/sirupsen/logrus" + "golang.org/x/exp/slices" ) const ( - minSampleBytes = 1024 - // Requests below minSampleBytes will not be counted UNLESS they took more than minDurationForMinBytes - minDurationForMinBytes = 1 * time.Second - minSampleDuration = 50 * time.Millisecond - maxSamples = 20.0 + // Requests that transfer less than minSampleBytes AND take less than maxDurationForMinBytes will not + // be taken into account for ranking. + minSampleBytes = 512 << 10 // 512KiB + maxDurationForMinBytes = 1 * time.Second + + maxSamples = 15.0 ) type Stats struct { @@ -24,7 +26,7 @@ type Stats struct { } type Config struct { - NumWorkers int `yaml:"-"` + NumWorkers int `yaml:"workers"` NumTopWorkers int `yaml:"topWorkers"` GoodThroughputMiBs float64 `yaml:"goodThroughputMiBs"` @@ -40,14 +42,14 @@ func (c Config) WithDefaults() Config { } if c.GoodThroughputMiBs == 0 { - c.GoodThroughputMiBs = 10 + c.GoodThroughputMiBs = 2 } return c } type Sample struct { - Bytes int64 + Bytes uint64 Duration time.Duration } @@ -84,13 +86,8 @@ func (s *Stats) Remove(name string) { } func (s *Stats) Update(name string, sample Sample) { - if sample.Bytes < minSampleBytes && sample.Duration < minDurationForMinBytes { - log.Infof("Dropping sample for %s, not enough bytes to measure (%d)", name, sample.Bytes) - return - } - - if sample.Duration < minSampleDuration { - log.Infof("Dropping sample for %s, not transaction too short to measure (%v)", name, sample.Duration) + if sample.Bytes < minSampleBytes && sample.Duration < maxDurationForMinBytes { + log.Tracef("Dropping sample for %s, not significant enough (%d bytes in %v)", name, sample.Bytes, sample.Duration) return } diff --git a/worker/worker.go b/worker/worker.go index 51590e1..da78f38 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -1,60 +1,112 @@ package worker import ( + "errors" "fmt" + "net/http" + "net/url" + "strings" + "time" + log "github.com/sirupsen/logrus" - "roob.re/refractor/client" + "roob.re/refractor/stats" - "time" +) + +var ( + ErrSlowMirror = errors.New("slow mirror") + ErrChannelClosed = errors.New("request channel closed") + ErrRequest = errors.New("error performing request") + ErrCode = errors.New("received non-ok status code") ) type Worker struct { - Name string - Stats *stats.Stats - Client *client.Client + Name string + Mirror string + Timeout time.Duration + Stats *stats.Stats + Client *http.Client +} + +type RequestResponse struct { + Request *http.Request + ResponseCh chan ResponseErr +} + +type ResponseErr struct { + Response *http.Response + Err error } func (w Worker) String() string { - return fmt.Sprintf("%s:%s", w.Name, w.Client.String()) + return fmt.Sprintf("%s:%s", w.Name, w.Mirror) } -func (w Worker) Work(requests chan client.Request) error { +func (w Worker) Work(requests chan RequestResponse) error { log.Debugf("Starting worker %s", w.String()) + client := w.Client + if client == nil { + client = http.DefaultClient + } + for req := range requests { - if !w.Stats.GoodPerformer(w.String()) { - go func() { - requests <- req - }() + response, err := func(*http.Request) (*http.Response, error) { + if !w.Stats.GoodPerformer(w.String()) { + return nil, ErrSlowMirror + } - return fmt.Errorf("worker %s is not a good performer, evicting and requeuing request", w.String()) - } + httpReq, err := w.toMirror(req.Request) + if err != nil { + return nil, err + } + + log.Infof("%s %s %s", w.Name, httpReq.Method, httpReq.URL) + + start := time.Now() + response, err := client.Do(httpReq) + if err != nil { + return nil, err + } - log.Infof("Requesting %s:%s", w.Name, w.Client.URL(req.Path)) + if response.StatusCode > 400 { + return nil, fmt.Errorf("%w: %d", ErrCode, response.StatusCode) + } - start := time.Now() - response := w.Client.Do(req) - response.Worker = w.String() + response.Body = &stats.ReaderWrapper{ + Underlying: response.Body, + OnDone: func(written uint64) { + sample := stats.Sample{ + Bytes: written, + Duration: time.Since(start), + } + log.Debugf("%s: %s", w.Name, sample.String()) + go w.Stats.Update(w.String(), sample) + }, + } - if response.Error != nil { - go func() { - requests <- req - }() + return response, nil + }(req.Request) - return fmt.Errorf("worker %s returned error for %s, sacrificing: %v", w.String(), req.Path, response.Error) + req.ResponseCh <- ResponseErr{ + response, + err, } - response.Done = func(written int64) { - sample := stats.Sample{ - Bytes: written, - Duration: time.Since(start), - } - log.Infof("%s %s:%s", sample.String(), w.Name, w.Client.URL(req.Path)) - go w.Stats.Update(w.String(), sample) + if err != nil { + return ErrRequest } + } + + return ErrChannelClosed +} - req.ResponseChan <- response +func (w Worker) toMirror(r *http.Request) (*http.Request, error) { + newUrl, err := url.Parse(strings.TrimSuffix(w.Mirror, "/") + r.URL.Path) + if err != nil { + return nil, fmt.Errorf("building url: %w", err) } - return fmt.Errorf("request channel closed") + r.URL = newUrl + return r, nil } From 5d9fedbc91a8d326b5334faf07f0b706d4e68279 Mon Sep 17 00:00:00 2001 From: Roberto Santalla Date: Sun, 1 Oct 2023 13:24:50 +0200 Subject: [PATCH 02/23] refactor: make retries configurable --- refractor/refractor.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/refractor/refractor.go b/refractor/refractor.go index 36fc439..d085857 100644 --- a/refractor/refractor.go +++ b/refractor/refractor.go @@ -26,12 +26,14 @@ type Refractor struct { type Config struct { ChunkSize int64 ChunkTimeout time.Duration + Retries int } func (c Config) WithDefaults() Config { const ( defaultChunkSize = 4 << 20 // 4 MiB. defaultChunkTimeout = 5 * time.Second + defaultRetries = 5 ) if c.ChunkSize == 0 { @@ -42,6 +44,10 @@ func (c Config) WithDefaults() Config { c.ChunkTimeout = defaultChunkTimeout } + if c.Retries == 0 { + c.Retries = defaultRetries + } + return c } @@ -167,7 +173,7 @@ func (rf *Refractor) handleRefracted(rw http.ResponseWriter, r *http.Request) { func (rf *Refractor) retryRequest(r *http.Request) chan responseErr { respChan := make(chan responseErr) go func() { - const retries = 5 + retries := rf.Retries try := 0 for { try++ @@ -230,9 +236,11 @@ func (rf *Refractor) request(r *http.Request) (*http.Response, error) { buf.Reset() body := response.Body + // Asynchronously wait for context and close body if copy takes too long. go func() { <-ctx.Done() + err := body.Close() if err != nil { log.Errorf("Closing body due to context timeout: %v", err) From c14ecc1c658a51409b280a1b46a5b319fb50fa84 Mon Sep 17 00:00:00 2001 From: Roberto Santalla Date: Sun, 1 Oct 2023 13:25:06 +0200 Subject: [PATCH 03/23] refractor/test: add slow writers to test --- refractor/refractor_test.go | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/refractor/refractor_test.go b/refractor/refractor_test.go index acf438d..1aadaf8 100644 --- a/refractor/refractor_test.go +++ b/refractor/refractor_test.go @@ -30,6 +30,16 @@ func (j *requestJournal) Log(r *http.Request) { j.entries = append(j.entries, r) } +type slowWriter struct { + http.ResponseWriter + delay time.Duration +} + +func (s slowWriter) Write(p []byte) (int, error) { + time.Sleep(s.delay) + return s.ResponseWriter.Write(p) +} + func Test_Refractor(t *testing.T) { journal := requestJournal{} @@ -43,12 +53,18 @@ func Test_Refractor(t *testing.T) { s := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { journal.Log(r) + reader := io.ReadSeeker(bytes.NewReader(rubbish)) + if mrand.Float32() < 0.2 { rw.WriteHeader(http.StatusGatewayTimeout) return } - http.ServeContent(rw, r, "rubbish", time.Now(), bytes.NewReader(rubbish)) + if mrand.Float32() < 0.2 { + rw = slowWriter{ResponseWriter: rw, delay: time.Second} + } + + http.ServeContent(rw, r, "rubbish", time.Now(), reader) })) mirrors[i] = s.URL @@ -69,7 +85,10 @@ func Test_Refractor(t *testing.T) { } }() - r := refractor.New(refractor.Config{}, pool) + r := refractor.New(refractor.Config{ + ChunkTimeout: 500 * time.Millisecond, // Very short timeout to make test faster. + Retries: 10, // Make very unlikely for the test to fail due to unlucky random timeouts/errors. + }, pool) server := httptest.NewServer(r) response, err := http.Get(server.URL + "/rubbish") From 7863bbb0486ae1d3b66e34f3481bf3340daa9ae1 Mon Sep 17 00:00:00 2001 From: Roberto Santalla Date: Sun, 1 Oct 2023 13:48:10 +0200 Subject: [PATCH 04/23] refractor: specify chunkSize in MiBs --- refractor.yaml | 16 +++++++++++++++- refractor/refractor.go | 14 +++++++------- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/refractor.yaml b/refractor.yaml index cdf4e69..67d30cb 100644 --- a/refractor.yaml +++ b/refractor.yaml @@ -9,7 +9,21 @@ # Throughput considered "good" regardless of worker rank. Mirrors that are currently transmitting data at a # rate higher than this are never rotated out, regardless of their rank. # It should be set to a value _smaller_ than `(your ISP bandwidth in MiB/s)/(workers)`. -goodThroughputMiBs: 4 +#goodThroughputMiBs: 4 + +# Split package downloads across mirrors in chunks of this size. +# Total memory requirements of refractor roughly equals (chunkSizeMiBs * workers). +#chunkSizeMiBs: 4 + +# Refractor will give up on a mirror when a chunk takes longer than this to transfer the full chunk. +# Mirror will be immediately evicted, and request retried to another mirror in the pool. +#chunkTimeout: "5s" + +# Retry failed chunk downloads this many times. If the first chunk reaches this amount of failures, refractor will emit +# a non-OK status code. If this happens to a chunk other than the first, refractor will cancel the request after +# writing a partial body. As refractor advertised the expected Content-Lenght, HTTP clients should detect this and +# discard the data that has already been transmitted. +#retries: 5 provider: archlinux: diff --git a/refractor/refractor.go b/refractor/refractor.go index d085857..765f7db 100644 --- a/refractor/refractor.go +++ b/refractor/refractor.go @@ -24,20 +24,20 @@ type Refractor struct { } type Config struct { - ChunkSize int64 - ChunkTimeout time.Duration - Retries int + ChunkSizeMiBs int `yaml:"chunkSizeMiBs"` + ChunkTimeout time.Duration `yaml:"chunkTimeout"` + Retries int `yaml:"retries"` } func (c Config) WithDefaults() Config { const ( - defaultChunkSize = 4 << 20 // 4 MiB. + defaultChunkSize = 4 defaultChunkTimeout = 5 * time.Second defaultRetries = 5 ) - if c.ChunkSize == 0 { - c.ChunkSize = defaultChunkSize + if c.ChunkSizeMiBs == 0 { + c.ChunkSizeMiBs = defaultChunkSize } if c.ChunkTimeout == 0 { @@ -130,7 +130,7 @@ func (rf *Refractor) handleRefracted(rw http.ResponseWriter, r *http.Request) { size := br.response.ContentLength start := int64(0) for start < size { - end := start + rf.ChunkSize + end := start + int64(rf.ChunkSizeMiBs)<<20 if end > size { end = size } From a6e2a8b24ca0158a50f5430a03340a552a01fc2c Mon Sep 17 00:00:00 2001 From: Roberto Santalla Date: Sun, 1 Oct 2023 13:48:47 +0200 Subject: [PATCH 05/23] refractor: consume pending responses in aborted requests --- refractor/refractor.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/refractor/refractor.go b/refractor/refractor.go index 765f7db..fdbc0ed 100644 --- a/refractor/refractor.go +++ b/refractor/refractor.go @@ -150,23 +150,33 @@ func (rf *Refractor) handleRefracted(rw http.ResponseWriter, r *http.Request) { start = end + 1 // Server returns [start-end], both inclusive, so next request should start on end + 1. } + // Defer fully consuming and closing all response channels to avoid leaking buffers and workers, in the event an error occurs. + defer func() { + for _, rc := range responseChannels { + re := <-rc + if re.response != nil { + re.response.Body.Close() + } + } + }() + rw.Header().Add("content-length", fmt.Sprint(br.response.ContentLength)) for _, rc := range responseChannels { - responseErr := <-rc - if responseErr.err != nil { + re := <-rc + if re.err != nil { log.Errorf("Reading resopnse from channel: %v", err) rw.WriteHeader(http.StatusInternalServerError) return } - _, err := io.Copy(rw, responseErr.response.Body) + _, err := io.Copy(rw, re.response.Body) if err != nil { log.Errorf("Writing response chunk: %v", err) rw.WriteHeader(http.StatusInternalServerError) return } - responseErr.response.Body.Close() + re.response.Body.Close() } } From 865bc7cdaa03e37bf35d47ae257dd82289dc482e Mon Sep 17 00:00:00 2001 From: Roberto Santalla Date: Sun, 1 Oct 2023 14:54:19 +0200 Subject: [PATCH 06/23] README: adapt to v2 changes, overall tweaks --- README.md | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 803df4b..361e005 100644 --- a/README.md +++ b/README.md @@ -4,19 +4,29 @@ Refractor is linux mirror load-balancer, which parallelizes requests between an ## Working principle -The core of Refractor is a pool of workers, to which HTTP requests are routed. A worker mapped to a particular mirror performs the request to said mirror and proxies the response to the user. +The core of Refractor is a pool of workers, to which HTTP requests are routed. A worker draws a random mirror from a list, and proxies the response to the user. -Before considering a request, workers look how well they are performing compared to their peers. If they are on the bottom two positions of the ranking, they will resign and get out of the pool. The pool will automatically add a worker for a different mirror to compensate. +Refractor aims to work in a stateless, self-balancing way. It tries to achieve it by picking up mirrors from a large list (referred as a provider), and routing requests to them while measuring how those perform. If a mirror is among the bottom N performers, it gets rotated out. Mirrors that fail to complete requests in a given time are also immediately rotated, while mirrors that perform above a given threshold are never rotated out even if the rest perform better. After a certain amount of requests, this should stabilize in a pool of fast mirrors. -This way, the pool of active mirrors is constantly rotating slow mirrors out of the pool, based on their current performance. This eliminates the need of continuously benchmarking mirrors, and avoids having to assume that mirrors' bandwidth is constant in time. +In an attempt to maximize downlink and speed up the rotation of slow mirrors, requests are split up in several chunks of a configurable size, typically a few megabytes, that are themselves routed to different mirrors. If a mirror returns an error for a chunk, or fails to download the chunk in time, the mirror that failed is immediately rotated out and the piece is re-queued. -## Intended usage +Mirror throughput is measured using a rolling average, so if a mirror performed well in the past but doesn't anymore, for example because it is currently dealing with a large amount of traffic, it gets rotated out. -Refractor is intended to be run either locally, or in a local network where linux machines reside. This is because Refractor drops mirrors aggressively based on mirror-to-client throughput, and therefore it will not be effective if clients with different effective throughput to the host running Refractor connect to it. Moreover, for this same reason, bad actors could deliberately simulate bad latencies and kick good mirrors out of the pool, degrading service quality for others. +## Usage + +The provided docker image can be run directly with no arguments and it will use the default config (`refractor.yaml`). + +```shell +docker run ghcr.io/roobre/refractor:$VERSION +``` + +The default config will spin up Refractor to load-balance across Archlinux mirrors located in western Europe. To serve mirrors from different regions, check out the provider configuration below. + +The updated config file can be mounted in the docker container in `/config/refractor.yaml`. ## Providers -Refractor is designed to be distribution-agnostic, as long as a Provider that can fetch a mirror and feed it to the pool is implemented. Refractor automatically sorts the pool of mirrors automatically by the throughput they provide as request come by. This means that providers do not need to sort or benchmark mirrors before supplying them to the pool. +Refractor is designed to be distribution-agnostic, as long as a Provider that can fetch a mirror and feed it to the pool is implemented. As refractor automatically keeps fast mirrors and discards slow ones, providers do not need to sort or benchmark mirrors before supplying them to the pool. It is recommended, however, for providers to apply coarse-grain filter such as physical location, as doing so will allow the pool to stabilize faster. @@ -27,9 +37,6 @@ For the moment, the following providers exist: The Arch Linux provider feeds mirrors from `https://archlinux.org/mirrors/status/json/`, after applying some user-defined filters. For now, filtering by country and by score is allowed. ```yaml -workers: 8 -goodThroughputMiBs: 10 - provider: archlinux: maxScore: 5 @@ -47,9 +54,6 @@ The Command provider allows to feed to the pool mirror URLs obtained from runnin > ⚠️ Refractor rotates mirrors from the pool very aggressively, which means the specified command will be called multiple times and very often. Please make sure this command is not hammering any public API without appropriate caching. ```yaml -workers: 8 -goodThroughputMiBs: 10 - provider: command: #shell: /bin/bash # Defaults to $SHELL, then to /bin/sh @@ -77,12 +81,6 @@ As an example, the Arch Linux mirror provider retrieves the list of mirrors from Implementing providers in code is encouraged as it provides maximum flexibility to control caching and configuration options. PRs are welcome! -## Advanced features - -- **Average window**: Only the last few throughput measurments are averaged when checking how a mirror is performing. This allow rotating out mirrors that start to behave poorly even if they have been very performant in the past. -- **Absolutely good throughput**: Mirrors that perform better than `goodThroughputMiBs` will not be rotated from the pool, even if they are the least performant. -- **Request peeking**: Refractor will "peek" the first few megs (`peekSizeMiBs`) from the connection to a mirror before passing the response to the client. If this peek operation takes too long (`peekTimeout`), the request will be requeued to a different mirror. - ## Trivia - The name "Refractor" is a gimmick to [Reflector](https://wiki.archlinux.org/title/Reflector) From 3d3e31b42d4ad0c4f34d52a3042804c42de8666d Mon Sep 17 00:00:00 2001 From: Roberto Santalla Date: Thu, 5 Oct 2023 22:48:29 +0200 Subject: [PATCH 07/23] refractor: mirror all upstream headers --- refractor/refractor.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/refractor/refractor.go b/refractor/refractor.go index fdbc0ed..66901fb 100644 --- a/refractor/refractor.go +++ b/refractor/refractor.go @@ -106,6 +106,12 @@ func (rf *Refractor) handlePlain(rw http.ResponseWriter, r *http.Request) { return } + for k, vs := range br.response.Header { + for _, v := range vs { + rw.Header().Add(k, v) + } + } + _, err = io.Copy(rw, br.response.Body) if err != nil { log.Errorf("writing GET body: %v", err) @@ -160,7 +166,11 @@ func (rf *Refractor) handleRefracted(rw http.ResponseWriter, r *http.Request) { } }() - rw.Header().Add("content-length", fmt.Sprint(br.response.ContentLength)) + for k, vs := range br.response.Header { + for _, v := range vs { + rw.Header().Add(k, v) + } + } for _, rc := range responseChannels { re := <-rc if re.err != nil { From 3403ac402db5641273ff267047147cc15bb22a40 Mon Sep 17 00:00:00 2001 From: Roberto Santalla Date: Thu, 5 Oct 2023 22:49:20 +0200 Subject: [PATCH 08/23] refractor: log errors if an unexpected amount of bytes are written --- refractor/refractor.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/refractor/refractor.go b/refractor/refractor.go index 66901fb..78d56b0 100644 --- a/refractor/refractor.go +++ b/refractor/refractor.go @@ -156,7 +156,8 @@ func (rf *Refractor) handleRefracted(rw http.ResponseWriter, r *http.Request) { start = end + 1 // Server returns [start-end], both inclusive, so next request should start on end + 1. } - // Defer fully consuming and closing all response channels to avoid leaking buffers and workers, in the event an error occurs. + // Defer fully consuming and closing all response channels to avoid leaking buffers and workers, in the event an + // error occurs. defer func() { for _, rc := range responseChannels { re := <-rc @@ -171,6 +172,8 @@ func (rf *Refractor) handleRefracted(rw http.ResponseWriter, r *http.Request) { rw.Header().Add(k, v) } } + + written := int64(0) for _, rc := range responseChannels { re := <-rc if re.err != nil { @@ -179,15 +182,22 @@ func (rf *Refractor) handleRefracted(rw http.ResponseWriter, r *http.Request) { return } - _, err := io.Copy(rw, re.response.Body) + n, err := io.Copy(rw, re.response.Body) if err != nil { log.Errorf("Writing response chunk: %v", err) rw.WriteHeader(http.StatusInternalServerError) return } + written += n + re.response.Body.Close() } + + if written != br.response.ContentLength { + log.Errorf("Wrote %d bytes of %d expected for %s", written, br.response.ContentLength, url) + return + } } func (rf *Refractor) retryRequest(r *http.Request) chan responseErr { From 2351c5db9d2c682969b5d356fd8a4f76867c321d Mon Sep 17 00:00:00 2001 From: Roberto Santalla Date: Thu, 5 Oct 2023 22:49:43 +0200 Subject: [PATCH 09/23] refractor: close response channels so deferred routine does not block forever --- refractor/refractor.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/refractor/refractor.go b/refractor/refractor.go index 78d56b0..3d93cb2 100644 --- a/refractor/refractor.go +++ b/refractor/refractor.go @@ -203,6 +203,8 @@ func (rf *Refractor) handleRefracted(rw http.ResponseWriter, r *http.Request) { func (rf *Refractor) retryRequest(r *http.Request) chan responseErr { respChan := make(chan responseErr) go func() { + defer close(respChan) + retries := rf.Retries try := 0 for { From ae6e018f144446538d7898bfe9eff7cd1b3c59ad Mon Sep 17 00:00:00 2001 From: Roberto Santalla Date: Thu, 5 Oct 2023 22:50:55 +0200 Subject: [PATCH 10/23] refractor: error out if contentLength is not set --- refractor/refractor.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/refractor/refractor.go b/refractor/refractor.go index 3d93cb2..10cdc7b 100644 --- a/refractor/refractor.go +++ b/refractor/refractor.go @@ -259,6 +259,10 @@ func (rf *Refractor) request(r *http.Request) (*http.Response, error) { return nil, fmt.Errorf("got status %d, expected %d", response.StatusCode, expectedStatus) } + if response.ContentLength == -1 { + return nil, fmt.Errorf("got -1 content length for response") + } + // If this is a HEAD request there is no need to copy the body. if r.Method == http.MethodHead { return response, nil @@ -284,6 +288,7 @@ func (rf *Refractor) request(r *http.Request) (*http.Response, error) { return nil, err } + // Check we read the expected length. if n != response.ContentLength { return nil, fmt.Errorf("expected to read bytes %d but read %d instead", response.ContentLength, n) } From 4bfb8d6ad3635cfe8fe18157e4eb4e120cfe5ed7 Mon Sep 17 00:00:00 2001 From: Roberto Santalla Date: Thu, 5 Oct 2023 22:51:17 +0200 Subject: [PATCH 11/23] readerWrapper: use simple bool instead of sync.Once --- stats/readerWrapper.go | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/stats/readerWrapper.go b/stats/readerWrapper.go index a907959..1b8b4eb 100644 --- a/stats/readerWrapper.go +++ b/stats/readerWrapper.go @@ -2,20 +2,20 @@ package stats import ( "io" - "sync" ) // ReaderWrapper wraps an io.Reader and calls OnDone with the total number of bytes read from the underlying reader -// when the underlying reader returns the first error, that can be EOF, or when it gets Close()d. -// OnDone is called at most once. -// If the underlying reader implements io.Closer, ReaderWrapper will forward calls to Close() to it. Otherwise, the -// Close() operation always returns nil. +// as an argument. +// OnDone is called at most once when the underlying reader returns the first error, that can be EOF, or when it get +// Close()d. +// If the underlying reader also implements io.Closer, ReaderWrapper.Close() will also call Close() on it. Otherwise, +// the Close() operation always returns nil. type ReaderWrapper struct { Underlying io.Reader OnDone func(totalRead uint64) - read uint64 - once sync.Once + read uint64 + reported bool } func (w *ReaderWrapper) Read(p []byte) (n int, err error) { @@ -39,7 +39,10 @@ func (w *ReaderWrapper) Close() (err error) { } func (w *ReaderWrapper) report() { - w.once.Do(func() { - w.OnDone(w.read) - }) + if w.reported { + return + } + + w.reported = true + w.OnDone(w.read) } From 975e02457396e501ca8cb9c68dc4cc4fc82b536d Mon Sep 17 00:00:00 2001 From: Roberto Santalla Date: Thu, 5 Oct 2023 22:51:41 +0200 Subject: [PATCH 12/23] stats,worker: minor renaming, expose throughput to worker so it can log it --- stats/stats.go | 29 ++++++++++++++++++----------- worker/worker.go | 5 +++-- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/stats/stats.go b/stats/stats.go index 0586897..3fb34a1 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -11,11 +11,11 @@ import ( const ( // Requests that transfer less than minSampleBytes AND take less than maxDurationForMinBytes will not - // be taken into account for ranking. - minSampleBytes = 512 << 10 // 512KiB + // be taken into account for ranking. + minSampleBytes = 512 << 10 // 512KiB maxDurationForMinBytes = 1 * time.Second - maxSamples = 15.0 + maxSamples = 15.0 ) type Stats struct { @@ -86,8 +86,14 @@ func (s *Stats) Remove(name string) { } func (s *Stats) Update(name string, sample Sample) { + // Samples for very few bytes are discarded, as the delta is too small to produce a meaningful throughput + // calculation. However, if the amount of bytes is small but the transaction still took a substantial amount of + // time, we keep it, as it is meaningfully telling us that this mirror is shit. if sample.Bytes < minSampleBytes && sample.Duration < maxDurationForMinBytes { - log.Tracef("Dropping sample for %s, not significant enough (%d bytes in %v)", name, sample.Bytes, sample.Duration) + log.Debugf( + "Dropping sample for %s, not significant enough (%d bytes in %v)", + name, sample.Bytes, sample.Duration, + ) return } @@ -113,12 +119,12 @@ func (s *Stats) Update(name string, sample Sample) { s.workers[name] = w } -func (s *Stats) GoodPerformer(name string) bool { +func (s *Stats) Stats(name string) (float64, bool) { entries := s.workerList() if len(entries) <= s.NumTopWorkers { log.Debugf("Less than %d workers ranked, cannot evict any yet", s.NumWorkers) - return true + return 0, true } position := slices.IndexFunc(entries, func(entry namedEntry) bool { @@ -127,18 +133,19 @@ func (s *Stats) GoodPerformer(name string) bool { if position == -1 { log.Debugf("Worker %s is not ranked yet", name) - return true + return 0, true } log.Debugf("Worker %s is in position %d/%d", name, position+1, len(s.workers)) - if entries[position].throughput > s.GoodThroughputMiBs*1024*1024 { + throughput := entries[position].throughput + if throughput > s.GoodThroughputMiBs*1024*1024 { log.Debugf("Worker %s has an absolutely good throughput", name) - return true + return throughput, true } - // We're good performers if we're earlier than the last two positions - return position < s.NumTopWorkers + // We're good performers if we're among NumTopWorkers. + return throughput, position < s.NumTopWorkers } func (s *Stats) report() { diff --git a/worker/worker.go b/worker/worker.go index da78f38..033142e 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -52,8 +52,9 @@ func (w Worker) Work(requests chan RequestResponse) error { for req := range requests { response, err := func(*http.Request) (*http.Response, error) { - if !w.Stats.GoodPerformer(w.String()) { - return nil, ErrSlowMirror + throughput, goodPerformer := w.Stats.Stats(w.Name) + if !goodPerformer { + return nil, fmt.Errorf("%w: %.2fMiB/s", ErrSlowMirror, throughput/1024/1024) } httpReq, err := w.toMirror(req.Request) From bb6f7d0b923e4b1509e2688968f8942470a25d93 Mon Sep 17 00:00:00 2001 From: Roberto Santalla Date: Thu, 5 Oct 2023 22:53:34 +0200 Subject: [PATCH 13/23] pool: remove unused woker count --- pool/pool.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pool/pool.go b/pool/pool.go index b77541f..e34cb16 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -14,7 +14,6 @@ import ( ) type Pool struct { - workers int stats *stats.Stats provider types.Provider namer func() string From ccf2182e2b5141be1b9e155f1120f43414e0a410 Mon Sep 17 00:00:00 2001 From: Roberto Santalla Date: Thu, 5 Oct 2023 22:54:36 +0200 Subject: [PATCH 14/23] stats: increase default number of workers --- stats/stats.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stats/stats.go b/stats/stats.go index 3fb34a1..e45b320 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -34,7 +34,7 @@ type Config struct { func (c Config) WithDefaults() Config { if c.NumWorkers == 0 { - c.NumWorkers = 8 + c.NumWorkers = 12 } if c.NumTopWorkers == 0 { From 6c3592dfaabdf6bfd92ae354359a074a197906be Mon Sep 17 00:00:00 2001 From: Roberto Santalla Date: Thu, 5 Oct 2023 22:54:43 +0200 Subject: [PATCH 15/23] server: formatting --- server/server.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/server/server.go b/server/server.go index db712e7..880f1a9 100644 --- a/server/server.go +++ b/server/server.go @@ -17,8 +17,8 @@ import ( ) type Config struct { - Stats stats.Config `yaml:",inline"` - Refractor refractor.Config `yaml:",inline"` + Stats stats.Config `yaml:",inline"` + Refractor refractor.Config `yaml:",inline"` // Provider contains the name of the chosen provider, and provider-specific config. Provider map[string]yaml.Node @@ -67,14 +67,14 @@ func New(configFile io.Reader) (*Server, error) { return &Server{ pool: p, refractor: refractor.New( - config.Refractor, + config.Refractor, p, ), }, nil } func (s *Server) Run(address string) error { - ctx := context.Background() + ctx := context.Background() go s.pool.Start(ctx) From f18f737e30c91c3eae20f3154d6e715365182a28 Mon Sep 17 00:00:00 2001 From: Roberto Santalla Date: Sun, 5 Nov 2023 12:35:40 +0100 Subject: [PATCH 16/23] refractor: reduce chunk timeout --- refractor/refractor.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/refractor/refractor.go b/refractor/refractor.go index 10cdc7b..07fa859 100644 --- a/refractor/refractor.go +++ b/refractor/refractor.go @@ -32,7 +32,7 @@ type Config struct { func (c Config) WithDefaults() Config { const ( defaultChunkSize = 4 - defaultChunkTimeout = 5 * time.Second + defaultChunkTimeout = 3 * time.Second defaultRetries = 5 ) @@ -71,8 +71,9 @@ type responseErr struct { func (rf *Refractor) ServeHTTP(rw http.ResponseWriter, r *http.Request) { url := r.URL.String() - // Archlinux hack: Mirrors return 404 for .db.sig files. // TODO: Mirror-specific hacks should be a on a different, possibly config-driven object that wraps Refractor. + + // Archlinux hack: Mirrors return 404 for .db.sig files. if strings.HasSuffix(url, ".db.sig") { rw.WriteHeader(http.StatusNotFound) return From 890b5a1e7df3211b2a3b0f3e64f143d7b3d7ed72 Mon Sep 17 00:00:00 2001 From: Roberto Santalla Date: Sun, 5 Nov 2023 13:39:39 +0100 Subject: [PATCH 17/23] refractor/test: log mismatching amount of bytes before fatalling --- refractor/refractor_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/refractor/refractor_test.go b/refractor/refractor_test.go index 1aadaf8..7bbe6a4 100644 --- a/refractor/refractor_test.go +++ b/refractor/refractor_test.go @@ -97,8 +97,12 @@ func Test_Refractor(t *testing.T) { } body, err := io.ReadAll(response.Body) + if len(body) != len(rubbish) { + t.Errorf("Received %d bytes of %d expected", len(body), len(rubbish)) + } + if err != nil { - t.Fatalf("cannot read response body error: %v", err) + t.Fatalf("cannot read response body: %v", err) } if !bytes.Equal(body, rubbish) { From 2922c69a639c9b72fe349cb8338f33ffa6a80303 Mon Sep 17 00:00:00 2001 From: Roberto Santalla Date: Sun, 5 Nov 2023 13:40:00 +0100 Subject: [PATCH 18/23] refractor: process each request serially, but pipelined --- refractor/refractor.go | 180 +++++++++++++++++++---------------------- 1 file changed, 85 insertions(+), 95 deletions(-) diff --git a/refractor/refractor.go b/refractor/refractor.go index 07fa859..f0f5e2e 100644 --- a/refractor/refractor.go +++ b/refractor/refractor.go @@ -63,11 +63,6 @@ func New(c Config, pool *pool.Pool) *Refractor { } } -type responseErr struct { - err error - response *http.Response -} - func (rf *Refractor) ServeHTTP(rw http.ResponseWriter, r *http.Request) { url := r.URL.String() @@ -100,20 +95,22 @@ func (rf *Refractor) handlePlain(rw http.ResponseWriter, r *http.Request) { return } - br := <-rf.retryRequest(req) - if br.err != nil { + response, err := rf.retryRequest(req) + if err != nil { log.Errorf("GET request for %q failed: %v", url, err) rw.WriteHeader(http.StatusBadGateway) return } - for k, vs := range br.response.Header { + defer response.Body.Close() + + for k, vs := range response.Header { for _, v := range vs { rw.Header().Add(k, v) } } - _, err = io.Copy(rw, br.response.Body) + _, err = io.Copy(rw, response.Body) if err != nil { log.Errorf("writing GET body: %v", err) } @@ -129,113 +126,106 @@ func (rf *Refractor) handleRefracted(rw http.ResponseWriter, r *http.Request) { return } - headReq.Header.Add("accept-encoding", "identity") // Prevent server from gzipping response. - br := <-rf.retryRequest(headReq) - - var responseChannels []chan responseErr - - size := br.response.ContentLength - start := int64(0) - for start < size { - end := start + int64(rf.ChunkSizeMiBs)<<20 - if end > size { - end = size - } - - req, err := http.NewRequest(http.MethodGet, url, nil) - if err != nil { - log.Errorf("building ranged retryRequest for %q: %v", url, err) - rw.WriteHeader(http.StatusInternalServerError) - return - } - - req.Header.Add("range", fmt.Sprintf("bytes=%d-%d", start, end)) - // Prevent servers from gzipping request, as that would break ranges across servers. - req.Header.Add("accept-encoding", "identity") - responseChannels = append(responseChannels, rf.retryRequest(req)) - - start = end + 1 // Server returns [start-end], both inclusive, so next request should start on end + 1. + headReq.Header.Add("accept-encoding", "identity") // Prevent server from gzipping headResponse. + headResponse, err := rf.retryRequest(headReq) + if err != nil { + log.Errorf("HEAD request to %s did not succeed: %v", url, err) + rw.WriteHeader(http.StatusBadGateway) + return } - // Defer fully consuming and closing all response channels to avoid leaking buffers and workers, in the event an - // error occurs. - defer func() { - for _, rc := range responseChannels { - re := <-rc - if re.response != nil { - re.response.Body.Close() - } - } - }() - - for k, vs := range br.response.Header { + for k, vs := range headResponse.Header { for _, v := range vs { rw.Header().Add(k, v) } } - written := int64(0) - for _, rc := range responseChannels { - re := <-rc - if re.err != nil { - log.Errorf("Reading resopnse from channel: %v", err) - rw.WriteHeader(http.StatusInternalServerError) - return - } - - n, err := io.Copy(rw, re.response.Body) - if err != nil { - log.Errorf("Writing response chunk: %v", err) - rw.WriteHeader(http.StatusInternalServerError) - return + // This goroutine runs in parallel with the main request loop, writing bodies to the client and reporting back + // errors so the main loop can stop if an error occurs. + // This allows writing bytes to the client at the same time we are reading the next chunk. + bodyChan := make(chan io.ReadCloser) + errChan := make(chan error) + go func() { + for body := range bodyChan { + _, err := io.Copy(rw, body) + if err != nil { + errChan <- err + } + // Always close body, even if an error occurred. + body.Close() } - written += n + // Close errChan to signal the main loop when we're done. + close(errChan) + }() - re.response.Body.Close() - } + // Run main loop as an anonymous function to preserve semantics of defer and early return. + func() { + defer close(bodyChan) // Ensure bodyChan is closed even if we return early. - if written != br.response.ContentLength { - log.Errorf("Wrote %d bytes of %d expected for %s", written, br.response.ContentLength, url) - return - } -} - -func (rf *Refractor) retryRequest(r *http.Request) chan responseErr { - respChan := make(chan responseErr) - go func() { - defer close(respChan) - - retries := rf.Retries - try := 0 - for { - try++ + size := headResponse.ContentLength + start := int64(0) + for start < size { + end := start + int64(rf.ChunkSizeMiBs)<<20 + if end > size { + end = size + } - response, err := rf.request(r) + req, err := http.NewRequest(http.MethodGet, url, nil) if err != nil { - log.Errorf("[%d/%d] Requesting %s[%s]: %v", try, retries, r.URL.Path, r.Header.Get("range"), err) - if try < retries { - continue - } - - log.Errorf("Giving up on %s[%s]: %v", r.URL.Path, r.Header.Get("range"), err) + log.Errorf("building ranged retryRequest for %q: %v", url, err) + rw.WriteHeader(http.StatusInternalServerError) + return + } - respChan <- responseErr{ - err: err, - } + req.Header.Add("range", fmt.Sprintf("bytes=%d-%d", start, end)) + // Prevent servers from gzipping request, as that would break ranges across servers. + // This actually does mirrors a favor, preventing them from spending CPU cycles on compressing in-transport + // linux packages which are already compressed. + req.Header.Add("accept-encoding", "identity") + chunkResponse, err := rf.retryRequest(req) + if err != nil { + log.Errorf("Requesting chunk of %q: %v", url, err) return } - respChan <- responseErr{ - response: response, + select { + case prevErr := <-errChan: + log.Errorf("Error writing chunk of %q:", prevErr) + case bodyChan <- chunkResponse.Body: } - return + start = end + 1 // Server returns [start-end], both inclusive, so next request should start on end + 1. } }() - return respChan + // Wait for the sending routine to finish and log the final error, if any. + if err, hasErr := <-errChan; hasErr { + log.Errorf("Error writing final chunk of %q:", err) + } +} + +func (rf *Refractor) retryRequest(r *http.Request) (*http.Response, error) { + retries := rf.Retries + try := 0 + for { + try++ + + response, err := rf.request(r) + if err != nil { + log.Errorf("[%d/%d] Requesting %s[%s]: %v", try, retries, r.URL.Path, r.Header.Get("range"), err) + if try < retries { + continue + } + + log.Errorf("Giving up on %s[%s]: %v", r.URL.Path, r.Header.Get("range"), err) + return nil, err + + } + + return response, nil + } } func (rf *Refractor) request(r *http.Request) (*http.Response, error) { @@ -274,7 +264,7 @@ func (rf *Refractor) request(r *http.Request) (*http.Response, error) { body := response.Body - // Asynchronously wait for context and close body if copy takes too long. + // Asynchronously wait for context and close body if it gets cancelled. go func() { <-ctx.Done() @@ -284,12 +274,12 @@ func (rf *Refractor) request(r *http.Request) (*http.Response, error) { } }() + // io.Copy will return early if the source body is cancelled above. n, err := io.Copy(buf, body) if err != nil { return nil, err } - // Check we read the expected length. if n != response.ContentLength { return nil, fmt.Errorf("expected to read bytes %d but read %d instead", response.ContentLength, n) } From b7b41f5196fefde88b9fb896b3ae72ea15931370 Mon Sep 17 00:00:00 2001 From: Roberto Santalla Date: Sun, 5 Nov 2023 15:51:54 +0100 Subject: [PATCH 19/23] refractor: remove unused variable --- refractor/refractor.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/refractor/refractor.go b/refractor/refractor.go index f0f5e2e..f31dc6f 100644 --- a/refractor/refractor.go +++ b/refractor/refractor.go @@ -207,15 +207,14 @@ func (rf *Refractor) handleRefracted(rw http.ResponseWriter, r *http.Request) { } func (rf *Refractor) retryRequest(r *http.Request) (*http.Response, error) { - retries := rf.Retries try := 0 for { try++ response, err := rf.request(r) if err != nil { - log.Errorf("[%d/%d] Requesting %s[%s]: %v", try, retries, r.URL.Path, r.Header.Get("range"), err) - if try < retries { + log.Errorf("[%d/%d] Requesting %s[%s]: %v", try, rf.Retries, r.URL.Path, r.Header.Get("range"), err) + if try < rf.Retries { continue } From 74efa1bbb384344cd4824db652b3591f5cad8d1b Mon Sep 17 00:00:00 2001 From: Roberto Santalla Date: Sun, 5 Nov 2023 15:54:11 +0100 Subject: [PATCH 20/23] worker: return actual request error --- worker/worker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/worker/worker.go b/worker/worker.go index 033142e..0bc0d61 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -16,7 +16,6 @@ import ( var ( ErrSlowMirror = errors.New("slow mirror") ErrChannelClosed = errors.New("request channel closed") - ErrRequest = errors.New("error performing request") ErrCode = errors.New("received non-ok status code") ) @@ -94,8 +93,9 @@ func (w Worker) Work(requests chan RequestResponse) error { err, } + // In addition to reporting the error to the channel, break request loop as we don't trust this mirror anymore. if err != nil { - return ErrRequest + return err } } From 3811609d6b3b8de5ca4bee4481e42cbb419f47f7 Mon Sep 17 00:00:00 2001 From: Roberto Santalla Date: Sun, 5 Nov 2023 15:55:38 +0100 Subject: [PATCH 21/23] readerWrapper: slightly improve comment --- stats/readerWrapper.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stats/readerWrapper.go b/stats/readerWrapper.go index 1b8b4eb..ef5aa4f 100644 --- a/stats/readerWrapper.go +++ b/stats/readerWrapper.go @@ -7,8 +7,8 @@ import ( // ReaderWrapper wraps an io.Reader and calls OnDone with the total number of bytes read from the underlying reader // as an argument. // OnDone is called at most once when the underlying reader returns the first error, that can be EOF, or when it get -// Close()d. -// If the underlying reader also implements io.Closer, ReaderWrapper.Close() will also call Close() on it. Otherwise, +// Close()d, whatever happens first. +// If the underlying reader also implements io.Closer, calling Close() will also call Close() on it. Otherwise, // the Close() operation always returns nil. type ReaderWrapper struct { Underlying io.Reader From 36b49cded8a43fb7fc19d9da933a8faa284c715d Mon Sep 17 00:00:00 2001 From: Roberto Santalla Date: Sun, 5 Nov 2023 16:45:07 +0100 Subject: [PATCH 22/23] README: improve wording a bit --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 361e005..60a58bd 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,14 @@ # 🪞 Refractor -Refractor is linux mirror load-balancer, which parallelizes requests between an extremely dynamic pool of mirrors. Mirrors in the pool are constantly monitored for throughput, and slowest mirrors are continuously rotated out of the pool and replaced by new ones. +Refractor is linux mirror load-balancer, which parallelizes requests between an extremely dynamic pool of mirrors. Mirrors in the pool are constantly monitored for throughput, and slowest mirrors are continuously rotated out of the pool and replaced by new ones obtained at random. ## Working principle The core of Refractor is a pool of workers, to which HTTP requests are routed. A worker draws a random mirror from a list, and proxies the response to the user. -Refractor aims to work in a stateless, self-balancing way. It tries to achieve it by picking up mirrors from a large list (referred as a provider), and routing requests to them while measuring how those perform. If a mirror is among the bottom N performers, it gets rotated out. Mirrors that fail to complete requests in a given time are also immediately rotated, while mirrors that perform above a given threshold are never rotated out even if the rest perform better. After a certain amount of requests, this should stabilize in a pool of fast mirrors. +Refractor aims to work in a stateless, self-balancing way. It tries to achieve this by picking up mirrors from a large list (referred as a Provider), and routing requests to them while measuring how the mirrors perform. If a mirror is among the bottom N performers, it gets rotated out of the pool. Mirrors that fail to complete requests in a given time are also immediately rotated out, while mirrors that perform above a given threshold are never rotated out even if they are among the bottom performers. After a certain amount of requests, this should stabilize in a pool of fast mirrors. -In an attempt to maximize downlink and speed up the rotation of slow mirrors, requests are split up in several chunks of a configurable size, typically a few megabytes, that are themselves routed to different mirrors. If a mirror returns an error for a chunk, or fails to download the chunk in time, the mirror that failed is immediately rotated out and the piece is re-queued. +In an attempt to maximize downlink and speed up the rotation of slow mirrors, requests are split up in several chunks of a configurable size, typically a few megabytes, that are themselves routed to different mirrors. Chunks are buffered in memory and served to clients in a pipelined fashion. If a mirror returns an error for a chunk, or fails to download the chunk in time, the mirror that failed is immediately rotated out and the chunk is re-queued to another mirror. Mirror throughput is measured using a rolling average, so if a mirror performed well in the past but doesn't anymore, for example because it is currently dealing with a large amount of traffic, it gets rotated out. From 00b8c462f380a1fa964e8290eb9847bd8203a68c Mon Sep 17 00:00:00 2001 From: Roberto Santalla Date: Sun, 5 Nov 2023 18:52:13 +0100 Subject: [PATCH 23/23] refactor: extract request splitting to a different func --- refractor/refractor.go | 69 +++++++++++++++++++++++++----------------- 1 file changed, 41 insertions(+), 28 deletions(-) diff --git a/refractor/refractor.go b/refractor/refractor.go index f31dc6f..b355dfb 100644 --- a/refractor/refractor.go +++ b/refractor/refractor.go @@ -126,7 +126,6 @@ func (rf *Refractor) handleRefracted(rw http.ResponseWriter, r *http.Request) { return } - headReq.Header.Add("accept-encoding", "identity") // Prevent server from gzipping headResponse. headResponse, err := rf.retryRequest(headReq) if err != nil { log.Errorf("HEAD request to %s did not succeed: %v", url, err) @@ -140,6 +139,13 @@ func (rf *Refractor) handleRefracted(rw http.ResponseWriter, r *http.Request) { } } + requests, err := rf.split(url, headResponse.ContentLength) + if err != nil { + log.Errorf("Splitting request: %v", err) + rw.WriteHeader(http.StatusInternalServerError) + return + } + // This goroutine runs in parallel with the main request loop, writing bodies to the client and reporting back // errors so the main loop can stop if an error occurs. // This allows writing bytes to the client at the same time we are reading the next chunk. @@ -159,31 +165,10 @@ func (rf *Refractor) handleRefracted(rw http.ResponseWriter, r *http.Request) { close(errChan) }() - // Run main loop as an anonymous function to preserve semantics of defer and early return. func() { - defer close(bodyChan) // Ensure bodyChan is closed even if we return early. - - size := headResponse.ContentLength - start := int64(0) - for start < size { - end := start + int64(rf.ChunkSizeMiBs)<<20 - if end > size { - end = size - } - - req, err := http.NewRequest(http.MethodGet, url, nil) - if err != nil { - log.Errorf("building ranged retryRequest for %q: %v", url, err) - rw.WriteHeader(http.StatusInternalServerError) - return - } - - req.Header.Add("range", fmt.Sprintf("bytes=%d-%d", start, end)) - // Prevent servers from gzipping request, as that would break ranges across servers. - // This actually does mirrors a favor, preventing them from spending CPU cycles on compressing in-transport - // linux packages which are already compressed. - req.Header.Add("accept-encoding", "identity") + defer close(bodyChan) // Ensure bodyChan is break the loop early. + for _, req := range requests { chunkResponse, err := rf.retryRequest(req) if err != nil { log.Errorf("Requesting chunk of %q: %v", url, err) @@ -193,17 +178,45 @@ func (rf *Refractor) handleRefracted(rw http.ResponseWriter, r *http.Request) { select { case prevErr := <-errChan: log.Errorf("Error writing chunk of %q:", prevErr) + return case bodyChan <- chunkResponse.Body: } - - start = end + 1 // Server returns [start-end], both inclusive, so next request should start on end + 1. } }() // Wait for the sending routine to finish and log the final error, if any. - if err, hasErr := <-errChan; hasErr { - log.Errorf("Error writing final chunk of %q:", err) + for err := range errChan { + log.Errorf("Error writing chunk of %q:", err) + } +} + +func (rf *Refractor) split(url string, size int64) ([]*http.Request, error) { + // Build requests + var requests []*http.Request + start := int64(0) + for start < size { + end := start + int64(rf.ChunkSizeMiBs)<<20 + if end > size { + end = size + } + + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return nil, fmt.Errorf("building ranged retryRequest for %q: %v", url, err) + } + + req.Header.Add("range", fmt.Sprintf("bytes=%d-%d", start, end)) + // Prevent servers from gzipping request, as that would break ranges across servers. + // This actually does mirrors a favor, preventing them from spending CPU cycles on compressing in-transport + // linux packages which are already compressed. + req.Header.Add("accept-encoding", "identity") + + requests = append(requests, req) + + start = end + 1 // Server returns [start-end], both inclusive, so next request should start on end + 1. } + + return requests, nil } func (rf *Refractor) retryRequest(r *http.Request) (*http.Response, error) {