From 54bc1989f9f8b38101e4af505d73cab002c360d9 Mon Sep 17 00:00:00 2001 From: Cedric Verstraeten Date: Thu, 23 Nov 2023 21:17:39 +0100 Subject: [PATCH] fix: update locking webrtc --- machinery/src/cloud/Cloud.go | 9 +----- machinery/src/routers/mqtt/main.go | 2 +- machinery/src/webrtc/main.go | 51 ++++++++++++++++-------------- 3 files changed, 30 insertions(+), 32 deletions(-) diff --git a/machinery/src/cloud/Cloud.go b/machinery/src/cloud/Cloud.go index 34a0fdb..ca4372b 100644 --- a/machinery/src/cloud/Cloud.go +++ b/machinery/src/cloud/Cloud.go @@ -601,14 +601,7 @@ func HandleLiveStreamHD(livestreamCursor *pubsub.QueueCursor, configuration *mod log.Log.Info("HandleLiveStreamHD: Waiting for peer connections.") for handshake := range communication.HandleLiveHDHandshake { log.Log.Info("HandleLiveStreamHD: setting up a peer connection.") - key := config.Key + "/" + handshake.SessionID - webrtc.CandidatesMutex.Lock() - _, ok := webrtc.CandidateArrays[key] - if !ok { - webrtc.CandidateArrays[key] = make(chan string) - } - webrtc.CandidatesMutex.Unlock() - webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, videoTrack, audioTrack, handshake, webrtc.CandidateArrays[key]) + go webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, videoTrack, audioTrack, handshake) } } diff --git a/machinery/src/routers/mqtt/main.go b/machinery/src/routers/mqtt/main.go index f124f42..60e4e29 100644 --- a/machinery/src/routers/mqtt/main.go +++ b/machinery/src/routers/mqtt/main.go @@ -478,7 +478,7 @@ func HandleReceiveHDCandidates(mqttClient mqtt.Client, hubKey string, payload mo if communication.CameraConnected { // Register candidate channel key := configuration.Config.Key + "/" + receiveHDCandidatesPayload.SessionID - webrtc.RegisterCandidates(key, receiveHDCandidatesPayload) + go webrtc.RegisterCandidates(key, receiveHDCandidatesPayload) } else { log.Log.Info("HandleReceiveHDCandidates: received candidate, but camera is not connected.") } diff --git a/machinery/src/webrtc/main.go b/machinery/src/webrtc/main.go index 59d9ba1..1736c49 100644 --- a/machinery/src/webrtc/main.go +++ b/machinery/src/webrtc/main.go @@ -88,11 +88,8 @@ func (w WebRTC) CreateOffer(sd []byte) pionWebRTC.SessionDescription { } func RegisterCandidates(key string, candidate models.ReceiveHDCandidatesPayload) { - // Set lock CandidatesMutex.Lock() - defer CandidatesMutex.Unlock() - channel := CandidateArrays[key] if channel == nil { channel = make(chan string) @@ -100,9 +97,10 @@ func RegisterCandidates(key string, candidate models.ReceiveHDCandidatesPayload) } log.Log.Info("HandleReceiveHDCandidates: " + candidate.Candidate) channel <- candidate.Candidate + CandidatesMutex.Unlock() } -func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, handshake models.RequestHDStreamPayload, candidates chan string) { +func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, handshake models.RequestHDStreamPayload) { config := configuration.Config deviceKey := config.Key @@ -111,6 +109,15 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati turnServersUsername := config.TURNUsername turnServersCredential := config.TURNPassword + // We create a channel which will hold the candidates for this session. + sessionKey := config.Key + "/" + handshake.SessionID + CandidatesMutex.Lock() + _, ok := CandidateArrays[sessionKey] + if !ok { + CandidateArrays[sessionKey] = make(chan string) + } + CandidatesMutex.Unlock() + // Set variables hubKey := handshake.HubKey sessionDescription := handshake.SessionDescription @@ -147,35 +154,40 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati if err == nil && peerConnection != nil { if _, err = peerConnection.AddTrack(videoTrack); err != nil { - panic(err) + //panic(err) } if _, err = peerConnection.AddTrack(audioTrack); err != nil { - panic(err) + //panic(err) } if err != nil { - panic(err) + //panic(err) } peerConnection.OnICEConnectionStateChange(func(connectionState pionWebRTC.ICEConnectionState) { if connectionState == pionWebRTC.ICEConnectionStateDisconnected { - CandidatesMutex.Lock() - defer CandidatesMutex.Unlock() - atomic.AddInt64(&peerConnectionCount, -1) + + // Set lock + CandidatesMutex.Lock() peerConnections[handshake.SessionID] = nil - close(candidates) + _, ok := CandidateArrays[sessionKey] + if ok { + close(CandidateArrays[sessionKey]) + } + CandidatesMutex.Unlock() + close(w.PacketsCount) if err := peerConnection.Close(); err != nil { - panic(err) + //panic(err) } } else if connectionState == pionWebRTC.ICEConnectionStateConnected { atomic.AddInt64(&peerConnectionCount, 1) } else if connectionState == pionWebRTC.ICEConnectionStateChecking { // Iterate over the candidates and send them to the remote client // Non blocking channel - for candidate := range candidates { + for candidate := range CandidateArrays[sessionKey] { log.Log.Info("InitializeWebRTCConnection: Received candidate.") if candidateErr := peerConnection.AddICECandidate(pionWebRTC.ICECandidateInit{Candidate: string(candidate)}); candidateErr != nil { log.Log.Error("InitializeWebRTCConnection: something went wrong while adding candidate: " + candidateErr.Error()) @@ -188,29 +200,22 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati offer := w.CreateOffer(sd) if err = peerConnection.SetRemoteDescription(offer); err != nil { - panic(err) + //panic(err) } answer, err := peerConnection.CreateAnswer(nil) if err != nil { - panic(err) + //panic(err) } else if err = peerConnection.SetLocalDescription(answer); err != nil { - panic(err) + //panic(err) } - // When an ICE candidate is available send to the other Pion instance - // the other Pion instance will add this candidate by calling AddICECandidate - var candidatesMux sync.Mutex // When an ICE candidate is available send to the other peer using the signaling server (MQTT). // The other peer will add this candidate by calling AddICECandidate peerConnection.OnICECandidate(func(candidate *pionWebRTC.ICECandidate) { if candidate == nil { return } - - candidatesMux.Lock() - defer candidatesMux.Unlock() - // Create a config map valueMap := make(map[string]interface{}) candateJSON := candidate.ToJSON()