Skip to content

Commit

Permalink
fix: goroutine exit by panic
Browse files Browse the repository at this point in the history
  • Loading branch information
若尘(樱の泪) committed Feb 23, 2019
1 parent 7d62c84 commit 04af0eb
Show file tree
Hide file tree
Showing 10 changed files with 341 additions and 91 deletions.
95 changes: 77 additions & 18 deletions cmd/test/test.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,85 @@
package main

import (
"fmt"
"bufio"
"bytes"
"io"
"net"
"net/http"
"time"

"github.com/rc452860/vnet/proxy/client"

"github.com/rc452860/vnet/common/config"
"github.com/rc452860/vnet/common/log"
"github.com/rc452860/vnet/common/pool"
"github.com/rc452860/vnet/network/conn"
"github.com/rc452860/vnet/proxy/server"
thttp "github.com/rc452860/vnet/testing/servers/http"
"github.com/rc452860/vnet/utils/datasize"
)

func main() {
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("error %v", e)
}
}()
go func() {
defer func() {
if e := recover(); e != nil {
fmt.Printf("error %v \n", e)
}
}()
panic("this is error")
}()
}()
time.Sleep(1 * time.Second)
fmt.Println("aaa")
config.LoadConfig("config.json")
log.GetLogger("root").Level = log.INFO
proxy, err := server.NewShadowsocks("0.0.0.0", "aes-128-gcm", "killer", 1090, server.ShadowsocksArgs{
Limit: 4 * 1024 * 1024,
ConnectTimeout: 0,
})
proxy.Start()
if err != nil {
log.Err(err)
return
}
time.Sleep(time.Second)
thttp.StartFakeFileServer()
s, c := net.Pipe()
client := client.NewShadowsocksClient("127.0.0.1", "aes-128-gcm", "killer", 1090)
cs, err := conn.NewDefaultConn(c, "pipe")
go client.TcpProxy(cs, "localhost", 8080)
var httpRequest bytes.Buffer
httpRequest.WriteString("GET /download?size=4MB HTTP/1.1\n")
httpRequest.WriteString("Host: localhost:8080\n")
httpRequest.WriteString("Connection: keep-alive\n")
// httpRequest.WriteString("Connection: close\n")
httpRequest.WriteString("Pragma: no-cache\n")
httpRequest.WriteString("Cache-Control: no-cache\n")
httpRequest.WriteString("Upgrade-Insecure-Requests: 1\n")
httpRequest.WriteString("User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.80 Safari/537.36\n")
httpRequest.WriteString("DNT: 1\n\n")
data := httpRequest.Bytes()
s.Write(data)
var count int64 = 0
buf := pool.GetBuf()
defer pool.PutBuf(buf)
size, _ := datasize.Parse("4MB")
// var buff bytes.Buffer
log.Info("%v", size)
request, err := http.ReadRequest(bufio.NewReader(bufio.NewReader(&httpRequest)))
response, err := http.ReadResponse(bufio.NewReader(s), request)
log.Info("content lenght:%v", response.ContentLength)
if err != nil {
log.Err(err)
return
}
for {
n, err := response.Body.Read(buf)
// buff.Write(buf)
count = count + int64(n)
if err != nil && err != io.EOF {
log.Err(err)
break
}
if count == response.ContentLength {
break
}
}
log.Info("count %v", count)
log.Info("upload %v", proxy.UpBytes)
// log.Info("%s", buff.Bytes()[:255])
upspeed, _ := datasize.HumanSize(uint64(proxy.UpSpeed))
downspeed, _ := datasize.HumanSize(uint64(proxy.DownSpeed))
up, _ := datasize.HumanSize(proxy.UpBytes)
down, _ := datasize.HumanSize(proxy.DownBytes)
log.Info("upspeed:%s - downspeed:%s | up:%s - down:%s", upspeed, downspeed, up, down)
}
32 changes: 9 additions & 23 deletions cmd/testClient/client.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,16 @@
package main

import (
"crypto/rand"
"io"
"net"
_ "net/http/pprof"
"os"
"os/signal"

thttp "github.com/rc452860/vnet/testing/servers/http"
)

func main() {
listen, err := net.ListenPacket("udp", "0.0.0.0:8082")
if err != nil {
panic(err)
}
dstAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:8081")
if err != nil {
panic(err)
}
tmp := make([]byte, 4096)
if _, err = io.ReadFull(rand.Reader, tmp); err != nil {
panic(err)
}

for i := 0; i < 100000; i++ {
_, err := listen.WriteTo(tmp, dstAddr)
if err != nil {
panic(err)
}
}

thttp.StartFakeFileServer()
ch := make(chan os.Signal)
signal.Notify(ch, os.Kill, os.Interrupt)
<-ch
}
132 changes: 115 additions & 17 deletions cmd/testServer/sever.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,131 @@
package main

import (
"fmt"
"bufio"
"bytes"
"io"
"net"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"syscall"
"sync"
"time"

"github.com/rc452860/vnet/common/config"
"github.com/rc452860/vnet/common/log"
"github.com/rc452860/vnet/common/pool"
"github.com/rc452860/vnet/network/conn"
"github.com/rc452860/vnet/proxy/client"
"github.com/rc452860/vnet/proxy/server"
"github.com/rc452860/vnet/service"
thttp "github.com/rc452860/vnet/testing/servers/http"
)

const number = 10

