From 12831b79e1327acff8cf96cf8e9e516272f24101 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Bires?= Date: Thu, 17 Mar 2022 18:20:29 -0300 Subject: [PATCH] Passing the channel instance into the established/finished server handlers --- examples/ws-chat/main.go | 74 ---------------------------- examples/ws-chat/server/main.go | 85 +++++++++++++++++++++++++++++++++ server.go | 18 +++++-- 3 files changed, 99 insertions(+), 78 deletions(-) delete mode 100644 examples/ws-chat/main.go create mode 100644 examples/ws-chat/server/main.go diff --git a/examples/ws-chat/main.go b/examples/ws-chat/main.go deleted file mode 100644 index 26b84c5..0000000 --- a/examples/ws-chat/main.go +++ /dev/null @@ -1,74 +0,0 @@ -package main - -import ( - "context" - "github.com/takenet/lime-go" - "log" - "net" - "net/http" - "os" - "os/signal" -) - -func main() { - wsConfig := &lime.WebsocketConfig{ - CheckOrigin: func(r *http.Request) bool { - return true - }, - } - server := lime.NewServerBuilder(). - MessagesHandlerFunc( - func(ctx context.Context, msg *lime.Message, s lime.Sender) error { - log.Printf("Message received - ID: %v - From: %v - Type: %v - Content: %v\n", msg.ID, msg.From, msg.Type, msg.Content) - return s.SendMessage(ctx, &lime.Message{ - EnvelopeBase: lime.EnvelopeBase{ - To: msg.From, - }, - Type: msg.Type, - Content: msg.Content, - }) - return nil - }). - NotificationsHandlerFunc( - func(ctx context.Context, not *lime.Notification) error { - log.Printf("Notification received - ID: %v - From: %v - Event: %v - Reason: %v\n", not.ID, not.From, not.Event, not.Reason) - return nil - }). - CommandHandlerFunc( - func(cmd *lime.Command) bool { - if cmd.Status != "" || cmd.URI == nil { - return false - } - - url := cmd.URI.ToURL() - return cmd.Status == "" && - url.String() == "/presence" - }, - func(ctx context.Context, cmd *lime.Command, s lime.Sender) error { - return s.SendCommand( - ctx, - cmd.SuccessResponse()) - }). - CommandsHandlerFunc( - func(ctx context.Context, cmd *lime.Command, s lime.Sender) error { - log.Printf("Command received - ID: %v - Status: %v\n", cmd.ID, cmd.Status) - return nil - }). - ListenWebsocket(&net.TCPAddr{Port: 8080}, wsConfig). - Build() - - go func() { - if err := server.ListenAndServe(); err != lime.ErrServerClosed { - log.Printf("server: listen: %v\n", err) - } - }() - - sig := make(chan os.Signal, 1) - signal.Notify(sig) - log.Println("Listening at ws:8080. Press Ctrl+C to stop.") - <-sig - - if err := server.Close(); err != nil { - log.Printf("server: close: %v\n", err) - } -} diff --git a/examples/ws-chat/server/main.go b/examples/ws-chat/server/main.go new file mode 100644 index 0000000..d0c0cd5 --- /dev/null +++ b/examples/ws-chat/server/main.go @@ -0,0 +1,85 @@ +package main + +import ( + "context" + "github.com/takenet/lime-go" + "go.uber.org/multierr" + "log" + "net" + "net/http" + "os" + "os/signal" + "sync" +) + +var channels = make(map[string]*lime.ServerChannel) +var nodesToID = make(map[lime.Node]string) +var mu sync.RWMutex + +func main() { + wsConfig := &lime.WebsocketConfig{ + CheckOrigin: func(r *http.Request) bool { + return true + }, + } + + server := lime.NewServerBuilder(). + Established(func(sessionID string, c *lime.ServerChannel) { + mu.Lock() + defer mu.Unlock() + // Register a new established user session + channels[sessionID] = c + nodesToID[c.RemoteNode()] = sessionID + }). + Finished(func(sessionID string) { + mu.Lock() + defer mu.Unlock() + // Remove a finished session + if channel, ok := channels[sessionID]; ok { + delete(nodesToID, channel.RemoteNode()) + delete(channels, sessionID) + } + }). + MessagesHandlerFunc(HandleMessage). + ListenWebsocket(&net.TCPAddr{Port: 8080}, wsConfig). + Build() + + go func() { + if err := server.ListenAndServe(); err != lime.ErrServerClosed { + log.Printf("server: listen: %v\n", err) + } + }() + + sig := make(chan os.Signal, 1) + signal.Notify(sig) + log.Println("Listening at ws:8080. Press Ctrl+C to stop.") + <-sig + + if err := server.Close(); err != nil { + log.Printf("server: close: %v\n", err) + } +} + +func HandleMessage(ctx context.Context, msg *lime.Message, s lime.Sender) error { + mu.RLock() + defer mu.RUnlock() + + var err error + // Check if it is a direct message to another user + if msg.To != (lime.Node{}) { + if sessionID, ok := nodesToID[msg.To]; ok { + if c, ok := channels[sessionID]; ok { + err = c.SendMessage(ctx, msg) + } + } + } else { + // Broadcast the message to all others sessions + senderSessionID, _ := lime.ContextSessionID(ctx) + for id, c := range channels { + if id != senderSessionID { + err = multierr.Append(err, c.SendMessage(ctx, msg)) + } + } + } + return err +} diff --git a/server.go b/server.go index 7f34512..24557bd 100644 --- a/server.go +++ b/server.go @@ -127,7 +127,7 @@ func (srv *Server) handleChannel(ctx context.Context, c *ServerChannel) { established := srv.config.Established if established != nil { - established(ctx, c.remoteNode, c.sessionID) + established(c.sessionID, c) } defer func() { @@ -140,7 +140,7 @@ func (srv *Server) handleChannel(ctx context.Context, c *ServerChannel) { finished := srv.config.Finished if finished != nil { - finished(context.Background(), c.remoteNode) + finished(c.sessionID) } }() @@ -187,9 +187,9 @@ type ServerConfig struct { // Register is called for the client Node address registration. Register func(context.Context, Node, *ServerChannel) (Node, error) // Established is called when a session with a node is established. - Established func(ctx context.Context, n Node, sessionID string) + Established func(sessionID string, c *ServerChannel) // Finished is called when an established session with a node is finished. - Finished func(context.Context, Node) + Finished func(sessionID string) } var defaultServerConfig = NewServerConfig() @@ -379,6 +379,16 @@ func (b *ServerBuilder) ChannelBufferSize(bufferSize int) *ServerBuilder { return b } +func (b *ServerBuilder) Established(established func(sessionID string, c *ServerChannel)) *ServerBuilder { + b.config.Established = established + return b +} + +func (b *ServerBuilder) Finished(finished func(sessionID string)) *ServerBuilder { + b.config.Finished = finished + return b +} + func (b *ServerBuilder) Build() *Server { b.config.Authenticate = buildAuthenticate(b.plainAuth, b.keyAuth, b.externalAuth) return NewServer(b.config, b.mux, b.listeners...)