From 5885a09c93fa36079a5dc3666bd425752900b5d1 Mon Sep 17 00:00:00 2001 From: Cedric Verstraeten Date: Sat, 21 Dec 2024 12:02:35 +0100 Subject: [PATCH 1/8] add PTS2 + add mpegts_muxer --- machinery/go.mod | 2 + machinery/go.sum | 5 ++ machinery/src/capture/gortsplib.go | 10 ++-- machinery/src/capture/main.go | 10 ++-- machinery/src/capture/mpegts_muxer.go | 84 +++++++++++++++++++++++++++ machinery/src/cloud/Cloud.go | 57 ++++++++++++++++++ machinery/src/components/Kerberos.go | 18 ++++-- machinery/src/models/Communication.go | 1 + machinery/src/models/MQTT.go | 5 ++ machinery/src/packets/packet.go | 20 +++---- machinery/src/packets/queue.go | 3 +- machinery/src/routers/mqtt/main.go | 22 +++++++ machinery/src/webrtc/main.go | 13 +++-- 13 files changed, 221 insertions(+), 29 deletions(-) create mode 100644 machinery/src/capture/mpegts_muxer.go diff --git a/machinery/go.mod b/machinery/go.mod index 39b7476..997a062 100644 --- a/machinery/go.mod +++ b/machinery/go.mod @@ -51,6 +51,8 @@ require ( github.com/Microsoft/go-winio v0.6.1 // indirect github.com/PuerkitoBio/purell v1.1.1 // indirect github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect + github.com/asticode/go-astikit v0.30.0 // indirect + github.com/asticode/go-astits v1.13.0 // indirect github.com/beevik/etree v1.2.0 // indirect github.com/bytedance/sonic v1.12.2 // indirect github.com/bytedance/sonic/loader v0.2.0 // indirect diff --git a/machinery/go.sum b/machinery/go.sum index f6f4820..e5505c5 100644 --- a/machinery/go.sum +++ b/machinery/go.sum @@ -65,6 +65,10 @@ github.com/appleboy/gin-jwt/v2 v2.10.0/go.mod h1:DvCh3V1Ma32/7kAsAHYQVyjsQMwG+wM github.com/appleboy/gofight/v2 v2.1.2 h1:VOy3jow4vIK8BRQJoC/I9muxyYlJ2yb9ht2hZoS3rf4= github.com/appleboy/gofight/v2 v2.1.2/go.mod h1:frW+U1QZEdDgixycTj4CygQ48yLTUhplt43+Wczp3rw= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= +github.com/asticode/go-astikit v0.30.0 h1:DkBkRQRIxYcknlaU7W7ksNfn4gMFsB0tqMJflxkRsZA= +github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0= +github.com/asticode/go-astits v1.13.0 h1:XOgkaadfZODnyZRR5Y0/DWkA9vrkLLPLeeOvDwfKZ1c= +github.com/asticode/go-astits v1.13.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI= github.com/beevik/etree v1.2.0 h1:l7WETslUG/T+xOPs47dtd6jov2Ii/8/OjCldk5fYfQw= github.com/beevik/etree v1.2.0/go.mod h1:aiPf89g/1k3AShMVAzriilpcE4R/Vuor90y83zVZWFc= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= @@ -377,6 +381,7 @@ github.com/pion/webrtc/v4 v4.0.1/go.mod h1:SfNn8CcFxR6OUVjLXVslAQ3a3994JhyE3Hw1j github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/profile v1.4.0/go.mod h1:NWz/XGvpEW1FyYQ7fCx4dqYBLlfTcE+A9FLAkNKqjFE= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= diff --git a/machinery/src/capture/gortsplib.go b/machinery/src/capture/gortsplib.go index ad7be5d..6d2b642 100644 --- a/machinery/src/capture/gortsplib.go +++ b/machinery/src/capture/gortsplib.go @@ -410,7 +410,7 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets if g.AudioG711Media != nil && g.AudioG711Forma != nil { g.Client.OnPacketRTP(g.AudioG711Media, g.AudioG711Forma, func(rtppkt *rtp.Packet) { // decode timestamp - pts, ok := g.Client.PacketPTS(g.AudioG711Media, rtppkt) + pts, ok := g.Client.PacketPTS2(g.AudioG711Media, rtppkt) if !ok { log.Log.Debug("capture.golibrtsp.Start(): " + "unable to get PTS") return @@ -442,7 +442,7 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets if g.AudioMPEG4Media != nil && g.AudioMPEG4Forma != nil { g.Client.OnPacketRTP(g.AudioMPEG4Media, g.AudioMPEG4Forma, func(rtppkt *rtp.Packet) { // decode timestamp - pts, ok := g.Client.PacketPTS(g.AudioMPEG4Media, rtppkt) + pts, ok := g.Client.PacketPTS2(g.AudioMPEG4Media, rtppkt) if !ok { log.Log.Error("capture.golibrtsp.Start(): " + "unable to get PTS") return @@ -493,7 +493,7 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets if len(rtppkt.Payload) > 0 { // decode timestamp - pts, ok := g.Client.PacketPTS(g.VideoH264Media, rtppkt) + pts, ok := g.Client.PacketPTS2(g.VideoH264Media, rtppkt) if !ok { log.Log.Debug("capture.golibrtsp.Start(): " + "unable to get PTS") return @@ -578,6 +578,7 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets Time: pts, CompositionTime: pts, Idx: g.VideoH264Index, + AU: filteredAU, IsVideo: true, IsAudio: false, Codec: "H264", @@ -638,7 +639,7 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets if len(rtppkt.Payload) > 0 { // decode timestamp - pts, ok := g.Client.PacketPTS(g.VideoH265Media, rtppkt) + pts, ok := g.Client.PacketPTS2(g.VideoH265Media, rtppkt) if !ok { log.Log.Debug("capture.golibrtsp.Start(): " + "unable to get PTS") return @@ -705,6 +706,7 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets Time: pts, CompositionTime: pts, Idx: g.VideoH265Index, + AU: au, IsVideo: true, IsAudio: false, Codec: "H265", diff --git a/machinery/src/capture/main.go b/machinery/src/capture/main.go index dfd3914..ee6252e 100644 --- a/machinery/src/capture/main.go +++ b/machinery/src/capture/main.go @@ -120,7 +120,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat nextPkt.IsKeyFrame && (timestamp+recordingPeriod-now <= 0 || now-startRecording >= maxRecordingPeriod) { // Write the last packet - ttime := convertPTS(pkt.Time) + ttime := uint64(pkt.Time) if pkt.IsVideo { if err := myMuxer.Write(videoTrack, pkt.Data, ttime, ttime); err != nil { log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error()) @@ -242,7 +242,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error()) } - ttime := convertPTS(pkt.Time) + ttime := uint64(pkt.Time) if pkt.IsVideo { if err := myMuxer.Write(videoTrack, pkt.Data, ttime, ttime); err != nil { log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error()) @@ -261,7 +261,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat recordingStatus = "started" } else if start { - ttime := convertPTS(pkt.Time) + ttime := uint64(pkt.Time) if pkt.IsVideo { if err := myMuxer.Write(videoTrack, pkt.Data, ttime, ttime); err != nil { log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error()) @@ -337,7 +337,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat log.Log.Info("capture.main.HandleRecordStream(motiondetection): Start motion based recording ") - var lastDuration time.Duration + var lastDuration int64 var lastRecordingTime int64 //var cws *cacheWriterSeeker @@ -445,7 +445,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat } if start { - ttime := convertPTS(pkt.Time) + ttime := uint64(pkt.Time) if pkt.IsVideo { if err := myMuxer.Write(videoTrack, pkt.Data, ttime, ttime); err != nil { log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error()) diff --git a/machinery/src/capture/mpegts_muxer.go b/machinery/src/capture/mpegts_muxer.go new file mode 100644 index 0000000..3b9d4f6 --- /dev/null +++ b/machinery/src/capture/mpegts_muxer.go @@ -0,0 +1,84 @@ +package capture + +import ( + "bufio" + "os" + "sync" + + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediacommon/pkg/codecs/h264" + "github.com/bluenviron/mediacommon/pkg/formats/mpegts" + "github.com/kerberos-io/agent/machinery/src/packets" +) + +func multiplyAndDivide(v, m, d int64) int64 { + secs := v / d + dec := v % d + return (secs*m + dec*m/d) +} + +// mpegtsMuxer allows to save a H264 / MPEG-4 audio stream into a MPEG-TS file. +type MpegtsMuxer struct { + FileName string + H264Format *format.H264 + Mpeg4AudioFormat *format.MPEG4Audio + + f *os.File + b *bufio.Writer + w *mpegts.Writer + H264Track *mpegts.Track + Mpeg4AudioTrack *mpegts.Track + dtsExtractor *h264.DTSExtractor2 + mutex sync.Mutex +} + +// initialize initializes a mpegtsMuxer. +func (e *MpegtsMuxer) Initialize() error { + var err error + e.f, err = os.Create(e.FileName) + if err != nil { + return err + } + e.b = bufio.NewWriter(e.f) + + e.H264Track = &mpegts.Track{ + Codec: &mpegts.CodecH264{}, + } + + /*e.Mpeg4AudioTrack = &mpegts.Track{ + Codec: &mpegts.CodecMPEG4Audio{ + Config: *e.Mpeg4AudioFormat.Config, + }, + }*/ + + e.w = mpegts.NewWriter(e.b, []*mpegts.Track{e.H264Track}) //, e.Mpeg4AudioTrack}) + + return nil +} + +// close closes all the mpegtsMuxer resources. +func (e *MpegtsMuxer) Close() { + e.b.Flush() + e.f.Close() +} + +// writeH264 writes a H264 access unit into MPEG-TS. +func (e *MpegtsMuxer) WriteH264(pkt packets.Packet, pts int64) error { + e.mutex.Lock() + defer e.mutex.Unlock() + + dts, err := e.dtsExtractor.Extract(pkt.AU, pkt.Time) + if err != nil { + return err + } + + return e.w.WriteH264(e.H264Track, pkt.Time, dts, pkt.IsKeyFrame, pkt.AU) +} + +// writeMPEG4Audio writes MPEG-4 audio access units into MPEG-TS. +func (e *MpegtsMuxer) WriteMPEG4Audio(aus [][]byte, pts int64) error { + e.mutex.Lock() + defer e.mutex.Unlock() + + return e.w.WriteMPEG4Audio(e.Mpeg4AudioTrack, multiplyAndDivide(pts, 90000, int64(e.Mpeg4AudioFormat.ClockRate())), aus) +} diff --git a/machinery/src/cloud/Cloud.go b/machinery/src/cloud/Cloud.go index 4f86cd2..a029312 100644 --- a/machinery/src/cloud/Cloud.go +++ b/machinery/src/cloud/Cloud.go @@ -734,6 +734,63 @@ func HandleLiveStreamSD(livestreamCursor *packets.QueueCursor, configuration *mo log.Log.Debug("cloud.HandleLiveStreamSD(): finished") } +func HandleLiveStreamHLS(livestreamCursor *packets.QueueCursor, configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, rtspClient capture.RTSPClient) { + + log.Log.Debug("cloud.HandleLiveStreamHLS(): started") + + config := configuration.Config + + // If offline made is enabled, we will stop the thread. + if config.Offline == "true" { + log.Log.Debug("cloud.HandleLiveStreamHLS(): stopping as Offline is enabled.") + } else { + + // Check if we need to enable the live stream + if config.Capture.Liveview != "false" { + + hubKey := "" + if config.Cloud == "s3" && config.S3 != nil && config.S3.Publickey != "" { + hubKey = config.S3.Publickey + } else if config.Cloud == "kstorage" && config.KStorage != nil && config.KStorage.CloudKey != "" { + hubKey = config.KStorage.CloudKey + } + // This is the new way ;) + if config.HubKey != "" { + hubKey = config.HubKey + } + fmt.Println("hubKey: ", hubKey) + + lastLivestreamRequest := int64(0) + + var cursorError error + var pkt packets.Packet + + for cursorError == nil { + pkt, cursorError = livestreamCursor.ReadPacket() + if len(pkt.Data) == 0 || !pkt.IsKeyFrame { + continue + } + now := time.Now().Unix() + select { + case <-communication.HandleLiveHLS: + lastLivestreamRequest = now + default: + } + if now-lastLivestreamRequest > 3 { + continue + } + log.Log.Info("cloud.HandleLiveStreamHLS(): Creating .ts recording for HLS live stream.") + // .. + } + + } else { + log.Log.Debug("cloud.HandleLiveStreamHLS(): stopping as Liveview is disabled.") + } + } + + log.Log.Debug("cloud.HandleLiveStreamHLS(): finished") +} + func HandleLiveStreamHD(livestreamCursor *packets.QueueCursor, configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, rtspClient capture.RTSPClient) { config := configuration.Config diff --git a/machinery/src/components/Kerberos.go b/machinery/src/components/Kerberos.go index f6eec60..df01e6e 100644 --- a/machinery/src/components/Kerberos.go +++ b/machinery/src/components/Kerberos.go @@ -61,6 +61,7 @@ func Bootstrap(configDirectory string, configuration *models.Configuration, comm communication.HandleUpload = make(chan string, 1) communication.HandleHeartBeat = make(chan string, 1) communication.HandleLiveSD = make(chan int64, 1) + communication.HandleLiveHLS = make(chan int64, 1) communication.HandleLiveHDKeepalive = make(chan string, 1) communication.HandleLiveHDPeers = make(chan string, 1) communication.IsConfiguring = abool.New() @@ -253,14 +254,23 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu go cloud.HandleLiveStreamSD(livestreamCursor, configuration, communication, mqttClient, rtspClient) } + // Handle livestream HLS + if subStreamEnabled { + livestreamCursor := subQueue.Latest() + go cloud.HandleLiveStreamHLS(livestreamCursor, configuration, communication, mqttClient, rtspSubClient) + } else { + livestreamCursor := queue.Latest() + go cloud.HandleLiveStreamHLS(livestreamCursor, configuration, communication, mqttClient, rtspClient) + } + // Handle livestream HD (high resolution over WEBRTC) communication.HandleLiveHDHandshake = make(chan models.RequestHDStreamPayload, 1) if subStreamEnabled { - livestreamHDCursor := subQueue.Latest() - go cloud.HandleLiveStreamHD(livestreamHDCursor, configuration, communication, mqttClient, rtspSubClient) + livestreamCursor := subQueue.Latest() + go cloud.HandleLiveStreamHD(livestreamCursor, configuration, communication, mqttClient, rtspSubClient) } else { - livestreamHDCursor := queue.Latest() - go cloud.HandleLiveStreamHD(livestreamHDCursor, configuration, communication, mqttClient, rtspClient) + livestreamCursor := queue.Latest() + go cloud.HandleLiveStreamHD(livestreamCursor, configuration, communication, mqttClient, rtspClient) } // Handle recording, will write an mp4 to disk. diff --git a/machinery/src/models/Communication.go b/machinery/src/models/Communication.go index 262a86f..33ea92e 100644 --- a/machinery/src/models/Communication.go +++ b/machinery/src/models/Communication.go @@ -26,6 +26,7 @@ type Communication struct { HandleUpload chan string HandleHeartBeat chan string HandleLiveSD chan int64 + HandleLiveHLS chan int64 HandleLiveHDKeepalive chan string HandleLiveHDHandshake chan RequestHDStreamPayload HandleLiveHDPeers chan string diff --git a/machinery/src/models/MQTT.go b/machinery/src/models/MQTT.go index d211910..0a12dc5 100644 --- a/machinery/src/models/MQTT.go +++ b/machinery/src/models/MQTT.go @@ -172,6 +172,11 @@ type RequestSDStreamPayload struct { Timestamp int64 `json:"timestamp"` // timestamp } +// We received a request HLS stream request +type RequestHLSStreamPayload struct { + Timestamp int64 `json:"timestamp"` // timestamp +} + // We received a request HD stream request type RequestHDStreamPayload struct { Timestamp int64 `json:"timestamp"` // timestamp diff --git a/machinery/src/packets/packet.go b/machinery/src/packets/packet.go index d385bbc..5e8b701 100644 --- a/machinery/src/packets/packet.go +++ b/machinery/src/packets/packet.go @@ -1,20 +1,20 @@ package packets import ( - "time" - "github.com/pion/rtp" ) // Packet represents an RTP Packet type Packet struct { Packet *rtp.Packet - IsAudio bool // packet is audio - IsVideo bool // packet is video - IsKeyFrame bool // video packet is key frame - Idx int8 // stream index in container format - Codec string // codec name - CompositionTime time.Duration // packet presentation time minus decode time for H264 B-Frame - Time time.Duration // packet decode time - Data []byte // packet data + IsAudio bool // packet is audio + IsVideo bool // packet is video + IsKeyFrame bool // video packet is key frame + Idx int8 // stream index in container format + Codec string // codec name + CompositionTime int64 // packet presentation time minus decode time for H264 B-Frame + Time int64 // packet decode time + DTS int64 // packet presentation time + AU [][]byte // access unit + Data []byte // packet data } diff --git a/machinery/src/packets/queue.go b/machinery/src/packets/queue.go index a0de723..a250295 100644 --- a/machinery/src/packets/queue.go +++ b/machinery/src/packets/queue.go @@ -4,7 +4,6 @@ package packets import ( "io" "sync" - "time" ) // time @@ -145,7 +144,7 @@ func (self *Queue) Oldest() *QueueCursor { } // Create cursor position at specific time in buffered packets. -func (self *Queue) DelayedTime(dur time.Duration) *QueueCursor { +func (self *Queue) DelayedTime(dur int64) *QueueCursor { cursor := self.newCursor() cursor.init = func(buf *Buf, videoidx int) BufPos { i := buf.Tail - 1 diff --git a/machinery/src/routers/mqtt/main.go b/machinery/src/routers/mqtt/main.go index db35b38..f3a2631 100644 --- a/machinery/src/routers/mqtt/main.go +++ b/machinery/src/routers/mqtt/main.go @@ -267,6 +267,8 @@ func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory go HandleUpdateConfig(mqttClient, hubKey, payload, configDirectory, configuration, communication) case "request-sd-stream": go HandleRequestSDStream(mqttClient, hubKey, payload, configuration, communication) + case "request-hls-stream": + go HandleRequestHLSStream(mqttClient, hubKey, payload, configuration, communication) case "request-hd-stream": go HandleRequestHDStream(mqttClient, hubKey, payload, configuration, communication) case "receive-hd-candidates": @@ -481,6 +483,26 @@ func HandleRequestSDStream(mqttClient mqtt.Client, hubKey string, payload models } } +func HandleRequestHLSStream(mqttClient mqtt.Client, hubKey string, payload models.Payload, configuration *models.Configuration, communication *models.Communication) { + value := payload.Value + // Convert map[string]interface{} to RequestSDStreamPayload + jsonData, _ := json.Marshal(value) + var requestHLSStreamPayload models.RequestHLSStreamPayload + json.Unmarshal(jsonData, &requestHLSStreamPayload) + + if requestHLSStreamPayload.Timestamp != 0 { + if communication.CameraConnected { + select { + case communication.HandleLiveHLS <- time.Now().Unix(): + default: + } + log.Log.Info("routers.mqtt.main.HandleRequestHLSStream(): received request to livestream.") + } else { + log.Log.Info("routers.mqtt.main.HandleRequestHLSStream(): received request to livestream, but camera is not connected.") + } + } +} + func HandleRequestHDStream(mqttClient mqtt.Client, hubKey string, payload models.Payload, configuration *models.Configuration, communication *models.Communication) { value := payload.Value // Convert map[string]interface{} to RequestHDStreamPayload diff --git a/machinery/src/webrtc/main.go b/machinery/src/webrtc/main.go index bc12c41..a7fc51d 100644 --- a/machinery/src/webrtc/main.go +++ b/machinery/src/webrtc/main.go @@ -341,8 +341,8 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C var cursorError error var pkt packets.Packet - var previousTimeVideo time.Duration - var previousTimeAudio time.Duration + var previousTimeVideo int64 + var previousTimeAudio int64 start := false receivedKeyFrame := false @@ -353,6 +353,9 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C pkt, cursorError = livestreamCursor.ReadPacket() + // We had to disable this because webrtc the ice connection state is not getting into connected state. + // WORKAROUND TO BE FIXED! + //if config.Capture.ForwardWebRTC != "true" && peerConnectionCount == 0 { // start = false // receivedKeyFrame = false @@ -409,7 +412,8 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C start = true } if start { - sample := pionMedia.Sample{Data: pkt.Data, Duration: bufferDuration} + bufferDurationCasted := time.Duration(bufferDuration) * time.Millisecond + sample := pionMedia.Sample{Data: pkt.Data, Duration: bufferDurationCasted} if config.Capture.ForwardWebRTC == "true" { // We will send the video to a remote peer // TODO.. @@ -436,7 +440,8 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C previousTimeAudio = pkt.Time // We will send the audio - sample := pionMedia.Sample{Data: pkt.Data, Duration: bufferDuration} + bufferDurationCasted := time.Duration(bufferDuration) * time.Millisecond + sample := pionMedia.Sample{Data: pkt.Data, Duration: bufferDurationCasted} if err := audioTrack.WriteSample(sample); err != nil && err != io.ErrClosedPipe { log.Log.Error("webrtc.main.WriteToTrack(): something went wrong while writing sample: " + err.Error()) } From 383ec3c693b7e2235c3c72d25c45822bb3b6f896 Mon Sep 17 00:00:00 2001 From: Cedric Verstraeten Date: Mon, 23 Dec 2024 13:27:05 +0100 Subject: [PATCH 2/8] add HLS support --- machinery/src/capture/gortsplib.go | 12 +++++ machinery/src/capture/mpegts_muxer.go | 64 ++++++++++++++++++++++++--- machinery/src/cloud/Cloud.go | 44 ++++++++++-------- machinery/src/packets/packet.go | 1 + 4 files changed, 98 insertions(+), 23 deletions(-) diff --git a/machinery/src/capture/gortsplib.go b/machinery/src/capture/gortsplib.go index 6d2b642..acdfc80 100644 --- a/machinery/src/capture/gortsplib.go +++ b/machinery/src/capture/gortsplib.go @@ -480,6 +480,7 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets // called when a video RTP packet arrives for H264 var filteredAU [][]byte if g.VideoH264Media != nil && g.VideoH264Forma != nil { + g.Client.OnPacketRTP(g.VideoH264Media, g.VideoH264Forma, func(rtppkt *rtp.Packet) { // This will check if we need to stop the thread, @@ -504,6 +505,14 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets // access unit. Once we have a full access unit, we can // decode it, and know if it's a keyframe or not. au, errDecode := g.VideoH264Decoder.Decode(rtppkt) + //originalAU := au + // Deep copy the AU, so we can use it later on. + originalAU := make([][]byte, len(au)) + for i, v := range au { + originalAU[i] = make([]byte, len(v)) + copy(originalAU[i], v) + } + if errDecode != nil { if errDecode != rtph264.ErrNonStartingPacketAndNoPrevious && errDecode != rtph264.ErrMorePacketsNeeded { log.Log.Error("capture.golibrtsp.Start(): " + errDecode.Error()) @@ -579,6 +588,7 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets CompositionTime: pts, Idx: g.VideoH264Index, AU: filteredAU, + OrginialAU: originalAU, IsVideo: true, IsAudio: false, Codec: "H264", @@ -650,6 +660,7 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets // access unit. Once we have a full access unit, we can // decode it, and know if it's a keyframe or not. au, errDecode := g.VideoH265Decoder.Decode(rtppkt) + originalAU := au if errDecode != nil { if errDecode != rtph265.ErrNonStartingPacketAndNoPrevious && errDecode != rtph265.ErrMorePacketsNeeded { log.Log.Error("capture.golibrtsp.Start(): " + errDecode.Error()) @@ -707,6 +718,7 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets CompositionTime: pts, Idx: g.VideoH265Index, AU: au, + OrginialAU: originalAU, IsVideo: true, IsAudio: false, Codec: "H265", diff --git a/machinery/src/capture/mpegts_muxer.go b/machinery/src/capture/mpegts_muxer.go index 3b9d4f6..8cc93f8 100644 --- a/machinery/src/capture/mpegts_muxer.go +++ b/machinery/src/capture/mpegts_muxer.go @@ -2,6 +2,7 @@ package capture import ( "bufio" + "fmt" "os" "sync" @@ -20,6 +21,7 @@ func multiplyAndDivide(v, m, d int64) int64 { // mpegtsMuxer allows to save a H264 / MPEG-4 audio stream into a MPEG-TS file. type MpegtsMuxer struct { FileName string + IsOpen bool H264Format *format.H264 Mpeg4AudioFormat *format.MPEG4Audio @@ -39,12 +41,15 @@ func (e *MpegtsMuxer) Initialize() error { if err != nil { return err } + e.IsOpen = true e.b = bufio.NewWriter(e.f) e.H264Track = &mpegts.Track{ Codec: &mpegts.CodecH264{}, } + e.dtsExtractor = h264.NewDTSExtractor2() + /*e.Mpeg4AudioTrack = &mpegts.Track{ Codec: &mpegts.CodecMPEG4Audio{ Config: *e.Mpeg4AudioFormat.Config, @@ -58,21 +63,70 @@ func (e *MpegtsMuxer) Initialize() error { // close closes all the mpegtsMuxer resources. func (e *MpegtsMuxer) Close() { + e.IsOpen = false e.b.Flush() e.f.Close() } // writeH264 writes a H264 access unit into MPEG-TS. -func (e *MpegtsMuxer) WriteH264(pkt packets.Packet, pts int64) error { +func (e *MpegtsMuxer) WriteH264(pkt packets.Packet) error { e.mutex.Lock() defer e.mutex.Unlock() - dts, err := e.dtsExtractor.Extract(pkt.AU, pkt.Time) - if err != nil { - return err + au := pkt.OrginialAU + + if au == nil || pkt.IsAudio { + return nil + } + + var filteredAU [][]byte + + nonIDRPresent := false + idrPresent := false + + var SPS []byte + var PPS []byte + for _, nalu := range au { + typ := h264.NALUType(nalu[0] & 0x1F) + switch typ { + case h264.NALUTypeSPS: + SPS = nalu + continue + + case h264.NALUTypePPS: + PPS = nalu + continue + + case h264.NALUTypeAccessUnitDelimiter: + continue + + case h264.NALUTypeIDR: + idrPresent = true + + case h264.NALUTypeNonIDR: + nonIDRPresent = true + } + + filteredAU = append(filteredAU, nalu) + } + + au = filteredAU + + if au == nil || (!nonIDRPresent && !idrPresent) { + return nil + } + + // add SPS and PPS before access unit that contains an IDR + if idrPresent { + au = append([][]byte{SPS, PPS}, au...) } - return e.w.WriteH264(e.H264Track, pkt.Time, dts, pkt.IsKeyFrame, pkt.AU) + dts, err := e.dtsExtractor.Extract(au, pkt.Time) + if err != nil { + fmt.Println("Error extracting DTS: ", err) + //return err + } + return e.w.WriteH264(e.H264Track, pkt.Time, dts, pkt.IsKeyFrame, au) } // writeMPEG4Audio writes MPEG-4 audio access units into MPEG-TS. diff --git a/machinery/src/cloud/Cloud.go b/machinery/src/cloud/Cloud.go index a029312..2acb0ea 100644 --- a/machinery/src/cloud/Cloud.go +++ b/machinery/src/cloud/Cloud.go @@ -748,26 +748,16 @@ func HandleLiveStreamHLS(livestreamCursor *packets.QueueCursor, configuration *m // Check if we need to enable the live stream if config.Capture.Liveview != "false" { - hubKey := "" - if config.Cloud == "s3" && config.S3 != nil && config.S3.Publickey != "" { - hubKey = config.S3.Publickey - } else if config.Cloud == "kstorage" && config.KStorage != nil && config.KStorage.CloudKey != "" { - hubKey = config.KStorage.CloudKey - } - // This is the new way ;) - if config.HubKey != "" { - hubKey = config.HubKey - } - fmt.Println("hubKey: ", hubKey) - lastLivestreamRequest := int64(0) - var cursorError error var pkt packets.Packet + segmentDuration := 10 // 4 seconds + var muxer capture.MpegtsMuxer + var firstPTS = time.Now().Unix() for cursorError == nil { pkt, cursorError = livestreamCursor.ReadPacket() - if len(pkt.Data) == 0 || !pkt.IsKeyFrame { + if len(pkt.Data) == 0 { continue } now := time.Now().Unix() @@ -776,13 +766,31 @@ func HandleLiveStreamHLS(livestreamCursor *packets.QueueCursor, configuration *m lastLivestreamRequest = now default: } - if now-lastLivestreamRequest > 3 { + + if now-lastLivestreamRequest > 10 { + if muxer.IsOpen { + muxer.Close() + } continue } - log.Log.Info("cloud.HandleLiveStreamHLS(): Creating .ts recording for HLS live stream.") - // .. - } + if pkt.IsKeyFrame { + if muxer.IsOpen && time.Now().Unix()-firstPTS > int64(segmentDuration) { + muxer.Close() + } + if !muxer.IsOpen { + muxer = capture.MpegtsMuxer{ + FileName: fmt.Sprintf("data/live/live-%d.ts", time.Now().Unix()), + } + muxer.Initialize() + firstPTS = time.Now().Unix() + } + } + + if muxer.IsOpen { + muxer.WriteH264(pkt) + } + } } else { log.Log.Debug("cloud.HandleLiveStreamHLS(): stopping as Liveview is disabled.") } diff --git a/machinery/src/packets/packet.go b/machinery/src/packets/packet.go index 5e8b701..ba460f9 100644 --- a/machinery/src/packets/packet.go +++ b/machinery/src/packets/packet.go @@ -16,5 +16,6 @@ type Packet struct { Time int64 // packet decode time DTS int64 // packet presentation time AU [][]byte // access unit + OrginialAU [][]byte // original access unit Data []byte // packet data } From 38a0f85eebfb0f7ce9a531d2e4e21bbfc72c98a1 Mon Sep 17 00:00:00 2001 From: Cedric Verstraeten Date: Mon, 23 Dec 2024 13:36:48 +0100 Subject: [PATCH 3/8] increase gop size substream --- machinery/src/capture/mpegts_muxer.go | 2 +- machinery/src/components/Kerberos.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/machinery/src/capture/mpegts_muxer.go b/machinery/src/capture/mpegts_muxer.go index 8cc93f8..341c030 100644 --- a/machinery/src/capture/mpegts_muxer.go +++ b/machinery/src/capture/mpegts_muxer.go @@ -124,7 +124,7 @@ func (e *MpegtsMuxer) WriteH264(pkt packets.Packet) error { dts, err := e.dtsExtractor.Extract(au, pkt.Time) if err != nil { fmt.Println("Error extracting DTS: ", err) - //return err + return err } return e.w.WriteH264(e.H264Track, pkt.Time, dts, pkt.IsKeyFrame, au) } diff --git a/machinery/src/components/Kerberos.go b/machinery/src/components/Kerberos.go index df01e6e..b1b8979 100644 --- a/machinery/src/components/Kerberos.go +++ b/machinery/src/components/Kerberos.go @@ -237,7 +237,7 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu if subStreamEnabled && rtspSubClient != nil { subQueue = packets.NewQueue() communication.SubQueue = subQueue - subQueue.SetMaxGopCount(1) // GOP time frame is set to prerecording (we'll add 2 gops to leave some room). + subQueue.SetMaxGopCount(2) // GOP time frame is set to prerecording (we'll add 2 gops to leave some room). subQueue.WriteHeader(videoSubStreams) go rtspSubClient.Start(context.Background(), "sub", subQueue, configuration, communication) From ea8c9f4b041f55d50352d63a9974dc0195f699ee Mon Sep 17 00:00:00 2001 From: Cedric Verstraeten Date: Mon, 23 Dec 2024 13:37:42 +0100 Subject: [PATCH 4/8] Update .gitignore --- .gitignore | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 1e4aa56..2a51cfe 100644 --- a/.gitignore +++ b/.gitignore @@ -8,8 +8,11 @@ machinery/data/config machinery/data/cloud machinery/data/recordings machinery/data/snapshots +machinery/data/live machinery/test* machinery/init-dev.sh machinery/.env machinery/vendor -deployments/docker/private-docker-compose.yaml \ No newline at end of file +machinery/__debug* +deployments/docker/private-docker-compose.yaml +.DS_Store \ No newline at end of file From a119c394faf9d551537b5df2d59adb01207c7910 Mon Sep 17 00:00:00 2001 From: Cedric Verstraeten Date: Sun, 29 Dec 2024 21:54:36 +0100 Subject: [PATCH 5/8] Update Cloud.go --- machinery/src/cloud/Cloud.go | 41 ++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/machinery/src/cloud/Cloud.go b/machinery/src/cloud/Cloud.go index 2acb0ea..a3fbd2f 100644 --- a/machinery/src/cloud/Cloud.go +++ b/machinery/src/cloud/Cloud.go @@ -7,6 +7,7 @@ import ( "encoding/json" "fmt" "io" + "io/ioutil" "os" "strings" @@ -776,7 +777,47 @@ func HandleLiveStreamHLS(livestreamCursor *packets.QueueCursor, configuration *m if pkt.IsKeyFrame { if muxer.IsOpen && time.Now().Unix()-firstPTS > int64(segmentDuration) { + muxer.Close() + fileName := muxer.FileName + fmt.Println("We'll upload the file: " + fileName) + + // Load the file into a reader + file, err := os.Open(fileName) + + req, err := http.NewRequest("POST", config.HubURI+"/hls/segment", file) + + req.Header.Set("Content-Type", "video/mp4") + req.Header.Set("X-Kerberos-Hub-PublicKey", config.HubKey) + req.Header.Set("X-Kerberos-Hub-PrivateKey", config.HubPrivateKey) + req.Header.Set("X-Kerberos-Storage-Device", config.Key) + + var client *http.Client + if os.Getenv("AGENT_TLS_INSECURE") == "true" { + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + client = &http.Client{Transport: tr} + } else { + client = &http.Client{} + } + + resp, err := client.Do(req) + if resp != nil { + defer resp.Body.Close() + } + + if err == nil { + if resp != nil { + body, err := ioutil.ReadAll(resp.Body) + if err == nil { + if resp.StatusCode == 200 { + fmt.Println(body) + } + } + } + } + } if !muxer.IsOpen { muxer = capture.MpegtsMuxer{ From 30fc5f09c39c93056beae44fdba5fea990d3cff7 Mon Sep 17 00:00:00 2001 From: Cedric Verstraeten Date: Sun, 29 Dec 2024 23:12:07 +0100 Subject: [PATCH 6/8] Update main.go --- machinery/src/capture/main.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/machinery/src/capture/main.go b/machinery/src/capture/main.go index ee6252e..89a560d 100644 --- a/machinery/src/capture/main.go +++ b/machinery/src/capture/main.go @@ -120,7 +120,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat nextPkt.IsKeyFrame && (timestamp+recordingPeriod-now <= 0 || now-startRecording >= maxRecordingPeriod) { // Write the last packet - ttime := uint64(pkt.Time) + ttime := convertPTS2(pkt.Time) if pkt.IsVideo { if err := myMuxer.Write(videoTrack, pkt.Data, ttime, ttime); err != nil { log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error()) @@ -242,7 +242,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error()) } - ttime := uint64(pkt.Time) + ttime := convertPTS2(pkt.Time) if pkt.IsVideo { if err := myMuxer.Write(videoTrack, pkt.Data, ttime, ttime); err != nil { log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error()) @@ -261,7 +261,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat recordingStatus = "started" } else if start { - ttime := uint64(pkt.Time) + ttime := convertPTS2(pkt.Time) if pkt.IsVideo { if err := myMuxer.Write(videoTrack, pkt.Data, ttime, ttime); err != nil { log.Log.Error("capture.main.HandleRecordStream(continuous): " + err.Error()) @@ -445,7 +445,7 @@ func HandleRecordStream(queue *packets.Queue, configDirectory string, configurat } if start { - ttime := uint64(pkt.Time) + ttime := convertPTS2(pkt.Time) if pkt.IsVideo { if err := myMuxer.Write(videoTrack, pkt.Data, ttime, ttime); err != nil { log.Log.Error("capture.main.HandleRecordStream(motiondetection): " + err.Error()) @@ -695,3 +695,7 @@ func JpegImage(captureDevice *Capture, communication *models.Communication) imag func convertPTS(v time.Duration) uint64 { return uint64(v.Milliseconds()) } + +func convertPTS2(v int64) uint64 { + return uint64(v) / 100 +} From ac821f2507d1c80eb7e7cca334d1eafc49c82911 Mon Sep 17 00:00:00 2001 From: Cedric Verstraeten Date: Thu, 2 Jan 2025 17:12:55 +0100 Subject: [PATCH 7/8] use packet timestamp instead --- machinery/src/webrtc/main.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/machinery/src/webrtc/main.go b/machinery/src/webrtc/main.go index a7fc51d..fefdbcf 100644 --- a/machinery/src/webrtc/main.go +++ b/machinery/src/webrtc/main.go @@ -341,8 +341,8 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C var cursorError error var pkt packets.Packet - var previousTimeVideo int64 - var previousTimeAudio int64 + //var previousTimeVideo int64 + //var previousTimeAudio int64 start := false receivedKeyFrame := false @@ -404,16 +404,16 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C if pkt.IsVideo { // Calculate the difference - bufferDuration := pkt.Time - previousTimeVideo - previousTimeVideo = pkt.Time + //bufferDuration := pkt.Time - previousTimeVideo + //previousTimeVideo = pkt.Time // Start at the first keyframe if pkt.IsKeyFrame { start = true } if start { - bufferDurationCasted := time.Duration(bufferDuration) * time.Millisecond - sample := pionMedia.Sample{Data: pkt.Data, Duration: bufferDurationCasted} + //bufferDurationCasted := time.Duration(bufferDuration) * time.Millisecond + sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: pkt.Packet.Timestamp} //Duration: bufferDurationCasted} if config.Capture.ForwardWebRTC == "true" { // We will send the video to a remote peer // TODO.. @@ -436,12 +436,12 @@ func WriteToTrack(livestreamCursor *packets.QueueCursor, configuration *models.C } // Calculate the difference - bufferDuration := pkt.Time - previousTimeAudio - previousTimeAudio = pkt.Time + //bufferDuration := pkt.Time - previousTimeAudio + //previousTimeAudio = pkt.Time // We will send the audio - bufferDurationCasted := time.Duration(bufferDuration) * time.Millisecond - sample := pionMedia.Sample{Data: pkt.Data, Duration: bufferDurationCasted} + //bufferDurationCasted := time.Duration(bufferDuration) * time.Millisecond + sample := pionMedia.Sample{Data: pkt.Data, PacketTimestamp: pkt.Packet.Timestamp} //Duration: bufferDurationCasted} if err := audioTrack.WriteSample(sample); err != nil && err != io.ErrClosedPipe { log.Log.Error("webrtc.main.WriteToTrack(): something went wrong while writing sample: " + err.Error()) } From 4a5be74d8c0d6d2823ff1255585ef7d491a7dacc Mon Sep 17 00:00:00 2001 From: Cedric Verstraeten Date: Thu, 2 Jan 2025 17:55:01 +0100 Subject: [PATCH 8/8] add OrginialAU and AU --- machinery/src/packets/packet.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/machinery/src/packets/packet.go b/machinery/src/packets/packet.go index 3f170ac..667f4c8 100644 --- a/machinery/src/packets/packet.go +++ b/machinery/src/packets/packet.go @@ -18,4 +18,6 @@ type Packet struct { Time int64 // packet decode time TimeLegacy time.Duration Data []byte // packet data + AU [][]byte + OrginialAU [][]byte }