From 650b0c36982f2313d66576f76c5f6582e18302a1 Mon Sep 17 00:00:00 2001 From: zyxkad Date: Mon, 12 Aug 2024 13:12:09 -0700 Subject: [PATCH] refactored all errors --- api/config.go | 4 + cluster/http.go | 13 +- cluster/storage.go | 413 +++++++++++++++++- cluster/tempfile_test.go | 180 ++++++++ config.go | 152 +++++++ config/config.go | 33 +- config/server.go | 7 +- handler.go | 10 +- limited/api_rate.go | 52 ++- main.go | 124 +++--- storage/storage_webdav.go | 2 +- sync.go | 862 -------------------------------------- util.go | 46 -- utils/http.go | 43 ++ utils/util.go | 59 ++- 15 files changed, 972 insertions(+), 1028 deletions(-) create mode 100644 cluster/tempfile_test.go delete mode 100644 sync.go diff --git a/api/config.go b/api/config.go index e091b6b..a32ffcd 100644 --- a/api/config.go +++ b/api/config.go @@ -21,8 +21,11 @@ package api import ( "encoding/json" + "errors" ) +var ErrPreconditionFailed = errors.New("Precondition Failed") + type ConfigHandler interface { json.Marshaler json.Unmarshaler @@ -31,5 +34,6 @@ type ConfigHandler interface { UnmarshalJSONPath(path string, data []byte) error Fingerprint() string + // DoLockedAction will execute callback if the fingerprint matches, or return ErrPreconditionFailed DoLockedAction(fingerprint string, callback func(ConfigHandler) error) error } diff --git a/cluster/http.go b/cluster/http.go index 4910444..07480e3 100644 --- a/cluster/http.go +++ b/cluster/http.go @@ -80,6 +80,14 @@ func redirectChecker(req *http.Request, via []*http.Request) error { return nil } +func (cr *Cluster) getFullURL(relpath string) (u *url.URL, err error) { + if u, err = url.Parse(cr.opts.Server); err != nil { + return + } + u.Path = path.Join(u.Path, relpath) + return +} + func (cr *Cluster) makeReq(ctx context.Context, method string, relpath string, query url.Values) (req *http.Request, err error) { return cr.makeReqWithBody(ctx, method, relpath, query, nil) } @@ -89,11 +97,10 @@ func (cr *Cluster) makeReqWithBody( method string, relpath string, query url.Values, body io.Reader, ) (req *http.Request, err error) { - var u *url.URL - if u, err = url.Parse(cr.opts.Server); err != nil { + u, err := cr.getFullURL(relpath) + if err != nil { return } - u.Path = path.Join(u.Path, relpath) if query != nil { u.RawQuery = query.Encode() } diff --git a/cluster/storage.go b/cluster/storage.go index 77173d6..1d48521 100644 --- a/cluster/storage.go +++ b/cluster/storage.go @@ -20,16 +20,21 @@ package cluster import ( + "compress/gzip" + "compress/zlib" "context" "crypto" "encoding/hex" + "errors" "fmt" "io" "net/http" "net/url" + "os" "runtime" "slices" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -68,12 +73,19 @@ type FileInfo struct { Mtime int64 `json:"mtime" avro:"mtime"` } +type RequestPath struct { + *http.Request + Path string +} + type StorageFileInfo struct { - FileInfo + Hash string + Size int64 Storages []storage.Storage + URLs map[string]RequestPath } -func (cr *Cluster) GetFileList(ctx context.Context, fileMap map[string]*StorageFileInfo, forceAll bool) (err error) { +func (cr *Cluster) GetFileList(ctx context.Context, fileMap map[string]*StorageFileInfo, forceAll bool) error { var query url.Values lastMod := cr.fileListLastMod if forceAll { @@ -86,31 +98,30 @@ func (cr *Cluster) GetFileList(ctx context.Context, fileMap map[string]*StorageF } req, err := cr.makeReqWithAuth(ctx, http.MethodGet, "/openbmclapi/files", query) if err != nil { - return + return err } res, err := cr.cachedCli.Do(req) if err != nil { - return + return err } defer res.Body.Close() switch res.StatusCode { case http.StatusOK: // case http.StatusNoContent, http.StatusNotModified: - return + return nil default: - err = utils.NewHTTPStatusErrorFromResponse(res) - return + return utils.NewHTTPStatusErrorFromResponse(res) } log.Debug("Parsing filelist body ...") zr, err := zstd.NewReader(res.Body) if err != nil { - return + return err } defer zr.Close() var files []FileInfo - if err = avro.NewDecoderForSchema(fileListSchema, zr).Decode(&files); err != nil { - return + if err := avro.NewDecoderForSchema(fileListSchema, zr).Decode(&files); err != nil { + return err } for _, f := range files { @@ -119,7 +130,7 @@ func (cr *Cluster) GetFileList(ctx context.Context, fileMap map[string]*StorageF } if ff, ok := fileMap[f.Hash]; ok { if ff.Size != f.Size { - log.Panicf("Hash conflict detected, hash of both %q (%dB) and %q (%dB) is %s", ff.Path, ff.Size, f.Path, f.Size, f.Hash) + log.Panicf("Hash conflict detected, hash of both %q (%dB) and %v (%dB) is %s", f.Path, f.Size, ff.URLs, ff.Size, f.Hash) } for _, s := range cr.storages { sto := cr.storageManager.Storages[s] @@ -129,19 +140,26 @@ func (cr *Cluster) GetFileList(ctx context.Context, fileMap map[string]*StorageF } } else { ff := &StorageFileInfo{ - FileInfo: f, + Hash: f.Hash, + Size: f.Size, Storages: make([]storage.Storage, len(cr.storages)), + URLs: make(map[string]RequestPath), } for i, s := range cr.storages { ff.Storages[i] = cr.storageManager.Storages[s] } slices.SortFunc(ff.Storages, storageIdSortFunc) + req, err := cr.makeReqWithAuth(context.Background(), http.MethodGet, f.Path, nil) + if err != nil { + return err + } + ff.URLs[req.URL.String()] = RequestPath{Request: req, Path: f.Path} fileMap[f.Hash] = ff } } cr.fileListLastMod = lastMod log.Debugf("Filelist parsed, length = %d, lastMod = %d", len(files), lastMod) - return + return nil } func storageIdSortFunc(a, b storage.Storage) int { @@ -179,15 +197,15 @@ func checkFile( pg *mpb.Progress, ) (err error) { var missingCount atomic.Int32 - addMissing := func(f FileInfo, sto storage.Storage) { + addMissing := func(f *StorageFileInfo, sto storage.Storage) { missingCount.Add(1) if info, ok := missing[f.Hash]; ok { info.Storages = append(info.Storages, sto) } else { - missing[f.Hash] = &StorageFileInfo{ - FileInfo: f, - Storages: []storage.Storage{sto}, - } + info := new(StorageFileInfo) + *info = *f + info.Storages = []storage.Storage{sto} + missing[f.Hash] = info } } @@ -280,13 +298,13 @@ func checkFile( size, ok := ssizeMap[sto][hash] if !ok { // log.Debugf("Could not found file %q", name) - addMissing(f.FileInfo, sto) + addMissing(f, sto) bar.EwmaIncrement(time.Since(start)) continue } if size != f.Size { log.TrWarnf("warn.check.modified.size", name, size, f.Size) - addMissing(f.FileInfo, sto) + addMissing(f, sto) bar.EwmaIncrement(time.Since(start)) continue } @@ -305,7 +323,7 @@ func checkFile( return ctx.Err() } wg.Add(1) - go func(f FileInfo, buf []byte, free func()) { + go func(f *StorageFileInfo, buf []byte, free func()) { defer log.RecoverPanic(nil) defer wg.Done() miss := true @@ -329,7 +347,7 @@ func checkFile( if miss { addMissing(f, sto) } - }(f.FileInfo, buf, free) + }(f, buf, free) } } wg.Wait() @@ -341,6 +359,349 @@ func checkFile( return nil } +type syncStats struct { + slots *limited.BufSlots + + totalSize int64 + okCount, failCount atomic.Int32 + totalFiles int + + pg *mpb.Progress + totalBar *mpb.Bar + lastInc atomic.Int64 +} + +func (c *HTTPClient) SyncFiles( + ctx context.Context, + manager *storage.Manager, + files map[string]*StorageFileInfo, + heavy bool, + slots int, +) error { + pg := mpb.New(mpb.WithRefreshRate(time.Second/2), mpb.WithAutoRefresh(), mpb.WithWidth(140)) + defer pg.Shutdown() + log.SetLogOutput(pg) + defer log.SetLogOutput(nil) + + missingMap := make(map[string]*StorageFileInfo) + if err := checkFile(ctx, manager, files, heavy, missingMap, pg); err != nil { + return err + } + + totalFiles := len(files) + + var stats syncStats + stats.pg = pg + stats.slots = limited.NewBufSlots(slots) + stats.totalFiles = totalFiles + + var barUnit decor.SizeB1024 + stats.lastInc.Store(time.Now().UnixNano()) + stats.totalBar = pg.AddBar(stats.totalSize, + mpb.BarRemoveOnComplete(), + mpb.BarPriority(stats.slots.Cap()), + mpb.PrependDecorators( + decor.Name(lang.Tr("hint.sync.total")), + decor.NewPercentage("%.2f"), + ), + mpb.AppendDecorators( + decor.Any(func(decor.Statistics) string { + return fmt.Sprintf("(%d + %d / %d) ", stats.okCount.Load(), stats.failCount.Load(), stats.totalFiles) + }), + decor.Counters(barUnit, "(%.1f/%.1f) "), + decor.EwmaSpeed(barUnit, "%.1f ", 30), + decor.OnComplete( + decor.EwmaETA(decor.ET_STYLE_GO, 30), "done", + ), + ), + ) + + log.TrInfof("hint.sync.start", totalFiles, utils.BytesToUnit((float64)(stats.totalSize))) + start := time.Now() + + done := make(chan []storage.Storage, 1) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + stLen := len(manager.Storages) + aliveStorages := make(map[storage.Storage]struct{}, stLen) + for _, s := range manager.Storages { + tctx, cancel := context.WithTimeout(ctx, time.Second*10) + err := s.CheckUpload(tctx) + cancel() + if err != nil { + if err := ctx.Err(); err != nil { + return err + } + log.Errorf("Storage %s does not work: %v", s.String(), err) + } else { + aliveStorages[s] = struct{}{} + } + } + if len(aliveStorages) == 0 { + err := errors.New("All storages are broken") + log.TrErrorf("error.sync.failed", err) + return err + } + if len(aliveStorages) < stLen { + log.TrErrorf("error.sync.part.working", len(aliveStorages), stLen) + select { + case <-time.After(time.Minute): + case <-ctx.Done(): + return ctx.Err() + } + } + + for _, info := range missingMap { + log.Debugf("File %s is for %s", info.Hash, joinStorageIDs(info.Storages)) + pathRes, err := c.fetchFile(ctx, &stats, info) + if err != nil { + log.TrWarnf("warn.sync.interrupted") + return err + } + go func(info *StorageFileInfo, pathRes <-chan string) { + defer log.RecordPanic() + select { + case path := <-pathRes: + // cr.syncProg.Add(1) + if path == "" { + select { + case done <- nil: // TODO: or all storage? + case <-ctx.Done(): + } + return + } + defer os.Remove(path) + // acquire slot here + slotId, buf, free := stats.slots.Alloc(ctx) + if buf == nil { + return + } + defer free() + _ = slotId + var srcFd *os.File + if srcFd, err = os.Open(path); err != nil { + return + } + defer srcFd.Close() + var failed []storage.Storage + for _, target := range info.Storages { + if _, err = srcFd.Seek(0, io.SeekStart); err != nil { + log.Errorf("Cannot seek file %q to start: %v", path, err) + continue + } + if err = target.Create(info.Hash, srcFd); err != nil { + failed = append(failed, target) + log.TrErrorf("error.sync.create.failed", target.String(), info.Hash, err) + continue + } + } + free() + srcFd.Close() + os.Remove(path) + select { + case done <- failed: + case <-ctx.Done(): + } + case <-ctx.Done(): + return + } + }(info, pathRes) + } + + for i := len(missingMap); i > 0; i-- { + select { + case failed := <-done: + for _, s := range failed { + if _, ok := aliveStorages[s]; ok { + delete(aliveStorages, s) + log.Debugf("Broken storage %d / %d", stLen-len(aliveStorages), stLen) + if len(aliveStorages) == 0 { + cancel() + err := errors.New("All storages are broken") + log.TrErrorf("error.sync.failed", err) + return err + } + } + } + case <-ctx.Done(): + log.TrWarnf("warn.sync.interrupted") + return ctx.Err() + } + } + + use := time.Since(start) + stats.totalBar.Abort(true) + pg.Wait() + + log.TrInfof("hint.sync.done", use, utils.BytesToUnit((float64)(stats.totalSize)/use.Seconds())) + return nil +} + +func (c *HTTPClient) fetchFile(ctx context.Context, stats *syncStats, f *StorageFileInfo) (<-chan string, error) { + const maxRetryCount = 10 + + slotId, buf, free := stats.slots.Alloc(ctx) + if buf == nil { + return nil, ctx.Err() + } + + pathRes := make(chan string, 1) + go func() { + defer log.RecordPanic() + defer free() + defer close(pathRes) + + var barUnit decor.SizeB1024 + var tried atomic.Int32 + tried.Store(1) + + fPath := f.Hash // TODO: show downloading URL instead? Will it be too long? + + bar := stats.pg.AddBar(f.Size, + mpb.BarRemoveOnComplete(), + mpb.BarPriority(slotId), + mpb.PrependDecorators( + decor.Name(lang.Tr("hint.sync.downloading")), + decor.Any(func(decor.Statistics) string { + tc := tried.Load() + if tc <= 1 { + return "" + } + return fmt.Sprintf("(%d/%d) ", tc, maxRetryCount) + }), + decor.Name(fPath, decor.WCSyncSpaceR), + ), + mpb.AppendDecorators( + decor.NewPercentage("%d", decor.WCSyncSpace), + decor.Counters(barUnit, "[%.1f / %.1f]", decor.WCSyncSpace), + decor.EwmaSpeed(barUnit, "%.1f", 30, decor.WCSyncSpace), + decor.OnComplete( + decor.EwmaETA(decor.ET_STYLE_GO, 30, decor.WCSyncSpace), "done", + ), + ), + ) + defer bar.Abort(true) + + interval := time.Second + for { + bar.SetCurrent(0) + hashMethod, err := getHashMethod(len(f.Hash)) + if err == nil { + var path string + if path, err = c.fetchFileWithBuf(ctx, f, hashMethod, buf, func(r io.Reader) io.Reader { + return utils.ProxyPBReader(r, bar, stats.totalBar, &stats.lastInc) + }); err == nil { + pathRes <- path + stats.okCount.Add(1) + log.Infof(lang.Tr("info.sync.downloaded"), fPath, + utils.BytesToUnit((float64)(f.Size)), + (float64)(stats.totalBar.Current())/(float64)(stats.totalSize)*100) + return + } + } + bar.SetRefill(bar.Current()) + + c := tried.Add(1) + if c > maxRetryCount { + log.TrErrorf("error.sync.download.failed", fPath, err) + break + } + log.TrErrorf("error.sync.download.failed.retry", fPath, interval, err) + select { + case <-time.After(interval): + interval *= 2 + case <-ctx.Done(): + return + } + } + stats.failCount.Add(1) + }() + return pathRes, nil +} + +func (c *HTTPClient) fetchFileWithBuf( + ctx context.Context, f *StorageFileInfo, + hashMethod crypto.Hash, buf []byte, + wrapper func(io.Reader) io.Reader, +) (path string, err error) { + var ( + req *http.Request + res *http.Response + fd *os.File + r io.Reader + ) + for _, rq := range f.URLs { + req = rq.Request + break + } + req = req.Clone(ctx) + req.Header.Set("Accept-Encoding", "gzip, deflate") + if res, err = c.Do(req); err != nil { + return + } + defer res.Body.Close() + if err = ctx.Err(); err != nil { + return + } + if res.StatusCode != http.StatusOK { + err = utils.ErrorFromRedirect(utils.NewHTTPStatusErrorFromResponse(res), res) + return + } + switch ce := strings.ToLower(res.Header.Get("Content-Encoding")); ce { + case "": + r = res.Body + case "gzip": + if r, err = gzip.NewReader(res.Body); err != nil { + err = utils.ErrorFromRedirect(err, res) + return + } + case "deflate": + if r, err = zlib.NewReader(res.Body); err != nil { + err = utils.ErrorFromRedirect(err, res) + return + } + default: + err = utils.ErrorFromRedirect(fmt.Errorf("Unexpected Content-Encoding %q", ce), res) + return + } + if wrapper != nil { + r = wrapper(r) + } + + hw := hashMethod.New() + + if fd, err = os.CreateTemp("", "*.downloading"); err != nil { + return + } + path = fd.Name() + defer func(path string) { + if err != nil { + os.Remove(path) + } + }(path) + + _, err = io.CopyBuffer(io.MultiWriter(hw, fd), r, buf) + stat, err2 := fd.Stat() + fd.Close() + if err != nil { + err = utils.ErrorFromRedirect(err, res) + return + } + if err2 != nil { + err = err2 + return + } + if t := stat.Size(); f.Size >= 0 && t != f.Size { + err = utils.ErrorFromRedirect(fmt.Errorf("File size wrong, got %d, expect %d", t, f.Size), res) + return + } else if hs := hex.EncodeToString(hw.Sum(buf[:0])); hs != f.Hash { + err = utils.ErrorFromRedirect(fmt.Errorf("File hash not match, got %s, expect %s", hs, f.Hash), res) + return + } + return +} + func getHashMethod(l int) (hashMethod crypto.Hash, err error) { switch l { case 32: @@ -352,3 +713,11 @@ func getHashMethod(l int) (hashMethod crypto.Hash, err error) { } return } + +func joinStorageIDs(storages []storage.Storage) string { + ss := make([]string, len(storages)) + for i, s := range storages { + ss[i] = s.Id() + } + return "[" + strings.Join(ss, ", ") + "]" +} diff --git a/cluster/tempfile_test.go b/cluster/tempfile_test.go new file mode 100644 index 0000000..d3d216b --- /dev/null +++ b/cluster/tempfile_test.go @@ -0,0 +1,180 @@ +/** + * OpenBmclAPI (Golang Edition) + * Copyright (C) 2024 Kevin Z + * All rights reserved + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package cluster_test + +import ( + "testing" + + "io" + "os" +) + +var datas = func() [][]byte { + datas := make([][]byte, 0x7) + for i := range len(datas) { + b := make([]byte, 0xff00+i) + for j := range len(b) { + b[j] = (byte)(i + j) + } + datas[i] = b + } + return datas +}() + +func BenchmarkCreateAndRemoveFile(t *testing.B) { + t.ReportAllocs() + buf := make([]byte, 1024) + _ = buf + for i := 0; i < t.N; i++ { + d := datas[i%len(datas)] + fd, err := os.CreateTemp("", "*.downloading") + if err != nil { + t.Fatalf("Cannot create temp file: %v", err) + } + if _, err = fd.Write(d); err != nil { + t.Errorf("Cannot write file: %v", err) + } else if err = fd.Sync(); err != nil { + t.Errorf("Cannot write file: %v", err) + } + fd.Close() + os.Remove(fd.Name()) + if err != nil { + t.FailNow() + } + } +} + +func BenchmarkWriteAndTruncateFile(t *testing.B) { + t.ReportAllocs() + buf := make([]byte, 1024) + _ = buf + fd, err := os.CreateTemp("", "*.downloading") + if err != nil { + t.Fatalf("Cannot create temp file: %v", err) + } + defer os.Remove(fd.Name()) + for i := 0; i < t.N; i++ { + d := datas[i%len(datas)] + if _, err := fd.Write(d); err != nil { + t.Fatalf("Cannot write file: %v", err) + } else if err := fd.Sync(); err != nil { + t.Fatalf("Cannot write file: %v", err) + } else if err := fd.Truncate(0); err != nil { + t.Fatalf("Cannot truncate file: %v", err) + } + } +} + +func BenchmarkWriteAndSeekFile(t *testing.B) { + t.ReportAllocs() + buf := make([]byte, 1024) + _ = buf + fd, err := os.CreateTemp("", "*.downloading") + if err != nil { + t.Fatalf("Cannot create temp file: %v", err) + } + defer os.Remove(fd.Name()) + for i := 0; i < t.N; i++ { + d := datas[i%len(datas)] + if _, err := fd.Write(d); err != nil { + t.Fatalf("Cannot write file: %v", err) + } else if err := fd.Sync(); err != nil { + t.Fatalf("Cannot write file: %v", err) + } else if _, err := fd.Seek(io.SeekStart, 0); err != nil { + t.Fatalf("Cannot seek file: %v", err) + } + } +} + +func BenchmarkParallelCreateAndRemoveFile(t *testing.B) { + t.ReportAllocs() + t.SetParallelism(4) + buf := make([]byte, 1024) + _ = buf + t.RunParallel(func(pb *testing.PB) { + for i := 0; pb.Next(); i++ { + d := datas[i%len(datas)] + fd, err := os.CreateTemp("", "*.downloading") + if err != nil { + t.Fatalf("Cannot create temp file: %v", err) + } + if _, err = fd.Write(d); err != nil { + t.Errorf("Cannot write file: %v", err) + } else if err = fd.Sync(); err != nil { + t.Errorf("Cannot write file: %v", err) + } + fd.Close() + if err := os.Remove(fd.Name()); err != nil { + t.Fatalf("Cannot remove file: %v", err) + } + if err != nil { + t.FailNow() + } + } + }) +} + +func BenchmarkParallelWriteAndTruncateFile(t *testing.B) { + t.ReportAllocs() + t.SetParallelism(4) + buf := make([]byte, 1024) + _ = buf + t.RunParallel(func(pb *testing.PB) { + fd, err := os.CreateTemp("", "*.downloading") + if err != nil { + t.Fatalf("Cannot create temp file: %v", err) + } + defer os.Remove(fd.Name()) + for i := 0; pb.Next(); i++ { + d := datas[i%len(datas)] + if _, err := fd.Write(d); err != nil { + t.Fatalf("Cannot write file: %v", err) + } else if err := fd.Sync(); err != nil { + t.Fatalf("Cannot write file: %v", err) + } else if err := fd.Truncate(0); err != nil { + t.Fatalf("Cannot truncate file: %v", err) + } + } + }) +} + +func BenchmarkParallelWriteAndSeekFile(t *testing.B) { + t.ReportAllocs() + t.SetParallelism(4) + buf := make([]byte, 1024) + _ = buf + t.RunParallel(func(pb *testing.PB) { + fd, err := os.CreateTemp("", "*.downloading") + if err != nil { + t.Fatalf("Cannot create temp file: %v", err) + } + defer os.Remove(fd.Name()) + for i := 0; pb.Next(); i++ { + d := datas[i%len(datas)] + if _, err := fd.Write(d); err != nil { + t.Fatalf("Cannot write file: %v", err) + } else if err := fd.Sync(); err != nil { + t.Fatalf("Cannot write file: %v", err) + } else if _, err := fd.Seek(io.SeekStart, 0); err != nil { + t.Fatalf("Cannot seel file: %v", err) + } + } + }) +} diff --git a/config.go b/config.go index b525911..1c4540a 100644 --- a/config.go +++ b/config.go @@ -21,16 +21,22 @@ package main import ( "bytes" + "context" + "encoding/json" "errors" "fmt" "net/url" "os" + "strings" + "sync" "gopkg.in/yaml.v3" + "github.com/LiterMC/go-openbmclapi/api" "github.com/LiterMC/go-openbmclapi/config" "github.com/LiterMC/go-openbmclapi/log" "github.com/LiterMC/go-openbmclapi/storage" + "github.com/LiterMC/go-openbmclapi/utils" ) const DefaultBMCLAPIServer = "https://openbmclapi.bangbang93.com" @@ -185,3 +191,149 @@ func readAndRewriteConfig() (cfg *config.Config, err error) { } return } + +type ConfigHandler struct { + mux sync.RWMutex + r *Runner + + updateProcess []func(context.Context) error +} + +var _ api.ConfigHandler = (*ConfigHandler)(nil) + +func (c *ConfigHandler) update(newConfig *config.Config) error { + r := c.r + oldConfig := r.Config + c.updateProcess = c.updateProcess[:0] + + if newConfig.LogSlots != oldConfig.LogSlots || newConfig.NoAccessLog != oldConfig.NoAccessLog || newConfig.AccessLogSlots != oldConfig.AccessLogSlots || newConfig.Advanced.DebugLog != oldConfig.Advanced.DebugLog { + c.updateProcess = append(c.updateProcess, r.SetupLogger) + } + if newConfig.Host != oldConfig.Host || newConfig.Port != oldConfig.Port { + c.updateProcess = append(c.updateProcess, r.StartServer) + } + if newConfig.PublicHost != oldConfig.PublicHost || newConfig.PublicPort != oldConfig.PublicPort || newConfig.Advanced.NoFastEnable != oldConfig.Advanced.NoFastEnable || newConfig.MaxReconnectCount != oldConfig.MaxReconnectCount { + c.updateProcess = append(c.updateProcess, r.updateClustersWithGeneralConfig) + } + if newConfig.RateLimit != oldConfig.RateLimit { + c.updateProcess = append(c.updateProcess, r.updateRateLimit) + } + if newConfig.Notification != oldConfig.Notification { + // c.updateProcess = append(c.updateProcess, ) + } + + r.Config = newConfig + r.publicHost = r.Config.PublicHost + r.publicPort = r.Config.PublicPort + return nil +} + +func (c *ConfigHandler) doUpdateProcesses(ctx context.Context) error { + for _, proc := range c.updateProcess { + if err := proc(ctx); err != nil { + return err + } + } + c.updateProcess = c.updateProcess[:0] + return nil +} + +func (c *ConfigHandler) MarshalJSON() ([]byte, error) { + return c.r.Config.MarshalJSON() +} + +func (c *ConfigHandler) UnmarshalJSON(data []byte) error { + c2 := c.r.Config.Clone() + if err := c2.UnmarshalJSON(data); err != nil { + return err + } + c.update(c2) + return nil +} + +func (c *ConfigHandler) UnmarshalYAML(data []byte) error { + c2 := c.r.Config.Clone() + if err := c2.UnmarshalText(data); err != nil { + return err + } + c.update(c2) + return nil +} + +func (c *ConfigHandler) MarshalJSONPath(path string) ([]byte, error) { + names := strings.Split(path, ".") + data, err := c.r.Config.MarshalJSON() + if err != nil { + return nil, err + } + var m map[string]any + if err := json.Unmarshal(data, &m); err != nil { + return nil, err + } + accessed := "" + var x any = m + for _, n := range names { + mc, ok := x.(map[string]any) + if !ok { + return nil, fmt.Errorf("Unexpected type %T on path %q, expect map[string]any", x, accessed) + } + accessed += n + "." + x = mc[n] + } + return json.Marshal(x) +} + +func (c *ConfigHandler) UnmarshalJSONPath(path string, data []byte) error { + names := strings.Split(path, ".") + var d any + if err := json.Unmarshal(data, &d); err != nil { + return err + } + accessed := "" + var m map[string]any + { + b, err := c.MarshalJSON() + if err != nil { + return err + } + if err := json.Unmarshal(b, &m); err != nil { + return err + } + } + x := m + for _, p := range names[:len(names)-1] { + accessed += p + "." + var ok bool + x, ok = x[p].(map[string]any) + if !ok { + return fmt.Errorf("Unexpected type %T on path %q, expect map[string]any", x, accessed) + } + } + x[names[len(names)-1]] = d + dt, err := json.Marshal(m) + if err != nil { + return err + } + return c.UnmarshalJSON(dt) +} + +func (c *ConfigHandler) Fingerprint() string { + c.mux.RLock() + defer c.mux.RUnlock() + return c.fingerprintLocked() +} + +func (c *ConfigHandler) fingerprintLocked() string { + data, err := c.MarshalJSON() + if err != nil { + log.Panicf("ConfigHandler.Fingerprint: MarshalJSON: %v", err) + } + return utils.BytesAsSha256(data) +} + +func (c *ConfigHandler) DoLockedAction(fingerprint string, callback func(api.ConfigHandler) error) error { + if c.fingerprintLocked() != fingerprint { + return api.ErrPreconditionFailed + } + return callback(c) +} diff --git a/config/config.go b/config/config.go index ec18ca2..3654b6c 100644 --- a/config/config.go +++ b/config/config.go @@ -20,6 +20,7 @@ package config import ( + "encoding/json" "path/filepath" "time" @@ -94,10 +95,9 @@ func NewDefaultConfig() *Config { Certificates: []CertificateConfig{}, Tunneler: TunnelConfig{ - Enable: false, - TunnelProg: "./path/to/tunnel/program", - OutputRegex: `\bNATedAddr\s+(?P[0-9.]+|\[[0-9a-f:]+\]):(?P\d+)$`, - TunnelTimeout: 0, + Enable: false, + TunnelProg: "./path/to/tunnel/program", + OutputRegex: `\bNATedAddr\s+(?P[0-9.]+|\[[0-9a-f:]+\]):(?P\d+)$`, }, Cache: CacheConfig{ @@ -133,6 +133,8 @@ func NewDefaultConfig() *Config { Dashboard: DashboardConfig{ Enable: true, + Username: "", + Password: "", PwaName: "GoOpenBmclApi Dashboard", PwaShortName: "GOBA Dash", PwaDesc: "Go-Openbmclapi Internal Dashboard", @@ -141,6 +143,7 @@ func NewDefaultConfig() *Config { GithubAPI: GithubAPIConfig{ UpdateCheckInterval: (utils.YAMLDuration)(time.Hour), + Authorization: "", }, Database: DatabaseConfig{ @@ -177,6 +180,28 @@ func NewDefaultConfig() *Config { } } +func (config *Config) MarshalJSON() ([]byte, error) { + type T Config + return json.Marshal((*T)(config)) +} + +func (config *Config) UnmarshalJSON(data []byte) error { + type T Config + return json.Unmarshal(data, (*T)(config)) +} + func (config *Config) UnmarshalText(data []byte) error { return yaml.Unmarshal(data, config) } + +func (config *Config) Clone() *Config { + data, err := config.MarshalJSON() + if err != nil { + panic(err) + } + cloned := new(Config) + if err := cloned.UnmarshalJSON(data); err != nil { + panic(err) + } + return cloned +} diff --git a/config/server.go b/config/server.go index a1763a8..e6514d7 100644 --- a/config/server.go +++ b/config/server.go @@ -122,10 +122,9 @@ type GithubAPIConfig struct { } type TunnelConfig struct { - Enable bool `yaml:"enable"` - TunnelProg string `yaml:"tunnel-program"` - OutputRegex string `yaml:"output-regex"` - TunnelTimeout int `yaml:"tunnel-timeout"` + Enable bool `yaml:"enable"` + TunnelProg string `yaml:"tunnel-program"` + OutputRegex string `yaml:"output-regex"` outputRegex *regexp.Regexp hostOut int diff --git a/handler.go b/handler.go index 97e4e85..5fecf91 100644 --- a/handler.go +++ b/handler.go @@ -39,7 +39,6 @@ import ( "github.com/LiterMC/go-openbmclapi/api/v0" "github.com/LiterMC/go-openbmclapi/internal/build" "github.com/LiterMC/go-openbmclapi/internal/gosrc" - "github.com/LiterMC/go-openbmclapi/limited" "github.com/LiterMC/go-openbmclapi/log" "github.com/LiterMC/go-openbmclapi/utils" ) @@ -102,11 +101,14 @@ var wsUpgrader = &websocket.Upgrader{ HandshakeTimeout: time.Second * 30, } -func (r *Runner) GetHandler() http.Handler { - r.apiRateLimiter = limited.NewAPIRateMiddleWare(api.RealAddrCtxKey, "go-openbmclapi.cluster.logged.user" /* api/v0.loggedUserKey */) +func (r *Runner) updateRateLimit(ctx context.Context) error { r.apiRateLimiter.SetAnonymousRateLimit(r.Config.RateLimit.Anonymous) r.apiRateLimiter.SetLoggedRateLimit(r.Config.RateLimit.Logged) - r.handlerAPIv0 = http.StripPrefix("/api/v0", v0.NewHandler(wsUpgrader)) + return nil +} + +func (r *Runner) GetHandler() http.Handler { + r.handlerAPIv0 = http.StripPrefix("/api/v0", v0.NewHandler(wsUpgrader, r.configHandler, r.userManager, r.tokenManager, r.subManager)) r.hijackHandler = http.StripPrefix("/bmclapi", r.hijacker) handler := utils.NewHttpMiddleWareHandler((http.HandlerFunc)(r.serveHTTP)) diff --git a/limited/api_rate.go b/limited/api_rate.go index 33b7ea8..d664630 100644 --- a/limited/api_rate.go +++ b/limited/api_rate.go @@ -38,9 +38,8 @@ type RateLimit struct { } type limitSet struct { - Limit RateLimit - mux sync.RWMutex + limit RateLimit cleanCount int // min clean mask: 0xffff; hour clean mask: 0xff0000 accessMin map[string]*atomic.Int64 accessHour map[string]*atomic.Int64 @@ -54,8 +53,26 @@ func makeLimitSet() limitSet { } } +func (s *limitSet) GetLimit() RateLimit { + s.mux.RLock() + defer s.mux.RUnlock() + return s.limit +} + +func (s *limitSet) SetLimit(limit RateLimit) { + s.mux.Lock() + defer s.mux.Unlock() + s.limit = limit +} + func (s *limitSet) try(id string) (leftHour, leftMin int64, cleanId int) { - checkHour, checkMin := s.Limit.PerHour > 0, s.Limit.PerMin > 0 + var ( + hour, min *atomic.Int64 + ok1, ok2 bool + ) + + s.mux.RLock() + checkHour, checkMin := s.limit.PerHour > 0, s.limit.PerMin > 0 if !checkHour { leftHour = -1 } @@ -67,12 +84,6 @@ func (s *limitSet) try(id string) (leftHour, leftMin int64, cleanId int) { return } - var ( - hour, min *atomic.Int64 - ok1, ok2 bool - ) - - s.mux.RLock() cleanId = s.cleanCount if checkHour { hour, ok1 = s.accessHour[id] @@ -99,7 +110,7 @@ func (s *limitSet) try(id string) (leftHour, leftMin int64, cleanId int) { } s.mux.Unlock() } - leftHour = s.Limit.PerHour - hour.Add(1) + leftHour = s.limit.PerHour - hour.Add(1) if leftHour < 0 { hour.Add(-1) leftHour = 0 @@ -118,7 +129,7 @@ func (s *limitSet) try(id string) (leftHour, leftMin int64, cleanId int) { } s.mux.Unlock() } - leftMin = s.Limit.PerMin - min.Add(1) + leftMin = s.limit.PerMin - min.Add(1) if leftMin < 0 { hour.Add(-1) min.Add(-1) @@ -134,12 +145,12 @@ func (s *limitSet) release(id string, cleanId int) { if cleanId <= 0 { return } - checkHour, checkMin := s.Limit.PerHour > 0, s.Limit.PerMin > 0 + s.mux.Lock() + defer s.mux.Unlock() + checkHour, checkMin := s.limit.PerHour > 0, s.limit.PerMin > 0 if !checkHour && !checkMin { return } - s.mux.Lock() - defer s.mux.Unlock() releaseHour := checkHour && cleanId&0xff0000 == s.cleanCount&0xff0000 releaseMin := checkMin && cleanId&0xffff == s.cleanCount&0xffff if releaseHour { @@ -219,19 +230,19 @@ func SetSkipRateLimit(req *http.Request) *http.Request { } func (a *APIRateMiddleWare) AnonymousRateLimit() RateLimit { - return a.annoySet.Limit + return a.annoySet.GetLimit() } func (a *APIRateMiddleWare) SetAnonymousRateLimit(v RateLimit) { - a.annoySet.Limit = v + a.annoySet.SetLimit(v) } func (a *APIRateMiddleWare) LoggedRateLimit() RateLimit { - return a.loggedSet.Limit + return a.loggedSet.GetLimit() } func (a *APIRateMiddleWare) SetLoggedRateLimit(v RateLimit) { - a.loggedSet.Limit = v + a.loggedSet.SetLimit(v) } func (a *APIRateMiddleWare) Destroy() { @@ -265,6 +276,7 @@ func (a *APIRateMiddleWare) ServeMiddle(rw http.ResponseWriter, req *http.Reques } set = &a.annoySet } + limit := set.GetLimit() hourLeft, minLeft, cleanId := set.try(id) now := time.Now() var retryAfter int @@ -274,8 +286,8 @@ func (a *APIRateMiddleWare) ServeMiddle(rw http.ResponseWriter, req *http.Reques retryAfter = 60 - (int)(now.Sub(a.startAt)/time.Second%60) } resetAfter := now.Add((time.Duration)(retryAfter) * time.Second).Unix() - rw.Header().Set("X-Ratelimit-Limit-Minute", strconv.FormatInt(set.Limit.PerMin, 10)) - rw.Header().Set("X-Ratelimit-Limit-Hour", strconv.FormatInt(set.Limit.PerHour, 10)) + rw.Header().Set("X-Ratelimit-Limit-Minute", strconv.FormatInt(limit.PerMin, 10)) + rw.Header().Set("X-Ratelimit-Limit-Hour", strconv.FormatInt(limit.PerHour, 10)) rw.Header().Set("X-Ratelimit-Remaining-Minute", strconv.FormatInt(minLeft, 10)) rw.Header().Set("X-Ratelimit-Remaining-Hour", strconv.FormatInt(hourLeft, 10)) rw.Header().Set("X-Ratelimit-Reset-After", strconv.FormatInt(resetAfter, 10)) diff --git a/main.go b/main.go index 55f42dc..3dd36c8 100644 --- a/main.go +++ b/main.go @@ -45,6 +45,7 @@ import ( doh "github.com/libp2p/go-doh-resolver" + "github.com/LiterMC/go-openbmclapi/api" "github.com/LiterMC/go-openbmclapi/api/bmclapi" "github.com/LiterMC/go-openbmclapi/cluster" "github.com/LiterMC/go-openbmclapi/config" @@ -118,7 +119,7 @@ func main() { defer log.RecordPanic() log.StartFlushLogFile() - r := new(Runner) + r := NewRunner() ctx, cancel := context.WithCancel(context.Background()) @@ -211,27 +212,40 @@ func main() { type Runner struct { Config *config.Config + configHandler *ConfigHandler + client *cluster.HTTPClient clusters map[string]*cluster.Cluster - apiRateLimiter *limited.APIRateMiddleWare + userManager api.UserManager + tokenManager api.TokenManager + subManager api.SubscriptionManager storageManager *storage.Manager statManager *cluster.StatManager hijacker *bmclapi.HjProxy database database.DB - server *http.Server - handlerAPIv0 http.Handler - hijackHandler http.Handler + server *http.Server + apiRateLimiter *limited.APIRateMiddleWare + handler http.Handler + handlerAPIv0 http.Handler + hijackHandler http.Handler - tlsConfig *tls.Config + tlsConfig *tls.Config publicHost string publicPort uint16 - listener *utils.HTTPTLSListener + listener *utils.HTTPTLSListener reloading atomic.Bool updating atomic.Bool tunnelCancel context.CancelFunc } +func NewRunner() *Runner { + r := new(Runner) + r.configHandler = &ConfigHandler{r: r} + r.apiRateLimiter = limited.NewAPIRateMiddleWare(api.RealAddrCtxKey, "go-openbmclapi.cluster.logged.user" /* api/v0.loggedUserKey */) + return r +} + func (r *Runner) getPublicPort() uint16 { if r.publicPort > 0 { return r.publicPort @@ -250,9 +264,13 @@ func (r *Runner) InitServer() { r.server = &http.Server{ ReadTimeout: 10 * time.Second, IdleTimeout: 5 * time.Second, - Handler: r.GetHandler(), - ErrorLog: log.ProxiedStdLog, + Handler: (http.HandlerFunc)(func(rw http.ResponseWriter, req *http.Request) { + r.handler.ServeHTTP(rw, req) + }), + ErrorLog: log.ProxiedStdLog, } + r.updateRateLimit(context.TODO()) + r.handler = r.GetHandler() } // StartServer will start the HTTP server @@ -390,7 +408,6 @@ func (r *Runner) ListenSignals(ctx context.Context, cancel context.CancelFunc) i } } - return 0 } func (r *Runner) ReloadConfig(ctx context.Context) { @@ -411,28 +428,10 @@ func (r *Runner) ReloadConfig(ctx context.Context) { } func (r *Runner) updateConfig(ctx context.Context, newConfig *config.Config) error { - oldConfig := r.Config - reloadProcesses := make([]func(context.Context) error, 0, 8) - - if newConfig.LogSlots != oldConfig.LogSlots || newConfig.NoAccessLog != oldConfig.NoAccessLog || newConfig.AccessLogSlots != oldConfig.AccessLogSlots || newConfig.Advanced.DebugLog != oldConfig.Advanced.DebugLog { - reloadProcesses = append(reloadProcesses, r.SetupLogger) - } - if newConfig.Host != oldConfig.Host || newConfig.Port != oldConfig.Port { - reloadProcesses = append(reloadProcesses, r.StartServer) - } - if newConfig.PublicHost != oldConfig.PublicHost || newConfig.PublicPort != oldConfig.PublicPort || newConfig.Advanced.NoFastEnable != oldConfig.Advanced.NoFastEnable || newConfig.MaxReconnectCount != oldConfig.MaxReconnectCount { - reloadProcesses = append(reloadProcesses, r.updateClustersWithGeneralConfig) - } - - r.Config = newConfig - r.publicHost = r.Config.PublicHost - r.publicPort = r.Config.PublicPort - for _, proc := range reloadProcesses { - if err := proc(ctx); err != nil { - return err - } + if err := r.configHandler.update(newConfig); err != nil { + return err } - return nil + return r.configHandler.doUpdateProcesses(ctx) } func (r *Runner) SetupLogger(ctx context.Context) error { @@ -494,21 +493,23 @@ func (r *Runner) UpdateFileRecords(files map[string]*cluster.StorageFileInfo, ol sem := limited.NewSemaphore(12) log.Info("Begin to update file records") for _, f := range files { - if strings.HasPrefix(f.Path, "/openbmclapi/download/") { - continue - } - if oldfileset[f.Hash] > 0 { - continue + for _, u := range f.URLs { + if strings.HasPrefix(u.Path, "/openbmclapi/download/") { + continue + } + if oldfileset[f.Hash] > 0 { + continue + } + sem.Acquire() + go func(rec database.FileRecord) { + defer sem.Release() + r.database.SetFileRecord(rec) + }(database.FileRecord{ + Path: u.Path, + Hash: f.Hash, + Size: f.Size, + }) } - sem.Acquire() - go func(rec database.FileRecord) { - defer sem.Release() - r.database.SetFileRecord(rec) - }(database.FileRecord{ - Path: f.Path, - Hash: f.Hash, - Size: f.Size, - }) } sem.Wait() log.Info("All file records are updated") @@ -533,17 +534,20 @@ func (r *Runner) InitSynchronizer(ctx context.Context) { heavyCheck = false } - // if !r.Config.Advanced.SkipFirstSync { - // if !r.cluster.SyncFiles(ctx, fileMap, false) { - // return - // } - // go r.UpdateFileRecords(fileMap, nil) + // if !r.Config.Advanced.SkipFirstSync + { + slots := 10 + if err := r.client.SyncFiles(ctx, r.storageManager, fileMap, false, slots); err != nil { + log.Errorf("Sync failed: %v", err) + return + } + go r.UpdateFileRecords(fileMap, nil) - // if !r.Config.Advanced.NoGC { - // go r.cluster.Gc() - // } - // } else - // if fl != nil { + // if !r.Config.Advanced.NoGC { + // go r.cluster.Gc() + // } + } + // else if fl != nil { // if err := r.cluster.SetFilesetByExists(ctx, fl); err != nil { // return // } @@ -564,12 +568,10 @@ func (r *Runner) InitSynchronizer(ctx context.Context) { } checkCount = (checkCount + 1) % heavyCheckInterval - oldfileset := r.cluster.CloneFileset() - if r.cluster.SyncFiles(ctx, fl, heavyCheck && checkCount == 0) { - go r.UpdateFileRecords(fl, oldfileset) - if !r.Config.Advanced.NoGC && !r.Config.OnlyGcWhenStart { - go r.cluster.Gc() - } + slots := 10 + if err := r.client.SyncFiles(ctx, r.storageManager, fileMap, heavyCheck && (checkCount == 0), slots); err != nil { + log.Errorf("Sync failed: %v", err) + return } }, (time.Duration)(r.Config.SyncInterval)*time.Minute) } diff --git a/storage/storage_webdav.go b/storage/storage_webdav.go index 940067f..188fc22 100644 --- a/storage/storage_webdav.go +++ b/storage/storage_webdav.go @@ -551,7 +551,7 @@ func (s *WebDavStorage) ServeMeasure(rw http.ResponseWriter, req *http.Request, } func (s *WebDavStorage) createMeasureFile(ctx context.Context, size int) error { - if s.measures.Has(size) { + if s.measures.Contains(size) { // TODO: is this safe? return nil } diff --git a/sync.go b/sync.go deleted file mode 100644 index 12430b4..0000000 --- a/sync.go +++ /dev/null @@ -1,862 +0,0 @@ -//go:build ignore - -/** - * OpenBmclAPI (Golang Edition) - * Copyright (C) 2024 Kevin Z - * All rights reserved - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published - * by the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package main - -import ( - "compress/gzip" - "compress/zlib" - "context" - "crypto" - "encoding/hex" - "errors" - "fmt" - "io" - "net/http" - "net/url" - "os" - "path" - "runtime" - "sort" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" - - "github.com/hamba/avro/v2" - "github.com/klauspost/compress/zstd" - "github.com/vbauerster/mpb/v8" - "github.com/vbauerster/mpb/v8/decor" - - "github.com/LiterMC/go-openbmclapi/internal/build" - "github.com/LiterMC/go-openbmclapi/limited" - "github.com/LiterMC/go-openbmclapi/log" - "github.com/LiterMC/go-openbmclapi/storage" - "github.com/LiterMC/go-openbmclapi/update" - "github.com/LiterMC/go-openbmclapi/utils" -) - -func (cr *Cluster) CloneFileset() map[string]int64 { - cr.filesetMux.RLock() - defer cr.filesetMux.RUnlock() - fileset := make(map[string]int64, len(cr.fileset)) - for k, v := range cr.fileset { - fileset[k] = v - } - return fileset -} - -func (cr *Cluster) CachedFileSize(hash string) (size int64, ok bool) { - cr.filesetMux.RLock() - defer cr.filesetMux.RUnlock() - if size, ok = cr.fileset[hash]; !ok { - return - } - if size < 0 { - size = -size - } - return -} - -type syncStats struct { - slots *limited.BufSlots - noOpen bool - - totalSize int64 - okCount, failCount atomic.Int32 - totalFiles int - - pg *mpb.Progress - totalBar *mpb.Bar - lastInc atomic.Int64 -} - -func (cr *Cluster) checkFileFor( - ctx context.Context, - sto storage.Storage, files []FileInfo, - heavy bool, - missing *utils.SyncMap[string, *fileInfoWithTargets], - pg *mpb.Progress, -) (err error) { - var missingCount atomic.Int32 - addMissing := func(f FileInfo) { - missingCount.Add(1) - if info, has := missing.GetOrSet(f.Hash, func() *fileInfoWithTargets { - return &fileInfoWithTargets{ - FileInfo: f, - targets: []storage.Storage{sto}, - } - }); has { - info.tgMux.Lock() - info.targets = append(info.targets, sto) - info.tgMux.Unlock() - } - } - - log.Infof(Tr("info.check.start"), sto.String(), heavy) - - var ( - checkingHashMux sync.Mutex - checkingHash string - lastCheckingHash string - slots *limited.BufSlots - ) - - if heavy { - slots = limited.NewBufSlots(runtime.GOMAXPROCS(0) * 2) - } - - bar := pg.AddBar(0, - mpb.BarRemoveOnComplete(), - mpb.PrependDecorators( - decor.Name(Tr("hint.check.checking")), - decor.Name(sto.String()), - decor.OnCondition( - decor.Any(func(decor.Statistics) string { - c, l := slots.Cap(), slots.Len() - return fmt.Sprintf(" (%d / %d)", c-l, c) - }), - heavy, - ), - ), - mpb.AppendDecorators( - decor.CountersNoUnit("%d / %d", decor.WCSyncSpaceR), - decor.NewPercentage("%d", decor.WCSyncSpaceR), - decor.EwmaETA(decor.ET_STYLE_GO, 60), - ), - mpb.BarExtender((mpb.BarFillerFunc)(func(w io.Writer, _ decor.Statistics) (err error) { - if checkingHashMux.TryLock() { - lastCheckingHash = checkingHash - checkingHashMux.Unlock() - } - if lastCheckingHash != "" { - _, err = fmt.Fprintln(w, "\t", lastCheckingHash) - } - return - }), false), - ) - defer bar.Wait() - defer bar.Abort(true) - - bar.SetTotal(0x100, false) - - sizeMap := make(map[string]int64, len(files)) - { - start := time.Now() - var checkedMp [256]bool - if err = sto.WalkDir(func(hash string, size int64) error { - if n := utils.HexTo256(hash); !checkedMp[n] { - checkedMp[n] = true - now := time.Now() - bar.EwmaIncrement(now.Sub(start)) - start = now - } - sizeMap[hash] = size - return nil - }); err != nil { - return - } - } - - bar.SetCurrent(0) - bar.SetTotal((int64)(len(files)), false) - for _, f := range files { - if err = ctx.Err(); err != nil { - return - } - start := time.Now() - hash := f.Hash - if checkingHashMux.TryLock() { - checkingHash = hash - checkingHashMux.Unlock() - } - name := sto.String() + "/" + hash - if f.Size == 0 { - log.Debugf("Skipped empty file %s", name) - } else if size, ok := sizeMap[hash]; ok { - if size != f.Size { - log.Warnf(Tr("warn.check.modified.size"), name, size, f.Size) - addMissing(f) - } else if heavy { - hashMethod, err := getHashMethod(len(hash)) - if err != nil { - log.Errorf(Tr("error.check.unknown.hash.method"), hash) - } else { - _, buf, free := slots.Alloc(ctx) - if buf == nil { - return ctx.Err() - } - go func(f FileInfo, buf []byte, free func()) { - defer log.RecoverPanic(nil) - defer free() - miss := true - r, err := sto.Open(hash) - if err != nil { - log.Errorf(Tr("error.check.open.failed"), name, err) - } else { - hw := hashMethod.New() - _, err = io.CopyBuffer(hw, r, buf[:]) - r.Close() - if err != nil { - log.Errorf(Tr("error.check.hash.failed"), name, err) - } else if hs := hex.EncodeToString(hw.Sum(buf[:0])); hs != hash { - log.Warnf(Tr("warn.check.modified.hash"), name, hs, hash) - } else { - miss = false - } - } - free() - if miss { - addMissing(f) - } - bar.EwmaIncrement(time.Since(start)) - }(f, buf, free) - continue - } - } - } else { - // log.Debugf("Could not found file %q", name) - addMissing(f) - } - bar.EwmaIncrement(time.Since(start)) - } - - checkingHashMux.Lock() - checkingHash = "" - checkingHashMux.Unlock() - - bar.SetTotal(-1, true) - log.Infof(Tr("info.check.done"), sto.String(), missingCount.Load()) - return -} - -func (cr *Cluster) CheckFiles( - ctx context.Context, - files []FileInfo, - heavyCheck bool, - pg *mpb.Progress, -) (map[string]*fileInfoWithTargets, error) { - missingMap := utils.NewSyncMap[string, *fileInfoWithTargets]() - done := make(chan bool, 0) - - for _, s := range cr.storages { - go func(s storage.Storage) { - defer log.RecordPanic() - err := cr.checkFileFor(ctx, s, files, heavyCheck, missingMap, pg) - if ctx.Err() != nil { - return - } - if err != nil { - log.Errorf(Tr("error.check.failed"), s, err) - } - select { - case done <- err == nil: - case <-ctx.Done(): - } - }(s) - } - goodCount := 0 - for i := len(cr.storages); i > 0; i-- { - select { - case ok := <-done: - if ok { - goodCount++ - } - case <-ctx.Done(): - log.Warn(Tr("warn.sync.interrupted")) - return nil, ctx.Err() - } - } - if err := ctx.Err(); err != nil { - return nil, err - } - if goodCount == 0 { - return nil, errors.New("All storages are failed") - } - return missingMap.RawMap(), nil -} - -func (cr *Cluster) SetFilesetByExists(ctx context.Context, files []FileInfo) error { - pg := mpb.New(mpb.WithRefreshRate(time.Second/2), mpb.WithAutoRefresh(), mpb.WithWidth(140)) - defer pg.Shutdown() - log.SetLogOutput(pg) - defer log.SetLogOutput(nil) - - missingMap, err := cr.CheckFiles(ctx, files, false, pg) - if err != nil { - return err - } - fileset := make(map[string]int64, len(files)) - stoCount := len(cr.storages) - for _, f := range files { - if t, ok := missingMap[f.Hash]; !ok || len(t.targets) < stoCount { - fileset[f.Hash] = f.Size - } - } - - cr.mux.Lock() - cr.fileset = fileset - cr.mux.Unlock() - return nil -} - -func (cr *Cluster) syncFiles(ctx context.Context, files []FileInfo, heavyCheck bool) error { - pg := mpb.New(mpb.WithRefreshRate(time.Second/2), mpb.WithAutoRefresh(), mpb.WithWidth(140)) - defer pg.Shutdown() - log.SetLogOutput(pg) - defer log.SetLogOutput(nil) - - cr.syncProg.Store(0) - cr.syncTotal.Store(-1) - - missingMap, err := cr.CheckFiles(ctx, files, heavyCheck, pg) - if err != nil { - return err - } - var ( - missing = make([]*fileInfoWithTargets, 0, len(missingMap)) - missingSize int64 = 0 - ) - for _, f := range missingMap { - missing = append(missing, f) - missingSize += f.Size - } - totalFiles := len(missing) - if totalFiles == 0 { - log.Info(Tr("info.sync.none")) - return nil - } - - go cr.notifyManager.OnSyncBegin(len(missing), missingSize) - defer func() { - go cr.notifyManager.OnSyncDone() - }() - - cr.syncTotal.Store((int64)(totalFiles)) - - ccfg, err := cr.GetConfig(ctx) - if err != nil { - return err - } - syncCfg := ccfg.Sync - log.Infof(Tr("info.sync.config"), syncCfg) - - var stats syncStats - stats.pg = pg - stats.noOpen = syncCfg.Source == "center" - stats.slots = limited.NewBufSlots(syncCfg.Concurrency + 1) - stats.totalFiles = totalFiles - for _, f := range missing { - stats.totalSize += f.Size - } - - var barUnit decor.SizeB1024 - stats.lastInc.Store(time.Now().UnixNano()) - stats.totalBar = pg.AddBar(stats.totalSize, - mpb.BarRemoveOnComplete(), - mpb.BarPriority(stats.slots.Cap()), - mpb.PrependDecorators( - decor.Name(Tr("hint.sync.total")), - decor.NewPercentage("%.2f"), - ), - mpb.AppendDecorators( - decor.Any(func(decor.Statistics) string { - return fmt.Sprintf("(%d + %d / %d) ", stats.okCount.Load(), stats.failCount.Load(), stats.totalFiles) - }), - decor.Counters(barUnit, "(%.1f/%.1f) "), - decor.EwmaSpeed(barUnit, "%.1f ", 30), - decor.OnComplete( - decor.EwmaETA(decor.ET_STYLE_GO, 30), "done", - ), - ), - ) - - log.Infof(Tr("hint.sync.start"), totalFiles, utils.BytesToUnit((float64)(stats.totalSize))) - start := time.Now() - - done := make(chan []storage.Storage, 1) - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - aliveStorages := len(cr.storages) - for _, s := range cr.storages { - tctx, cancel := context.WithTimeout(ctx, time.Second*10) - err := s.CheckUpload(tctx) - cancel() - if err != nil { - if err := ctx.Err(); err != nil { - return err - } - aliveStorages-- - log.Errorf("Storage %s does not work: %v", s.String(), err) - } - } - if aliveStorages == 0 { - err := errors.New("All storages are broken") - log.Errorf(Tr("error.sync.failed"), err) - return err - } - if aliveStorages < len(cr.storages) { - log.Errorf(Tr("error.sync.part.working"), aliveStorages < len(cr.storages)) - select { - case <-time.After(time.Minute): - case <-ctx.Done(): - return ctx.Err() - } - } - - for _, f := range missing { - log.Debugf("File %s is for %v", f.Hash, f.targets) - pathRes, err := cr.fetchFile(ctx, &stats, f.FileInfo) - if err != nil { - log.Warn(Tr("warn.sync.interrupted")) - return err - } - go func(f *fileInfoWithTargets, pathRes <-chan string) { - defer log.RecordPanic() - select { - case path := <-pathRes: - cr.syncProg.Add(1) - if path == "" { - select { - case done <- nil: // TODO: or all storage? - case <-ctx.Done(): - } - return - } - defer os.Remove(path) - // acquire slot here - slotId, buf, free := stats.slots.Alloc(ctx) - if buf == nil { - return - } - defer free() - _ = slotId - var srcFd *os.File - if srcFd, err = os.Open(path); err != nil { - return - } - defer srcFd.Close() - var failed []storage.Storage - for _, target := range f.targets { - if _, err = srcFd.Seek(0, io.SeekStart); err != nil { - log.Errorf("Cannot seek file %q to start: %v", path, err) - continue - } - if err = target.Create(f.Hash, srcFd); err != nil { - failed = append(failed, target) - log.Errorf(Tr("error.sync.create.failed"), target.String(), f.Hash, err) - continue - } - } - free() - srcFd.Close() - os.Remove(path) - select { - case done <- failed: - case <-ctx.Done(): - } - case <-ctx.Done(): - return - } - }(f, pathRes) - } - - stLen := len(cr.storages) - broken := make(map[storage.Storage]bool, stLen) - - for i := len(missing); i > 0; i-- { - select { - case failed := <-done: - for _, s := range failed { - if !broken[s] { - broken[s] = true - log.Debugf("Broken storage %d / %d", len(broken), stLen) - if len(broken) >= stLen { - cancel() - err := errors.New("All storages are broken") - log.Errorf(Tr("error.sync.failed"), err) - return err - } - } - } - case <-ctx.Done(): - log.Warn(Tr("warn.sync.interrupted")) - return ctx.Err() - } - } - - use := time.Since(start) - stats.totalBar.Abort(true) - pg.Wait() - - log.Infof(Tr("hint.sync.done"), use, utils.BytesToUnit((float64)(stats.totalSize)/use.Seconds())) - return nil -} - -func (cr *Cluster) Gc() { - for _, s := range cr.storages { - cr.gcFor(s) - } -} - -func (cr *Cluster) gcFor(s storage.Storage) { - log.Infof(Tr("info.gc.start"), s.String()) - err := s.WalkDir(func(hash string, _ int64) error { - if cr.issync.Load() { - return context.Canceled - } - if _, ok := cr.CachedFileSize(hash); !ok { - log.Infof(Tr("info.gc.found"), s.String()+"/"+hash) - s.Remove(hash) - } - return nil - }) - if err != nil { - if err == context.Canceled { - log.Warnf(Tr("warn.gc.interrupted"), s.String()) - } else { - log.Errorf(Tr("error.gc.error"), err) - } - return - } - log.Infof(Tr("info.gc.done"), s.String()) -} - -func (cr *Cluster) fetchFile(ctx context.Context, stats *syncStats, f FileInfo) (<-chan string, error) { - const ( - maxRetryCount = 5 - maxTryWithOpen = 3 - ) - - slotId, buf, free := stats.slots.Alloc(ctx) - if buf == nil { - return nil, ctx.Err() - } - - pathRes := make(chan string, 1) - go func() { - defer log.RecordPanic() - defer free() - defer close(pathRes) - - var barUnit decor.SizeB1024 - var tried atomic.Int32 - tried.Store(1) - bar := stats.pg.AddBar(f.Size, - mpb.BarRemoveOnComplete(), - mpb.BarPriority(slotId), - mpb.PrependDecorators( - decor.Name(Tr("hint.sync.downloading")), - decor.Any(func(decor.Statistics) string { - tc := tried.Load() - if tc <= 1 { - return "" - } - return fmt.Sprintf("(%d/%d) ", tc, maxRetryCount) - }), - decor.Name(f.Path, decor.WCSyncSpaceR), - ), - mpb.AppendDecorators( - decor.NewPercentage("%d", decor.WCSyncSpace), - decor.Counters(barUnit, "[%.1f / %.1f]", decor.WCSyncSpace), - decor.EwmaSpeed(barUnit, "%.1f", 30, decor.WCSyncSpace), - decor.OnComplete( - decor.EwmaETA(decor.ET_STYLE_GO, 30, decor.WCSyncSpace), "done", - ), - ), - ) - defer bar.Abort(true) - - noOpen := stats.noOpen - badOpen := false - interval := time.Second - for { - bar.SetCurrent(0) - hashMethod, err := getHashMethod(len(f.Hash)) - if err == nil { - var path string - if path, err = cr.fetchFileWithBuf(ctx, f, hashMethod, buf, noOpen, badOpen, func(r io.Reader) io.Reader { - return ProxyReader(r, bar, stats.totalBar, &stats.lastInc) - }); err == nil { - pathRes <- path - stats.okCount.Add(1) - log.Infof(Tr("info.sync.downloaded"), f.Path, - utils.BytesToUnit((float64)(f.Size)), - (float64)(stats.totalBar.Current())/(float64)(stats.totalSize)*100) - return - } - } - bar.SetRefill(bar.Current()) - - c := tried.Add(1) - if c > maxRetryCount { - log.Errorf(Tr("error.sync.download.failed"), f.Path, err) - break - } - if c > maxTryWithOpen { - badOpen = true - } - log.Errorf(Tr("error.sync.download.failed.retry"), f.Path, interval, err) - select { - case <-time.After(interval): - interval *= 2 - case <-ctx.Done(): - return - } - } - stats.failCount.Add(1) - }() - return pathRes, nil -} - -func (cr *Cluster) fetchFileWithBuf( - ctx context.Context, f FileInfo, - hashMethod crypto.Hash, buf []byte, - noOpen bool, badOpen bool, - wrapper func(io.Reader) io.Reader, -) (path string, err error) { - var ( - reqPath = f.Path - query url.Values - req *http.Request - res *http.Response - fd *os.File - r io.Reader - ) - if badOpen { - reqPath = "/openbmclapi/download/" + f.Hash - } else if noOpen { - query = url.Values{ - "noopen": {"1"}, - } - } - if req, err = cr.makeReqWithAuth(ctx, http.MethodGet, reqPath, query); err != nil { - return - } - req.Header.Set("Accept-Encoding", "gzip, deflate") - if res, err = cr.client.Do(req); err != nil { - return - } - defer res.Body.Close() - if err = ctx.Err(); err != nil { - return - } - if res.StatusCode != http.StatusOK { - err = ErrorFromRedirect(utils.NewHTTPStatusErrorFromResponse(res), res) - return - } - switch ce := strings.ToLower(res.Header.Get("Content-Encoding")); ce { - case "": - r = res.Body - case "gzip": - if r, err = gzip.NewReader(res.Body); err != nil { - err = ErrorFromRedirect(err, res) - return - } - case "deflate": - if r, err = zlib.NewReader(res.Body); err != nil { - err = ErrorFromRedirect(err, res) - return - } - default: - err = ErrorFromRedirect(fmt.Errorf("Unexpected Content-Encoding %q", ce), res) - return - } - if wrapper != nil { - r = wrapper(r) - } - - hw := hashMethod.New() - - if fd, err = os.CreateTemp("", "*.downloading"); err != nil { - return - } - path = fd.Name() - defer func(path string) { - if err != nil { - os.Remove(path) - } - }(path) - - _, err = io.CopyBuffer(io.MultiWriter(hw, fd), r, buf) - stat, err2 := fd.Stat() - fd.Close() - if err != nil { - err = ErrorFromRedirect(err, res) - return - } - if err2 != nil { - err = err2 - return - } - if t := stat.Size(); f.Size >= 0 && t != f.Size { - err = ErrorFromRedirect(fmt.Errorf("File size wrong, got %d, expect %d", t, f.Size), res) - return - } else if hs := hex.EncodeToString(hw.Sum(buf[:0])); hs != f.Hash { - err = ErrorFromRedirect(fmt.Errorf("File hash not match, got %s, expect %s", hs, f.Hash), res) - return - } - return -} - -type downloadingItem struct { - err error - done chan struct{} -} - -func (cr *Cluster) lockDownloading(target string) (*downloadingItem, bool) { - cr.downloadMux.RLock() - item := cr.downloading[target] - cr.downloadMux.RUnlock() - if item != nil { - return item, true - } - - cr.downloadMux.Lock() - defer cr.downloadMux.Unlock() - - if item = cr.downloading[target]; item != nil { - return item, true - } - item = &downloadingItem{ - done: make(chan struct{}, 0), - } - cr.downloading[target] = item - return item, false -} - -func (cr *Cluster) DownloadFile(ctx context.Context, hash string) (err error) { - hashMethod, err := getHashMethod(len(hash)) - if err != nil { - return - } - - f := FileInfo{ - Path: "/openbmclapi/download/" + hash, - Hash: hash, - Size: -1, - Mtime: 0, - } - item, ok := cr.lockDownloading(hash) - if !ok { - go func() { - defer log.RecoverPanic(nil) - var err error - defer func() { - if err != nil { - log.Errorf(Tr("error.sync.download.failed"), hash, err) - } - item.err = err - close(item.done) - - cr.downloadMux.Lock() - defer cr.downloadMux.Unlock() - delete(cr.downloading, hash) - }() - - log.Infof(Tr("hint.sync.downloading.handler"), hash) - - ctx, cancel := context.WithCancel(context.Background()) - go func() { - if cr.enabled.Load() { - select { - case <-cr.Disabled(): - cancel() - case <-ctx.Done(): - } - } else { - select { - case <-cr.WaitForEnable(): - cancel() - case <-ctx.Done(): - } - } - }() - defer cancel() - - var buf []byte - _, buf, free := cr.allocBuf(ctx) - if buf == nil { - err = ctx.Err() - return - } - defer free() - - path, err := cr.fetchFileWithBuf(ctx, f, hashMethod, buf, true, true, nil) - if err != nil { - return - } - defer os.Remove(path) - var srcFd *os.File - if srcFd, err = os.Open(path); err != nil { - return - } - defer srcFd.Close() - var stat os.FileInfo - if stat, err = srcFd.Stat(); err != nil { - return - } - size := stat.Size() - - for _, target := range cr.storages { - if _, err = srcFd.Seek(0, io.SeekStart); err != nil { - log.Errorf("Cannot seek file %q: %v", path, err) - return - } - if err := target.Create(hash, srcFd); err != nil { - log.Errorf(Tr("error.sync.create.failed"), target.String(), hash, err) - continue - } - } - - cr.filesetMux.Lock() - cr.fileset[hash] = -size // negative means that the file was not stored into the database yet - cr.filesetMux.Unlock() - }() - } - select { - case <-item.done: - err = item.err - case <-ctx.Done(): - err = ctx.Err() - case <-cr.Disabled(): - err = context.Canceled - } - return -} - -func (cr *Cluster) checkUpdate() (err error) { - if update.CurrentBuildTag == nil { - return - } - log.Info(Tr("info.update.checking")) - release, err := update.Check(cr.cachedCli, config.GithubAPI.Authorization) - if err != nil || release == nil { - return - } - // TODO: print all middle change logs - log.Infof(Tr("info.update.detected"), release.Tag, update.CurrentBuildTag) - log.Infof(Tr("info.update.changelog"), update.CurrentBuildTag, release.Tag, release.Body) - cr.notifyManager.OnUpdateAvaliable(release) - return -} diff --git a/util.go b/util.go index 3bd2ba9..2810507 100644 --- a/util.go +++ b/util.go @@ -25,11 +25,7 @@ import ( "crypto/x509" "fmt" "io" - "net/http" - "net/url" "os" - "slices" - "strings" "time" "github.com/LiterMC/go-openbmclapi/log" @@ -95,45 +91,3 @@ func copyFile(src, dst string, mode os.FileMode) (err error) { _, err = io.Copy(dstFd, srcFd) return } - -type RedirectError struct { - Redirects []*url.URL - Err error -} - -func ErrorFromRedirect(err error, resp *http.Response) *RedirectError { - redirects := make([]*url.URL, 0, 4) - for resp != nil && resp.Request != nil { - redirects = append(redirects, resp.Request.URL) - resp = resp.Request.Response - } - if len(redirects) > 1 { - slices.Reverse(redirects) - } else { - redirects = nil - } - return &RedirectError{ - Redirects: redirects, - Err: err, - } -} - -func (e *RedirectError) Error() string { - if len(e.Redirects) == 0 { - return e.Err.Error() - } - - var b strings.Builder - b.WriteString("Redirect from:\n\t") - for _, r := range e.Redirects { - b.WriteString("- ") - b.WriteString(r.String()) - b.WriteString("\n\t") - } - b.WriteString(e.Err.Error()) - return b.String() -} - -func (e *RedirectError) Unwrap() error { - return e.Err -} diff --git a/utils/http.go b/utils/http.go index 0412bf2..4c6e5e6 100644 --- a/utils/http.go +++ b/utils/http.go @@ -30,6 +30,7 @@ import ( "net/url" "path" "runtime" + "slices" "strings" "sync" "sync/atomic" @@ -504,3 +505,45 @@ func (c *connHeadReader) Read(buf []byte) (n int, err error) { } return c.Conn.Read(buf) } + +type RedirectError struct { + Redirects []*url.URL + Err error +} + +func ErrorFromRedirect(err error, resp *http.Response) *RedirectError { + redirects := make([]*url.URL, 0, 4) + for resp != nil && resp.Request != nil { + redirects = append(redirects, resp.Request.URL) + resp = resp.Request.Response + } + if len(redirects) > 1 { + slices.Reverse(redirects) + } else { + redirects = nil + } + return &RedirectError{ + Redirects: redirects, + Err: err, + } +} + +func (e *RedirectError) Error() string { + if len(e.Redirects) == 0 { + return e.Err.Error() + } + + var b strings.Builder + b.WriteString("Redirect from:\n\t") + for _, r := range e.Redirects { + b.WriteString("- ") + b.WriteString(r.String()) + b.WriteString("\n\t") + } + b.WriteString(e.Err.Error()) + return b.String() +} + +func (e *RedirectError) Unwrap() error { + return e.Err +} diff --git a/utils/util.go b/utils/util.go index 78955cb..a5a6572 100644 --- a/utils/util.go +++ b/utils/util.go @@ -21,8 +21,10 @@ package utils import ( "errors" + "fmt" "os" "path/filepath" + "strings" "sync" ) @@ -47,6 +49,12 @@ func (m *SyncMap[K, V]) RawMap() map[K]V { return m.m } +func (m *SyncMap[K, V]) Clear() { + m.l.Lock() + defer m.l.Unlock() + clear(m.m) +} + func (m *SyncMap[K, V]) Set(k K, v V) { m.l.Lock() defer m.l.Unlock() @@ -59,7 +67,7 @@ func (m *SyncMap[K, V]) Get(k K) V { return m.m[k] } -func (m *SyncMap[K, V]) Has(k K) bool { +func (m *SyncMap[K, V]) Contains(k K) bool { m.l.RLock() defer m.l.RUnlock() _, ok := m.m[k] @@ -83,6 +91,55 @@ func (m *SyncMap[K, V]) GetOrSet(k K, setter func() V) (v V, had bool) { return } +type Set[T comparable] map[T]struct{} + +func NewSet[T comparable]() Set[T] { + return make(Set[T]) +} + +func (s Set[T]) Clear() { + clear(s) +} + +func (s Set[T]) Put(v T) { + s[v] = struct{}{} +} + +func (s Set[T]) Contains(v T) bool { + _, ok := s[v] + return ok +} + +func (s Set[T]) Remove(v T) bool { + _, ok := s[v] + if ok { + delete(s, v) + } + return ok +} + +func (s Set[T]) ToSlice(arr []T) []T { + for v, _ := range s { + arr = append(arr, v) + } + return arr +} + +func (s Set[T]) String() string { + var b strings.Builder + b.WriteString("Set{") + first := true + for v := range s { + if first { + first = false + b.WriteByte(' ') + } + fmt.Fprintf(&b, "%v", v) + } + b.WriteByte('}') + return b.String() +} + func WalkCacheDir(cacheDir string, walker func(hash string, size int64) (err error)) (err error) { for _, dir := range Hex256 { files, err := os.ReadDir(filepath.Join(cacheDir, dir))