From cdeb7e958de480012c820bd9bcf5440eadb73a24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=A5=E5=B0=98=EF=BC=88=E6=A8=B1=E3=81=AE=E6=B3=AA?= =?UTF-8?q?=EF=BC=89?= Date: Wed, 6 Feb 2019 11:58:11 +0800 Subject: [PATCH] update: project struct change --- cmd/server/server.go | 15 ++-- cmd/test/test.go | 44 +++------- db/db.go | 3 +- proxy/proxy.go | 160 ------------------------------------ proxy/proxy_test.go | 42 ---------- proxy/server/shadowsocks.go | 13 ++- 6 files changed, 29 insertions(+), 248 deletions(-) delete mode 100644 proxy/proxy.go delete mode 100644 proxy/proxy_test.go diff --git a/cmd/server/server.go b/cmd/server/server.go index f98fc64..53f79ac 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -6,7 +6,6 @@ import ( "os" "os/signal" "runtime" - "syscall" "time" "github.com/rc452860/vnet/common/config" @@ -33,10 +32,16 @@ func main() { BareStarted() } - c := make(chan os.Signal) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) - <-c - cancle() + c := make(chan os.Signal, 1) + signal.Notify(c) + for { + data := <-c + log.Error("recive signal: %s", data.String()) + if data == os.Interrupt { + cancle() + return + } + } } func BareStarted() { diff --git a/cmd/test/test.go b/cmd/test/test.go index 29dbc48..bfa72b1 100644 --- a/cmd/test/test.go +++ b/cmd/test/test.go @@ -2,40 +2,20 @@ package main import ( "fmt" - - "github.com/miekg/dns" + "os" + "os/signal" + "syscall" ) func main() { - c := new(dns.Client) - // c.DialTimeout = time.Millisecond * 5000 - // c.ReadTimeout = time.Millisecond * 5000 - // c.WriteTimeout = time.Millisecond * 5000 - m := new(dns.Msg) - m.SetQuestion(dns.Fqdn("baidu.com"), dns.TypeAAAA) - r, _, err := c.Exchange(m, "[2001:4860:4860::8844]:53") - if err != nil { - fmt.Print(err) + c := make(chan os.Signal) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + for { + data := <-c + fmt.Println(data.String()) + if data == os.Interrupt { + return + } } - fmt.Println(r.Answer) - // r, _, err := c.Exchange(m, "114.114.114.114:53") - // if err != nil { - // fmt.Print("a" + err.Error()) - // } - // fmt.Println(r.Answer) - // var l sync.Mutex - // c := sync.NewCond(&l) - // f := func() { - // c.L.Lock() - // defer c.L.Unlock() - // c.Wait() - // fmt.Println("aaa") - // } - // go f() - // go f() - // time.Sleep(3 * time.Second) - // c.L.Lock() - // c.Broadcast() - // c.L.Unlock() - // time.Sleep(3 * time.Second) + } diff --git a/db/db.go b/db/db.go index c18561e..ffb4249 100644 --- a/db/db.go +++ b/db/db.go @@ -11,7 +11,6 @@ import ( "github.com/rc452860/vnet/utils/addr" - "github.com/rc452860/vnet/proxy" "github.com/rc452860/vnet/record" "github.com/AlecAivazis/survey" @@ -365,7 +364,7 @@ func formatLoad() string { // DBTrafficMonitor is should using goroutine start func DBTrafficMonitor(ctx context.Context) { traffic := make(chan record.Traffic, 32) - proxy.RegisterTrafficHandle(traffic) + server.RegisterTrafficHandle(traffic) var data record.Traffic // count traffic for { diff --git a/proxy/proxy.go b/proxy/proxy.go deleted file mode 100644 index ae78f14..0000000 --- a/proxy/proxy.go +++ /dev/null @@ -1,160 +0,0 @@ -package proxy - -import ( - "context" - "fmt" - "net" - "strings" - "time" - - "github.com/rc452860/vnet/utils/addr" - - "github.com/rc452860/vnet/common/cache" - "github.com/rc452860/vnet/common/eventbus" - - "github.com/rc452860/vnet/record" - - "github.com/rc452860/vnet/common/log" -) - -type IProxyService interface { - Start() error - Stop() error -} - -type ProxyService struct { - context.Context `json:"-"` - TCP net.Listener `json:"tcp"` - UDP net.PacketConn `json:"udp"` - LastOneMinuteConnections *cache.Cache `json:"-"` - UpSpeed uint64 `json:"upspeed"` - DownSpeed uint64 `json:"downspeed"` - UpBytes uint64 `json:"upbytes"` - DownBytes uint64 `json:"downbytes"` - MessageRoute chan interface{} `json:"-"` - Status string `json:"status"` - Cancel context.CancelFunc `json:"-"` - Tick time.Duration `json:"-"` -} - -func NewProxyService() *ProxyService { - return NewProxyServiceWithTick(time.Second) -} - -func NewProxyServiceWithTick(duration time.Duration) *ProxyService { - ctx, cancel := context.WithCancel(context.Background()) - return &ProxyService{ - MessageRoute: make(chan interface{}, 32), - Status: "stop", - Context: ctx, - Cancel: cancel, - Tick: duration, - LastOneMinuteConnections: cache.New(duration), - } -} - -var trafficMonitorQueue []chan record.Traffic - -// RegisterTrafficHandle TrafficMessage channel -func RegisterTrafficHandle(trafficMonitor chan record.Traffic) { - trafficMonitorQueue = append(trafficMonitorQueue, trafficMonitor) -} - -func (this *ProxyService) TrafficMeasure() { - go this.speed() - go this.route() - <-this.Done() - log.Info("close traffic measure") -} - -func (this *ProxyService) route() { - for { - var data interface{} - select { - case <-this.Done(): - log.Info("close countClose") - return - case data = <-this.MessageRoute: - } - switch data.(type) { - case record.Traffic: - this.traffic(data.(record.Traffic)) - case record.ConnectionProxyRequest: - this.proxyRequest(data.(record.ConnectionProxyRequest)) - } - } -} - -// traffic handle traffic message -func (this *ProxyService) traffic(data record.Traffic) { - eventbus.GetEventBus().Publish("record:traffic", data) - this.UpBytes += data.Up - this.DownBytes += data.Down - if trafficMonitorQueue != nil && len(trafficMonitorQueue) > 0 { - for _, item := range trafficMonitorQueue { - item <- data - } - } -} - -// proxyRequest handle proxy request message -func (this *ProxyService) proxyRequest(data record.ConnectionProxyRequest) { - eventbus.GetEventBus().Publish("record:proxyRequest", data) - key := addr.GetIPFromAddr(data.ClientAddr) - if this.LastOneMinuteConnections.Get(key) == nil { - this.LastOneMinuteConnections.Put(key, []record.ConnectionProxyRequest{data}, this.Tick) - } else { - last := this.LastOneMinuteConnections.Get(key).([]record.ConnectionProxyRequest) - this.LastOneMinuteConnections.Put(key, append(last, data), this.Tick) - } - - // just print tcp log - if strings.Contains("tcp", data.ClientAddr.Network()) { - log.Info("%v:%s %s <-------> %s", addr.GetPortFromAddr(data.ProxyAddr), - addr.GetNetworkFromAddr(data.ProxyAddr), - data.ClientAddr.String(), - fmt.Sprintf("%s:%v", data.GetAddress(), data.GetPort())) - } - -} - -// speed is traffic statis -func (this *ProxyService) speed() { - var upTmp, downTmp uint64 = this.UpBytes, this.DownBytes - tick := time.Tick(this.Tick) - for { - this.UpSpeed, upTmp = this.UpBytes-upTmp, this.UpBytes - this.DownSpeed, downTmp = this.DownBytes-downTmp, this.DownBytes - select { - case <-tick: - continue - case <-this.Done(): - return - } - } -} - -func (this *ProxyService) Start() error { - go this.TrafficMeasure() - this.Status = "run" - return nil -} - -func (this *ProxyService) Stop() error { - log.Info("proxy stop") - this.Cancel() - if this.TCP != nil { - err := this.TCP.Close() - if err != nil { - log.Err(err) - } - } - if this.UDP != nil { - err := this.UDP.Close() - if err != nil { - log.Err(err) - } - } - this.Status = "stop" - return nil -} diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go deleted file mode 100644 index d2e8188..0000000 --- a/proxy/proxy_test.go +++ /dev/null @@ -1,42 +0,0 @@ -package proxy - -import ( - "fmt" - "net" - "testing" - "time" - - "github.com/rc452860/vnet/record" -) - -func Test_LastOneMinute(t *testing.T) { - proxy := NewProxyServiceWithTick(50 * time.Millisecond) - proxy.Start() - proxyAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:8000") - targetAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:8081") - - for i := 0; i < 10; i++ { - client, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("1.1.1.1:%v", i+1000)) - proxy.MessageRoute <- record.ConnectionProxyRequest{ - ClientAddr: client, - ProxyAddr: proxyAddr, - TargetAddr: targetAddr, - TargetDomain: "baidu.com", - } - time.Sleep(10 * time.Millisecond) - } - - var r int - proxy.LastOneMinuteConnections.Range(func(key, value interface{}) { - if result, ok := value.([]record.ConnectionProxyRequest); ok { - // t.Log(key) - for _, item := range result { - t.Log(item.ClientAddr.String()) - } - r++ - } - }) - if r > 10 { - t.FailNow() - } -} diff --git a/proxy/server/shadowsocks.go b/proxy/server/shadowsocks.go index d0b41fc..9ff8315 100644 --- a/proxy/server/shadowsocks.go +++ b/proxy/server/shadowsocks.go @@ -17,7 +17,6 @@ import ( "github.com/rc452860/vnet/common/pool" "github.com/rc452860/vnet/network/ciphers" "github.com/rc452860/vnet/network/conn" - "github.com/rc452860/vnet/proxy" "github.com/rc452860/vnet/record" "github.com/rc452860/vnet/socks" "golang.org/x/time/rate" @@ -44,11 +43,11 @@ func init() { // ShadowsocksProxy is respect shadowsocks proxy server // it have Start and Stop method to control proxy type ShadowsocksProxy struct { - *proxy.ProxyService `json:"-,omitempty"` - Host string `json:"host,omitempty"` - Port int `json:"port,omitempty"` - Method string `json:"method,omitempty"` - Password string `json:"password,omitempty"` + *ProxyService `json:"-,omitempty"` + Host string `json:"host,omitempty"` + Port int `json:"port,omitempty"` + Method string `json:"method,omitempty"` + Password string `json:"password,omitempty"` ShadowsocksArgs ReadLimiter *rate.Limiter `json:"read_limit,omitempty"` WriteLimiter *rate.Limiter `json:"write_limit,omitempty"` @@ -65,7 +64,7 @@ type ShadowsocksArgs struct { // NewShadowsocks is new ShadowsocksProxy object func NewShadowsocks(host string, method string, password string, port int, ssarg ShadowsocksArgs) (*ShadowsocksProxy, error) { ss := &ShadowsocksProxy{ - ProxyService: proxy.NewProxyService(), + ProxyService: NewProxyService(), Host: host, Method: method, Password: password,