func main() {
listen, err := net.ListenPacket("udp", "0.0.0.0:8081")
if err != nil {
panic(err)
}
config.LoadConfig("config.json")
// start pprof
go func() {
http.ListenAndServe("0.0.0.0:6060", nil)
}()

rage := time.Tick(100 * time.Millisecond)
buf := make([]byte, 4096)
for {
<-rage
_, _, err := listen.ReadFrom(buf)
fmt.Printf("%v\n", buf[0:10])
for i := 10000; i < 10000+number; i++ {
service.CurrentShadowsocksService().Add("0.0.0.0", "aes-128-gcm", "killer", i, server.ShadowsocksArgs{
ConnectTimeout: 3000,
Limit: 0,
TCPSwitch: "",
UDPSwitch: "",
})
err := service.CurrentShadowsocksService().Start(i)
if err != nil {
fmt.Print(err.Error())
continue
log.Err(err)
}
}
log.Info("all service is started")

Testing()
time.Sleep(10 * time.Second)
log.Info("=====================================================================")
ch := make(chan os.Signal, 2)

signal.Notify(ch, os.Interrupt, os.Kill)
<-ch
gw := new(sync.WaitGroup)
for i := 10000; i < 10000+number; i++ {
gw.Add(1)
go func(index int) {
gw.Done()
service.CurrentShadowsocksService().Stop(index)
}(i)
}
gw.Wait()
time.Sleep(3 * time.Second)
log.Info("all service stoped")
<-ch
log.Info("bybe~")
}

func Testing() {
log.GetLogger("root").Level = log.INFO
thttp.StartFakeFileServer()
wg := new(sync.WaitGroup)
for i := 10000; i < 20000; i++ {
wg.Add(1)
go func() {
TestingClient(int(10000 + 1))
wg.Done()
}()
}
wg.Wait()
// runtime.GC()
}

func TestingClient(i int) {
s, c := net.Pipe()
defer s.Close()
defer c.Close()
client := client.NewShadowsocksClient("127.0.0.1", "aes-128-gcm", "killer", i)
cs, err := conn.NewDefaultConn(c, "pipe")
go client.TcpProxy(cs, "127.0.0.1", 8080)
var httpRequest bytes.Buffer
httpRequest.WriteString("GET /download?size=4KB HTTP/1.1\n")
httpRequest.WriteString("Host: localhost:8080\n")
httpRequest.WriteString("Connection: keep-alive\n")
// httpRequest.WriteString("Connection: close\n")
httpRequest.WriteString("Pragma: no-cache\n")
httpRequest.WriteString("Cache-Control: no-cache\n")
httpRequest.WriteString("Upgrade-Insecure-Requests: 1\n")
httpRequest.WriteString("User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.80 Safari/537.36\n")
httpRequest.WriteString("DNT: 1\n\n")
data := httpRequest.Bytes()
s.SetWriteDeadline(time.Now().Add(3 * time.Second))
_, err = s.Write(data)
if err != nil {
return
}
var count int64 = 0
buf := pool.GetBuf()
defer pool.PutBuf(buf)
request, err := http.ReadRequest(bufio.NewReader(bufio.NewReader(&httpRequest)))
response, err := http.ReadResponse(bufio.NewReader(s), request)
log.Info("content lenght:%v", response.ContentLength)
if err != nil {
log.Err(err)
return
}
for {
n, err := response.Body.Read(buf)
// buff.Write(buf)
count = count + int64(n)
if err != nil && err != io.EOF {
log.Err(err)
break
}
if count == response.ContentLength {
break
}
}
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-c
log.Info("done %v", i)
}
Binary file added mem.profile
Binary file not shown.
13 changes: 9 additions & 4 deletions proxy/server/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net"
"strings"
"sync"
"time"

"github.com/rc452860/vnet/utils/addr"
Expand Down Expand Up @@ -35,6 +36,8 @@ type ProxyService struct {
Status string `json:"status"`
Cancel context.CancelFunc `json:"-"`
Tick time.Duration `json:"-"`
TCPLock *sync.Mutex
UDPLock *sync.Mutex
}

func NewProxyService() *ProxyService {
Expand Down Expand Up @@ -105,11 +108,12 @@ func (this *ProxyService) traffic(data record.Traffic) {
func (this *ProxyService) proxyRequest(data record.ConnectionProxyRequest) {
eventbus.GetEventBus().Publish("record:proxyRequest", data)
key := addr.GetIPFromAddr(data.ClientAddr)
if this.LastOneMinuteConnections.Get(key) == nil {
last := this.LastOneMinuteConnections.Get(key)
if last == nil {
this.LastOneMinuteConnections.Put(key, []record.ConnectionProxyRequest{data}, this.Tick)
} else {
last := this.LastOneMinuteConnections.Get(key).([]record.ConnectionProxyRequest)
this.LastOneMinuteConnections.Put(key, append(last, data), this.Tick)
swap := last.([]record.ConnectionProxyRequest)
this.LastOneMinuteConnections.Put(key, append(swap, data), this.Tick)
}

// just print tcp log
Expand Down Expand Up @@ -146,7 +150,7 @@ func (this *ProxyService) Start() error {
}

func (this *ProxyService) Stop() error {
log.Info("proxy stop")
start := time.Now()
this.Cancel()
if this.TCP != nil {
err := this.TCP.Close()
Expand All @@ -161,5 +165,6 @@ func (this *ProxyService) Stop() error {
}
}
this.Status = "stop"
log.Info("proxy stop consume %v", time.Since(start).Seconds())
return nil
}
Loading

0 comments on commit 04af0eb

Please sign in to comment.