Skip to content

Commit

Permalink
feat: webrtc audio support
Browse files Browse the repository at this point in the history
  • Loading branch information
zijiren233 committed Dec 17, 2024
1 parent 6e2585d commit 5632b67
Show file tree
Hide file tree
Showing 12 changed files with 490 additions and 177 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ require (
github.com/refraction-networking/utls v1.6.7 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.10.0 // indirect
github.com/tetratelabs/wazero v1.8.1 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
Expand Down
7 changes: 4 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,9 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
Expand All @@ -348,8 +349,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tetratelabs/wazero v1.8.1 h1:NrcgVbWfkWvVc4UtT4LRLDf91PsOzDzefMdwhLfA550=
github.com/tetratelabs/wazero v1.8.1/go.mod h1:yAI0XTsMBhREkM/YDAK/zNou3GoiAce1P6+rp/wQhjs=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
Expand Down
5 changes: 4 additions & 1 deletion internal/model/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,13 @@ const (
PermissionSetCurrentMovie
PermissionSetCurrentStatus
PermissionSendChatMessage
PermissionWebRTC

AllPermissions RoomMemberPermission = math.MaxUint32
NoPermission RoomMemberPermission = 0
DefaultPermissions RoomMemberPermission = PermissionGetMovieList | PermissionSendChatMessage
DefaultPermissions RoomMemberPermission = PermissionGetMovieList |
PermissionSendChatMessage |
PermissionWebRTC
)

func (p RoomMemberPermission) Has(permission RoomMemberPermission) bool {
Expand Down
34 changes: 25 additions & 9 deletions internal/op/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,28 @@ import (
"sync/atomic"
"time"

"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/synctv-org/synctv/internal/model"
pb "github.com/synctv-org/synctv/proto/message"
)

type Client struct {
u *User
r *Room
h *Hub
c chan Message
conn *websocket.Conn
wg sync.WaitGroup
timeOut time.Duration
closed uint32
u *User
r *Room
h *Hub
c chan Message
conn *websocket.Conn
connID string
wg sync.WaitGroup
timeOut time.Duration
closed uint32
rtcJoined atomic.Bool
}

func newClient(user *User, room *Room, h *Hub, conn *websocket.Conn) *Client {
return &Client{
connID: uuid.New().String(),
r: room,
u: user,
h: h,
Expand All @@ -33,6 +37,18 @@ func newClient(user *User, room *Room, h *Hub, conn *websocket.Conn) *Client {
}
}

func (c *Client) ConnID() string {
return c.connID
}

func (c *Client) RTCJoined() bool {
return c.rtcJoined.Load()
}

func (c *Client) SetRTCJoined(joined bool) {
c.rtcJoined.Store(joined)
}

func (c *Client) User() *User {
return c.u
}
Expand Down Expand Up @@ -115,5 +131,5 @@ func (c *Client) SetStatus(playing bool, seek float64, rate float64, timeDiff fl
PlaybackRate: status.PlaybackRate,
},
},
}, WithIgnoreClient(c))
}, WithIgnoreConnID(c.ConnID()))
}
63 changes: 45 additions & 18 deletions internal/op/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

type clients struct {
m map[*Client]struct{}
m map[string]*Client
lock sync.RWMutex
}

