Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/hls streaming #149

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ machinery/data/config
machinery/data/cloud
machinery/data/recordings
machinery/data/snapshots
machinery/data/live
machinery/test*
machinery/init-dev.sh
machinery/.env
machinery/vendor
deployments/docker/private-docker-compose.yaml
machinery/__debug*
deployments/docker/private-docker-compose.yaml
.DS_Store
2 changes: 2 additions & 0 deletions machinery/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ require (
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/asticode/go-astikit v0.30.0 // indirect
github.com/asticode/go-astits v1.13.0 // indirect
github.com/beevik/etree v1.2.0 // indirect
github.com/bytedance/sonic v1.12.2 // indirect
github.com/bytedance/sonic/loader v0.2.0 // indirect
Expand Down
5 changes: 5 additions & 0 deletions machinery/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ github.com/appleboy/gin-jwt/v2 v2.10.0/go.mod h1:DvCh3V1Ma32/7kAsAHYQVyjsQMwG+wM
github.com/appleboy/gofight/v2 v2.1.2 h1:VOy3jow4vIK8BRQJoC/I9muxyYlJ2yb9ht2hZoS3rf4=
github.com/appleboy/gofight/v2 v2.1.2/go.mod h1:frW+U1QZEdDgixycTj4CygQ48yLTUhplt43+Wczp3rw=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/asticode/go-astikit v0.30.0 h1:DkBkRQRIxYcknlaU7W7ksNfn4gMFsB0tqMJflxkRsZA=
github.com/asticode/go-astikit v0.30.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
github.com/asticode/go-astits v1.13.0 h1:XOgkaadfZODnyZRR5Y0/DWkA9vrkLLPLeeOvDwfKZ1c=
github.com/asticode/go-astits v1.13.0/go.mod h1:QSHmknZ51pf6KJdHKZHJTLlMegIrhega3LPWz3ND/iI=
github.com/beevik/etree v1.2.0 h1:l7WETslUG/T+xOPs47dtd6jov2Ii/8/OjCldk5fYfQw=
github.com/beevik/etree v1.2.0/go.mod h1:aiPf89g/1k3AShMVAzriilpcE4R/Vuor90y83zVZWFc=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
Expand Down Expand Up @@ -377,6 +381,7 @@ github.com/pion/webrtc/v4 v4.0.1/go.mod h1:SfNn8CcFxR6OUVjLXVslAQ3a3994JhyE3Hw1j
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.4.0/go.mod h1:NWz/XGvpEW1FyYQ7fCx4dqYBLlfTcE+A9FLAkNKqjFE=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down
13 changes: 13 additions & 0 deletions machinery/src/capture/gortsplib.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,14 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
// access unit. Once we have a full access unit, we can
// decode it, and know if it's a keyframe or not.
au, errDecode := g.VideoH264Decoder.Decode(rtppkt)
//originalAU := au
// Deep copy the AU, so we can use it later on.
originalAU := make([][]byte, len(au))
for i, v := range au {
originalAU[i] = make([]byte, len(v))
copy(originalAU[i], v)
}

if errDecode != nil {
if errDecode != rtph264.ErrNonStartingPacketAndNoPrevious && errDecode != rtph264.ErrMorePacketsNeeded {
log.Log.Error("capture.golibrtsp.Start(): " + errDecode.Error())
Expand Down Expand Up @@ -594,6 +602,8 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
TimeLegacy: pts,
CompositionTime: dts2,
Idx: g.VideoH264Index,
AU: filteredAU,
OrginialAU: originalAU,
IsVideo: true,
IsAudio: false,
Codec: "H264",
Expand Down Expand Up @@ -666,6 +676,7 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
// access unit. Once we have a full access unit, we can
// decode it, and know if it's a keyframe or not.
au, errDecode := g.VideoH265Decoder.Decode(rtppkt)
originalAU := au
if errDecode != nil {
if errDecode != rtph265.ErrNonStartingPacketAndNoPrevious && errDecode != rtph265.ErrMorePacketsNeeded {
log.Log.Error("capture.golibrtsp.Start(): " + errDecode.Error())
Expand Down Expand Up @@ -723,6 +734,8 @@ func (g *Golibrtsp) Start(ctx context.Context, streamType string, queue *packets
TimeLegacy: pts,
CompositionTime: pts2,
Idx: g.VideoH265Index,
AU: au,
OrginialAU: originalAU,
IsVideo: true,
IsAudio: false,
Codec: "H265",
Expand Down
138 changes: 138 additions & 0 deletions machinery/src/capture/mpegts_muxer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package capture

import (
"bufio"
"fmt"
"os"
"sync"

"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/pkg/codecs/h264"
"github.com/bluenviron/mediacommon/pkg/formats/mpegts"
"github.com/kerberos-io/agent/machinery/src/packets"
)

func multiplyAndDivide(v, m, d int64) int64 {
secs := v / d
dec := v % d
return (secs*m + dec*m/d)
}

// mpegtsMuxer allows to save a H264 / MPEG-4 audio stream into a MPEG-TS file.
type MpegtsMuxer struct {
FileName string
IsOpen bool
H264Format *format.H264
Mpeg4AudioFormat *format.MPEG4Audio

f *os.File
b *bufio.Writer
w *mpegts.Writer
H264Track *mpegts.Track
Mpeg4AudioTrack *mpegts.Track
dtsExtractor *h264.DTSExtractor2
mutex sync.Mutex
}

// initialize initializes a mpegtsMuxer.
func (e *MpegtsMuxer) Initialize() error {
var err error
e.f, err = os.Create(e.FileName)
if err != nil {
return err
}
e.IsOpen = true
e.b = bufio.NewWriter(e.f)

e.H264Track = &mpegts.Track{
Codec: &mpegts.CodecH264{},
}

e.dtsExtractor = h264.NewDTSExtractor2()

/*e.Mpeg4AudioTrack = &mpegts.Track{
Codec: &mpegts.CodecMPEG4Audio{
Config: *e.Mpeg4AudioFormat.Config,
},
}*/

e.w = mpegts.NewWriter(e.b, []*mpegts.Track{e.H264Track}) //, e.Mpeg4AudioTrack})

return nil
}

// close closes all the mpegtsMuxer resources.
func (e *MpegtsMuxer) Close() {
e.IsOpen = false
e.b.Flush()
e.f.Close()
}

// writeH264 writes a H264 access unit into MPEG-TS.
func (e *MpegtsMuxer) WriteH264(pkt packets.Packet) error {
e.mutex.Lock()
defer e.mutex.Unlock()

au := pkt.OrginialAU

if au == nil || pkt.IsAudio {
return nil
}

var filteredAU [][]byte

nonIDRPresent := false
idrPresent := false

var SPS []byte
var PPS []byte
for _, nalu := range au {
typ := h264.NALUType(nalu[0] & 0x1F)
switch typ {
case h264.NALUTypeSPS:
SPS = nalu
continue

case h264.NALUTypePPS:
PPS = nalu
continue

case h264.NALUTypeAccessUnitDelimiter:
continue

case h264.NALUTypeIDR:
idrPresent = true

case h264.NALUTypeNonIDR:
nonIDRPresent = true
}

filteredAU = append(filteredAU, nalu)
}

au = filteredAU

if au == nil || (!nonIDRPresent && !idrPresent) {
return nil
}

// add SPS and PPS before access unit that contains an IDR
if idrPresent {
au = append([][]byte{SPS, PPS}, au...)
}

dts, err := e.dtsExtractor.Extract(au, pkt.Time)
if err != nil {
fmt.Println("Error extracting DTS: ", err)
return err
}
return e.w.WriteH264(e.H264Track, pkt.Time, dts, pkt.IsKeyFrame, au)
}

// writeMPEG4Audio writes MPEG-4 audio access units into MPEG-TS.
func (e *MpegtsMuxer) WriteMPEG4Audio(aus [][]byte, pts int64) error {
e.mutex.Lock()
defer e.mutex.Unlock()

return e.w.WriteMPEG4Audio(e.Mpeg4AudioTrack, multiplyAndDivide(pts, 90000, int64(e.Mpeg4AudioFormat.ClockRate())), aus)
}
106 changes: 106 additions & 0 deletions machinery/src/cloud/Cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"strings"

Expand Down Expand Up @@ -734,6 +735,111 @@ func HandleLiveStreamSD(livestreamCursor *packets.QueueCursor, configuration *mo
log.Log.Debug("cloud.HandleLiveStreamSD(): finished")
}

func HandleLiveStreamHLS(livestreamCursor *packets.QueueCursor, configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, rtspClient capture.RTSPClient) {

log.Log.Debug("cloud.HandleLiveStreamHLS(): started")

config := configuration.Config

// If offline made is enabled, we will stop the thread.
if config.Offline == "true" {
log.Log.Debug("cloud.HandleLiveStreamHLS(): stopping as Offline is enabled.")
} else {

// Check if we need to enable the live stream
if config.Capture.Liveview != "false" {

lastLivestreamRequest := int64(0)
var cursorError error
var pkt packets.Packet
segmentDuration := 10 // 4 seconds
var muxer capture.MpegtsMuxer
var firstPTS = time.Now().Unix()

for cursorError == nil {
pkt, cursorError = livestreamCursor.ReadPacket()
if len(pkt.Data) == 0 {
continue
}
now := time.Now().Unix()
select {
case <-communication.HandleLiveHLS:
lastLivestreamRequest = now
default:
}

if now-lastLivestreamRequest > 10 {
if muxer.IsOpen {
muxer.Close()
}
continue
}

if pkt.IsKeyFrame {
if muxer.IsOpen && time.Now().Unix()-firstPTS > int64(segmentDuration) {

muxer.Close()
fileName := muxer.FileName
fmt.Println("We'll upload the file: " + fileName)

// Load the file into a reader
file, err := os.Open(fileName)

req, err := http.NewRequest("POST", config.HubURI+"/hls/segment", file)

req.Header.Set("Content-Type", "video/mp4")
req.Header.Set("X-Kerberos-Hub-PublicKey", config.HubKey)
req.Header.Set("X-Kerberos-Hub-PrivateKey", config.HubPrivateKey)
req.Header.Set("X-Kerberos-Storage-Device", config.Key)

var client *http.Client
if os.Getenv("AGENT_TLS_INSECURE") == "true" {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
client = &http.Client{Transport: tr}
} else {
client = &http.Client{}
}

resp, err := client.Do(req)
if resp != nil {
defer resp.Body.Close()
}

if err == nil {
if resp != nil {
body, err := ioutil.ReadAll(resp.Body)
if err == nil {
if resp.StatusCode == 200 {
fmt.Println(body)
}
}
}
}

}
if !muxer.IsOpen {
muxer = capture.MpegtsMuxer{
FileName: fmt.Sprintf("data/live/live-%d.ts", time.Now().Unix()),
}
muxer.Initialize()
firstPTS = time.Now().Unix()
}
}

if muxer.IsOpen {
muxer.WriteH264(pkt)
}
}
} else {
log.Log.Debug("cloud.HandleLiveStreamHLS(): stopping as Liveview is disabled.")
}
}

log.Log.Debug("cloud.HandleLiveStreamHLS(): finished")
}

func HandleLiveStreamHD(livestreamCursor *packets.QueueCursor, configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, rtspClient capture.RTSPClient) {

config := configuration.Config
Expand Down
18 changes: 14 additions & 4 deletions machinery/src/components/Kerberos.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func Bootstrap(configDirectory string, configuration *models.Configuration, comm
communication.HandleUpload = make(chan string, 1)
communication.HandleHeartBeat = make(chan string, 1)
communication.HandleLiveSD = make(chan int64, 1)
communication.HandleLiveHLS = make(chan int64, 1)
communication.HandleLiveHDKeepalive = make(chan string, 1)
communication.HandleLiveHDPeers = make(chan string, 1)
communication.IsConfiguring = abool.New()
Expand Down Expand Up @@ -253,14 +254,23 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
go cloud.HandleLiveStreamSD(livestreamCursor, configuration, communication, mqttClient, rtspClient)
}

// Handle livestream HLS
if subStreamEnabled {
livestreamCursor := subQueue.Latest()
go cloud.HandleLiveStreamHLS(livestreamCursor, configuration, communication, mqttClient, rtspSubClient)
} else {
livestreamCursor := queue.Latest()
go cloud.HandleLiveStreamHLS(livestreamCursor, configuration, communication, mqttClient, rtspClient)
}

// Handle livestream HD (high resolution over WEBRTC)
communication.HandleLiveHDHandshake = make(chan models.RequestHDStreamPayload, 1)
if subStreamEnabled {
livestreamHDCursor := subQueue.Latest()
go cloud.HandleLiveStreamHD(livestreamHDCursor, configuration, communication, mqttClient, rtspSubClient)
livestreamCursor := subQueue.Latest()
go cloud.HandleLiveStreamHD(livestreamCursor, configuration, communication, mqttClient, rtspSubClient)
} else {
livestreamHDCursor := queue.Latest()
go cloud.HandleLiveStreamHD(livestreamHDCursor, configuration, communication, mqttClient, rtspClient)
livestreamCursor := queue.Latest()
go cloud.HandleLiveStreamHD(livestreamCursor, configuration, communication, mqttClient, rtspClient)
}

// Handle recording, will write an mp4 to disk.
Expand Down
1 change: 1 addition & 0 deletions machinery/src/models/Communication.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Communication struct {
HandleUpload chan string
HandleHeartBeat chan string
HandleLiveSD chan int64
HandleLiveHLS chan int64
HandleLiveHDKeepalive chan string
HandleLiveHDHandshake chan RequestHDStreamPayload
HandleLiveHDPeers chan string
Expand Down
5 changes: 5 additions & 0 deletions machinery/src/models/MQTT.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,11 @@ type RequestSDStreamPayload struct {
Timestamp int64 `json:"timestamp"` // timestamp
}

// We received a request HLS stream request
type RequestHLSStreamPayload struct {
Timestamp int64 `json:"timestamp"` // timestamp
}

// We received a request HD stream request
type RequestHDStreamPayload struct {
Timestamp int64 `json:"timestamp"` // timestamp
Expand Down
Loading
Loading