From a14194523060f0a0426d2077ffdfb1337e7dcb01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8B=A5=E5=B0=98=EF=BC=88=E6=A8=B1=E3=81=AE=E6=B3=AA?= =?UTF-8?q?=EF=BC=89?= Date: Fri, 18 Jan 2019 23:48:16 +0800 Subject: [PATCH] feat: using eventbus for traffic statistics --- Gopkg.lock | 9 + Gopkg.toml | 9 +- cmd/test/test.go | 9 +- comm/eventbus/eventbus.go | 17 ++ conn/common.go | 9 - conn/conn_decorate.go | 3 +- conn/listener_decorate.go | 168 -------------- db/db.go | 8 +- ...00\345\217\221\351\241\273\347\237\245.md" | 5 +- proxy/proxy.go | 64 +++--- proxy/server/shadowsocks.go | 22 +- record/record.go | 115 ++++++++++ service/shadowsocks_test.go | 2 +- .../asaskevich/EventBus/.travis.yml | 10 + vendor/github.com/asaskevich/EventBus/LICENSE | 22 ++ .../github.com/asaskevich/EventBus/README.md | 162 ++++++++++++++ .../github.com/asaskevich/EventBus/client.go | 120 ++++++++++ .../asaskevich/EventBus/event_bus.go | 207 ++++++++++++++++++ .../asaskevich/EventBus/network_bus.go | 76 +++++++ .../github.com/asaskevich/EventBus/server.go | 153 +++++++++++++ 20 files changed, 948 insertions(+), 242 deletions(-) create mode 100644 comm/eventbus/eventbus.go delete mode 100644 conn/common.go delete mode 100644 conn/listener_decorate.go create mode 100644 record/record.go create mode 100644 vendor/github.com/asaskevich/EventBus/.travis.yml create mode 100644 vendor/github.com/asaskevich/EventBus/LICENSE create mode 100644 vendor/github.com/asaskevich/EventBus/README.md create mode 100644 vendor/github.com/asaskevich/EventBus/client.go create mode 100644 vendor/github.com/asaskevich/EventBus/event_bus.go create mode 100644 vendor/github.com/asaskevich/EventBus/network_bus.go create mode 100644 vendor/github.com/asaskevich/EventBus/server.go diff --git a/Gopkg.lock b/Gopkg.lock index 9259a0b..80f573f 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -25,6 +25,14 @@ pruneopts = "UT" revision = "e3b1f968fc6397b51d963fee8ec8711a47bc0ce8" +[[projects]] + branch = "master" + digest = "1:e2a1ff1174d564ed4b75a62757f4a9081ed3b8c99ed17e47eb252b048b4ff018" + name = "github.com/asaskevich/EventBus" + packages = ["."] + pruneopts = "UT" + revision = "d46933a94f05c6657d7b923fcf5ac563ee37ec79" + [[projects]] branch = "master" digest = "1:36fe9527deed01d2a317617e59304eb2c4ce9f8a24115bcc5c2e37b3aee5bae4" @@ -311,6 +319,7 @@ input-imports = [ "github.com/AlecAivazis/survey", "github.com/Yawning/chacha20", + "github.com/asaskevich/EventBus", "github.com/gin-gonic/gin", "github.com/go-sql-driver/mysql", "github.com/jinzhu/gorm", diff --git a/Gopkg.toml b/Gopkg.toml index d6450c8..1c590fd 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -24,11 +24,6 @@ # go-tests = true # unused-packages = true - -[[constraint]] - branch = "master" - name = "github.com/aead/chacha20" - [[constraint]] branch = "master" name = "golang.org/x/crypto" @@ -48,3 +43,7 @@ [[constraint]] name = "github.com/AlecAivazis/survey" version = "1.8.0" + +[[constraint]] + branch = "master" + name = "github.com/asaskevich/EventBus" diff --git a/cmd/test/test.go b/cmd/test/test.go index 8e6976c..57636bf 100644 --- a/cmd/test/test.go +++ b/cmd/test/test.go @@ -4,8 +4,11 @@ import ( "fmt" ) -func main() { - s := "a,c,b," - fmt.Printf(s[:len(s)-1]) +type Test struct { + Name string +} +func main() { + a := Test{} + fmt.Print(a.(type)) } diff --git a/comm/eventbus/eventbus.go b/comm/eventbus/eventbus.go new file mode 100644 index 0000000..2187d05 --- /dev/null +++ b/comm/eventbus/eventbus.go @@ -0,0 +1,17 @@ +package eventbus + +import ( + evbus "github.com/asaskevich/EventBus" + "github.com/rc452860/vnet/utils" +) + +var eventBus evbus.Bus + +func GetEventBus() evbus.Bus { + utils.Lock("eventbus:eventBus") + defer utils.UnLock("eventbus:eventBus") + if eventBus != nil { + eventBus = evbus.New() + } + return eventBus +} diff --git a/conn/common.go b/conn/common.go deleted file mode 100644 index 7897751..0000000 --- a/conn/common.go +++ /dev/null @@ -1,9 +0,0 @@ -package conn - -import "github.com/rc452860/vnet/log" - -var logging *log.Logging - -func init() { - logging = log.GetLogger("root") -} diff --git a/conn/conn_decorate.go b/conn/conn_decorate.go index 03ace0e..76349da 100644 --- a/conn/conn_decorate.go +++ b/conn/conn_decorate.go @@ -7,6 +7,7 @@ import ( "net" "time" + "github.com/rc452860/vnet/log" "github.com/rc452860/vnet/pool" "github.com/rc452860/vnet/utils" "golang.org/x/time/rate" @@ -26,7 +27,7 @@ func DialTcp(addr string) (IConn, error) { } err = con.(*net.TCPConn).SetKeepAlivePeriod(30 * time.Second) if err != nil { - logging.Error("set tcp keepalive error: %v", err) + log.Error("set tcp keepalive error: %v", err) } return icon, nil } diff --git a/conn/listener_decorate.go b/conn/listener_decorate.go deleted file mode 100644 index 8b30590..0000000 --- a/conn/listener_decorate.go +++ /dev/null @@ -1,168 +0,0 @@ -package conn - -import ( - "net" - "time" - - "github.com/rc452860/vnet/pool" -) - -type IListener interface { - Accept() (IConn, error) - Close() error -} - -type DefaultListener struct { - net.Listener - closed chan struct{} -} -type Handle func(IConn) - -func ListenTcp(addr string, handle Handle) (IListener, error) { - tcpAddr, err := net.ResolveTCPAddr("tcp", addr) - if err != nil { - logging.Error(err.Error()) - return nil, err - } - server, err := net.ListenTCP("tcp", tcpAddr) - if err != nil { - logging.Error(err.Error()) - return nil, err - } - - listener := DefaultListener{ - Listener: server, - closed: make(chan struct{}), - } - - go func() { - for { - select { - case <-listener.closed: - return - default: - } - listener.Listener.(*net.TCPListener).SetDeadline(time.Now().Add(1e9)) - con, err := listener.Accept() - if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() { - continue - } - - if err != nil { - logging.Error(err.Error()) - continue - } - - go handle(con) - } - }() - - return listener, nil -} - -func (d DefaultListener) Accept() (IConn, error) { - con, err := d.Listener.Accept() - if err != nil { - return nil, err - } - err = con.(*net.TCPConn).SetKeepAlivePeriod(30 * time.Second) - if err != nil { - logging.Error(err.Error()) - } - c, err := NewDefaultConn(con, TCP) - if err != nil { - return nil, err - } - - return c, nil -} - -func (d DefaultListener) Close() error { - d.closed <- struct{}{} - return d.Listener.Close() -} - -// UDP Listener -type IUDPListener interface { - net.PacketConn -} - -type DefaultUdpListener struct { - net.PacketConn - closed chan struct{} -} - -type UdpPacket struct { - Buf []byte - Addr *net.UDPAddr - net.PacketConn -} - -func ListenUdp(addr string, handle func(*UdpPacket)) (IUDPListener, error) { - l, err := net.ListenPacket("udp", addr) - if err != nil { - return nil, err - } - closed := make(chan struct{}) - listener := &DefaultUdpListener{ - PacketConn: l, - closed: closed, - } - for { - buf := pool.GetUdpBuf() - n, raddr, err := l.ReadFrom(buf) - if err != nil { - logging.Err(err) - continue - } - logging.Debug("recive packet: %v", raddr) - - udpPacket := &UdpPacket{ - Buf: buf[:n], - Addr: raddr.(*net.UDPAddr), - PacketConn: l, - } - go func() { - defer pool.PutUdpBuf(buf) - handle(udpPacket) - }() - } - return listener, nil -} - -// func ListenUdpS(password, method, addr string, handle func(*UdpPacket)) (IUDPListener, error) { -// l, err := net.ListenPacket("udp", addr) -// if err != nil { -// return nil, err -// } -// l, err = ciphers.CipherPacketDecorate(password, method, l) -// if err != nil { -// logging.Error("can't init packet decorate") -// return nil, err -// } -// closed := make(chan struct{}) -// listener := &DefaultUdpListener{ -// PacketConn: l, -// closed: closed, -// } -// for { -// buf := pool.GetUdpBuf() -// n, raddr, err := l.ReadFrom(buf) -// if err != nil { -// logging.Err(err) -// continue -// } -// logging.Debug("recive packet: %v", raddr) - -// udpPacket := &UdpPacket{ -// Buf: buf[:n], -// Addr: raddr.(*net.UDPAddr), -// PacketConn: l, -// } -// go func() { -// defer pool.PutUdpBuf(buf) -// handle(udpPacket) -// }() -// } -// return listener, nil -// } diff --git a/db/db.go b/db/db.go index 5cddf7b..583778c 100644 --- a/db/db.go +++ b/db/db.go @@ -196,9 +196,11 @@ func DbStarted(ctx context.Context) { } type DBTraffic struct { - Port int - Up uint64 - Down uint64 + Port int + Up uint64 + Down uint64 + ConnectCount int + Connects map[string]bool } var trafficTable = make(map[int]*DBTraffic) diff --git "a/doc/\345\274\200\345\217\221\351\241\273\347\237\245.md" "b/doc/\345\274\200\345\217\221\351\241\273\347\237\245.md" index d380aab..4e29b5f 100644 --- "a/doc/\345\274\200\345\217\221\351\241\273\347\237\245.md" +++ "b/doc/\345\274\200\345\217\221\351\241\273\347\237\245.md" @@ -2,4 +2,7 @@ golang 语言学习网址: http://docscn.studygolang.com/ golang 标准库: https://studygolang.com/pkgdoc ## 依赖管理 -dep \ No newline at end of file +dep ensure + + +## EventBus diff --git a/proxy/proxy.go b/proxy/proxy.go index 100d2bd..d8db363 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -1,6 +1,7 @@ package proxy import ( + "context" "net" "sync" "time" @@ -21,32 +22,29 @@ type TrafficMessage struct { DownBytes uint64 } type ProxyService struct { - Tcp net.Listener `json:"tcp"` - Udp net.PacketConn `json:"udp"` - UpSpeed uint64 `json:"upspeed"` - DownSpeed uint64 `json:"downspeed"` - UpBytes uint64 `json:"upbytes"` - DownBytes uint64 `json:"downbytes"` - TcpClose chan struct{} `json:"_"` - UdpClose chan struct{} `json:"_"` - TrafficClose chan struct{} `json:"_"` - TrafficMQ chan TrafficMessage `json:"_"` - TcpLock *sync.Mutex `json:"_"` - UdpLock *sync.Mutex `json:"_"` - Wait *sync.WaitGroup `json:"-"` - Status string `json:"status"` + context.Context `json:"-"` + Tcp net.Listener `json:"tcp"` + Udp net.PacketConn `json:"udp"` + UpSpeed uint64 `json:"upspeed"` + DownSpeed uint64 `json:"downspeed"` + UpBytes uint64 `json:"upbytes"` + DownBytes uint64 `json:"downbytes"` + TrafficMQ chan TrafficMessage `json:"_"` + TcpLock *sync.Mutex `json:"_"` + UdpLock *sync.Mutex `json:"_"` + Status string `json:"status"` + Cancel context.CancelFunc `json:"-` } func NewProxyService() *ProxyService { + ctx, cancel := context.WithCancel(context.Background()) return &ProxyService{ - TcpClose: make(chan struct{}), - UdpClose: make(chan struct{}), - TrafficClose: make(chan struct{}), - TcpLock: &sync.Mutex{}, - UdpLock: &sync.Mutex{}, - TrafficMQ: make(chan TrafficMessage, 128), - Wait: &sync.WaitGroup{}, - Status: "stop", + TcpLock: &sync.Mutex{}, + UdpLock: &sync.Mutex{}, + TrafficMQ: make(chan TrafficMessage, 128), + Status: "stop", + Context: ctx, + Cancel: cancel, } } @@ -58,9 +56,7 @@ func RegisterTrafficHandle(trafficMonitor chan TrafficMessage) { } func (this *ProxyService) TrafficMeasure() { - speedClose := make(chan struct{}) - countClose := make(chan struct{}) - this.Wait.Add(1) + go func() { var upTmp, downTmp uint64 = this.UpBytes, this.DownBytes tick := time.Tick(1 * time.Second) @@ -70,20 +66,17 @@ func (this *ProxyService) TrafficMeasure() { select { case <-tick: continue - case <-speedClose: - this.Wait.Done() + case <-this.Done(): return } } }() - this.Wait.Add(1) go func() { for { var data TrafficMessage select { - case <-countClose: + case <-this.Done(): log.Info("close countClose") - this.Wait.Done() return case data = <-this.TrafficMQ: } @@ -96,39 +89,32 @@ func (this *ProxyService) TrafficMeasure() { } } }() - <-this.TrafficClose + <-this.Done() log.Info("close traffic measure") - speedClose <- struct{}{} - countClose <- struct{}{} } func (this *ProxyService) Start() error { this.Status = "run" - this.Wait.Add(2) return nil } func (this *ProxyService) Stop() error { log.Info("proxy stop") - this.TrafficClose <- struct{}{} + this.Cancel() // this.TcpClose <- struct{}{} if this.Tcp != nil { err := this.Tcp.Close() if err != nil { log.Err(err) } - this.Wait.Done() } if this.Udp != nil { err := this.Udp.Close() if err != nil { log.Err(err) } - this.Wait.Done() } - this.Wait.Wait() this.Status = "stop" - // this.UdpClose <- struct{}{} return nil } diff --git a/proxy/server/shadowsocks.go b/proxy/server/shadowsocks.go index 92d61b5..9a81fda 100644 --- a/proxy/server/shadowsocks.go +++ b/proxy/server/shadowsocks.go @@ -170,13 +170,11 @@ func (s *ShadowsocksProxy) startTCP() error { go func() { defer server.Close() for { - // select { - // case <-s.ProxyService.TcpClose: - // s.Tcp.Close() - // s.Tcp = nil - // return - // default: - // } + select { + case <-s.ProxyService.Done(): + return + default: + } server.SetDeadline(time.Now().Add(s.TCPTimeout)) lcon, err := server.Accept() if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() { @@ -208,11 +206,6 @@ func (s *ShadowsocksProxy) startTCP() error { /** 限流装饰器 */ lcd, _ = conn.TrafficLimitDecorate(lcd, s.ReadLimit, s.WriteLimit) - // lcd, err = conn.TimerDecorate(lcd, s.TcpTimeout, s.TcpTimeout) - // if err != nil { - // logging.Err(err) - // return - // } /** 加密装饰器 */ lcd, err = ciphers.CipherDecorate(s.Password, s.Method, lcd) if err != nil { @@ -339,6 +332,11 @@ func (s *ShadowsocksProxy) startUDP() error { go func() { defer server.Close() for { + select { + case <-s.ProxyService.Done(): + return + default: + } server.SetDeadline(time.Now().Add(s.UDPTimeout)) n, raddr, err := server.ReadFrom(buf) if opErr, ok := err.(*net.OpError); ok && opErr.Timeout() { diff --git a/record/record.go b/record/record.go new file mode 100644 index 0000000..8ae544a --- /dev/null +++ b/record/record.go @@ -0,0 +1,115 @@ +package record + +import ( + "context" + "net" + "sync" + + "github.com/rc452860/vnet/comm/eventbus" + + "github.com/rc452860/vnet/utils" +) + +type Record struct { + ProxyAddr net.Addr + ConnectionsPair + Protocol string + Network string + RecordType string + Up uint64 + Down uint64 + Status int +} + +type ConnectionsPair struct { + Client net.Addr + Target net.Addr +} + +type GlobalResourceMonitor struct { + sync.RWMutex + LastMinuteOnline map[string][]ConnectionsPair + TrafficUp map[string]uint64 + TrafficDown map[string]uint64 + TrafficUpSpeed map[string]uint64 + TrafficDownSpeed map[string]uint64 + GlobalUp uint64 + GlobalDown uint64 + GlobalSpeedUp uint64 + GlobalSpeeddown uint64 + PacketCount uint64 + // cancel context.CancelFunc + // context context.Context +} + +var ( + globalResourceMonitor *GlobalResourceMonitor +) + +func GetGRMInstance() *GlobalResourceMonitor { + utils.Lock("GRM") + defer utils.UnLock("GRM") + + if globalResourceMonitor == nil { + globalResourceMonitor = &GlobalResourceMonitor{ + LastMinuteOnline: make(map[string][]ConnectionsPair), + TrafficUp: make(map[string]uint64), + TrafficDown: make(map[string]uint64), + TrafficUpSpeed: make(map[string]uint64), + TrafficDownSpeed: make(map[string]uint64), + GlobalUp: 0, + GlobalDown: 0, + GlobalSpeedUp: 0, + GlobalSpeeddown: 0, + PacketCount: 0, + } + } + + ctx, cancel := context.WithCancel(context.Background()) + // globalResourceMonitor.cancel = cancel + // globalResourceMonitor.context = ctx + eventbus.GetEventBus().SubscribeAsync( + "traffic", + ) + return globalResourceMonitor +} + +func (g *GlobalResourceMonitor) Stop() { + utils.Lock("GRM") + defer utils.UnLock("GRM") + // g.cancel() + globalResourceMonitor = nil + +} +func (g *GlobalResourceMonitor) Subscript(name string, sub <-chan Record) { + g.recordObservers.Store(name, sub) +} + +func (g *GlobalResourceMonitor) UnSubscript(name string) { + g.recordObservers.Delete(name) +} + +func (g *GlobalResourceMonitor) Publish(record Record) { + g.reciver <- record +} + +func (g *GlobalResourceMonitor) process() { + for { + // var data interface{} + // select { + // case <-g.context.Done(): + // return + // default: + // case data = <-g.reciver: + // } + + } +} + +func (g *GlobalResourceMonitor) trafficStatistics(data Record) { + g.TrafficDown[data.Port] += data.Down + g.TrafficUp[data.Port] += data.Up + g.GlobalUp += data.Up + g.GlobalDown += data.Down + +} diff --git a/service/shadowsocks_test.go b/service/shadowsocks_test.go index fe95c29..cc01117 100644 --- a/service/shadowsocks_test.go +++ b/service/shadowsocks_test.go @@ -9,7 +9,7 @@ import ( ) func TestMemProblem(t *testing.T) { - log.GetLogger("root").Level = log.WARN + log.GetLogger("root").Level = log.INFO for i := 10000; i < 20000; i++ { CurrentShadowsocksService().Add("0.0.0.0", "aes-128-cfb", "killer", i, "", 0) CurrentShadowsocksService().Start(i) diff --git a/vendor/github.com/asaskevich/EventBus/.travis.yml b/vendor/github.com/asaskevich/EventBus/.travis.yml new file mode 100644 index 0000000..7fb1397 --- /dev/null +++ b/vendor/github.com/asaskevich/EventBus/.travis.yml @@ -0,0 +1,10 @@ +language: go + +go: + - 1.1 + - 1.2 + - 1.3 + +notifications: + email: + - bwatas@gmail.com \ No newline at end of file diff --git a/vendor/github.com/asaskevich/EventBus/LICENSE b/vendor/github.com/asaskevich/EventBus/LICENSE new file mode 100644 index 0000000..9603a6f --- /dev/null +++ b/vendor/github.com/asaskevich/EventBus/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2014 Alex Saskevich + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/vendor/github.com/asaskevich/EventBus/README.md b/vendor/github.com/asaskevich/EventBus/README.md new file mode 100644 index 0000000..b3bd42c --- /dev/null +++ b/vendor/github.com/asaskevich/EventBus/README.md @@ -0,0 +1,162 @@ +EventBus +====== + +[![GoDoc](https://godoc.org/github.com/asaskevich/EventBus?status.svg)](https://godoc.org/github.com/asaskevich/EventBus) [![Coverage Status](https://img.shields.io/coveralls/asaskevich/EventBus.svg)](https://coveralls.io/r/asaskevich/EventBus?branch=master) [![Build Status](https://travis-ci.org/asaskevich/EventBus.svg)](https://travis-ci.org/asaskevich/EventBus) + +Package EventBus is the little and lightweight eventbus with async compatibility for GoLang. + +#### Installation +Make sure that Go is installed on your computer. +Type the following command in your terminal: + + go get github.com/asaskevich/EventBus + +After it the package is ready to use. + +#### Import package in your project +Add following line in your `*.go` file: +```go +import "github.com/asaskevich/EventBus" +``` +If you unhappy to use long `EventBus`, you can do something like this: +```go +import ( + evbus "github.com/asaskevich/EventBus" +) +``` + +#### Example +```go +func calculator(a int, b int) { + fmt.Printf("%d\n", a + b) +} + +func main() { + bus := EventBus.New(); + bus.Subscribe("main:calculator", calculator); + bus.Publish("main:calculator", 20, 40); + bus.Unsubscribe("main:calculator", calculator); +} +``` + +#### Implemented methods +* **New()** +* **Subscribe()** +* **SubscribeOnce()** +* **HasCallback()** +* **Unsubscribe()** +* **Publish()** +* **SubscribeAsync()** +* **SubscribeOnceAsync()** +* **WaitAsync()** + +#### New() +New returns new EventBus with empty handlers. +```go +bus := EventBus.New(); +``` + +#### Subscribe(topic string, fn interface{}) error +Subscribe to a topic. Returns error if `fn` is not a function. +```go +func Handler() { ... } +... +bus.Subscribe("topic:handler", Handler) +``` + +#### SubscribeOnce(topic string, fn interface{}) error +Subscribe to a topic once. Handler will be removed after executing. Returns error if `fn` is not a function. +```go +func HelloWorld() { ... } +... +bus.SubscribeOnce("topic:handler", HelloWorld) +``` + +#### Unsubscribe(topic string, fn interface{}) error +Remove callback defined for a topic. Returns error if there are no callbacks subscribed to the topic. +```go +bus.Unsubscribe("topic:handler", HelloWord); +``` + +#### HasCallback(topic string) bool +Returns true if exists any callback subscribed to the topic. + +#### Publish(topic string, args ...interface{}) +Publish executes callback defined for a topic. Any additional argument will be transferred to the callback. +```go +func Handler(str string) { ... } +... +bus.Subscribe("topic:handler", Handler) +... +bus.Publish("topic:handler", "Hello, World!"); +``` + +#### SubscribeAsync(topic string, fn interface{}, transactional bool) +Subscribe to a topic with an asynchronous callback. Returns error if `fn` is not a function. +```go +func slowCalculator(a, b int) { + time.Sleep(3 * time.Second) + fmt.Printf("%d\n", a + b) +} + +bus := EventBus.New() +bus.SubscribeAsync("main:slow_calculator", slowCalculator, false) + +bus.Publish("main:slow_calculator", 20, 60) + +fmt.Println("start: do some stuff while waiting for a result") +fmt.Println("end: do some stuff while waiting for a result") + +bus.WaitAsync() // wait for all async callbacks to complete + +fmt.Println("do some stuff after waiting for result") +``` +Transactional determines whether subsequent callbacks for a topic are run serially (true) or concurrently(false) + +#### SubscribeOnceAsync(topic string, args ...interface{}) +SubscribeOnceAsync works like SubscribeOnce except the callback to executed asynchronously + +#### WaitAsync() +WaitAsync waits for all async callbacks to complete. + +#### Cross Process Events +Works with two rpc services: +- a client service to listen to remotely published events from a server +- a server service to listen to client subscriptions + +server.go +```go +func main() { + server := NewServer(":2010", "/_server_bus_", New()) + server.Start() + // ... + server.EventBus().Publish("main:calculator", 4, 6) + // ... + server.Stop() +} +``` + +client.go +```go +func main() { + client := NewClient(":2015", "/_client_bus_", New()) + client.Start() + client.Subscribe("main:calculator", calculator, ":2010", "/_server_bus_") + // ... + client.Stop() +} +``` + +#### Notes +Documentation is available here: [godoc.org](https://godoc.org/github.com/asaskevich/EventBus). +Full information about code coverage is also available here: [EventBus on gocover.io](http://gocover.io/github.com/asaskevich/EventBus). + +#### Support +If you do have a contribution for the package feel free to put up a Pull Request or open Issue. + +#### Special thanks to [contributors](https://github.com/asaskevich/EventBus/graphs/contributors) +* [Brian Downs](https://github.com/briandowns) +* [Dominik Schulz](https://github.com/gittex) +* [bennAH](https://github.com/bennAH) +* [John Noble] (https://github.com/gaxunil) +* [Evan Borgstrom] (https://github.com/borgstrom) diff --git a/vendor/github.com/asaskevich/EventBus/client.go b/vendor/github.com/asaskevich/EventBus/client.go new file mode 100644 index 0000000..56ad290 --- /dev/null +++ b/vendor/github.com/asaskevich/EventBus/client.go @@ -0,0 +1,120 @@ +package EventBus + +import ( + "errors" + "fmt" + "net" + "net/http" + "net/rpc" + "sync" +) + +const ( + // PublishService - Client service method + PublishService = "ClientService.PushEvent" +) + +// ClientArg - object containing event for client to publish locally +type ClientArg struct { + Args []interface{} + Topic string +} + +// Client - object capable of subscribing to a remote event bus +type Client struct { + eventBus Bus + address string + path string + service *ClientService +} + +// NewClient - create a client object with the address and server path +func NewClient(address, path string, eventBus Bus) *Client { + client := new(Client) + client.eventBus = eventBus + client.address = address + client.path = path + client.service = &ClientService{client, &sync.WaitGroup{}, false} + return client +} + +// EventBus - returns the underlying event bus +func (client *Client) EventBus() Bus { + return client.eventBus +} + +func (client *Client) doSubscribe(topic string, fn interface{}, serverAddr, serverPath string, subscribeType SubscribeType) { + defer func() { + if r := recover(); r != nil { + fmt.Println("Server not found -", r) + } + }() + + rpcClient, err := rpc.DialHTTPPath("tcp", serverAddr, serverPath) + defer rpcClient.Close() + if err != nil { + fmt.Errorf("dialing: %v", err) + } + args := &SubscribeArg{client.address, client.path, PublishService, subscribeType, topic} + reply := new(bool) + err = rpcClient.Call(RegisterService, args, reply) + if err != nil { + fmt.Errorf("Register error: %v", err) + } + if *reply { + client.eventBus.Subscribe(topic, fn) + } +} + +//Subscribe subscribes to a topic in a remote event bus +func (client *Client) Subscribe(topic string, fn interface{}, serverAddr, serverPath string) { + client.doSubscribe(topic, fn, serverAddr, serverPath, Subscribe) +} + +//SubscribeOnce subscribes once to a topic in a remote event bus +func (client *Client) SubscribeOnce(topic string, fn interface{}, serverAddr, serverPath string) { + client.doSubscribe(topic, fn, serverAddr, serverPath, SubscribeOnce) +} + +// Start - starts the client service to listen to remote events +func (client *Client) Start() error { + var err error + service := client.service + if !service.started { + server := rpc.NewServer() + server.Register(service) + server.HandleHTTP(client.path, "/debug"+client.path) + l, err := net.Listen("tcp", client.address) + if err == nil { + service.wg.Add(1) + service.started = true + go http.Serve(l, nil) + } + } else { + err = errors.New("Client service already started") + } + return err +} + +// Stop - signal for the service to stop serving +func (client *Client) Stop() { + service := client.service + if service.started { + service.wg.Done() + service.started = false + } +} + +// ClientService - service object listening to events published in a remote event bus +type ClientService struct { + client *Client + wg *sync.WaitGroup + started bool +} + +// PushEvent - exported service to listening to remote events +func (service *ClientService) PushEvent(arg *ClientArg, reply *bool) error { + service.client.eventBus.Publish(arg.Topic, arg.Args...) + *reply = true + return nil +} diff --git a/vendor/github.com/asaskevich/EventBus/event_bus.go b/vendor/github.com/asaskevich/EventBus/event_bus.go new file mode 100644 index 0000000..d4cf80f --- /dev/null +++ b/vendor/github.com/asaskevich/EventBus/event_bus.go @@ -0,0 +1,207 @@ +package EventBus + +import ( + "fmt" + "reflect" + "sync" +) + +//BusSubscriber defines subscription-related bus behavior +type BusSubscriber interface { + Subscribe(topic string, fn interface{}) error + SubscribeAsync(topic string, fn interface{}, transactional bool) error + SubscribeOnce(topic string, fn interface{}) error + SubscribeOnceAsync(topic string, fn interface{}) error + Unsubscribe(topic string, handler interface{}) error +} + +//BusPublisher defines publishing-related bus behavior +type BusPublisher interface { + Publish(topic string, args ...interface{}) +} + +//BusController defines bus control behavior (checking handler's presence, synchronization) +type BusController interface { + HasCallback(topic string) bool + WaitAsync() +} + +//Bus englobes global (subscribe, publish, control) bus behavior +type Bus interface { + BusController + BusSubscriber + BusPublisher +} + +// EventBus - box for handlers and callbacks. +type EventBus struct { + handlers map[string][]*eventHandler + lock sync.Mutex // a lock for the map + wg sync.WaitGroup +} + +type eventHandler struct { + callBack reflect.Value + flagOnce bool + async bool + transactional bool + sync.Mutex // lock for an event handler - useful for running async callbacks serially +} + +// New returns new EventBus with empty handlers. +func New() Bus { + b := &EventBus{ + make(map[string][]*eventHandler), + sync.Mutex{}, + sync.WaitGroup{}, + } + return Bus(b) +} + +// doSubscribe handles the subscription logic and is utilized by the public Subscribe functions +func (bus *EventBus) doSubscribe(topic string, fn interface{}, handler *eventHandler) error { + bus.lock.Lock() + defer bus.lock.Unlock() + if !(reflect.TypeOf(fn).Kind() == reflect.Func) { + return fmt.Errorf("%s is not of type reflect.Func", reflect.TypeOf(fn).Kind()) + } + bus.handlers[topic] = append(bus.handlers[topic], handler) + return nil +} + +// Subscribe subscribes to a topic. +// Returns error if `fn` is not a function. +func (bus *EventBus) Subscribe(topic string, fn interface{}) error { + return bus.doSubscribe(topic, fn, &eventHandler{ + reflect.ValueOf(fn), false, false, false, sync.Mutex{}, + }) +} + +// SubscribeAsync subscribes to a topic with an asynchronous callback +// Transactional determines whether subsequent callbacks for a topic are +// run serially (true) or concurrently (false) +// Returns error if `fn` is not a function. +func (bus *EventBus) SubscribeAsync(topic string, fn interface{}, transactional bool) error { + return bus.doSubscribe(topic, fn, &eventHandler{ + reflect.ValueOf(fn), false, true, transactional, sync.Mutex{}, + }) +} + +// SubscribeOnce subscribes to a topic once. Handler will be removed after executing. +// Returns error if `fn` is not a function. +func (bus *EventBus) SubscribeOnce(topic string, fn interface{}) error { + return bus.doSubscribe(topic, fn, &eventHandler{ + reflect.ValueOf(fn), true, false, false, sync.Mutex{}, + }) +} + +// SubscribeOnceAsync subscribes to a topic once with an asynchronous callback +// Handler will be removed after executing. +// Returns error if `fn` is not a function. +func (bus *EventBus) SubscribeOnceAsync(topic string, fn interface{}) error { + return bus.doSubscribe(topic, fn, &eventHandler{ + reflect.ValueOf(fn), true, true, false, sync.Mutex{}, + }) +} + +// HasCallback returns true if exists any callback subscribed to the topic. +func (bus *EventBus) HasCallback(topic string) bool { + bus.lock.Lock() + defer bus.lock.Unlock() + _, ok := bus.handlers[topic] + if ok { + return len(bus.handlers[topic]) > 0 + } + return false +} + +// Unsubscribe removes callback defined for a topic. +// Returns error if there are no callbacks subscribed to the topic. +func (bus *EventBus) Unsubscribe(topic string, handler interface{}) error { + bus.lock.Lock() + defer bus.lock.Unlock() + if _, ok := bus.handlers[topic]; ok && len(bus.handlers[topic]) > 0 { + bus.removeHandler(topic, bus.findHandlerIdx(topic, reflect.ValueOf(handler))) + return nil + } + return fmt.Errorf("topic %s doesn't exist", topic) +} + +// Publish executes callback defined for a topic. Any additional argument will be transferred to the callback. +func (bus *EventBus) Publish(topic string, args ...interface{}) { + bus.lock.Lock() // will unlock if handler is not found or always after setUpPublish + defer bus.lock.Unlock() + if handlers, ok := bus.handlers[topic]; ok && 0 < len(handlers) { + // Handlers slice may be changed by removeHandler and Unsubscribe during iteration, + // so make a copy and iterate the copied slice. + copyHandlers := make([]*eventHandler, 0, len(handlers)) + copyHandlers = append(copyHandlers, handlers...) + for i, handler := range copyHandlers { + if handler.flagOnce { + bus.removeHandler(topic, i) + } + if !handler.async { + bus.doPublish(handler, topic, args...) + } else { + bus.wg.Add(1) + if handler.transactional { + handler.Lock() + } + go bus.doPublishAsync(handler, topic, args...) + } + } + } +} + +func (bus *EventBus) doPublish(handler *eventHandler, topic string, args ...interface{}) { + passedArguments := bus.setUpPublish(topic, args...) + handler.callBack.Call(passedArguments) +} + +func (bus *EventBus) doPublishAsync(handler *eventHandler, topic string, args ...interface{}) { + defer bus.wg.Done() + if handler.transactional { + defer handler.Unlock() + } + bus.doPublish(handler, topic, args...) +} + +func (bus *EventBus) removeHandler(topic string, idx int) { + if _, ok := bus.handlers[topic]; !ok { + return + } + l := len(bus.handlers[topic]) + + if !(0 <= idx && idx < l) { + return + } + + copy(bus.handlers[topic][idx:], bus.handlers[topic][idx+1:]) + bus.handlers[topic][l-1] = nil // or the zero value of T + bus.handlers[topic] = bus.handlers[topic][:l-1] +} + +func (bus *EventBus) findHandlerIdx(topic string, callback reflect.Value) int { + if _, ok := bus.handlers[topic]; ok { + for idx, handler := range bus.handlers[topic] { + if handler.callBack == callback { + return idx + } + } + } + return -1 +} + +func (bus *EventBus) setUpPublish(topic string, args ...interface{}) []reflect.Value { + + passedArguments := make([]reflect.Value, 0) + for _, arg := range args { + passedArguments = append(passedArguments, reflect.ValueOf(arg)) + } + return passedArguments +} + +// WaitAsync waits for all async callbacks to complete +func (bus *EventBus) WaitAsync() { + bus.wg.Wait() +} diff --git a/vendor/github.com/asaskevich/EventBus/network_bus.go b/vendor/github.com/asaskevich/EventBus/network_bus.go new file mode 100644 index 0000000..c6c877d --- /dev/null +++ b/vendor/github.com/asaskevich/EventBus/network_bus.go @@ -0,0 +1,76 @@ +package EventBus + +import ( + "errors" + "fmt" + "net" + "net/http" + "net/rpc" + "sync" +) + +// NetworkBus - object capable of subscribing to remote event buses in addition to remote event +// busses subscribing to it's local event bus. Compoed of a server and client +type NetworkBus struct { + *Client + *Server + service *NetworkBusService + sharedBus Bus + address string + path string +} + +// NewNetworkBus - returns a new network bus object at the server address and path +func NewNetworkBus(address, path string) *NetworkBus { + bus := new(NetworkBus) + bus.sharedBus = New() + bus.Server = NewServer(address, path, bus.sharedBus) + bus.Client = NewClient(address, path, bus.sharedBus) + bus.service = &NetworkBusService{&sync.WaitGroup{}, false} + bus.address = address + bus.path = path + return bus +} + +// EventBus - returns wrapped event bus +func (networkBus *NetworkBus) EventBus() Bus { + return networkBus.sharedBus +} + +// NetworkBusService - object capable of serving the network bus +type NetworkBusService struct { + wg *sync.WaitGroup + started bool +} + +// Start - helper method to serve a network bus service +func (networkBus *NetworkBus) Start() error { + var err error + service := networkBus.service + clientService := networkBus.Client.service + serverService := networkBus.Server.service + if !service.started { + server := rpc.NewServer() + server.RegisterName("ServerService", serverService) + server.RegisterName("ClientService", clientService) + server.HandleHTTP(networkBus.path, "/debug"+networkBus.path) + l, e := net.Listen("tcp", networkBus.address) + if e != nil { + err = fmt.Errorf("listen error: %v", e) + } + service.wg.Add(1) + go http.Serve(l, nil) + } else { + err = errors.New("Server bus already started") + } + return err +} + +// Stop - signal for the service to stop serving +func (networkBus *NetworkBus) Stop() { + service := networkBus.service + if service.started { + service.wg.Done() + service.started = false + } +} diff --git a/vendor/github.com/asaskevich/EventBus/server.go b/vendor/github.com/asaskevich/EventBus/server.go new file mode 100644 index 0000000..9f899ed --- /dev/null +++ b/vendor/github.com/asaskevich/EventBus/server.go @@ -0,0 +1,153 @@ +package EventBus + +import ( + "errors" + "fmt" + "net" + "net/http" + "net/rpc" + "sync" +) + +// SubscribeType - how the client intends to subscribe +type SubscribeType int + +const ( + // Subscribe - subscribe to all events + Subscribe SubscribeType = iota + // SubscribeOnce - subscribe to only one event + SubscribeOnce +) + +const ( + // RegisterService - Server subscribe service method + RegisterService = "ServerService.Register" +) + +// SubscribeArg - object to hold subscribe arguments from remote event handlers +type SubscribeArg struct { + ClientAddr string + ClientPath string + ServiceMethod string + SubscribeType SubscribeType + Topic string +} + +// Server - object capable of being subscribed to by remote handlers +type Server struct { + eventBus Bus + address string + path string + subscribers map[string][]*SubscribeArg + service *ServerService +} + +// NewServer - create a new Server at the address and path +func NewServer(address, path string, eventBus Bus) *Server { + server := new(Server) + server.eventBus = eventBus + server.address = address + server.path = path + server.subscribers = make(map[string][]*SubscribeArg) + server.service = &ServerService{server, &sync.WaitGroup{}, false} + return server +} + +// EventBus - returns wrapped event bus +func (server *Server) EventBus() Bus { + return server.eventBus +} + +func (server *Server) rpcCallback(subscribeArg *SubscribeArg) func(args ...interface{}) { + return func(args ...interface{}) { + client, connErr := rpc.DialHTTPPath("tcp", subscribeArg.ClientAddr, subscribeArg.ClientPath) + defer client.Close() + if connErr != nil { + fmt.Errorf("dialing: %v", connErr) + } + clientArg := new(ClientArg) + clientArg.Topic = subscribeArg.Topic + clientArg.Args = args + var reply bool + err := client.Call(subscribeArg.ServiceMethod, clientArg, &reply) + if err != nil { + fmt.Errorf("dialing: %v", err) + } + } +} + +// HasClientSubscribed - True if a client subscribed to this server with the same topic +func (server *Server) HasClientSubscribed(arg *SubscribeArg) bool { + if topicSubscribers, ok := server.subscribers[arg.Topic]; ok { + for _, topicSubscriber := range topicSubscribers { + if *topicSubscriber == *arg { + return true + } + } + } + return false +} + +// Start - starts a service for remote clients to subscribe to events +func (server *Server) Start() error { + var err error + service := server.service + if !service.started { + rpcServer := rpc.NewServer() + rpcServer.Register(service) + rpcServer.HandleHTTP(server.path, "/debug"+server.path) + l, e := net.Listen("tcp", server.address) + if e != nil { + err = e + fmt.Errorf("listen error: %v", e) + } + service.started = true + service.wg.Add(1) + go http.Serve(l, nil) + } else { + err = errors.New("Server bus already started") + } + return err +} + +// Stop - signal for the service to stop serving +func (server *Server) Stop() { + service := server.service + if service.started { + service.wg.Done() + service.started = false + } +} + +// ServerService - service object to listen to remote subscriptions +type ServerService struct { + server *Server + wg *sync.WaitGroup + started bool +} + +// Register - Registers a remote handler to this event bus +// for a remote subscribe - a given client address only needs to subscribe once +// event will be republished in local event bus +func (service *ServerService) Register(arg *SubscribeArg, success *bool) error { + subscribers := service.server.subscribers + if !service.server.HasClientSubscribed(arg) { + rpcCallback := service.server.rpcCallback(arg) + switch arg.SubscribeType { + case Subscribe: + service.server.eventBus.Subscribe(arg.Topic, rpcCallback) + case SubscribeOnce: + service.server.eventBus.SubscribeOnce(arg.Topic, rpcCallback) + } + var topicSubscribers []*SubscribeArg + if _, ok := subscribers[arg.Topic]; ok { + topicSubscribers = []*SubscribeArg{arg} + } else { + topicSubscribers = subscribers[arg.Topic] + topicSubscribers = append(topicSubscribers, arg) + } + subscribers[arg.Topic] = topicSubscribers + } + *success = true + return nil +}