From 001b3bf5eaf237130d0589aac1656b351611e170 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=A5=E5=B0=98=EF=BC=88=E6=A8=B1=E3=81=AE=E6=B3=AA?= =?UTF-8?q?=EF=BC=89?= Date: Mon, 21 Jan 2019 23:55:05 +0800 Subject: [PATCH] fix: database config change sync service status;user traffic log update the user_id is 0 problem;service online problem --- Gopkg.toml | 8 +-- config/config.go | 6 +- db/db.go | 58 ++++++++----------- .../asaskevich/EventBus/event_bus.go | 2 +- 4 files changed, 34 insertions(+), 40 deletions(-) diff --git a/Gopkg.toml b/Gopkg.toml index 1c590fd..64803a1 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -24,10 +24,6 @@ # go-tests = true # unused-packages = true -[[constraint]] - branch = "master" - name = "golang.org/x/crypto" - [prune] go-tests = true unused-packages = true @@ -47,3 +43,7 @@ [[constraint]] branch = "master" name = "github.com/asaskevich/EventBus" + +[[constraint]] + name = "github.com/takama/daemon" + version = "0.11.0" diff --git a/config/config.go b/config/config.go index e9a2f11..f88db3f 100644 --- a/config/config.go +++ b/config/config.go @@ -67,8 +67,10 @@ func LoadConfig(file string) (*Config, error) { } configFile = file config = &Config{ - Mode: "db", - DbConfig: DbConfig{}, + Mode: "db", + DbConfig: DbConfig{ + Rate: -1, + }, } data, _ := json.MarshalIndent(config, "", " ") ioutil.WriteFile(configFile, data, 0644) diff --git a/db/db.go b/db/db.go index 5177f5b..0016d8c 100644 --- a/db/db.go +++ b/db/db.go @@ -4,11 +4,11 @@ import ( "bytes" "context" "fmt" - "net" - "strconv" "strings" "time" + "github.com/rc452860/vnet/utils/addr" + "github.com/rc452860/vnet/proxy" "github.com/rc452860/vnet/record" @@ -138,7 +138,7 @@ func DbStarted(ctx context.Context) { }, &conf.DbConfig.NodeId, nil) } - if conf.DbConfig.Rate == 0 { + if conf.DbConfig.Rate == -1 { survey.AskOne(&survey.Input{ Message: "what is your want rate?", Default: "1", @@ -182,6 +182,8 @@ func DBOnlineMonitor(ctx context.Context) { defer connect.Close() for { + ss_node_info := fmt.Sprintf("INSERT INTO `ss_node_info` (`id`,`node_id`,`uptime`,`load`,`log_time`)"+ + "VALUES (NULL,%v,unix_timestamp(),'%s',unix_timestamp())", conf.NodeId, FormatLoad()) ss_node_online_log := fmt.Sprintf("INSERT INTO `ss_node_online_log` (id,node_id,online_user,log_time) "+ " VALUES (NULL,%v,%v,unix_timestamp())", conf.NodeId, grmInstance.GetLastOneMinuteOnlineCount()) ss_node_ip := bytes.NewBufferString("") @@ -197,6 +199,7 @@ func DBOnlineMonitor(ctx context.Context) { ss_node_ip.WriteString(fmt.Sprintf("(NULL,%v,%v,'%s','%s',unix_timestamp()),", conf.NodeId, k, "-", ips.String()[:ips.Len()-1])) } connect.Exec(ss_node_online_log) + connect.Exec(ss_node_info) if ss_node_ip.Len() > headLen { connect.Exec(ss_node_ip.String()[:ss_node_ip.Len()-1]) } @@ -220,41 +223,42 @@ func DBServiceMonitor(ctx context.Context) { users, err := GetEnableUser() if err != nil { log.Err(err) - return + continue } sslist := service.CurrentShadowsocksService().List() userMap := make(map[int]User) - for _, user := range users { - userMap[user.Port] = user + + for _, ss := range sslist { flag := false - for _, ss := range sslist { - if ss.Port == user.Port { + for _, user := range users { + if ss.Port == user.Port && ss.Method == user.Method && ss.Password == user.Password { flag = true break } } if !flag { - log.Info("start port [%d]", user.Port) - StartShadowsocks(user) + log.Info("stop port [%d]", ss.Port) + service.CurrentShadowsocksService().Del(ss.Port) } } - for _, ss := range sslist { + for _, user := range users { + userMap[user.Port] = user flag := false - for _, user := range users { + for _, ss := range sslist { if ss.Port == user.Port { flag = true break } } if !flag { - log.Info("stop port [%d]", ss.Port) - service.CurrentShadowsocksService().Del(ss.Port) + log.Info("start port [%d]", user.Port) + StartShadowsocks(user) } } + UpdateTrafficByUser(userMap) - //TODO update user upload and download <-tick } } @@ -330,12 +334,9 @@ func UpdateTrafficByUser(users map[int]User) { inserStr := insertBuf.String() user_traffic_log := fmt.Sprintf("INSERT INTO `user_traffic_log` (`id`, `user_id`, `u`, `d`, "+ "`node_id`, `rate`, `traffic`, `log_time`) VALUES %s", inserStr[:len(inserStr)-1]) - ss_node_info := fmt.Sprintf("INSERT INTO `ss_node_info` (`id`,`node_id`,`uptime`,`load`,`log_time`)"+ - "VALUES (NULL,%v,unix_timestamp(),'%s',unix_timestamp())", conf.NodeId, FormatLoad()) connect.Exec(user) connect.Exec(user_traffic_log) - connect.Exec(ss_node_info) } func FormatLoad() string { @@ -354,25 +355,16 @@ func DBTrafficMonitor(ctx context.Context) { return case data = <-traffic: } - _, port, err := net.SplitHostPort(data.ClientAddr.String()) - if err != nil { - log.Err(err) - continue - } - porti, err := strconv.Atoi(port) - if err != nil { - log.Err(err) - continue - } - if trafficTable[porti] == nil { - trafficTable[porti] = &DBTraffic{ - Port: porti, + port := addr.GetPortFromAddr(data.ProxyAddr) + if trafficTable[port] == nil { + trafficTable[port] = &DBTraffic{ + Port: port, Up: data.Up, Down: data.Down, } } else { - trafficTable[porti].Up += data.Up - trafficTable[porti].Down += data.Down + trafficTable[port].Up += data.Up + trafficTable[port].Down += data.Down } } } diff --git a/vendor/github.com/asaskevich/EventBus/event_bus.go b/vendor/github.com/asaskevich/EventBus/event_bus.go index 9365e39..d4cf80f 100644 --- a/vendor/github.com/asaskevich/EventBus/event_bus.go +++ b/vendor/github.com/asaskevich/EventBus/event_bus.go @@ -45,7 +45,7 @@ type eventHandler struct { flagOnce bool async bool transactional bool - sync.Mutex // lock for an event handler - useful for running async callbacks serially + sync.Mutex // lock for an event handler - useful for running async callbacks serially } // New returns new EventBus with empty handlers.