Skip to content

Commit

Permalink
feat: task separate proxy (#727)
Browse files Browse the repository at this point in the history
  • Loading branch information
monkeyWie authored Sep 6, 2024
1 parent 6538b29 commit 0c17093
Show file tree
Hide file tree
Showing 22 changed files with 447 additions and 67 deletions.
7 changes: 5 additions & 2 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package controller

import (
"github.com/GopeedLab/gopeed/pkg/base"
"net/http"
"net/url"
"os"
"path/filepath"
)

type Controller struct {
GetConfig func(v any)
ProxyConfig *base.DownloaderProxyConfig
GetConfig func(v any)
GetProxy func(requestProxy *base.RequestProxy) func(*http.Request) (*url.URL, error)
FileController
//ContextDialer() (proxy.Dialer, error)
}
Expand All @@ -23,6 +25,7 @@ type DefaultFileController struct {
func NewController() *Controller {
return &Controller{
GetConfig: func(v any) {},
GetProxy: func(requestProxy *base.RequestProxy) func(*http.Request) (*url.URL, error) { return nil },
FileController: &DefaultFileController{},
}
}
Expand Down
10 changes: 5 additions & 5 deletions internal/protocol/bt/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (f *Fetcher) initClient() (err error) {
cfg.Bep20 = fmt.Sprintf("-GP%s-", parseBep20())
cfg.ExtendedHandshakeClientVersion = fmt.Sprintf("Gopeed %s", base.Version)
cfg.ListenPort = f.config.ListenPort
cfg.HTTPProxy = f.ctl.ProxyConfig.ToHandler()
cfg.HTTPProxy = f.ctl.GetProxy(f.meta.Req.Proxy)
cfg.DefaultStorage = newFileOpts(newFileClientOpts{
ClientBaseDir: cfg.DataDir,
HandleFileTorrent: func(infoHash metainfo.Hash, ft *fileTorrentImpl) {
Expand All @@ -98,11 +98,14 @@ func (f *Fetcher) initClient() (err error) {
}

func (f *Fetcher) Resolve(req *base.Request) error {
if err := base.ParseReqExtra[bt.ReqExtra](req); err != nil {
return err
}
f.meta.Req = req
if err := f.addTorrent(req, false); err != nil {
return err
}
f.updateRes()
f.meta.Req = req
return nil
}

Expand Down Expand Up @@ -339,9 +342,6 @@ func (f *Fetcher) WaitUpload() (err error) {
}

func (f *Fetcher) addTorrent(req *base.Request, fromUpload bool) (err error) {
if err = base.ParseReqExtra[bt.ReqExtra](req); err != nil {
return
}
if err = f.initClient(); err != nil {
return
}
Expand Down
6 changes: 5 additions & 1 deletion internal/protocol/bt/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/GopeedLab/gopeed/internal/test"
"github.com/GopeedLab/gopeed/pkg/base"
"github.com/GopeedLab/gopeed/pkg/protocol/bt"
gohttp "net/http"
"net/url"
"os"
"reflect"
"testing"
Expand Down Expand Up @@ -210,7 +212,9 @@ func buildConfigFetcher(proxyConfig *base.DownloaderProxyConfig) fetcher.Fetcher
newController.GetConfig = func(v any) {
json.Unmarshal([]byte(test.ToJson(mockCfg)), v)
}
newController.ProxyConfig = proxyConfig
newController.GetProxy = func(requestProxy *base.RequestProxy) func(*gohttp.Request) (*url.URL, error) {
return proxyConfig.ToHandler()
}
fetcher.Setup(newController)
return fetcher
}
4 changes: 2 additions & 2 deletions internal/protocol/http/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (f *Fetcher) Resolve(req *base.Request) error {
if err := base.ParseReqExtra[fhttp.ReqExtra](req); err != nil {
return err
}
f.meta.Req = req
httpReq, err := f.buildRequest(nil, req)
if err != nil {
return err
Expand Down Expand Up @@ -153,7 +154,6 @@ func (f *Fetcher) Resolve(req *base.Request) error {
file.Name = httpReq.URL.Hostname()
}
res.Files = append(res.Files, file)
f.meta.Req = req
f.meta.Res = res
return nil
}
Expand Down Expand Up @@ -444,7 +444,7 @@ func (f *Fetcher) splitChunk() (chunks []*chunk) {

func (f *Fetcher) buildClient() *http.Client {
transport := &http.Transport{
Proxy: f.ctl.ProxyConfig.ToHandler(),
Proxy: f.ctl.GetProxy(f.meta.Req.Proxy),
}
// Cookie handle
jar, _ := cookiejar.New(nil)
Expand Down
12 changes: 8 additions & 4 deletions internal/protocol/http/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/GopeedLab/gopeed/pkg/base"
"github.com/GopeedLab/gopeed/pkg/protocol/http"
"net"
gohttp "net/http"
"net/url"
"testing"
"time"
)
Expand Down Expand Up @@ -385,10 +387,12 @@ func downloadResume(listener net.Listener, connections int, t *testing.T) {
func downloadWithProxy(httpListener net.Listener, proxyListener net.Listener, t *testing.T) {
fetcher := downloadReady(httpListener, 4, t)
ctl := controller.NewController()
ctl.ProxyConfig = &base.DownloaderProxyConfig{
Enable: true,
Scheme: "socks5",
Host: proxyListener.Addr().String(),
ctl.GetProxy = func(requestProxy *base.RequestProxy) func(*gohttp.Request) (*url.URL, error) {
return (&base.DownloaderProxyConfig{
Enable: true,
Scheme: "socks5",
Host: proxyListener.Addr().String(),
}).ToHandler()
}
fetcher.Setup(ctl)
err := fetcher.Start()
Expand Down
43 changes: 34 additions & 9 deletions pkg/base/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ type Request struct {
Extra any `json:"extra"`
// Labels is used to mark the download task
Labels map[string]string `json:"labels"`
// Proxy is special proxy config for request
Proxy *RequestProxy `json:"proxy"`
}

func (r *Request) Validate() error {
Expand All @@ -25,6 +27,37 @@ func (r *Request) Validate() error {
return nil
}

type RequestProxyMode string

const (
// RequestProxyModeFollow follow setting proxy
RequestProxyModeFollow RequestProxyMode = "follow"
// RequestProxyModeNone not use proxy
RequestProxyModeNone RequestProxyMode = "none"
// RequestProxyModeCustom custom proxy
RequestProxyModeCustom RequestProxyMode = "custom"
)

type RequestProxy struct {
Mode RequestProxyMode `json:"mode"`
Scheme string `json:"scheme"`
Host string `json:"host"`
Usr string `json:"usr"`
Pwd string `json:"pwd"`
}

func (p *RequestProxy) ToHandler() func(r *http.Request) (*url.URL, error) {
if p == nil || p.Mode != RequestProxyModeCustom {
return nil
}

if p.Scheme == "" || p.Host == "" {
return nil
}

return http.ProxyURL(util.BuildProxyUrl(p.Scheme, p.Host, p.Usr, p.Pwd))
}

// Resource download resource
type Resource struct {
// if name is not empty, the resource is a folder and the name is the folder name
Expand Down Expand Up @@ -94,15 +127,7 @@ func (o *Options) InitSelectFiles(fileSize int) {
}

func (o *Options) Clone() *Options {
if o == nil {
return nil
}
return &Options{
Name: o.Name,
Path: o.Path,
SelectFiles: o.SelectFiles,
Extra: o.Extra,
}
return util.DeepClone(o)
}

func ParseReqExtra[E any](req *Request) error {
Expand Down
17 changes: 16 additions & 1 deletion pkg/download/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"github.com/rs/zerolog"
"github.com/rs/zerolog/pkgerrors"
"github.com/virtuald/go-paniclog"
gohttp "net/http"
"net/url"
"os"
"path/filepath"
"sort"
Expand Down Expand Up @@ -253,7 +255,20 @@ func (d *Downloader) setupFetcher(fm fetcher.FetcherManager, fetcher fetcher.Fet
ctl.GetConfig = func(v any) {
d.getProtocolConfig(fm.Name(), v)
}
ctl.ProxyConfig = d.cfg.Proxy
// Get proxy config, task request proxy config has higher priority, then use global proxy config
ctl.GetProxy = func(requestProxy *base.RequestProxy) func(*gohttp.Request) (*url.URL, error) {
if requestProxy == nil {
return d.cfg.Proxy.ToHandler()
}
switch requestProxy.Mode {
case base.RequestProxyModeNone:
return nil
case base.RequestProxyModeCustom:
return requestProxy.ToHandler()
default:
return d.cfg.Proxy.ToHandler()
}
}
fetcher.Setup(ctl)
}

Expand Down
53 changes: 41 additions & 12 deletions pkg/download/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,21 +159,21 @@ func TestDownloader_CreateDirectBatch(t *testing.T) {

func TestDownloader_CreateWithProxy(t *testing.T) {
// No proxy
doTestDownloaderCreateWithProxy(t, false, func(proxyCfg *base.DownloaderProxyConfig) *base.DownloaderProxyConfig {
doTestDownloaderCreateWithProxy(t, false, nil, func(proxyCfg *base.DownloaderProxyConfig) *base.DownloaderProxyConfig {
return nil
}, nil)
// Disable proxy
doTestDownloaderCreateWithProxy(t, false, func(proxyCfg *base.DownloaderProxyConfig) *base.DownloaderProxyConfig {
doTestDownloaderCreateWithProxy(t, false, nil, func(proxyCfg *base.DownloaderProxyConfig) *base.DownloaderProxyConfig {
proxyCfg.Enable = false
return proxyCfg
}, nil)
// Enable system proxy but not set proxy environment variable
doTestDownloaderCreateWithProxy(t, false, func(proxyCfg *base.DownloaderProxyConfig) *base.DownloaderProxyConfig {
doTestDownloaderCreateWithProxy(t, false, nil, func(proxyCfg *base.DownloaderProxyConfig) *base.DownloaderProxyConfig {
proxyCfg.System = true
return proxyCfg
}, nil)
// Enable proxy but error proxy environment variable
doTestDownloaderCreateWithProxy(t, false, func(proxyCfg *base.DownloaderProxyConfig) *base.DownloaderProxyConfig {
doTestDownloaderCreateWithProxy(t, false, nil, func(proxyCfg *base.DownloaderProxyConfig) *base.DownloaderProxyConfig {
os.Setenv("HTTP_PROXY", "http://127.0.0.1:1234")
os.Setenv("HTTPS_PROXY", "http://127.0.0.1:1234")
proxyCfg.System = true
Expand All @@ -184,33 +184,50 @@ func TestDownloader_CreateWithProxy(t *testing.T) {
}
})
// Enable system proxy and set proxy environment variable
doTestDownloaderCreateWithProxy(t, false, func(proxyCfg *base.DownloaderProxyConfig) *base.DownloaderProxyConfig {
doTestDownloaderCreateWithProxy(t, false, nil, func(proxyCfg *base.DownloaderProxyConfig) *base.DownloaderProxyConfig {
os.Setenv("HTTP_PROXY", proxyCfg.ToUrl().String())
os.Setenv("HTTPS_PROXY", proxyCfg.ToUrl().String())
proxyCfg.System = true
return proxyCfg
}, nil)
// Invalid proxy scheme
doTestDownloaderCreateWithProxy(t, false, func(proxyCfg *base.DownloaderProxyConfig) *base.DownloaderProxyConfig {
doTestDownloaderCreateWithProxy(t, false, nil, func(proxyCfg *base.DownloaderProxyConfig) *base.DownloaderProxyConfig {
proxyCfg.Scheme = ""
return proxyCfg
}, nil)
// Invalid proxy host
doTestDownloaderCreateWithProxy(t, false, func(proxyCfg *base.DownloaderProxyConfig) *base.DownloaderProxyConfig {
doTestDownloaderCreateWithProxy(t, false, nil, func(proxyCfg *base.DownloaderProxyConfig) *base.DownloaderProxyConfig {
proxyCfg.Host = ""
return proxyCfg
}, nil)
// Use proxy without auth
doTestDownloaderCreateWithProxy(t, false, func(proxyCfg *base.DownloaderProxyConfig) *base.DownloaderProxyConfig {
doTestDownloaderCreateWithProxy(t, false, nil, func(proxyCfg *base.DownloaderProxyConfig) *base.DownloaderProxyConfig {
return proxyCfg
}, nil)
// Use proxy with auth
doTestDownloaderCreateWithProxy(t, true, func(proxyCfg *base.DownloaderProxyConfig) *base.DownloaderProxyConfig {
doTestDownloaderCreateWithProxy(t, true, nil, func(proxyCfg *base.DownloaderProxyConfig) *base.DownloaderProxyConfig {
return proxyCfg
}, nil)

// Request proxy mode follow
doTestDownloaderCreateWithProxy(t, false, func(reqProxy *base.RequestProxy) *base.RequestProxy {
reqProxy.Mode = base.RequestProxyModeFollow
return reqProxy
}, nil, nil)

// Request proxy mode none
doTestDownloaderCreateWithProxy(t, false, func(reqProxy *base.RequestProxy) *base.RequestProxy {
reqProxy.Mode = base.RequestProxyModeNone
return reqProxy
}, nil, nil)

// Request proxy mode custom
doTestDownloaderCreateWithProxy(t, false, func(reqProxy *base.RequestProxy) *base.RequestProxy {
return reqProxy
}, nil, nil)
}

func doTestDownloaderCreateWithProxy(t *testing.T, auth bool, buildProxyConfig func(proxyCfg *base.DownloaderProxyConfig) *base.DownloaderProxyConfig, errHandler func(err error)) {
func doTestDownloaderCreateWithProxy(t *testing.T, auth bool, buildReqProxy func(reqProxy *base.RequestProxy) *base.RequestProxy, buildProxyConfig func(proxyCfg *base.DownloaderProxyConfig) *base.DownloaderProxyConfig, errHandler func(err error)) {
usr, pwd := "", ""
if auth {
usr, pwd = "admin", "123"
Expand All @@ -223,17 +240,29 @@ func doTestDownloaderCreateWithProxy(t *testing.T, auth bool, buildProxyConfig f
t.Fatal(err)
}
defer downloader.Clear()
downloader.cfg.DownloaderStoreConfig.Proxy = buildProxyConfig(&base.DownloaderProxyConfig{
globalProxyCfg := &base.DownloaderProxyConfig{
Enable: true,
Scheme: "socks5",
Host: proxyListener.Addr().String(),
Usr: usr,
Pwd: pwd,
})
}
if buildProxyConfig != nil {
globalProxyCfg = buildProxyConfig(globalProxyCfg)
}
downloader.cfg.DownloaderStoreConfig.Proxy = globalProxyCfg

req := &base.Request{
URL: test.ExternalDownloadUrl,
}
if buildReqProxy != nil {
req.Proxy = buildReqProxy(&base.RequestProxy{
Scheme: "socks5",
Host: proxyListener.Addr().String(),
Usr: usr,
Pwd: pwd,
})
}
rr, err := downloader.Resolve(req)
if err != nil {
if errHandler == nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/download/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (e *Engine) await(value any) {
}

func (e *Engine) Close() {
e.loop.Stop()
e.loop.StopNoWait()
}

type Config struct {
Expand Down
6 changes: 5 additions & 1 deletion pkg/download/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,8 +694,12 @@ type ExtensionTask struct {
}

func NewExtensionTask(download *Downloader, task *Task) *ExtensionTask {
newTask := task.clone()
// Assign the pointer of the properties that the extension supports modification
newTask.Meta = task.Meta
newTask.Status = task.Status
return &ExtensionTask{
Task: task.clone(),
Task: newTask,
download: download,
}
}
Expand Down
11 changes: 1 addition & 10 deletions pkg/download/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,7 @@ func (t *Task) updateStatus(status base.Status) {
}

func (t *Task) clone() *Task {
return &Task{
ID: t.ID,
Protocol: t.Protocol,
Meta: t.Meta,
Status: t.Status,
Uploading: t.Uploading,
Progress: t.Progress,
CreatedAt: t.CreatedAt,
UpdatedAt: t.UpdatedAt,
}
return util.DeepClone(t)
}

func (t *Task) calcSpeed(speedArr []int64, downloaded int64, usedTime float64) int64 {
Expand Down
Loading

0 comments on commit 0c17093

Please sign in to comment.