Expand All @@ -30,21 +30,28 @@ type Hub struct {

type broadcastMessage struct {
data Message
ignoreClient []*Client
ignoreID []string
ignoreConnID []string
ignoreUserID []string
rtcJoined bool
}

type BroadcastConf func(*broadcastMessage)

func WithIgnoreClient(cli ...*Client) BroadcastConf {
func WithRTCJoined() BroadcastConf {
return func(bm *broadcastMessage) {
bm.ignoreClient = cli
bm.rtcJoined = true
}
}

func WithIgnoreConnID(connID ...string) BroadcastConf {
return func(bm *broadcastMessage) {
bm.ignoreConnID = connID
}
}

func WithIgnoreID(id ...string) BroadcastConf {
return func(bm *broadcastMessage) {
bm.ignoreID = id
bm.ignoreUserID = id
}
}

Expand Down Expand Up @@ -72,9 +79,12 @@ func (h *Hub) serve() {
h.clients.Range(func(id string, clients *clients) bool {
clients.lock.RLock()
defer clients.lock.RUnlock()
for c := range clients.m {
if utils.In(message.ignoreID, c.u.ID) ||
utils.In(message.ignoreClient, c) {
for _, c := range clients.m {
if utils.In(message.ignoreUserID, c.u.ID) ||
utils.In(message.ignoreConnID, c.ConnID()) {
continue
}
if message.rtcJoined && !c.RTCJoined() {
continue
}
if err := c.Send(message.data); err != nil {
Expand Down Expand Up @@ -145,8 +155,8 @@ func (h *Hub) Close() error {
h.clients.CompareAndDelete(id, clients)
clients.lock.Lock()
defer clients.lock.Unlock()
for c := range clients.m {
delete(clients.m, c)
for id, c := range clients.m {
delete(clients.m, id)
c.Close()
}
return true
Expand Down Expand Up @@ -191,11 +201,11 @@ func (h *Hub) RegClient(cli *Client) error {
return h.RegClient(cli)
}
if c.m == nil {
c.m = make(map[*Client]struct{})
} else if _, ok := c.m[cli]; ok {
c.m = make(map[string]*Client)
} else if _, ok := c.m[cli.ConnID()]; ok {
return errors.New("client already exists")
}
c.m[cli] = struct{}{}
c.m[cli.ConnID()] = cli
return nil
}

Expand All @@ -212,10 +222,10 @@ func (h *Hub) UnRegClient(cli *Client) error {
}
c.lock.Lock()
defer c.lock.Unlock()
if _, ok := c.m[cli]; !ok {
if _, ok := c.m[cli.ConnID()]; !ok {
return errors.New("client not found")
}
delete(c.m, cli)
delete(c.m, cli.ConnID())
if len(c.m) == 0 {
h.clients.CompareAndDelete(cli.u.ID, c)
}
Expand All @@ -236,14 +246,31 @@ func (h *Hub) SendToUser(userID string, data Message) (err error) {
}
cli.lock.RLock()
defer cli.lock.RUnlock()
for c := range cli.m {
for _, c := range cli.m {
if err = c.Send(data); err != nil {
c.Close()
}
}
return
}

func (h *Hub) SendToConnID(userID, connID string, data Message) error {
cli, ok := h.GetClientByConnID(userID, connID)
if !ok {
return nil
}
return cli.Send(data)
}

func (h *Hub) GetClientByConnID(userID, connID string) (*Client, bool) {
c, ok := h.clients.Load(userID)
if !ok {
return nil, false
}
client, ok := c.m[connID]
return client, ok
}

func (h *Hub) IsOnline(userID string) bool {
_, ok := h.clients.Load(userID)
return ok
Expand Down Expand Up @@ -272,7 +299,7 @@ func (h *Hub) KickUser(userID string) error {
}
cli.lock.RLock()
defer cli.lock.RUnlock()
for c := range cli.m {
for _, c := range cli.m {
c.Close()
}
return nil
Expand Down
7 changes: 7 additions & 0 deletions internal/op/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ func (r *Room) SendToUserWithID(userID string, data Message) error {
return r.lazyInitHub().SendToUser(userID, data)
}

func (r *Room) SendToConnID(userID, connID string, data Message) error {
if r.HubIsNotInited() {
return nil
}
return r.lazyInitHub().SendToConnID(userID, connID, data)
}

func (r *Room) GetChannel(channelName string) (*rtmps.Channel, error) {
return r.movies.GetChannel(channelName)
}
Expand Down
4 changes: 4 additions & 0 deletions internal/op/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,10 @@ func (u *User) IsRoomCreator(room *Room) bool {
return room.IsCreator(u.ID)
}

func (u *User) HasRoomWebRTCPermission(room *Room) bool {
return u.HasRoomPermission(room, model.PermissionWebRTC)
}

func (u *User) DeleteRoom(room *RoomEntry) error {
if !u.HasRoomAdminPermission(room.Value(), model.PermissionDeleteRoom) {
return model.ErrNoPermission
Expand Down
Loading

0 comments on commit 5632b67

Please sign in to comment.