Skip to content

Commit

Permalink
Passing the channel instance into the established/finished server han…
Browse files Browse the repository at this point in the history
…dlers
  • Loading branch information
andrebires committed Mar 17, 2022
1 parent 141fffa commit 12831b7
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 78 deletions.
74 changes: 0 additions & 74 deletions examples/ws-chat/main.go

This file was deleted.

85 changes: 85 additions & 0 deletions examples/ws-chat/server/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package main

import (
"context"
"github.com/takenet/lime-go"
"go.uber.org/multierr"
"log"
"net"
"net/http"
"os"
"os/signal"
"sync"
)

var channels = make(map[string]*lime.ServerChannel)
var nodesToID = make(map[lime.Node]string)
var mu sync.RWMutex

func main() {
wsConfig := &lime.WebsocketConfig{
CheckOrigin: func(r *http.Request) bool {
return true
},
}

server := lime.NewServerBuilder().
Established(func(sessionID string, c *lime.ServerChannel) {
mu.Lock()
defer mu.Unlock()
// Register a new established user session
channels[sessionID] = c
nodesToID[c.RemoteNode()] = sessionID
}).
Finished(func(sessionID string) {
mu.Lock()
defer mu.Unlock()
// Remove a finished session
if channel, ok := channels[sessionID]; ok {
delete(nodesToID, channel.RemoteNode())
delete(channels, sessionID)
}
}).
MessagesHandlerFunc(HandleMessage).
ListenWebsocket(&net.TCPAddr{Port: 8080}, wsConfig).
Build()

go func() {
if err := server.ListenAndServe(); err != lime.ErrServerClosed {
log.Printf("server: listen: %v\n", err)
}
}()

sig := make(chan os.Signal, 1)
signal.Notify(sig)
log.Println("Listening at ws:8080. Press Ctrl+C to stop.")
<-sig

if err := server.Close(); err != nil {
log.Printf("server: close: %v\n", err)
}
}

func HandleMessage(ctx context.Context, msg *lime.Message, s lime.Sender) error {
mu.RLock()
defer mu.RUnlock()

var err error
// Check if it is a direct message to another user
if msg.To != (lime.Node{}) {
if sessionID, ok := nodesToID[msg.To]; ok {
if c, ok := channels[sessionID]; ok {
err = c.SendMessage(ctx, msg)
}
}
} else {
// Broadcast the message to all others sessions
senderSessionID, _ := lime.ContextSessionID(ctx)
for id, c := range channels {
if id != senderSessionID {
err = multierr.Append(err, c.SendMessage(ctx, msg))
}
}
}
return err
}
18 changes: 14 additions & 4 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (srv *Server) handleChannel(ctx context.Context, c *ServerChannel) {

established := srv.config.Established
if established != nil {
established(ctx, c.remoteNode, c.sessionID)
established(c.sessionID, c)
}

defer func() {
Expand All @@ -140,7 +140,7 @@ func (srv *Server) handleChannel(ctx context.Context, c *ServerChannel) {

finished := srv.config.Finished
if finished != nil {
finished(context.Background(), c.remoteNode)
finished(c.sessionID)
}
}()

Expand Down Expand Up @@ -187,9 +187,9 @@ type ServerConfig struct {
// Register is called for the client Node address registration.
Register func(context.Context, Node, *ServerChannel) (Node, error)
// Established is called when a session with a node is established.
Established func(ctx context.Context, n Node, sessionID string)
Established func(sessionID string, c *ServerChannel)
// Finished is called when an established session with a node is finished.
Finished func(context.Context, Node)
Finished func(sessionID string)
}

var defaultServerConfig = NewServerConfig()
Expand Down Expand Up @@ -379,6 +379,16 @@ func (b *ServerBuilder) ChannelBufferSize(bufferSize int) *ServerBuilder {
return b
}

func (b *ServerBuilder) Established(established func(sessionID string, c *ServerChannel)) *ServerBuilder {
b.config.Established = established
return b
}

func (b *ServerBuilder) Finished(finished func(sessionID string)) *ServerBuilder {
b.config.Finished = finished
return b
}

func (b *ServerBuilder) Build() *Server {
b.config.Authenticate = buildAuthenticate(b.plainAuth, b.keyAuth, b.externalAuth)
return NewServer(b.config, b.mux, b.listeners...)
Expand Down

0 comments on commit 12831b7

Please sign in to comment.