diff --git a/.gitignore b/.gitignore index 84de49b..e5dddef 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ bin/ server server.exe -config.json \ No newline at end of file +config.json +*.test.exe \ No newline at end of file diff --git a/ciphers/ciphers_test.go b/ciphers/ciphers_test.go index b0058e2..d6abaf2 100644 --- a/ciphers/ciphers_test.go +++ b/ciphers/ciphers_test.go @@ -9,7 +9,7 @@ import ( "testing" "time" - "github.com/rc452860/vnet/log" + "github.com/rc452860/vnet/comm/log" "github.com/rc452860/vnet/utils/datasize" ) diff --git a/ciphers/ssaead/cipher_conn.go b/ciphers/ssaead/cipher_conn.go index df1f9a6..85337cd 100644 --- a/ciphers/ssaead/cipher_conn.go +++ b/ciphers/ssaead/cipher_conn.go @@ -11,7 +11,7 @@ import ( "crypto/md5" "crypto/rand" - "github.com/rc452860/vnet/log" + "github.com/rc452860/vnet/comm/log" "github.com/rc452860/vnet/pool" "golang.org/x/crypto/hkdf" ) diff --git a/ciphers/ssstream/cipher_conn.go b/ciphers/ssstream/cipher_conn.go index 75f5f96..9de0a47 100644 --- a/ciphers/ssstream/cipher_conn.go +++ b/ciphers/ssstream/cipher_conn.go @@ -7,7 +7,7 @@ import ( "io" connect "github.com/rc452860/vnet/conn" - "github.com/rc452860/vnet/log" + "github.com/rc452860/vnet/comm/log" "github.com/rc452860/vnet/pool" ) diff --git a/cmd/server/server.go b/cmd/server/server.go index 1c81da1..4b85f3a 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -11,7 +11,7 @@ import ( "github.com/rc452860/vnet/config" "github.com/rc452860/vnet/db" - "github.com/rc452860/vnet/log" + "github.com/rc452860/vnet/comm/log" "github.com/rc452860/vnet/proxy/server" "github.com/rc452860/vnet/utils/datasize" ) diff --git a/cmd/test/test.go b/cmd/test/test.go index 57636bf..16a74c5 100644 --- a/cmd/test/test.go +++ b/cmd/test/test.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "net" ) type Test struct { @@ -9,6 +10,6 @@ type Test struct { } func main() { - a := Test{} - fmt.Print(a.(type)) + listen, _ := net.Listen("tcp", "0.0.0.0:8080") + fmt.Print(listen.Addr().String()) } diff --git a/comm/array/time_array.go b/comm/array/time_array.go new file mode 100644 index 0000000..a2ee489 --- /dev/null +++ b/comm/array/time_array.go @@ -0,0 +1,90 @@ +package array + +import ( + "context" + "reflect" + "sync" + "time" +) + +type TimeArrayElement struct { + Time time.Time + Element interface{} +} + +type TimeArray struct { + array []TimeArrayElement + Size int + expire time.Duration + context context.Context + cancel context.CancelFunc + AutoClear bool + sync.RWMutex +} + +func NewTimeArray(expire time.Duration, autoClear bool) TimeArray { + result := TimeArray{ + expire: expire, + AutoClear: autoClear, + } + if autoClear { + ctx, cancel := context.WithCancel(context.Background()) + result.context = ctx + result.cancel = cancel + go result.autoClear() + } + return result +} + +func (t *TimeArray) Close() { + if t.AutoClear && t.cancel != nil { + t.cancel() + } +} + +func (t *TimeArray) autoClear() { + tick := time.Tick(t.expire) + for { + select { + case <-t.context.Done(): + return + case <-tick: + } + t.Clear() + } +} + +func (t *TimeArray) Add(data interface{}) { + t.Lock() + defer t.Unlock() + + t.array = append(t.array, TimeArrayElement{ + Time: time.Now(), + Element: data, + }) + t.Size++ +} + +func (t *TimeArray) Remove(data interface{}) { + for i, item := range t.array { + if reflect.DeepEqual(data, item) { + t.array = append(t.array[:i], t.array[i+1:]...) + t.Size-- + } + } +} + +func (t *TimeArray) Clear() { + now := time.Now() + for i, item := range t.array { + if now.Sub(item.Time) > t.expire { + t.array = append(t.array[:i], t.array[i+1:]...) + t.Size-- + } + } +} +func (t *TimeArray) Range(callback func(i int, key interface{})) { + for i, item := range t.array { + callback(i, item.Element) + } +} diff --git a/comm/array/time_array_test.go b/comm/array/time_array_test.go new file mode 100644 index 0000000..b0446c6 --- /dev/null +++ b/comm/array/time_array_test.go @@ -0,0 +1,43 @@ +package array + +import ( + "testing" + "time" +) + +func Benchmark_Time_Array(t *testing.B) { + timeArray := NewTimeArray(time.Second*2, true) + for i := 0; i < t.N; i++ { + timeArray.Add("123") + } +} + +func Benchmark_Time_Array_Remove(t *testing.B) { + timeArray := NewTimeArray(time.Second*2, true) + for i := 0; i < t.N; i++ { + timeArray.Add("123") + } + t.ResetTimer() + for i := 0; i < t.N; i++ { + timeArray.Remove("123") + } + t.ReportAllocs() +} + +func Test_Time_Array(t *testing.T) { + timeArray := NewTimeArray(time.Second*2, true) + tick := time.Tick(time.Second) + index := 0 + for { + <-tick + index++ + if index > 10 { + break + } + + timeArray.Add("aaa") + } + timeArray.Range(func(i int, v interface{}) { + t.Log(v.(string)) + }) +} diff --git a/comm/cache/cache.go b/comm/cache/cache.go new file mode 100644 index 0000000..bc41da7 --- /dev/null +++ b/comm/cache/cache.go @@ -0,0 +1,111 @@ +package cache + +import ( + "runtime" + "sync" + "time" +) + +// Cache store element with a expired time +type Cache struct { + *cache +} + +type cache struct { + mapping sync.Map + janitor *janitor +} + +type element struct { + Expired time.Time + Payload interface{} +} + +// Put element in Cache with its ttl +func (c *cache) Put(key interface{}, payload interface{}, ttl time.Duration) { + c.mapping.Store(key, &element{ + Payload: payload, + Expired: time.Now().Add(ttl), + }) +} + +// Get element in Cache, and drop when it expired +func (c *cache) Get(key interface{}) interface{} { + item, exist := c.mapping.Load(key) + if !exist { + return nil + } + elm := item.(*element) + // expired + if time.Since(elm.Expired) > 0 { + c.mapping.Delete(key) + return nil + } + return elm.Payload +} + +func (c *cache) Range(callback func(key, value interface{})) { + c.mapping.Range(func(k, v interface{}) bool { + elm := v.(*element) + if time.Since(elm.Expired) > 0 { + c.mapping.Delete(k) + } else { + callback(k, elm.Payload) + } + return true + }) +} + +func (c *cache) cleanup() { + c.mapping.Range(func(k, v interface{}) bool { + // key := k.(string) + elm := v.(*element) + if time.Since(elm.Expired) > 0 { + c.mapping.Delete(k) + } + return true + }) +} + +func (c *cache) Size() int { + var result int + c.Range(func(k, v interface{}) { + result++ + }) + return result +} + +type janitor struct { + interval time.Duration + stop chan struct{} +} + +func (j *janitor) process(c *cache) { + ticker := time.NewTicker(j.interval) + for { + select { + case <-ticker.C: + c.cleanup() + case <-j.stop: + ticker.Stop() + return + } + } +} + +func stopJanitor(c *Cache) { + c.janitor.stop <- struct{}{} +} + +// New return *Cache +func New(interval time.Duration) *Cache { + j := &janitor{ + interval: interval, + stop: make(chan struct{}), + } + c := &cache{janitor: j} + go j.process(c) + C := &Cache{c} + runtime.SetFinalizer(C, stopJanitor) + return C +} diff --git a/comm/cache/cache_test.go b/comm/cache/cache_test.go new file mode 100644 index 0000000..101ca86 --- /dev/null +++ b/comm/cache/cache_test.go @@ -0,0 +1,70 @@ +package cache + +import ( + "runtime" + "testing" + "time" +) + +func TestCache_Basic(t *testing.T) { + interval := 200 * time.Millisecond + ttl := 20 * time.Millisecond + c := New(interval) + c.Put("int", 1, ttl) + c.Put("string", "a", ttl) + + i := c.Get("int") + if i.(int) != 1 { + t.Error("should recv 1") + } + + s := c.Get("string") + if s.(string) != "a" { + t.Error("should recv 'a'") + } +} + +func TestCache_TTL(t *testing.T) { + interval := 200 * time.Millisecond + ttl := 20 * time.Millisecond + c := New(interval) + c.Put("int", 1, ttl) + + i := c.Get("int") + if i.(int) != 1 { + t.Error("should recv 1") + } + + time.Sleep(ttl * 2) + i = c.Get("int") + if i != nil { + t.Error("should recv nil") + } +} + +func TestCache_AutoCleanup(t *testing.T) { + interval := 10 * time.Millisecond + ttl := 15 * time.Millisecond + c := New(interval) + c.Put("int", 1, ttl) + + time.Sleep(ttl * 2) + i := c.Get("int") + if i != nil { + t.Error("should recv nil") + } +} + +func TestCache_AutoGC(t *testing.T) { + sign := make(chan struct{}) + go func() { + interval := 10 * time.Millisecond + ttl := 15 * time.Millisecond + c := New(interval) + c.Put("int", 1, ttl) + sign <- struct{}{} + }() + + <-sign + runtime.GC() +} diff --git a/comm/eventbus/eventbus.go b/comm/eventbus/eventbus.go index 2187d05..6bc4ef3 100644 --- a/comm/eventbus/eventbus.go +++ b/comm/eventbus/eventbus.go @@ -2,16 +2,13 @@ package eventbus import ( evbus "github.com/asaskevich/EventBus" - "github.com/rc452860/vnet/utils" ) var eventBus evbus.Bus +func init() { + eventBus = evbus.New() +} func GetEventBus() evbus.Bus { - utils.Lock("eventbus:eventBus") - defer utils.UnLock("eventbus:eventBus") - if eventBus != nil { - eventBus = evbus.New() - } return eventBus } diff --git a/comm/eventbus/eventbus_test.go b/comm/eventbus/eventbus_test.go new file mode 100644 index 0000000..3088002 --- /dev/null +++ b/comm/eventbus/eventbus_test.go @@ -0,0 +1,51 @@ +package eventbus + +import ( + "testing" + + "github.com/asaskevich/EventBus" +) + +func Benchmark_eventbus(t *testing.B) { + evbus := EventBus.New() + evbus.SubscribeAsync("abc", func(arg string) { + // t.Log(arg) + }, false) + + go func() { + for i := 0; i < t.N; i++ { + evbus.Publish("abc", "aa") + } + }() + go func() { + for i := 0; i < t.N; i++ { + evbus.Publish("abc", "aa") + } + }() + for i := 0; i < t.N; i++ { + evbus.Publish("abc", "aa") + } + +} + +func Benchmark_Channel(t *testing.B) { + ch := make(chan string, 128) + go func() { + for { + <-ch + } + }() + go func() { + for i := 0; i < t.N; i++ { + ch <- "aa" + } + }() + go func() { + for i := 0; i < t.N; i++ { + ch <- "aa" + } + }() + for i := 0; i < t.N; i++ { + ch <- "aa" + } +} diff --git a/log/file_rotation_write.go.ignore b/comm/log/file_rotation_write.go.ignore similarity index 100% rename from log/file_rotation_write.go.ignore rename to comm/log/file_rotation_write.go.ignore diff --git a/log/file_test.go b/comm/log/file_test.go similarity index 100% rename from log/file_test.go rename to comm/log/file_test.go diff --git a/log/file_writer.go b/comm/log/file_writer.go similarity index 64% rename from log/file_writer.go rename to comm/log/file_writer.go index 50383ea..7301b37 100644 --- a/log/file_writer.go +++ b/comm/log/file_writer.go @@ -2,8 +2,6 @@ package log import ( "os" - - "github.com/rc452860/vnet/utils" ) type LogFileWriter struct { @@ -12,7 +10,7 @@ type LogFileWriter struct { } func LogFileWriterFactory(name string) *LogFileWriter { - file, _ := utils.OpenFile(name) + file, _ := OpenFile(name) log := &LogFileWriter{ FileName: name, File: file, @@ -23,3 +21,8 @@ func LogFileWriterFactory(name string) *LogFileWriter { func (this *LogFileWriter) Write(message string) { this.File.WriteString(message) } + +// remember after used need close file +func OpenFile(file string) (*os.File, error) { + return os.OpenFile(file, os.O_APPEND|os.O_CREATE, 0644) +} diff --git a/log/logging.go b/comm/log/logging.go similarity index 100% rename from log/logging.go rename to comm/log/logging.go diff --git a/log/logging_test.go b/comm/log/logging_test.go similarity index 100% rename from log/logging_test.go rename to comm/log/logging_test.go diff --git a/log/pattern_formatter.go b/comm/log/pattern_formatter.go similarity index 100% rename from log/pattern_formatter.go rename to comm/log/pattern_formatter.go diff --git a/log/runtime.go b/comm/log/runtime.go similarity index 100% rename from log/runtime.go rename to comm/log/runtime.go diff --git a/log/runtime_test.go b/comm/log/runtime_test.go similarity index 100% rename from log/runtime_test.go rename to comm/log/runtime_test.go diff --git a/log/terminal_writer.go b/comm/log/terminal_writer.go similarity index 100% rename from log/terminal_writer.go rename to comm/log/terminal_writer.go diff --git a/config/config.go b/config/config.go index 8df01bb..e9a2f11 100644 --- a/config/config.go +++ b/config/config.go @@ -5,9 +5,8 @@ import ( "fmt" "io/ioutil" "path/filepath" - "time" - "github.com/rc452860/vnet/log" + "github.com/rc452860/vnet/comm/log" "github.com/rc452860/vnet/utils" ) @@ -25,19 +24,20 @@ type Config struct { // DbConfig is global database config type DbConfig struct { - Host string `json:"host"` - User string `json:"user"` - Passwd string `json:"passwd"` - Port string `json:"port"` - Database string `json:"database"` - Rate float32 `json:"rate"` - NodeId int `json:"node_id` - SyncTime time.Duration `json:'sync_time'` + Host string `json:"host"` + User string `json:"user"` + Passwd string `json:"passwd"` + Port string `json:"port"` + Database string `json:"database"` + Rate float32 `json:"rate"` + NodeId int `json:"node_id` + SyncTime int `json:'sync_time'` + OnlineSyncTime int `json:'online_sync_time'` } type ShadowsocksOptions struct { - TcpTimeout time.Duration `json:"tcp_timeout"` - UdpTimeout time.Duration `json:"udp_timeout"` + TCPTimeout int `json:"tcp_timeout"` + UDPTimeout int `json:"udp_timeout"` } func CurrentConfig() *Config { @@ -67,10 +67,8 @@ func LoadConfig(file string) (*Config, error) { } configFile = file config = &Config{ - Mode: "db", - DbConfig: DbConfig{ - SyncTime: 3 * time.Second, - }, + Mode: "db", + DbConfig: DbConfig{}, } data, _ := json.MarshalIndent(config, "", " ") ioutil.WriteFile(configFile, data, 0644) diff --git a/config/config_test.go b/config/config_test.go index 1606d55..a7ecee2 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -16,7 +16,7 @@ func Test_config(t *testing.T) { // config.ShadowsocksOptions = &ShadowsocksOptions{ // TcpTimeout: 12, // } - config.ShadowsocksOptions.UdpTimeout = 3 + config.ShadowsocksOptions.UDPTimeout = 3 // fmt.Print(config) err = SaveConfig() if err != nil { diff --git a/conn/conn_decorate.go b/conn/conn_decorate.go index 76349da..fdbabf2 100644 --- a/conn/conn_decorate.go +++ b/conn/conn_decorate.go @@ -7,7 +7,7 @@ import ( "net" "time" - "github.com/rc452860/vnet/log" + "github.com/rc452860/vnet/comm/log" "github.com/rc452860/vnet/pool" "github.com/rc452860/vnet/utils" "golang.org/x/time/rate" @@ -192,7 +192,7 @@ func (r *RealTimeFlush) Write(b []byte) (n int, err error) { } //导出装饰器 -type TrafficHandle func(IConn, int) +type TrafficHandle func(IConn, uint64) func TrafficDecorate(c IConn, upload, download TrafficHandle) (IConn, error) { return &Traffic{ @@ -211,7 +211,7 @@ type Traffic struct { func (t *Traffic) Read(b []byte) (n int, err error) { n, err = t.IConn.Read(b) if t.Download != nil { - t.Download(t.IConn, n) + t.Download(t.IConn, uint64(n)) } return } @@ -219,7 +219,7 @@ func (t *Traffic) Read(b []byte) (n int, err error) { func (t *Traffic) Write(b []byte) (n int, err error) { n, err = t.IConn.Write(b) if t.Upload != nil { - t.Upload(t.IConn, n) + t.Upload(t.IConn, uint64(n)) } return } @@ -269,7 +269,7 @@ func (c *TrafficLimit) Write(b []byte) (n int, err error) { // for blow code is about udp communicateion // is decorate packet con to provide traffic record and traffic limit etc ... -type PacketTrafficHandle func(laddr string, raddr string, n int) +type PacketTrafficHandle func(laddr net.Addr, raddr net.Addr, n uint64) type PacketTrafficConn struct { net.PacketConn @@ -291,7 +291,7 @@ func (this *PacketTrafficConn) ReadFrom(p []byte) (n int, addr net.Addr, err err return n, addr, err } if this.download != nil { - this.download(this.PacketConn.LocalAddr().String(), addr.String(), n) + this.download(this.PacketConn.LocalAddr(), addr, uint64(n)) } return n, addr, err } @@ -299,7 +299,7 @@ func (this *PacketTrafficConn) ReadFrom(p []byte) (n int, addr net.Addr, err err func (this *PacketTrafficConn) WriteTo(p []byte, addr net.Addr) (n int, err error) { n, err = this.PacketConn.WriteTo(p, addr) if this.upload != nil { - this.upload(this.PacketConn.LocalAddr().String(), addr.String(), n) + this.upload(this.PacketConn.LocalAddr(), addr, uint64(n)) } return n, err } diff --git a/db/db.go b/db/db.go index 583778c..5177f5b 100644 --- a/db/db.go +++ b/db/db.go @@ -3,17 +3,18 @@ package db import ( "bytes" "context" - "errors" "fmt" "net" "strconv" "strings" "time" + "github.com/rc452860/vnet/proxy" + "github.com/rc452860/vnet/record" + "github.com/AlecAivazis/survey" + "github.com/rc452860/vnet/comm/log" "github.com/rc452860/vnet/config" - "github.com/rc452860/vnet/log" - "github.com/rc452860/vnet/proxy" "github.com/rc452860/vnet/service" "github.com/rc452860/vnet/utils/datasize" @@ -33,6 +34,16 @@ type User struct { Limit uint64 `gorm:"column:speed_limit_per_user"` } +type DBTraffic struct { + Port int + Up uint64 + Down uint64 + ConnectCount int + Connects map[string]bool +} + +var trafficTable = make(map[int]*DBTraffic) + func (this User) TableName() string { return "user" } @@ -136,103 +147,152 @@ func DbStarted(ctx context.Context) { if conf.DbConfig.SyncTime == 0 { survey.AskOne(&survey.Input{ - Message: "what is your want database sync time?", - Default: "3", - }, &conf.DbConfig.Rate, nil) + Message: "what is your want database sync time (unit: Millisencod)?", + Default: "3000", + }, &conf.DbConfig.SyncTime, nil) + } + + if conf.DbConfig.OnlineSyncTime == 0 { + survey.AskOne(&survey.Input{ + Message: "what is the value your want set online user sync time (unit: Millisencod)?", + Default: "60000", + }, &conf.DbConfig.OnlineSyncTime, nil) } + // save config + config.SaveConfig() + // start database traffic monitor go DBTrafficMonitor(ctx) + // start database service monitor + go DBServiceMonitor(ctx) + // online monitor + go DBOnlineMonitor(ctx) +} - go func(ctx context.Context) { - tick := time.Tick(3 * time.Second) - for { - select { - case <-ctx.Done(): - return - default: - } - config.SaveConfig() - users, err := GetEnableUser() - if err != nil { - log.Err(err) - return - } +// DBOnlineMonitor auto upload last one minute ips +func DBOnlineMonitor(ctx context.Context) { + conf := config.CurrentConfig().DbConfig + tick := time.Tick(time.Duration(conf.OnlineSyncTime) * time.Millisecond) + grmInstance := record.GetGRMInstanceWithTick(time.Duration(conf.OnlineSyncTime) * time.Millisecond) + connect, err := Connect() + if err != nil { + log.Err(err) + return + } + defer connect.Close() - sslist := service.CurrentShadowsocksService().List() - userMap := make(map[int]User) - for _, user := range users { - userMap[user.Port] = user - flag := false - for _, ss := range sslist { - if ss.Port == user.Port { - flag = true - break - } - } - if !flag { - log.Info("start port [%d]", user.Port) - StartShadowsocks(user) - } + for { + ss_node_online_log := fmt.Sprintf("INSERT INTO `ss_node_online_log` (id,node_id,online_user,log_time) "+ + " VALUES (NULL,%v,%v,unix_timestamp())", conf.NodeId, grmInstance.GetLastOneMinuteOnlineCount()) + ss_node_ip := bytes.NewBufferString("") + ss_node_ip.WriteString("INSERT INTO `ss_node_ip`(id,node_id,port,type,ip,created_at) VALUES") + headLen := ss_node_ip.Len() + userOnline := grmInstance.GetLastOneMinuteOnlineByPort() + for k, v := range userOnline { + ips := bytes.NewBufferString("") + for _, ip := range v { + ips.WriteString(ip) + ips.WriteString(",") } + ss_node_ip.WriteString(fmt.Sprintf("(NULL,%v,%v,'%s','%s',unix_timestamp()),", conf.NodeId, k, "-", ips.String()[:ips.Len()-1])) + } + connect.Exec(ss_node_online_log) + if ss_node_ip.Len() > headLen { + connect.Exec(ss_node_ip.String()[:ss_node_ip.Len()-1]) + } + + <-tick + } +} +// DBServiceMonitor is shadowsocks service monitor +// it will keep watch the database config change,and period +// apply the config to our program +func DBServiceMonitor(ctx context.Context) { + conf := config.CurrentConfig().DbConfig + tick := time.Tick(time.Duration(conf.SyncTime) * time.Millisecond) + for { + select { + case <-ctx.Done(): + return + default: + } + users, err := GetEnableUser() + if err != nil { + log.Err(err) + return + } + + sslist := service.CurrentShadowsocksService().List() + userMap := make(map[int]User) + for _, user := range users { + userMap[user.Port] = user + flag := false for _, ss := range sslist { - flag := false - for _, user := range users { - if ss.Port == user.Port { - flag = true - break - } - } - if !flag { - log.Info("stop port [%d]", ss.Port) - service.CurrentShadowsocksService().Del(ss.Port) + if ss.Port == user.Port { + flag = true + break } } - UpdateTrafficByUser(userMap) - //TODO update user upload and download - <-tick + if !flag { + log.Info("start port [%d]", user.Port) + StartShadowsocks(user) + } } - }(ctx) -} -type DBTraffic struct { - Port int - Up uint64 - Down uint64 - ConnectCount int - Connects map[string]bool + for _, ss := range sslist { + flag := false + for _, user := range users { + if ss.Port == user.Port { + flag = true + break + } + } + if !flag { + log.Info("stop port [%d]", ss.Port) + service.CurrentShadowsocksService().Del(ss.Port) + } + } + UpdateTrafficByUser(userMap) + //TODO update user upload and download + <-tick + } } -var trafficTable = make(map[int]*DBTraffic) - func UpdateTrafficByUser(users map[int]User) { // TODO move to log - defer func() { - var err error - if r := recover(); r != nil { - // find out exactly what the error was and set err - switch x := r.(type) { - case string: - err = errors.New(x) - case error: - err = x - default: - // Fallback err (per specs, error strings should be lowercase w/o punctuation - err = errors.New("unknown panic") - } - // invalidate rep - log.Err(err) - } - }() + // defer func() { + // var err error + // if r := recover(); r != nil { + // // find out exactly what the error was and set err + // switch x := r.(type) { + // case string: + // err = errors.New(x) + // case error: + // err = x + // default: + // // Fallback err (per specs, error strings should be lowercase w/o punctuation + // err = errors.New("unknown panic") + // } + // // invalidate rep + // log.Err(err) + // } + // }() whenUp := new(bytes.Buffer) whenDown := new(bytes.Buffer) whenPort := new(bytes.Buffer) insertBuf := new(bytes.Buffer) conf := config.CurrentConfig().DbConfig + connect, err := Connect() + if err != nil { + log.Err(err) + return + } + defer connect.Close() for _, v := range trafficTable { // if the traffic less then 64 kb it will no necessary to update - if v.Up+v.Down < 512*1024 { + if v.Up+v.Down < 64*1024 { continue } whenUp.WriteString(fmt.Sprintf(" WHEN %v THEN u+%v", v.Port, float32(v.Up)*conf.Rate)) @@ -261,58 +321,60 @@ func UpdateTrafficByUser(users map[int]User) { } // assembly query portStr := whenPort.String() - query := fmt.Sprintf("UPDATE user SET u = CASE port%s END,"+ + user := fmt.Sprintf("UPDATE user SET u = CASE port%s END,"+ "d = CASE port%s END,t = unix_timestamp() WHERE port IN (%s)", whenUp.String(), whenDown.String(), portStr[:len(portStr)-1]) inserStr := insertBuf.String() - insert := fmt.Sprintf("INSERT INTO `user_traffic_log` (`id`, `user_id`, `u`, `d`, "+ + user_traffic_log := fmt.Sprintf("INSERT INTO `user_traffic_log` (`id`, `user_id`, `u`, `d`, "+ "`node_id`, `rate`, `traffic`, `log_time`) VALUES %s", inserStr[:len(inserStr)-1]) - connect, err := Connect() - if err != nil { - log.Err(err) - return - } - connect.Exec(query) - connect.Exec(insert) + ss_node_info := fmt.Sprintf("INSERT INTO `ss_node_info` (`id`,`node_id`,`uptime`,`load`,`log_time`)"+ + "VALUES (NULL,%v,unix_timestamp(),'%s',unix_timestamp())", conf.NodeId, FormatLoad()) + + connect.Exec(user) + connect.Exec(user_traffic_log) + connect.Exec(ss_node_info) } +func FormatLoad() string { + return fmt.Sprintf("cpu:%v%% mem:%v%% disk:%v%%", service.GetCPUUsage(), service.GetMemUsage(), service.GetDiskUsage()) +} + +// DBTrafficMonitor is should using goroutine start func DBTrafficMonitor(ctx context.Context) { - traffic := make(chan proxy.TrafficMessage, 128) + traffic := make(chan record.Traffic, 32) proxy.RegisterTrafficHandle(traffic) - var data proxy.TrafficMessage + var data record.Traffic // count traffic - go func(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case data = <-traffic: - } - _, port, err := net.SplitHostPort(data.LAddr) - if err != nil { - log.Err(err) - continue - } - porti, err := strconv.Atoi(port) - if err != nil { - log.Err(err) - continue - } - if trafficTable[porti] == nil { - trafficTable[porti] = &DBTraffic{ - Port: porti, - Up: data.UpBytes, - Down: data.DownBytes, - } - } else { - trafficTable[porti].Up += data.UpBytes - trafficTable[porti].Down += data.DownBytes + for { + select { + case <-ctx.Done(): + return + case data = <-traffic: + } + _, port, err := net.SplitHostPort(data.ClientAddr.String()) + if err != nil { + log.Err(err) + continue + } + porti, err := strconv.Atoi(port) + if err != nil { + log.Err(err) + continue + } + if trafficTable[porti] == nil { + trafficTable[porti] = &DBTraffic{ + Port: porti, + Up: data.Up, + Down: data.Down, } + } else { + trafficTable[porti].Up += data.Up + trafficTable[porti].Down += data.Down } - }(ctx) + } } func StartShadowsocks(user User) { diff --git a/db/db_test.go b/db/db_test.go index 40214c8..de30ed9 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -3,7 +3,8 @@ package db import ( "testing" - "github.com/rc452860/vnet/log" + _ "github.com/go-sql-driver/mysql" + "github.com/rc452860/vnet/comm/log" ) func Test_GetEnableUser(t *testing.T) { @@ -17,3 +18,7 @@ func Test_GetEnableUser(t *testing.T) { logging.Info("item: %v", item) } } + +func TestFormatLoad(t *testing.T) { + t.Log(FormatLoad()) +} diff --git a/proxy/client/shadowsocks.go b/proxy/client/shadowsocks.go index 4e25adb..8376344 100644 --- a/proxy/client/shadowsocks.go +++ b/proxy/client/shadowsocks.go @@ -4,7 +4,7 @@ import ( "fmt" "net" - "github.com/rc452860/vnet/log" + "github.com/rc452860/vnet/comm/log" "github.com/rc452860/vnet/proxy/common" "github.com/rc452860/vnet/socks" diff --git a/proxy/client/shadowsocks_test.go b/proxy/client/shadowsocks_test.go index e69e03b..06073c1 100644 --- a/proxy/client/shadowsocks_test.go +++ b/proxy/client/shadowsocks_test.go @@ -14,7 +14,7 @@ import ( "github.com/gin-gonic/gin" "github.com/rc452860/vnet/config" "github.com/rc452860/vnet/conn" - "github.com/rc452860/vnet/log" + "github.com/rc452860/vnet/comm/log" "github.com/rc452860/vnet/pool" "github.com/rc452860/vnet/proxy/server" diff --git a/proxy/common/transport_channel.go b/proxy/common/transport_channel.go index 1e03cee..99a4078 100644 --- a/proxy/common/transport_channel.go +++ b/proxy/common/transport_channel.go @@ -7,7 +7,7 @@ import ( "github.com/rc452860/vnet/pool" "github.com/rc452860/vnet/conn" - "github.com/rc452860/vnet/log" + "github.com/rc452860/vnet/comm/log" ) type TcpChannel struct{} diff --git a/proxy/proxy.go b/proxy/proxy.go index d8db363..a782ffd 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -3,10 +3,16 @@ package proxy import ( "context" "net" - "sync" "time" - "github.com/rc452860/vnet/log" + "github.com/rc452860/vnet/utils/addr" + + "github.com/rc452860/vnet/comm/cache" + "github.com/rc452860/vnet/comm/eventbus" + + "github.com/rc452860/vnet/record" + + "github.com/rc452860/vnet/comm/log" ) type IProxyService interface { @@ -14,87 +20,114 @@ type IProxyService interface { Stop() error } -type TrafficMessage struct { - Network string - LAddr string - RAddr string - UpBytes uint64 - DownBytes uint64 -} type ProxyService struct { - context.Context `json:"-"` - Tcp net.Listener `json:"tcp"` - Udp net.PacketConn `json:"udp"` - UpSpeed uint64 `json:"upspeed"` - DownSpeed uint64 `json:"downspeed"` - UpBytes uint64 `json:"upbytes"` - DownBytes uint64 `json:"downbytes"` - TrafficMQ chan TrafficMessage `json:"_"` - TcpLock *sync.Mutex `json:"_"` - UdpLock *sync.Mutex `json:"_"` - Status string `json:"status"` - Cancel context.CancelFunc `json:"-` + context.Context `json:"-"` + TCP net.Listener `json:"tcp"` + UDP net.PacketConn `json:"udp"` + LastOneMinuteConnections *cache.Cache `json:"-"` + UpSpeed uint64 `json:"upspeed"` + DownSpeed uint64 `json:"downspeed"` + UpBytes uint64 `json:"upbytes"` + DownBytes uint64 `json:"downbytes"` + MessageRoute chan interface{} `json:"-"` + Status string `json:"status"` + Cancel context.CancelFunc `json:"-"` + Tick time.Duration `json:"-"` } func NewProxyService() *ProxyService { + return NewProxyServiceWithTick(time.Second) +} + +func NewProxyServiceWithTick(duration time.Duration) *ProxyService { ctx, cancel := context.WithCancel(context.Background()) return &ProxyService{ - TcpLock: &sync.Mutex{}, - UdpLock: &sync.Mutex{}, - TrafficMQ: make(chan TrafficMessage, 128), - Status: "stop", - Context: ctx, - Cancel: cancel, + MessageRoute: make(chan interface{}, 32), + Status: "stop", + Context: ctx, + Cancel: cancel, + Tick: duration, + LastOneMinuteConnections: cache.New(duration), } } -var trafficMonitorQueue []chan TrafficMessage +var trafficMonitorQueue []chan record.Traffic // RegisterTrafficHandle TrafficMessage channel -func RegisterTrafficHandle(trafficMonitor chan TrafficMessage) { +func RegisterTrafficHandle(trafficMonitor chan record.Traffic) { trafficMonitorQueue = append(trafficMonitorQueue, trafficMonitor) } func (this *ProxyService) TrafficMeasure() { + go this.speed() + go this.route() + <-this.Done() + log.Info("close traffic measure") +} - go func() { - var upTmp, downTmp uint64 = this.UpBytes, this.DownBytes - tick := time.Tick(1 * time.Second) - for { - this.UpSpeed, upTmp = this.UpBytes-upTmp, this.UpBytes - this.DownSpeed, downTmp = this.DownBytes-downTmp, this.DownBytes - select { - case <-tick: - continue - case <-this.Done(): - return - } +func (this *ProxyService) route() { + for { + var data interface{} + select { + case <-this.Done(): + log.Info("close countClose") + return + case data = <-this.MessageRoute: } - }() - go func() { - for { - var data TrafficMessage - select { - case <-this.Done(): - log.Info("close countClose") - return - case data = <-this.TrafficMQ: - } - this.UpBytes += data.UpBytes - this.DownBytes += data.DownBytes - if trafficMonitorQueue != nil && len(trafficMonitorQueue) > 0 { - for _, item := range trafficMonitorQueue { - item <- data - } - } + switch data.(type) { + case record.Traffic: + this.traffic(data.(record.Traffic)) + case record.ConnectionProxyRequest: + this.proxyRequest(data.(record.ConnectionProxyRequest)) } - }() - <-this.Done() - log.Info("close traffic measure") + } +} + +// traffic handle traffic message +func (this *ProxyService) traffic(data record.Traffic) { + eventbus.GetEventBus().Publish("record:traffic", data) + this.UpBytes += data.Up + this.DownBytes += data.Down + if trafficMonitorQueue != nil && len(trafficMonitorQueue) > 0 { + for _, item := range trafficMonitorQueue { + item <- data + } + } +} +// proxyRequest handle proxy request message +func (this *ProxyService) proxyRequest(data record.ConnectionProxyRequest) { + eventbus.GetEventBus().Publish("record:proxyRequest", data) + key := addr.GetIPFromAddr(data.ClientAddr) + if this.LastOneMinuteConnections.Get(key) == nil { + this.LastOneMinuteConnections.Put(key, []record.ConnectionProxyRequest{data}, this.Tick) + } else { + last := this.LastOneMinuteConnections.Get(key).([]record.ConnectionProxyRequest) + this.LastOneMinuteConnections.Put(key, append(last, data), this.Tick) + } + log.Info("%v:%s %s <-------> %s", addr.GetPortFromAddr(data.ProxyAddr), + addr.GetNetworkFromAddr(data.ProxyAddr), + data.ClientAddr.String(), + data.TargetAddr.String()) } +// speed is traffic statis +func (this *ProxyService) speed() { + var upTmp, downTmp uint64 = this.UpBytes, this.DownBytes + tick := time.Tick(this.Tick) + for { + this.UpSpeed, upTmp = this.UpBytes-upTmp, this.UpBytes + this.DownSpeed, downTmp = this.DownBytes-downTmp, this.DownBytes + select { + case <-tick: + continue + case <-this.Done(): + return + } + } +} func (this *ProxyService) Start() error { + go this.TrafficMeasure() this.Status = "run" return nil } @@ -102,15 +135,14 @@ func (this *ProxyService) Start() error { func (this *ProxyService) Stop() error { log.Info("proxy stop") this.Cancel() - // this.TcpClose <- struct{}{} - if this.Tcp != nil { - err := this.Tcp.Close() + if this.TCP != nil { + err := this.TCP.Close() if err != nil { log.Err(err) } } - if this.Udp != nil { - err := this.Udp.Close() + if this.UDP != nil { + err := this.UDP.Close() if err != nil { log.Err(err) } diff --git a/proxy/proxy_test.go b/proxy/proxy_test.go new file mode 100644 index 0000000..4e58b96 --- /dev/null +++ b/proxy/proxy_test.go @@ -0,0 +1,43 @@ +package proxy + +import ( + "fmt" + "net" + "testing" + "time" + + "github.com/rc452860/vnet/record" +) + +func Test_LastOneMinute(t *testing.T) { + proxy := NewProxyServiceWithTick(50 * time.Millisecond) + proxy.Start() + proxyAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:8000") + targetAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:8081") + + for i := 0; i < 10; i++ { + client, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("1.1.1.1:%v", i+1000)) + proxy.MessageRoute <- record.ConnectionProxyRequest{ + ConnectionPair: record.ConnectionPair{ + ClientAddr: client, + ProxyAddr: proxyAddr, + TargetAddr: targetAddr, + }, + } + time.Sleep(10 * time.Millisecond) + } + + var r int + proxy.LastOneMinuteConnections.Range(func(key, value interface{}) { + if result, ok := value.([]record.ConnectionProxyRequest); ok { + // t.Log(key) + for _, item := range result { + t.Log(item.ClientAddr.String()) + } + r++ + } + }) + if r > 10 { + t.FailNow() + } +} diff --git a/proxy/server/shadowsocks.go b/proxy/server/shadowsocks.go index 9a81fda..9dbc5c3 100644 --- a/proxy/server/shadowsocks.go +++ b/proxy/server/shadowsocks.go @@ -1,6 +1,7 @@ package server import ( + "encoding/json" "fmt" "io" "net" @@ -8,13 +9,16 @@ import ( "sync" "time" + "github.com/pkg/errors" + "github.com/rc452860/vnet/utils/datasize" "github.com/rc452860/vnet/ciphers" + "github.com/rc452860/vnet/comm/log" "github.com/rc452860/vnet/conn" - "github.com/rc452860/vnet/log" "github.com/rc452860/vnet/pool" "github.com/rc452860/vnet/proxy" + "github.com/rc452860/vnet/record" "github.com/rc452860/vnet/socks" "golang.org/x/time/rate" ) @@ -42,9 +46,9 @@ func init() { type ShadowsocksProxy struct { *proxy.ProxyService `json:"-,omitempty"` Host string `json:"host,omitempty"` + Port int `json:"port,omitempty"` Method string `json:"method,omitempty"` Password string `json:"password,omitempty"` - Port int `json:"port,omitempty"` TCPTimeout time.Duration `json:"tcp_timeout,omitempty"` UDPTimeout time.Duration `json:"udp_timeout,omitempty"` ReadLimit *rate.Limiter `json:"read_limit,omitempty"` @@ -71,6 +75,7 @@ func NewShadowsocks(host string, method string, password string, port int, limit return ss, err } +// ConfigLimit config shadowsocks traffic limit func (s *ShadowsocksProxy) ConfigLimit(limit uint64) { if limit == 0 { return @@ -91,7 +96,7 @@ func (s *ShadowsocksProxy) ConfigLimitHuman(limit string) error { if trafficLimit < 5*1024 { return nil } - logging.Info("server port: %v limit is: %v", s.Port, trafficLimit) + // logging.Info("server port: %v limit is: %v", s.Port, trafficLimit) s.ReadLimit = rate.NewLimiter(rate.Limit(trafficLimit), int(trafficLimit)) s.WriteLimit = rate.NewLimiter(rate.Limit(trafficLimit), int(trafficLimit)) } @@ -99,6 +104,7 @@ func (s *ShadowsocksProxy) ConfigLimitHuman(limit string) error { } +// ConfigTimeout is config shadowsocks timeout func (s *ShadowsocksProxy) ConfigTimeout(timeout time.Duration) error { if timeout == 0 { s.TCPTimeout = 3e9 @@ -107,7 +113,7 @@ func (s *ShadowsocksProxy) ConfigTimeout(timeout time.Duration) error { s.TCPTimeout = timeout s.UDPTimeout = timeout } - log.Info("%s:%v timeout:%v", s.Host, s.Port, s.TCPTimeout) + // log.Info("%s:%v timeout:%v", s.Host, s.Port, s.TCPTimeout) return nil } @@ -122,7 +128,6 @@ func (s *ShadowsocksProxy) Start() error { log.Err(err) return err } - go s.ProxyService.TrafficMeasure() return nil } @@ -131,24 +136,32 @@ func (s *ShadowsocksProxy) Stop() error { return s.ProxyService.Stop() } -// statistics upload traffic -func (s *ShadowsocksProxy) upload(con conn.IConn, up int) { - s.ProxyService.TrafficMQ <- proxy.TrafficMessage{ - Network: "tcp", - LAddr: con.LocalAddr().String(), - RAddr: con.RemoteAddr().String(), - UpBytes: uint64(up), +// statistics tcpUpload traffic +func (s *ShadowsocksProxy) tcpUpload(con conn.IConn, up uint64) { + message := record.Traffic{ + ConnectionPair: record.ConnectionPair{ + ProxyAddr: con.LocalAddr(), + ClientAddr: con.RemoteAddr(), + }, + Network: con.GetNetwork(), + Up: up, } + + s.ProxyService.MessageRoute <- message } -// statics download traffic -func (s *ShadowsocksProxy) download(con conn.IConn, down int) { - s.ProxyService.TrafficMQ <- proxy.TrafficMessage{ - Network: "tcp", - LAddr: con.LocalAddr().String(), - RAddr: con.RemoteAddr().String(), - DownBytes: uint64(down), +// statics tcpDownload traffic +func (s *ShadowsocksProxy) tcpDownload(con conn.IConn, down uint64) { + message := record.Traffic{ + ConnectionPair: record.ConnectionPair{ + ProxyAddr: con.LocalAddr(), + ClientAddr: con.RemoteAddr(), + }, + Network: con.GetNetwork(), + Down: down, } + + s.ProxyService.MessageRoute <- message } // start shadowsocks tcp proxy service @@ -160,12 +173,12 @@ func (s *ShadowsocksProxy) startTCP() error { return err } server, err := net.ListenTCP("tcp", tcpAddr) - logging.Info("listening TCP on %s", addr) + // logging.Info("listening TCP on %s", addr) if err != nil { logging.Error(err.Error()) - return err + return errors.Cause(err) } - s.Tcp = server + s.TCP = server go func() { defer server.Close() @@ -198,7 +211,7 @@ func (s *ShadowsocksProxy) startTCP() error { return } /** 流量记录装饰器 */ - lcd, err = conn.TrafficDecorate(lcd, s.upload, s.download) + lcd, err = conn.TrafficDecorate(lcd, s.tcpUpload, s.tcpDownload) if err != nil { logging.Err(err) return @@ -216,7 +229,7 @@ func (s *ShadowsocksProxy) startTCP() error { /** 读取目标地址 */ targetAddr, err := socks.ReadAddr(lcd) if err != nil { - log.Error("read target address error %s", err.Error()) + log.Error("read target address error %s. (maybe the crypto method 2rong configuration)", err.Error()) return } @@ -227,9 +240,10 @@ func (s *ShadowsocksProxy) startTCP() error { return } defer rc.Close() + s.ConnectionStage(s.TCP.Addr(), lcd.RemoteAddr(), rc.RemoteAddr()) rc.(*net.TCPConn).SetKeepAlive(true) - logging.Info("tcp %s <----> %s", lcd.RemoteAddr(), targetAddr) + // logging.Info("tcp %s <----> %s", lcd.RemoteAddr(), targetAddr) /** 默认装饰器 */ rcd, err := conn.DefaultDecorate(rc, conn.TCP) @@ -238,15 +252,6 @@ func (s *ShadowsocksProxy) startTCP() error { return } - // rcd, _ = conn.TrafficLimitDecorate(rcd, s.ReadLimit, s.WriteLimit) - - /** 流量统计装饰器 */ - // rcd, err = conn.TrafficDecorate(rcd) - // if err != nil { - // logging.Err(err) - // return - // } - _, _, err = relayTCP(lcd, rcd) if err != nil { if err, ok := err.(net.Error); ok && err.Timeout() { @@ -284,27 +289,34 @@ func relayTCP(left, right net.Conn) (int64, int64, error) { if err == nil { err = rs.Err } - return n, rs.N, err + return n, rs.N, errors.Cause(err) } // udp upload traffic count -func (s *ShadowsocksProxy) udpUpload(laddr, raddr string, n int) { - s.ProxyService.TrafficMQ <- proxy.TrafficMessage{ - Network: "tcp", - LAddr: laddr, - RAddr: raddr, - UpBytes: uint64(n), +func (s *ShadowsocksProxy) udpUpload(laddr, raddr net.Addr, n uint64) { + message := record.Traffic{ + ConnectionPair: record.ConnectionPair{ + ProxyAddr: laddr, + ClientAddr: raddr, + }, + Network: laddr.Network(), + Up: n, } + + s.ProxyService.MessageRoute <- message } // udp download traffic count -func (s *ShadowsocksProxy) udpDownload(laddr, raddr string, n int) { - s.ProxyService.TrafficMQ <- proxy.TrafficMessage{ - Network: "tcp", - LAddr: laddr, - RAddr: raddr, - DownBytes: uint64(n), +func (s *ShadowsocksProxy) udpDownload(laddr, raddr net.Addr, n uint64) { + message := record.Traffic{ + ConnectionPair: record.ConnectionPair{ + ProxyAddr: laddr, + ClientAddr: raddr, + }, + Network: laddr.Network(), + Down: n, } + s.ProxyService.MessageRoute <- message } // Listen on addr for encrypted packets and basically do UDP NAT. @@ -313,21 +325,21 @@ func (s *ShadowsocksProxy) startUDP() error { server, err := net.ListenPacket("udp", addr) if err != nil { logging.Error("UDP remote listen error: %v", err) - return err + return errors.Cause(err) } - s.Udp = server + s.UDP = server server, err = ciphers.CipherPacketDecorate(s.Password, s.Method, server) server = conn.PacketTrafficConnDecorate(server, s.udpUpload, s.udpDownload) if err != nil { logging.Error("UDP CipherPacketDecorate init error: %v", err) - return err + return errors.Cause(err) } nm := newNATmap(s.UDPTimeout) buf := pool.GetUdpBuf() defer pool.PutUdpBuf(buf) - logging.Info("listening UDP on %s", addr) + // logging.Info("listening UDP on %s", addr) go func() { defer server.Close() @@ -354,14 +366,14 @@ func (s *ShadowsocksProxy) startUDP() error { logging.Error("failed to split target address from packet: %q", buf[:n]) continue } - logging.Info("udp %s <----> %s", raddr, tgtAddr) - + // logging.Info("udp %s <----> %s", raddr, tgtAddr) tgtUDPAddr, err := net.ResolveUDPAddr("udp", tgtAddr.String()) if err != nil { logging.Error("failed to resolve target UDP address: %v", err) continue } + s.ConnectionStage(s.UDP.LocalAddr(), raddr, tgtUDPAddr) payload := buf[len(tgtAddr):n] pc := nm.Get(raddr.String()) @@ -385,6 +397,26 @@ func (s *ShadowsocksProxy) startUDP() error { return nil } +// ConnectionStage event +func (s *ShadowsocksProxy) ConnectionStage(proxy, client, target net.Addr) { + s.MessageRoute <- record.ConnectionProxyRequest{ + ConnectionPair: record.ConnectionPair{ + ClientAddr: client, + ProxyAddr: proxy, + TargetAddr: target, + }, + } +} + +func (s *ShadowsocksProxy) String() string { + result, err := json.Marshal(s) + if err != nil { + log.Err(err) + return "" + } + return string(result) +} + // Packet NAT table type natmap struct { sync.RWMutex @@ -444,7 +476,7 @@ func timedCopy(dst net.PacketConn, target net.Addr, src net.PacketConn, timeout src.SetReadDeadline(time.Now().Add(timeout)) n, raddr, err := src.ReadFrom(buf) if err != nil { - return err + return errors.Cause(err) } switch role { @@ -461,7 +493,7 @@ func timedCopy(dst net.PacketConn, target net.Addr, src net.PacketConn, timeout } if err != nil { - return err + return errors.Cause(err) } } } diff --git a/proxy/server/shadowsocks_test.go b/proxy/server/shadowsocks_test.go index 19d75a1..ffdc5d1 100644 --- a/proxy/server/shadowsocks_test.go +++ b/proxy/server/shadowsocks_test.go @@ -14,7 +14,8 @@ import ( "github.com/rc452860/vnet/socks" ) -func mockUdpServer(t *testing.T) { +// mockUDPServer is mock udp server for shadowsocks udp test +func mockUDPServer(t *testing.T) { conn, err := net.ListenPacket("udp", "0.0.0.0:8081") if err != nil { t.Error(err) @@ -39,7 +40,7 @@ func Test_NewServer(t *testing.T) { } func testShadowsocksProxy(t *testing.T, host, method, password string, port int) { - ss, _ := NewShadowsocks(host, method, password, port, "4MB", 0) + ss, _ := NewShadowsocks(host, method, password, port, "512k", 0) go ss.Start() time.Sleep(1 * time.Second) transport := &http.Transport{ @@ -65,8 +66,8 @@ func testShadowsocksProxy(t *testing.T, host, method, password string, port int) t.Fatal("http status error") } - t.Logf("tcp success") - go mockUdpServer(t) + t.Logf("tcp success: %s", ss.String()) + go mockUDPServer(t) addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%v", "127.0.0.1", port)) if err != nil { t.Error(err) diff --git a/record/record.go b/record/record.go index 8ae544a..77ddde4 100644 --- a/record/record.go +++ b/record/record.go @@ -1,115 +1,156 @@ package record import ( - "context" + "encoding/json" + "fmt" "net" + "strings" "sync" + "time" + "github.com/rc452860/vnet/comm/cache" "github.com/rc452860/vnet/comm/eventbus" - + "github.com/rc452860/vnet/comm/log" "github.com/rc452860/vnet/utils" + "github.com/rc452860/vnet/utils/addr" ) -type Record struct { - ProxyAddr net.Addr - ConnectionsPair - Protocol string - Network string - RecordType string - Up uint64 - Down uint64 - Status int +// Traffic is repersent traffic record +type Traffic struct { + ConnectionPair `json:"connections_pair,omitempty"` + Network string `json:"network,omitempty"` + Up uint64 `json:"up,omitempty"` + Down uint64 `json:"down,omitempty"` +} + +type ConnectionProxyRequest struct { + ConnectionPair } -type ConnectionsPair struct { - Client net.Addr - Target net.Addr +// ConnectionPair is struct represent a proxy connections +type ConnectionPair struct { + ProxyAddr net.Addr `json:"proxy_addr,omitempty"` + ClientAddr net.Addr `json:"client_addr,omitempty"` + TargetAddr net.Addr `json:"target_addr,omitempty"` } +// GlobalResourceMonitor global resource monitor type GlobalResourceMonitor struct { sync.RWMutex - LastMinuteOnline map[string][]ConnectionsPair - TrafficUp map[string]uint64 - TrafficDown map[string]uint64 - TrafficUpSpeed map[string]uint64 - TrafficDownSpeed map[string]uint64 - GlobalUp uint64 - GlobalDown uint64 - GlobalSpeedUp uint64 - GlobalSpeeddown uint64 - PacketCount uint64 - // cancel context.CancelFunc - // context context.Context + LastOneMinuteConnections *cache.Cache `json:"-"` + GlobalUp uint64 `json:"global_up,omitempty"` + GlobalDown uint64 `json:"global_down,omitempty"` + GlobalSpeedUp uint64 `json:"global_speed_up,omitempty"` + GlobalSpeeddown uint64 `json:"global_speeddown,omitempty"` + PacketCount uint64 `json:"packet_count,omitempty"` + tick time.Duration } var ( globalResourceMonitor *GlobalResourceMonitor ) +// GetGRMInstance GlobalResourceMonitor singal instance func GetGRMInstance() *GlobalResourceMonitor { - utils.Lock("GRM") - defer utils.UnLock("GRM") + return GetGRMInstanceWithTick(time.Minute) +} - if globalResourceMonitor == nil { - globalResourceMonitor = &GlobalResourceMonitor{ - LastMinuteOnline: make(map[string][]ConnectionsPair), - TrafficUp: make(map[string]uint64), - TrafficDown: make(map[string]uint64), - TrafficUpSpeed: make(map[string]uint64), - TrafficDownSpeed: make(map[string]uint64), - GlobalUp: 0, - GlobalDown: 0, - GlobalSpeedUp: 0, - GlobalSpeeddown: 0, - PacketCount: 0, - } +func GetGRMInstanceWithTick(duration time.Duration) *GlobalResourceMonitor { + if globalResourceMonitor != nil { + return globalResourceMonitor } - - ctx, cancel := context.WithCancel(context.Background()) - // globalResourceMonitor.cancel = cancel - // globalResourceMonitor.context = ctx - eventbus.GetEventBus().SubscribeAsync( - "traffic", - ) + utils.Lock("GetGRMInstance") + globalResourceMonitor = &GlobalResourceMonitor{ + LastOneMinuteConnections: cache.New(duration), + GlobalUp: 0, + GlobalDown: 0, + GlobalSpeedUp: 0, + GlobalSpeeddown: 0, + PacketCount: 0, + tick: duration, + } + eventbus.GetEventBus().SubscribeAsync("record:traffic", globalResourceMonitor.trafficStatistics, false) + eventbus.GetEventBus().SubscribeAsync("record:proxyRequest", globalResourceMonitor.lastOneMinuteConnections, false) + go globalResourceMonitor.speed() + utils.UnLock("GetGRMInstance") return globalResourceMonitor } -func (g *GlobalResourceMonitor) Stop() { - utils.Lock("GRM") - defer utils.UnLock("GRM") - // g.cancel() - globalResourceMonitor = nil - +func (g *GlobalResourceMonitor) trafficStatistics(data Traffic) { + utils.Lock("record:globalResourceMonitor") + defer utils.UnLock("record:globalResourceMonitor") + g.GlobalUp += data.Up + g.GlobalDown += data.Down + if strings.Contains(data.Network, "udp") { + g.PacketCount++ + } } -func (g *GlobalResourceMonitor) Subscript(name string, sub <-chan Record) { - g.recordObservers.Store(name, sub) +func (g *GlobalResourceMonitor) lastOneMinuteConnections(data ConnectionProxyRequest) { + key := addr.GetIPFromAddr(data.ClientAddr) + if g.LastOneMinuteConnections.Get(key) == nil { + g.LastOneMinuteConnections.Put(key, []ConnectionProxyRequest{data}, g.tick) + } else { + last := g.LastOneMinuteConnections.Get(key).([]ConnectionProxyRequest) + g.LastOneMinuteConnections.Put(key, append(last, data), g.tick) + } } -func (g *GlobalResourceMonitor) UnSubscript(name string) { - g.recordObservers.Delete(name) +// speed is initialize by GlobalResourceMonitor init,and it should be in other goroutine +// it's a schedule for calculate up and down speed for global and every single service +func (g *GlobalResourceMonitor) speed() { + var globalUpTmp, globalDownTmp uint64 + tick := time.Tick(g.tick) + for { + <-tick + utils.Lock("record:globalResourceMonitor") + g.GlobalSpeedUp = g.GlobalUp - globalUpTmp + globalUpTmp = g.GlobalUp + g.GlobalSpeeddown = g.GlobalDown - globalDownTmp + globalDownTmp = g.GlobalDown + utils.UnLock("record:globalResourceMonitor") + } } -func (g *GlobalResourceMonitor) Publish(record Record) { - g.reciver <- record +// GetLastOneMinuteOnlineCount return online service count +func (g *GlobalResourceMonitor) GetLastOneMinuteOnlineCount() int { + return g.LastOneMinuteConnections.Size() } -func (g *GlobalResourceMonitor) process() { - for { - // var data interface{} - // select { - // case <-g.context.Done(): - // return - // default: - // case data = <-g.reciver: - // } - - } +// GetLastOneMinuteOnlineByPort convert LastOneMinuteOnlineByPort key from client to proxy +// the original map is map[client.ip] +// the convert map is map[proxy.port] +func (g *GlobalResourceMonitor) GetLastOneMinuteOnlineByPort() map[int][]string { + result := make(map[int][]string) + filter := make(map[string]bool) + g.LastOneMinuteConnections.Range(func(k, v interface{}) { + if value, ok := v.([]ConnectionProxyRequest); ok { + for _, item := range value { + proxyPort := addr.GetPortFromAddr(item.ProxyAddr) + clientIp := addr.GetIPFromAddr(item.ClientAddr) + filterKey := fmt.Sprintf("%v-%s", proxyPort, clientIp) + if filter[filterKey] { + continue + } else { + filter[filterKey] = true + } + if result[proxyPort] != nil { + result[proxyPort] = append(result[proxyPort], + clientIp) + + } else { + result[proxyPort] = []string{clientIp} + } + } + } + }) + return result } -func (g *GlobalResourceMonitor) trafficStatistics(data Record) { - g.TrafficDown[data.Port] += data.Down - g.TrafficUp[data.Port] += data.Up - g.GlobalUp += data.Up - g.GlobalDown += data.Down - +func (g *GlobalResourceMonitor) String() string { + result, e := json.Marshal(g) + if e != nil { + log.Err(e) + return "" + } + return string(result) } diff --git a/record/record_test.go b/record/record_test.go new file mode 100644 index 0000000..441e376 --- /dev/null +++ b/record/record_test.go @@ -0,0 +1,29 @@ +package record + +import ( + "fmt" + "testing" + "time" + + "github.com/rc452860/vnet/comm/eventbus" + "github.com/rc452860/vnet/utils/addr" +) + +func Test_GetLastOneMinuteOnlineByPort(t *testing.T) { + instance := GetGRMInstanceWithTick(100 * time.Millisecond) + for i := 0; i < 10; i++ { + + eventbus.GetEventBus().Publish("record:proxyRequest", ConnectionProxyRequest{ + ConnectionPair: ConnectionPair{ + ProxyAddr: addr.ParseAddrFromString("tcp", "0.0.0.0:8080"), + ClientAddr: addr.ParseAddrFromString("tcp", fmt.Sprintf("192.168.1.1:%v", i)), + TargetAddr: addr.ParseAddrFromString("tcp", fmt.Sprintf("192.168.1.2:%v", i)), + }, + }) + time.Sleep(5 * time.Millisecond) + } + + if instance.GetLastOneMinuteOnlineByPort()[8080][0] != "192.168.1.1" { + t.FailNow() + } +} diff --git a/service/monitor.go b/service/monitor.go index 63c7f3b..c2a766f 100644 --- a/service/monitor.go +++ b/service/monitor.go @@ -1,7 +1,7 @@ package service import ( - "github.com/rc452860/vnet/log" + "github.com/rc452860/vnet/comm/log" "github.com/shirou/gopsutil/cpu" "github.com/shirou/gopsutil/disk" "github.com/shirou/gopsutil/mem" diff --git a/service/shadowsocks_test.go b/service/shadowsocks_test.go index cc01117..138fd2b 100644 --- a/service/shadowsocks_test.go +++ b/service/shadowsocks_test.go @@ -5,7 +5,7 @@ import ( "testing" "time" - "github.com/rc452860/vnet/log" + "github.com/rc452860/vnet/comm/log" ) func TestMemProblem(t *testing.T) { diff --git a/utils/addr/addr.go b/utils/addr/addr.go new file mode 100644 index 0000000..199f118 --- /dev/null +++ b/utils/addr/addr.go @@ -0,0 +1,57 @@ +package addr + +import ( + "net" + + "github.com/rc452860/vnet/comm/log" +) + +func GetIPFromAddr(addr net.Addr) string { + switch addr.(type) { + case *net.TCPAddr: + tcpAddr := addr.(*net.TCPAddr) + return tcpAddr.IP.String() + case *net.UDPAddr: + udpAddr := addr.(*net.UDPAddr) + return udpAddr.IP.String() + case nil: + return "" + default: + return "" + } +} + +func GetPortFromAddr(addr net.Addr) int { + switch addr.(type) { + case *net.TCPAddr: + tcpAddr := addr.(*net.TCPAddr) + return tcpAddr.Port + case *net.UDPAddr: + udpAddr := addr.(*net.UDPAddr) + return udpAddr.Port + case nil: + return 0 + default: + return 0 + } +} + +func GetNetworkFromAddr(addr net.Addr) string { + return addr.Network() +} + +func ParseAddrFromString(network, addr string) net.Addr { + var addrConvert net.Addr + var err error + switch network { + case "tcp", "tcp4", "tcp6": + addrConvert, err = net.ResolveTCPAddr(network, addr) + case "udp", "udp4", "udp6": + addrConvert, err = net.ResolveUDPAddr(network, addr) + } + if err != nil { + log.Err(err) + return nil + } + return addrConvert +} diff --git a/utils/addr/addr_test.go b/utils/addr/addr_test.go new file mode 100644 index 0000000..6cfbf35 --- /dev/null +++ b/utils/addr/addr_test.go @@ -0,0 +1,33 @@ +package addr + +import ( + "net" + "testing" +) + +func Benchmark_GetIPFromAddr(t *testing.B) { + client, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:808") + for i := 0; i < t.N; i++ { + GetIPFromAddr(client) + } + t.Log(GetIPFromAddr(client)) + t.ReportAllocs() +} + +func Benchmark_GetPortFromAddr(t *testing.B) { + client, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:808") + for i := 0; i < t.N; i++ { + GetPortFromAddr(client) + } + t.Log(GetPortFromAddr(client)) + t.ReportAllocs() +} + +func Benchmark_GetNetworkFromAddr(t *testing.B) { + client, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:808") + for i := 0; i < t.N; i++ { + GetNetworkFromAddr(client) + } + t.Log(GetNetworkFromAddr(client)) + t.ReportAllocs() +} diff --git a/utils/addr/debug.test b/utils/addr/debug.test new file mode 100644 index 0000000..e18fad0 Binary files /dev/null and b/utils/addr/debug.test differ diff --git a/vendor/github.com/asaskevich/EventBus/event_bus.go b/vendor/github.com/asaskevich/EventBus/event_bus.go index d4cf80f..9365e39 100644 --- a/vendor/github.com/asaskevich/EventBus/event_bus.go +++ b/vendor/github.com/asaskevich/EventBus/event_bus.go @@ -45,7 +45,7 @@ type eventHandler struct { flagOnce bool async bool transactional bool - sync.Mutex // lock for an event handler - useful for running async callbacks serially + sync.Mutex // lock for an event handler - useful for running async callbacks serially } // New returns new EventBus with empty handlers.