From 05a019f3d54317b1b8c7b5d0a2a5913f25f29f6b Mon Sep 17 00:00:00 2001 From: Mark Pashmfouroush Date: Sat, 3 Aug 2024 09:34:08 +0100 Subject: [PATCH 1/5] wiresocks: reduce ipscanner timeout to 1min Signed-off-by: Mark Pashmfouroush --- wiresocks/scanner.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/wiresocks/scanner.go b/wiresocks/scanner.go index 193bcf055..d112f2666 100644 --- a/wiresocks/scanner.go +++ b/wiresocks/scanner.go @@ -19,7 +19,9 @@ type ScanOptions struct { } func RunScan(ctx context.Context, l *slog.Logger, opts ScanOptions) (result []ipscanner.IPInfo, err error) { - // new scanner + ctx, cancel := context.WithTimeout(ctx, 1*time.Minute) + defer cancel() + scanner := ipscanner.NewScanner( ipscanner.WithLogger(l.With(slog.String("subsystem", "scanner"))), ipscanner.WithWarpPing(), @@ -31,9 +33,6 @@ func RunScan(ctx context.Context, l *slog.Logger, opts ScanOptions) (result []ip ipscanner.WithCidrList(warp.WarpPrefixes()), ) - ctx, cancel := context.WithTimeout(ctx, 2*time.Minute) - defer cancel() - scanner.Run(ctx) t := time.NewTicker(1 * time.Second) From c02b99a371c9443635b2d6cdfb1c0be2f3f561f1 Mon Sep 17 00:00:00 2001 From: Mark Pashmfouroush Date: Sat, 3 Aug 2024 09:35:21 +0100 Subject: [PATCH 2/5] wiresocks: ignore client-side conn resets Signed-off-by: Mark Pashmfouroush --- wiresocks/proxy.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/wiresocks/proxy.go b/wiresocks/proxy.go index 2459e7c0f..90afd3c4f 100644 --- a/wiresocks/proxy.go +++ b/wiresocks/proxy.go @@ -7,6 +7,7 @@ import ( "log/slog" "net" "net/netip" + "syscall" "time" "github.com/bepass-org/warp-plus/proxy/pkg/mixed" @@ -82,6 +83,10 @@ func (vt *VirtualTun) generalHandler(req *statute.ProxyRequest) error { buf1 := vt.pool.Get() defer vt.pool.Put(buf1) _, err := copyConnTimeout(conn, req.Conn, buf1[:cap(buf1)], timeout) + if errors.Is(err, syscall.ECONNRESET) { + done <- nil + return + } done <- err }() // Copy data from conn to req.Conn From 6c98f7a655999459c340752aa8a83f2c3577b382 Mon Sep 17 00:00:00 2001 From: Mark Pashmfouroush Date: Sat, 3 Aug 2024 09:39:29 +0100 Subject: [PATCH 3/5] ipscanner: slim down scanner and use ctx everywhere Signed-off-by: Mark Pashmfouroush --- ipscanner/internal/engine/engine.go | 54 +++++++++++++---------------- ipscanner/internal/engine/queue.go | 10 ++++-- ipscanner/internal/ping/ping.go | 36 ++++++++++--------- ipscanner/internal/ping/warp.go | 46 +++++++++++++++--------- 4 files changed, 80 insertions(+), 66 deletions(-) diff --git a/ipscanner/internal/engine/engine.go b/ipscanner/internal/engine/engine.go index 3af6ab600..e75911290 100644 --- a/ipscanner/internal/engine/engine.go +++ b/ipscanner/internal/engine/engine.go @@ -2,9 +2,9 @@ package engine import ( "context" + "errors" "log/slog" "net/netip" - "time" "github.com/bepass-org/warp-plus/ipscanner/internal/iterator" "github.com/bepass-org/warp-plus/ipscanner/internal/ping" @@ -14,7 +14,7 @@ import ( type Engine struct { generator *iterator.IpGenerator ipQueue *IPQueue - ping func(netip.Addr) (statute.IPInfo, error) + ping func(context.Context, netip.Addr) (statute.IPInfo, error) log *slog.Logger } @@ -28,7 +28,7 @@ func NewScannerEngine(opts *statute.ScannerOptions) *Engine { ipQueue: queue, ping: p.DoPing, generator: iterator.NewIterator(opts), - log: opts.Logger.With(slog.String("subsystem", "scanner/engine")), + log: opts.Logger, } } @@ -40,37 +40,33 @@ func (e *Engine) GetAvailableIPs(desc bool) []statute.IPInfo { } func (e *Engine) Run(ctx context.Context) { - for { - select { - case <-ctx.Done(): + e.ipQueue.Init() + + select { + case <-ctx.Done(): + return + case <-e.ipQueue.available: + e.log.Debug("Started new scanning round") + batch, err := e.generator.NextBatch() + if err != nil { + e.log.Error("Error while generating IP: %v", err) return - case <-e.ipQueue.available: - e.log.Debug("Started new scanning round") - batch, err := e.generator.NextBatch() - if err != nil { - e.log.Error("Error while generating IP: %v", err) - // in case of disastrous error, to prevent resource draining wait for 2 seconds and try again - time.Sleep(2 * time.Second) - continue - } - for _, ip := range batch { - select { - case <-ctx.Done(): - return - default: - e.log.Debug("pinging IP", "addr", ip) - if ipInfo, err := e.ping(ip); err == nil { - e.log.Debug("ping success", "addr", ipInfo.AddrPort, "rtt", ipInfo.RTT) - e.ipQueue.Enqueue(ipInfo) - } else { + } + for _, ip := range batch { + select { + case <-ctx.Done(): + return + default: + ipInfo, err := e.ping(ctx, ip) + if err != nil { + if !errors.Is(err, context.Canceled) { e.log.Error("ping error", "addr", ip, "error", err) } + continue } + e.log.Debug("ping success", "addr", ipInfo.AddrPort, "rtt", ipInfo.RTT) + e.ipQueue.Enqueue(ipInfo) } - default: - e.log.Debug("calling expire") - e.ipQueue.Expire() - time.Sleep(200 * time.Millisecond) } } } diff --git a/ipscanner/internal/engine/queue.go b/ipscanner/internal/engine/queue.go index ec3a66186..88c918d2d 100644 --- a/ipscanner/internal/engine/queue.go +++ b/ipscanner/internal/engine/queue.go @@ -29,7 +29,7 @@ func NewIPQueue(opts *statute.ScannerOptions) *IPQueue { maxTTL: opts.IPQueueTTL, rttThreshold: opts.MaxDesirableRTT, available: make(chan struct{}, opts.IPQueueSize), - log: opts.Logger.With(slog.String("subsystem", "engine/queue")), + log: opts.Logger, reserved: reserved, } } @@ -122,15 +122,19 @@ func (q *IPQueue) Dequeue() (statute.IPInfo, bool) { return info, true } -func (q *IPQueue) Expire() { +func (q *IPQueue) Init() { q.mu.Lock() defer q.mu.Unlock() if !q.inIdealMode { - q.log.Debug("Expire: Not in ideal mode") q.available <- struct{}{} return } +} + +func (q *IPQueue) Expire() { + q.mu.Lock() + defer q.mu.Unlock() q.log.Debug("Expire: In ideal mode") defer func() { diff --git a/ipscanner/internal/ping/ping.go b/ipscanner/internal/ping/ping.go index 32865d0f0..65fde8475 100644 --- a/ipscanner/internal/ping/ping.go +++ b/ipscanner/internal/ping/ping.go @@ -1,6 +1,7 @@ package ping import ( + "context" "errors" "fmt" "net/netip" @@ -13,9 +14,9 @@ type Ping struct { } // DoPing performs a ping on the given IP address. -func (p *Ping) DoPing(ip netip.Addr) (statute.IPInfo, error) { +func (p *Ping) DoPing(ctx context.Context, ip netip.Addr) (statute.IPInfo, error) { if p.Options.SelectedOps&statute.HTTPPing > 0 { - res, err := p.httpPing(ip) + res, err := p.httpPing(ctx, ip) if err != nil { return statute.IPInfo{}, err } @@ -23,7 +24,7 @@ func (p *Ping) DoPing(ip netip.Addr) (statute.IPInfo, error) { return res, nil } if p.Options.SelectedOps&statute.TLSPing > 0 { - res, err := p.tlsPing(ip) + res, err := p.tlsPing(ctx, ip) if err != nil { return statute.IPInfo{}, err } @@ -31,7 +32,7 @@ func (p *Ping) DoPing(ip netip.Addr) (statute.IPInfo, error) { return res, nil } if p.Options.SelectedOps&statute.TCPPing > 0 { - res, err := p.tcpPing(ip) + res, err := p.tcpPing(ctx, ip) if err != nil { return statute.IPInfo{}, err } @@ -39,7 +40,7 @@ func (p *Ping) DoPing(ip netip.Addr) (statute.IPInfo, error) { return res, nil } if p.Options.SelectedOps&statute.QUICPing > 0 { - res, err := p.quicPing(ip) + res, err := p.quicPing(ctx, ip) if err != nil { return statute.IPInfo{}, err } @@ -47,7 +48,7 @@ func (p *Ping) DoPing(ip netip.Addr) (statute.IPInfo, error) { return res, nil } if p.Options.SelectedOps&statute.WARPPing > 0 { - res, err := p.warpPing(ip) + res, err := p.warpPing(ctx, ip) if err != nil { return statute.IPInfo{}, err } @@ -58,8 +59,9 @@ func (p *Ping) DoPing(ip netip.Addr) (statute.IPInfo, error) { return statute.IPInfo{}, errors.New("no ping operation selected") } -func (p *Ping) httpPing(ip netip.Addr) (statute.IPInfo, error) { +func (p *Ping) httpPing(ctx context.Context, ip netip.Addr) (statute.IPInfo, error) { return p.calc( + ctx, NewHttpPing( ip, "GET", @@ -74,30 +76,30 @@ func (p *Ping) httpPing(ip netip.Addr) (statute.IPInfo, error) { ) } -func (p *Ping) warpPing(ip netip.Addr) (statute.IPInfo, error) { - return p.calc(NewWarpPing(ip, p.Options)) +func (p *Ping) warpPing(ctx context.Context, ip netip.Addr) (statute.IPInfo, error) { + return p.calc(ctx, NewWarpPing(ip, p.Options)) } -func (p *Ping) tlsPing(ip netip.Addr) (statute.IPInfo, error) { - return p.calc( +func (p *Ping) tlsPing(ctx context.Context, ip netip.Addr) (statute.IPInfo, error) { + return p.calc(ctx, NewTlsPing(ip, p.Options.Hostname, p.Options.Port, p.Options), ) } -func (p *Ping) tcpPing(ip netip.Addr) (statute.IPInfo, error) { - return p.calc( +func (p *Ping) tcpPing(ctx context.Context, ip netip.Addr) (statute.IPInfo, error) { + return p.calc(ctx, NewTcpPing(ip, p.Options.Hostname, p.Options.Port, p.Options), ) } -func (p *Ping) quicPing(ip netip.Addr) (statute.IPInfo, error) { - return p.calc( +func (p *Ping) quicPing(ctx context.Context, ip netip.Addr) (statute.IPInfo, error) { + return p.calc(ctx, NewQuicPing(ip, p.Options.Hostname, p.Options.Port, p.Options), ) } -func (p *Ping) calc(tp statute.IPing) (statute.IPInfo, error) { - pr := tp.Ping() +func (p *Ping) calc(ctx context.Context, tp statute.IPing) (statute.IPInfo, error) { + pr := tp.PingContext(ctx) err := pr.Error() if err != nil { return statute.IPInfo{}, err diff --git a/ipscanner/internal/ping/warp.go b/ipscanner/internal/ping/warp.go index a513497de..dc135f3d0 100644 --- a/ipscanner/internal/ping/warp.go +++ b/ipscanner/internal/ping/warp.go @@ -55,9 +55,10 @@ func (h *WarpPing) Ping() statute.IPingResult { return h.PingContext(context.Background()) } -func (h *WarpPing) PingContext(_ context.Context) statute.IPingResult { +func (h *WarpPing) PingContext(ctx context.Context) statute.IPingResult { addr := netip.AddrPortFrom(h.IP, warp.RandomWarpPort()) rtt, err := initiateHandshake( + ctx, addr, h.PrivateKey, h.PeerPublicKey, @@ -117,15 +118,21 @@ func ephemeralKeypair() (noise.DHKey, error) { }, nil } -func randomInt(min, max int) int { - nBig, err := rand.Int(rand.Reader, big.NewInt(int64(max-min+1))) +func randomInt(min, max uint64) uint64 { + rangee := max - min + if rangee < 1 { + return 0 + } + + n, err := rand.Int(rand.Reader, big.NewInt(int64(rangee))) if err != nil { panic(err) } - return int(nBig.Int64()) + min + + return min + n.Uint64() } -func initiateHandshake(serverAddr netip.AddrPort, privateKeyBase64, peerPublicKeyBase64, presharedKeyBase64 string) (time.Duration, error) { +func initiateHandshake(ctx context.Context, serverAddr netip.AddrPort, privateKeyBase64, peerPublicKeyBase64, presharedKeyBase64 string) (time.Duration, error) { staticKeyPair, err := staticKeypair(privateKeyBase64) if err != nil { return 0, err @@ -209,19 +216,24 @@ func initiateHandshake(serverAddr netip.AddrPort, privateKeyBase64, peerPublicKe numPackets := randomInt(8, 15) randomPacket := make([]byte, 100) - for i := 0; i < numPackets; i++ { - packetSize := randomInt(40, 100) - _, err := rand.Read(randomPacket[:packetSize]) - if err != nil { - return 0, fmt.Errorf("error generating random packet: %w", err) + for i := uint64(0); i < numPackets; i++ { + select { + case <-ctx.Done(): + return 0, ctx.Err() + default: + packetSize := randomInt(40, 100) + _, err := rand.Read(randomPacket[:packetSize]) + if err != nil { + return 0, fmt.Errorf("error generating random packet: %w", err) + } + + _, err = conn.Write(randomPacket[:packetSize]) + if err != nil { + return 0, fmt.Errorf("error sending random packet: %w", err) + } + + time.Sleep(time.Duration(randomInt(20, 250)) * time.Millisecond) } - - _, err = conn.Write(randomPacket[:packetSize]) - if err != nil { - return 0, fmt.Errorf("error sending random packet: %w", err) - } - - time.Sleep(time.Duration(randomInt(20, 250)) * time.Millisecond) } _, err = initiationPacket.WriteTo(conn) From 56c443c483b1ff396acb844c708c88d932850cca Mon Sep 17 00:00:00 2001 From: Mark Pashmfouroush Date: Sat, 3 Aug 2024 09:47:49 +0100 Subject: [PATCH 4/5] wireguard: some refactors Signed-off-by: Mark Pashmfouroush --- app/app.go | 231 ++++++++++++++++++++++++------------- app/wg.go | 80 +++++++++---- wireguard/device/device.go | 4 +- wireguard/device/peer.go | 13 ++- wireguard/device/send.go | 63 +++++++--- wireguard/device/uapi.go | 14 +-- 6 files changed, 270 insertions(+), 135 deletions(-) diff --git a/app/app.go b/app/app.go index dea951783..4fe5b1c5b 100644 --- a/app/app.go +++ b/app/app.go @@ -12,6 +12,8 @@ import ( "github.com/bepass-org/warp-plus/iputils" "github.com/bepass-org/warp-plus/psiphon" "github.com/bepass-org/warp-plus/warp" + "github.com/bepass-org/warp-plus/wireguard/tun" + "github.com/bepass-org/warp-plus/wireguard/tun/netstack" "github.com/bepass-org/warp-plus/wiresocks" ) @@ -80,7 +82,7 @@ func RunWarp(ctx context.Context, l *slog.Logger, opts WarpOptions) error { return err } - l.Info("scan results", "endpoints", res) + l.Debug("scan results", "endpoints", res) endpoints = make([]string, len(res)) for i := 0; i < len(res); i++ { @@ -122,7 +124,7 @@ func runWireguard(ctx context.Context, l *slog.Logger, opts WarpOptions) error { // Enable trick and keepalive on all peers in config for i, peer := range conf.Peers { peer.Trick = true - peer.KeepAlive = 3 + peer.KeepAlive = 5 // Try resolving if the endpoint is a domain addr, err := iputils.ParseResolveAddressPort(peer.Endpoint, false, opts.DnsAddr.String()) @@ -134,35 +136,56 @@ func runWireguard(ctx context.Context, l *slog.Logger, opts WarpOptions) error { } if opts.Tun { - // Create a new tun interface - tunDev, err := newNormalTun([]netip.Addr{opts.DnsAddr}) - if err != nil { - return err - } - // Establish wireguard tunnel on tun interface - if err := establishWireguard(l, conf, tunDev, true, opts.FwMark); err != nil { - return err + var werr error + var tunDev tun.Device + for _, t := range []string{"t1", "t2"} { + // Create a new tun interface + tunDev, werr = newNormalTun([]netip.Addr{opts.DnsAddr}) + if werr != nil { + continue + } + + werr = establishWireguard(l, conf, tunDev, true, opts.FwMark, t) + if werr != nil { + continue + } + break } + if werr != nil { + return werr + } + l.Info("serving tun", "interface", "warp0") return nil } - // Create userspace tun network stack - tunDev, tnet, err := newUsermodeTun(conf) - if err != nil { - return err - } - // Establish wireguard on userspace stack - if err := establishWireguard(l, conf, tunDev, false, opts.FwMark); err != nil { - return err - } + var werr error + var tnet *netstack.Net + var tunDev tun.Device + for _, t := range []string{"t1", "t2"} { + // Create userspace tun network stack + tunDev, tnet, werr = netstack.CreateNetTUN(conf.Interface.Addresses, conf.Interface.DNS, conf.Interface.MTU) + if err != nil { + continue + } + + werr = establishWireguard(l, conf, tunDev, false, opts.FwMark, t) + if werr != nil { + continue + } - // // Test wireguard connectivity - // if err := usermodeTunTest(ctx, l, tnet); err != nil { - // return err - // } + // Test wireguard connectivity + werr = usermodeTunTest(ctx, l, tnet) + if werr != nil { + continue + } + break + } + if werr != nil { + return werr + } // Run a proxy on the userspace stack _, err = wiresocks.StartProxy(ctx, l, tnet, opts.Bind) @@ -194,7 +217,7 @@ func runWarp(ctx context.Context, l *slog.Logger, opts WarpOptions, endpoint str for i, peer := range conf.Peers { peer.Endpoint = endpoint peer.Trick = true - peer.KeepAlive = 3 + peer.KeepAlive = 5 if opts.Reserved != "" { r, err := wiresocks.ParseReserved(opts.Reserved) @@ -208,35 +231,55 @@ func runWarp(ctx context.Context, l *slog.Logger, opts WarpOptions, endpoint str } if opts.Tun { - // Create a new tun interface - tunDev, err := newNormalTun([]netip.Addr{opts.DnsAddr}) - if err != nil { - return err - } - // Establish wireguard tunnel on tun interface - if err := establishWireguard(l, &conf, tunDev, true, opts.FwMark); err != nil { - return err + var werr error + var tunDev tun.Device + for _, t := range []string{"t1", "t2"} { + // Create a new tun interface + tunDev, werr = newNormalTun([]netip.Addr{opts.DnsAddr}) + if werr != nil { + continue + } + + // Create userspace tun network stack + werr = establishWireguard(l, &conf, tunDev, true, opts.FwMark, t) + if werr != nil { + continue + } + break + } + if werr != nil { + return werr } l.Info("serving tun", "interface", "warp0") return nil } - // Create userspace tun network stack - tunDev, tnet, err := newUsermodeTun(&conf) - if err != nil { - return err - } - // Establish wireguard on userspace stack - if err := establishWireguard(l, &conf, tunDev, false, opts.FwMark); err != nil { - return err - } + var werr error + var tnet *netstack.Net + var tunDev tun.Device + for _, t := range []string{"t1", "t2"} { + tunDev, tnet, werr = netstack.CreateNetTUN(conf.Interface.Addresses, conf.Interface.DNS, conf.Interface.MTU) + if werr != nil { + continue + } - // // Test wireguard connectivity - // if err := usermodeTunTest(ctx, l, tnet); err != nil { - // return err - // } + werr = establishWireguard(l, &conf, tunDev, false, opts.FwMark, t) + if werr != nil { + continue + } + + // Test wireguard connectivity + werr = usermodeTunTest(ctx, l, tnet) + if werr != nil { + continue + } + break + } + if werr != nil { + return werr + } // Run a proxy on the userspace stack _, err = wiresocks.StartProxy(ctx, l, tnet, opts.Bind) @@ -267,7 +310,7 @@ func runWarpInWarp(ctx context.Context, l *slog.Logger, opts WarpOptions, endpoi for i, peer := range conf.Peers { peer.Endpoint = endpoints[0] peer.Trick = true - peer.KeepAlive = 3 + peer.KeepAlive = 5 if opts.Reserved != "" { r, err := wiresocks.ParseReserved(opts.Reserved) @@ -280,24 +323,35 @@ func runWarpInWarp(ctx context.Context, l *slog.Logger, opts WarpOptions, endpoi conf.Peers[i] = peer } - // Create userspace tun network stack - tunDev, tnet, err := newUsermodeTun(&conf) - if err != nil { - return err - } - // Establish wireguard on userspace stack and bind the wireguard sockets to the default interface and apply - if err := establishWireguard(l.With("gool", "outer"), &conf, tunDev, opts.Tun, opts.FwMark); err != nil { - return err - } + var werr error + var tnet1 *netstack.Net + var tunDev tun.Device + for _, t := range []string{"t1", "t2"} { + // Create userspace tun network stack + tunDev, tnet1, werr = netstack.CreateNetTUN(conf.Interface.Addresses, conf.Interface.DNS, conf.Interface.MTU) + if werr != nil { + continue + } - // // Test wireguard connectivity - // if err := usermodeTunTest(ctx, l, tnet); err != nil { - // return err - // } + werr = establishWireguard(l.With("gool", "outer"), &conf, tunDev, opts.Tun, opts.FwMark, t) + if werr != nil { + continue + } + + // Test wireguard connectivity + werr = usermodeTunTest(ctx, l, tnet1) + if werr != nil { + continue + } + break + } + if werr != nil { + return werr + } // Create a UDP port forward between localhost and the remote endpoint - addr, err := wiresocks.NewVtunUDPForwarder(ctx, netip.MustParseAddrPort("127.0.0.1:0"), endpoints[0], tnet, singleMTU) + addr, err := wiresocks.NewVtunUDPForwarder(ctx, netip.MustParseAddrPort("127.0.0.1:0"), endpoints[0], tnet1, singleMTU) if err != nil { return err } @@ -319,7 +373,7 @@ func runWarpInWarp(ctx context.Context, l *slog.Logger, opts WarpOptions, endpoi // Enable keepalive on all peers in config for i, peer := range conf.Peers { peer.Endpoint = addr.String() - peer.KeepAlive = 10 + peer.KeepAlive = 20 if opts.Reserved != "" { r, err := wiresocks.ParseReserved(opts.Reserved) @@ -341,7 +395,7 @@ func runWarpInWarp(ctx context.Context, l *slog.Logger, opts WarpOptions, endpoi // Establish wireguard tunnel on tun interface but don't bind // wireguard sockets to default interface and don't apply fwmark. - if err := establishWireguard(l.With("gool", "inner"), &conf, tunDev, false, opts.FwMark); err != nil { + if err := establishWireguard(l.With("gool", "inner"), &conf, tunDev, false, opts.FwMark, "t0"); err != nil { return err } l.Info("serving tun", "interface", "warp0") @@ -349,22 +403,22 @@ func runWarpInWarp(ctx context.Context, l *slog.Logger, opts WarpOptions, endpoi } // Create userspace tun network stack - tunDev, tnet, err = newUsermodeTun(&conf) + tunDev, tnet2, err := netstack.CreateNetTUN(conf.Interface.Addresses, conf.Interface.DNS, conf.Interface.MTU) if err != nil { return err } // Establish wireguard on userspace stack - if err := establishWireguard(l.With("gool", "inner"), &conf, tunDev, false, opts.FwMark); err != nil { + if err := establishWireguard(l.With("gool", "inner"), &conf, tunDev, false, opts.FwMark, "t0"); err != nil { return err } - // // Test wireguard connectivity - // if err := usermodeTunTest(ctx, l, tnet); err != nil { - // return err - // } + // Test wireguard connectivity + if err := usermodeTunTest(ctx, l, tnet2); err != nil { + return err + } - _, err = wiresocks.StartProxy(ctx, l, tnet, opts.Bind) + _, err = wiresocks.StartProxy(ctx, l, tnet2, opts.Bind) if err != nil { return err } @@ -392,7 +446,7 @@ func runWarpWithPsiphon(ctx context.Context, l *slog.Logger, opts WarpOptions, e for i, peer := range conf.Peers { peer.Endpoint = endpoint peer.Trick = true - peer.KeepAlive = 3 + peer.KeepAlive = 5 if opts.Reserved != "" { r, err := wiresocks.ParseReserved(opts.Reserved) @@ -405,21 +459,32 @@ func runWarpWithPsiphon(ctx context.Context, l *slog.Logger, opts WarpOptions, e conf.Peers[i] = peer } - // Create userspace tun network stack - tunDev, tnet, err := newUsermodeTun(&conf) - if err != nil { - return err - } - // Establish wireguard on userspace stack - if err := establishWireguard(l, &conf, tunDev, false, opts.FwMark); err != nil { - return err - } + var werr error + var tnet *netstack.Net + var tunDev tun.Device + for _, t := range []string{"t1", "t2"} { + // Create userspace tun network stack + tunDev, tnet, werr = netstack.CreateNetTUN(conf.Interface.Addresses, conf.Interface.DNS, conf.Interface.MTU) + if werr != nil { + continue + } - // // Test wireguard connectivity - // if err := usermodeTunTest(ctx, l, tnet); err != nil { - // return err - // } + werr = establishWireguard(l, &conf, tunDev, false, opts.FwMark, t) + if werr != nil { + continue + } + + // Test wireguard connectivity + werr = usermodeTunTest(ctx, l, tnet) + if werr != nil { + continue + } + break + } + if werr != nil { + return werr + } // Run a proxy on the userspace stack warpBind, err := wiresocks.StartProxy(ctx, l, tnet, netip.MustParseAddrPort("127.0.0.1:0")) diff --git a/app/wg.go b/app/wg.go index ce3794c0f..e6fb88352 100644 --- a/app/wg.go +++ b/app/wg.go @@ -1,12 +1,13 @@ package app import ( + "bufio" "bytes" "context" "fmt" - "io" "log/slog" "net/http" + "strings" "time" "github.com/bepass-org/warp-plus/wireguard/conn" @@ -16,19 +17,10 @@ import ( "github.com/bepass-org/warp-plus/wiresocks" ) -const connTestEndpoint = "https://www.gstatic.com/generate_204" - -func newUsermodeTun(conf *wiresocks.Configuration) (wgtun.Device, *netstack.Net, error) { - tunDev, tnet, err := netstack.CreateNetTUN(conf.Interface.Addresses, conf.Interface.DNS, conf.Interface.MTU) - if err != nil { - return nil, nil, err - } - - return tunDev, tnet, nil -} +const connTestEndpoint = "http://1.1.1.1/cdn-cgi/trace" func usermodeTunTest(ctx context.Context, l *slog.Logger, tnet *netstack.Net) error { - ctx, cancel := context.WithDeadline(ctx, time.Now().Add(30*time.Second)) + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(5*time.Second)) defer cancel() for { @@ -42,15 +34,13 @@ func usermodeTunTest(ctx context.Context, l *slog.Logger, tnet *netstack.Net) er DialContext: tnet.DialContext, ResponseHeaderTimeout: 5 * time.Second, }} - resp, err := client.Get(connTestEndpoint) + resp, err := client.Head(connTestEndpoint) if err != nil { - l.Error("connection test failed", "error", err.Error()) + l.Error("connection test failed") continue } - _, err = io.ReadAll(resp.Body) - resp.Body.Close() - if err != nil { - l.Error("connection test failed", "error", err.Error()) + if resp.StatusCode != http.StatusOK { + l.Error("connection test failed") continue } @@ -61,7 +51,49 @@ func usermodeTunTest(ctx context.Context, l *slog.Logger, tnet *netstack.Net) er return nil } -func establishWireguard(l *slog.Logger, conf *wiresocks.Configuration, tunDev wgtun.Device, bind bool, fwmark uint32) error { +func waitHandshake(ctx context.Context, l *slog.Logger, dev *device.Device) error { + lastHandshakeSecs := "0" + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + get, err := dev.IpcGet() + if err != nil { + continue + } + scanner := bufio.NewScanner(strings.NewReader(get)) + for scanner.Scan() { + line := scanner.Text() + if line == "" { + break + } + + key, value, ok := strings.Cut(line, "=") + if !ok { + continue + } + + if key == "last_handshake_time_sec" { + lastHandshakeSecs = value + break + } + } + if lastHandshakeSecs != "0" { + l.Debug("handshake complete") + break + } + + l.Debug("waiting on handshake") + time.Sleep(1 * time.Second) + } + + return nil +} + +func establishWireguard(l *slog.Logger, conf *wiresocks.Configuration, tunDev wgtun.Device, bind bool, fwmark uint32, t string) error { // create the IPC message to establish the wireguard conn var request bytes.Buffer @@ -75,7 +107,7 @@ func establishWireguard(l *slog.Logger, conf *wiresocks.Configuration, tunDev wg request.WriteString(fmt.Sprintf("persistent_keepalive_interval=%d\n", peer.KeepAlive)) request.WriteString(fmt.Sprintf("preshared_key=%s\n", peer.PreSharedKey)) request.WriteString(fmt.Sprintf("endpoint=%s\n", peer.Endpoint)) - request.WriteString(fmt.Sprintf("trick=%t\n", peer.Trick)) + request.WriteString(fmt.Sprintf("trick=%s\n", t)) request.WriteString(fmt.Sprintf("reserved=%d,%d,%d\n", peer.Reserved[0], peer.Reserved[1], peer.Reserved[2])) for _, cidr := range peer.AllowedIPs { @@ -103,5 +135,13 @@ func establishWireguard(l *slog.Logger, conf *wiresocks.Configuration, tunDev wg } } + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(15*time.Second)) + defer cancel() + if err := waitHandshake(ctx, l, dev); err != nil { + dev.BindClose() + dev.Close() + return err + } + return nil } diff --git a/wireguard/device/device.go b/wireguard/device/device.go index aec790646..9250af584 100644 --- a/wireguard/device/device.go +++ b/wireguard/device/device.go @@ -182,9 +182,7 @@ func (device *Device) upLocked() error { device.peers.RLock() for _, peer := range device.peers.keyMap { peer.Start() - if peer.persistentKeepaliveInterval.Load() > 0 { - peer.SendKeepalive() - } + peer.SendHandshakeInitiation(false) } device.peers.RUnlock() return nil diff --git a/wireguard/device/peer.go b/wireguard/device/peer.go index c3d528bbd..b86900de1 100644 --- a/wireguard/device/peer.go +++ b/wireguard/device/peer.go @@ -53,7 +53,7 @@ type Peer struct { inbound *autodrainingInboundQueue // sequential ordering of tun writing } - trick bool + trick string reserved [3]byte cookieGenerator CookieGenerator @@ -115,7 +115,7 @@ func (device *Device) NewPeer(pk NoisePublicKey) (*Peer, error) { return peer, nil } -func (peer *Peer) SendBuffers(buffers [][]byte) error { +func (peer *Peer) SendBuffers(buffers [][]byte, trick bool) error { peer.device.net.RLock() defer peer.device.net.RUnlock() @@ -135,9 +135,11 @@ func (peer *Peer) SendBuffers(buffers [][]byte) error { } peer.endpoint.Unlock() - for i := range buffers { - if len(buffers[i]) > 3 && buffers[i][0] > 0 && buffers[i][0] < 5 { - copy(buffers[i][1:4], peer.reserved[:]) + if !trick { + for i := range buffers { + if len(buffers[i]) > 3 && buffers[i][0] > 0 && buffers[i][0] < 5 { + copy(buffers[i][1:4], peer.reserved[:]) + } } } @@ -153,6 +155,7 @@ func (peer *Peer) SendBuffers(buffers [][]byte) error { } func (peer *Peer) String() string { + return "" // The awful goo that follows is identical to: // // base64Key := base64.StdEncoding.EncodeToString(peer.handshake.remoteStatic[:]) diff --git a/wireguard/device/send.go b/wireguard/device/send.go index c8d5c4bed..459adfcfa 100644 --- a/wireguard/device/send.go +++ b/wireguard/device/send.go @@ -79,29 +79,64 @@ func (elem *QueueOutboundElement) clearPointers() { elem.peer = nil } -func randomInt(min, max int) int { - nBig, err := rand.Int(rand.Reader, big.NewInt(int64(max-min+1))) +func randomInt(min, max uint64) uint64 { + rangee := max - min + if rangee < 1 { + return 0 + } + + n, err := rand.Int(rand.Reader, big.NewInt(int64(rangee))) if err != nil { panic(err) } - return int(nBig.Int64()) + min + + return min + n.Uint64() } func (peer *Peer) sendRandomPackets() { - numPackets := randomInt(8, 15) - randomPacket := make([]byte, 100) - for i := 0; i < numPackets; i++ { + var Wheader = []byte{} + switch peer.trick { + case "t1": + case "t2": + clist := []byte{0xDC, 0xDE, 0xD3, 0xD9, 0xD0, 0xEC, 0xEE, 0xE3} + Wheader = []byte{ + clist[randomInt(0, uint64(len(clist)-1))], + 0x00, 0x00, 0x00, 0x01, 0x08, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x44, 0xD0, + } + _, err := rand.Read(Wheader[6:14]) + if err != nil { + panic(err) + } + // default: + // if len(tm)%2 != 0 { + // tm = tm + "0" + // } + // decodedBytes, err := hex.DecodeString(tm) + // if err == nil { + // Wheader = decodedBytes + // } + default: + return + } + + numPackets := randomInt(15, 50) + maxpLen := uint64(len(Wheader) + 120) + randomPacket := make([]byte, maxpLen) + for i := uint64(0); i < numPackets; i++ { if peer.device.isClosed() || !peer.isRunning.Load() { return } - packetSize := randomInt(40, 100) - _, err := rand.Read(randomPacket[:packetSize]) + packetSize := randomInt(uint64(len(Wheader)+10), maxpLen) + _, err := rand.Read(randomPacket[len(Wheader):packetSize]) if err != nil { return } + copy(randomPacket[0:], Wheader) - err = peer.SendBuffers([][]byte{randomPacket[:packetSize]}) + err = peer.SendBuffers([][]byte{randomPacket[:packetSize]}, true) if err != nil { return } @@ -114,7 +149,7 @@ func (peer *Peer) sendRandomPackets() { */ func (peer *Peer) SendKeepalive() { if len(peer.queue.staged) == 0 && peer.isRunning.Load() { - if peer.trick { + if peer.trick != "" && peer.trick != "t0" { peer.device.log.Verbosef("%v - Running tricks! (keepalive)", peer) peer.sendRandomPackets() } @@ -152,7 +187,7 @@ func (peer *Peer) SendHandshakeInitiation(isRetry bool) error { return nil } - if peer.trick { + if peer.trick != "" && peer.trick != "t0" { peer.device.log.Verbosef("%v - Running tricks! (handshake)", peer) peer.sendRandomPackets() } @@ -177,7 +212,7 @@ func (peer *Peer) SendHandshakeInitiation(isRetry bool) error { peer.timersAnyAuthenticatedPacketTraversal() peer.timersAnyAuthenticatedPacketSent() - err = peer.SendBuffers([][]byte{packet}) + err = peer.SendBuffers([][]byte{packet}, false) if err != nil { peer.device.log.Errorf("%v - Failed to send handshake initiation: %v", peer, err) } @@ -216,7 +251,7 @@ func (peer *Peer) SendHandshakeResponse() error { peer.timersAnyAuthenticatedPacketSent() // TODO: allocation could be avoided - err = peer.SendBuffers([][]byte{packet}) + err = peer.SendBuffers([][]byte{packet}, false) if err != nil { peer.device.log.Errorf("%v - Failed to send handshake response: %v", peer, err) } @@ -564,7 +599,7 @@ func (peer *Peer) RoutineSequentialSender(maxBatchSize int) { peer.timersAnyAuthenticatedPacketTraversal() peer.timersAnyAuthenticatedPacketSent() - err := peer.SendBuffers(bufs) + err := peer.SendBuffers(bufs, false) if dataSent { peer.timersDataSent() } diff --git a/wireguard/device/uapi.go b/wireguard/device/uapi.go index f917b6771..1022cd968 100644 --- a/wireguard/device/uapi.go +++ b/wireguard/device/uapi.go @@ -119,7 +119,7 @@ func (device *Device) IpcGetOperation(w io.Writer) error { sendf("tx_bytes=%d", peer.txBytes.Load()) sendf("rx_bytes=%d", peer.rxBytes.Load()) sendf("persistent_keepalive_interval=%d", peer.persistentKeepaliveInterval.Load()) - sendf("trick=%t", peer.trick) + sendf("trick=%s", peer.trick) sendf("reserved=%d,%d,%d", peer.reserved[0], peer.reserved[1], peer.reserved[2]) device.allowedips.EntriesForPeer(peer, func(prefix netip.Prefix) bool { @@ -266,9 +266,7 @@ func (peer *ipcSetPeer) handlePostConfig() { } if peer.device.isUp() { peer.Start() - if peer.pkaOn { - peer.SendKeepalive() - } + peer.SendHandshakeInitiation(false) peer.SendStagedPackets() } } @@ -389,12 +387,8 @@ func (device *Device) handlePeerLine(peer *ipcSetPeer, key, value string) error } case "trick": - device.log.Verbosef("%v - UAPI: Setting trick: %s", peer.Peer, value) - parsedBool, err := strconv.ParseBool(value) - if err != nil { - return ipcErrorf(ipc.IpcErrorInvalid, "invalid trick value: %v", value) - } - peer.trick = parsedBool + device.log.Verbosef("%v - UAPI: Setting trick", peer.Peer) + peer.trick = value case "reserved": device.log.Verbosef("%v - UAPI: Setting reserved: %s", peer.Peer, value) From 742db3ccfc9699f2f7641d867820af38523b1092 Mon Sep 17 00:00:00 2001 From: Mark Pashmfouroush Date: Sat, 3 Aug 2024 10:00:56 +0100 Subject: [PATCH 5/5] proxy: remove some log lines Signed-off-by: Mark Pashmfouroush --- proxy/pkg/http/server.go | 1 - proxy/pkg/mixed/proxy.go | 1 - proxy/pkg/socks4/server.go | 1 - proxy/pkg/socks5/server.go | 1 - 4 files changed, 4 deletions(-) diff --git a/proxy/pkg/http/server.go b/proxy/pkg/http/server.go index 2c66b31d9..ba6b7b4fb 100644 --- a/proxy/pkg/http/server.go +++ b/proxy/pkg/http/server.go @@ -59,7 +59,6 @@ func (s *Server) ListenAndServe() error { } s.Bind = s.Listener.Addr().(*net.TCPAddr).String() - s.Logger.Debug("started proxy", "address", s.Bind) // ensure listener will be closed defer func() { diff --git a/proxy/pkg/mixed/proxy.go b/proxy/pkg/mixed/proxy.go index 28a9b57d3..f6a1b5d66 100644 --- a/proxy/pkg/mixed/proxy.go +++ b/proxy/pkg/mixed/proxy.go @@ -90,7 +90,6 @@ func (p *Proxy) ListenAndServe() error { } p.bind = p.listener.Addr().(*net.TCPAddr).String() - p.logger.Debug("started proxy", "address", p.bind) // ensure listener will be closed defer func() { diff --git a/proxy/pkg/socks4/server.go b/proxy/pkg/socks4/server.go index 74cca18e0..3e9801833 100644 --- a/proxy/pkg/socks4/server.go +++ b/proxy/pkg/socks4/server.go @@ -57,7 +57,6 @@ func (s *Server) ListenAndServe() error { } s.Bind = s.Listener.Addr().(*net.TCPAddr).String() - s.Logger.Debug("started proxy", "address", s.Bind) // ensure listener will be closed defer func() { diff --git a/proxy/pkg/socks5/server.go b/proxy/pkg/socks5/server.go index be63b26cc..757f16c3a 100644 --- a/proxy/pkg/socks5/server.go +++ b/proxy/pkg/socks5/server.go @@ -68,7 +68,6 @@ func (s *Server) ListenAndServe() error { } s.Bind = s.Listener.Addr().(*net.TCPAddr).String() - s.Logger.Debug("started proxy", "address", s.Bind) // ensure listener will be closed defer func() {