Skip to content

Commit

Permalink
allocate single frame for decoding + disable transcoding
Browse files Browse the repository at this point in the history
  • Loading branch information
cedricve committed Feb 5, 2023
1 parent 926f9ea commit 9aff467
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 50 deletions.
7 changes: 2 additions & 5 deletions machinery/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module github.com/kerberos-io/agent/machinery

go 1.19

//replace github.com/kerberos-io/joy4 v1.0.48 => ../../../../github.com/kerberos-io/joy4
//replace github.com/kerberos-io/joy4 v1.0.50 => ../../../../github.com/kerberos-io/joy4
//replace github.com/kerberos-io/onvif v0.0.5 => ../../../../github.com/kerberos-io/onvif

require (
Expand All @@ -20,13 +20,12 @@ require (
github.com/golang-module/carbon/v2 v2.2.3
github.com/gorilla/websocket v1.5.0
github.com/kellydunn/golang-geo v0.7.0
github.com/kerberos-io/joy4 v1.0.50
github.com/kerberos-io/joy4 v1.0.51
github.com/kerberos-io/onvif v0.0.5
github.com/minio/minio-go/v6 v6.0.57
github.com/nsmith5/mjpeg v0.0.0-20200913181537-54b8ada0e53e
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
github.com/pion/webrtc/v3 v3.1.50
github.com/shirou/gopsutil v3.21.11+incompatible
github.com/sirupsen/logrus v1.9.0
github.com/swaggo/files v1.0.0
github.com/swaggo/gin-swagger v1.5.3
Expand Down Expand Up @@ -58,7 +57,6 @@ require (
github.com/elgs/gostrgen v0.0.0-20161222160715-9d61ae07eeae // indirect
github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.19.6 // indirect
github.com/go-openapi/spec v0.20.4 // indirect
Expand Down Expand Up @@ -110,7 +108,6 @@ require (
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/tinylib/msgp v1.1.6 // indirect
github.com/ugorji/go/codec v1.2.7 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
github.com/ziutek/mymysql v1.5.4 // indirect
go4.org/intern v0.0.0-20211027215823-ae77deb06f29 // indirect
go4.org/unsafe/assume-no-moving-gc v0.0.0-20220617031537-928513b29760 // indirect
Expand Down
11 changes: 2 additions & 9 deletions machinery/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ github.com/gin-gonic/gin v1.6.2/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwv
github.com/gin-gonic/gin v1.8.1/go.mod h1:ji8BvRH1azfM+SYow9zQ6SZMvR8qOMZHmsCuWR9tTTk=
github.com/gin-gonic/gin v1.8.2 h1:UzKToD9/PoFj/V4rvlKqTRKnQYyz8Sc1MJlv4JHPtvY=
github.com/gin-gonic/gin v1.8.2/go.mod h1:qw5AYuDrzRTnhvusDsrov+fDIxp9Dleuu12h8nfB398=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-openapi/jsonpointer v0.19.3/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/jsonpointer v0.19.5 h1:gZr+CIYByUqjcgeLXnQu2gHYQC9o73G2XUeOFYEICuY=
github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
Expand Down Expand Up @@ -177,8 +175,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kellydunn/golang-geo v0.7.0 h1:A5j0/BvNgGwY6Yb6inXQxzYwlPHc6WVZR+MrarZYNNg=
github.com/kellydunn/golang-geo v0.7.0/go.mod h1:YYlQPJ+DPEzrHx8kT3oPHC/NjyvCCXE+IuKGKdrjrcU=
github.com/kerberos-io/joy4 v1.0.50 h1:N1qr0Q6ytZPG5ZmG1hDVXWeRQ7jzM7f5QftDQ/KQVCo=
github.com/kerberos-io/joy4 v1.0.50/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
github.com/kerberos-io/joy4 v1.0.51 h1:RxpXVkZIw1cfJEBPbfqdlwHfZtuDiLb/U25Na7jvPgo=
github.com/kerberos-io/joy4 v1.0.51/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
github.com/kerberos-io/onvif v0.0.5 h1:kq9mnHZkih9Jl4DyIJ4Rzt++Y3DDKy3nI8S2ESEfZ5w=
github.com/kerberos-io/onvif v0.0.5/go.mod h1:Hr2dJOH2LM5SpYKk17gYZ1CMjhGhUl+QlT5kwYogrW0=
github.com/klauspost/cpuid v1.2.3 h1:CCtW0xUnWGVINKvE/WWOYKdsPV6mawAtvQuSl8guwQs=
Expand Down Expand Up @@ -310,8 +308,6 @@ github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZ
github.com/secure-systems-lab/go-securesystemslib v0.3.1/go.mod h1:o8hhjkbNl2gOamKUA/eNW3xUrntHT9L4W89W1nfj43U=
github.com/secure-systems-lab/go-securesystemslib v0.4.0 h1:b23VGrQhTA8cN2CbBw7/FulN9fTtqYUdS5+Oxzt+DUE=
github.com/secure-systems-lab/go-securesystemslib v0.4.0/go.mod h1:FGBZgq2tXWICsxWQW1msNf49F0Pf2Op5Htayx335Qbs=
github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI=
github.com/shirou/gopsutil v3.21.11+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.5.0/go.mod h1:+F7Ogzej0PZc/94MaYx/nvG9jOFMD2osvC3s+Squfpo=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
Expand Down Expand Up @@ -366,8 +362,6 @@ github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/X
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg=
github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
github.com/ziutek/mymysql v1.5.4 h1:GB0qdRGsTwQSBVYuVShFBKaXSnSnYYC2d9knnE1LHFs=
github.com/ziutek/mymysql v1.5.4/go.mod h1:LMSpPZ6DbqWFxNCHW77HeMg9I646SAhApZ/wKdgO/C0=
go4.org/intern v0.0.0-20211027215823-ae77deb06f29 h1:UXLjNohABv4S58tHmeuIZDO6e3mHpW2Dx33gaNt03LE=
Expand Down Expand Up @@ -438,7 +432,6 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
4 changes: 2 additions & 2 deletions machinery/src/capture/IPCamera.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ func GetVideoDecoder(streams []av.CodecData) *ffmpeg.VideoDecoder {
return dec
}

func DecodeImage(pkt av.Packet, decoder *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) (*ffmpeg.VideoFrame, error) {
func DecodeImage(frame *ffmpeg.VideoFrame, pkt av.Packet, decoder *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) (*ffmpeg.VideoFrame, error) {
decoderMutex.Lock()
img, err := decoder.Decode(pkt.Data)
img, err := decoder.Decode(frame, pkt.Data)
decoderMutex.Unlock()
return img, err
}
Expand Down
16 changes: 10 additions & 6 deletions machinery/src/cloud/Cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,9 @@ func HandleLiveStreamSD(livestreamCursor *pubsub.QueueCursor, configuration *mod
log.Log.Debug("HandleLiveStreamSD: stopping as Offline is enabled.")
} else {

// Allocate frame
frame := ffmpeg.AllocVideoFrame()

key := ""
if config.Cloud == "s3" && config.S3 != nil && config.S3.Publickey != "" {
key = config.S3.Publickey
Expand Down Expand Up @@ -345,22 +348,23 @@ func HandleLiveStreamSD(livestreamCursor *pubsub.QueueCursor, configuration *mod
continue
}
log.Log.Info("HandleLiveStreamSD: Sending base64 encoded images to MQTT.")
sendImage(topic, mqttClient, pkt, decoder, decoderMutex)
sendImage(frame, topic, mqttClient, pkt, decoder, decoderMutex)
}

// Cleanup the frame.
frame.Free()
}

log.Log.Debug("HandleLiveStreamSD: finished")
}

func sendImage(topic string, mqttClient mqtt.Client, pkt av.Packet, decoder *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) {
img, err := computervision.GetRawImage(pkt, decoder, decoderMutex)
func sendImage(frame *ffmpeg.VideoFrame, topic string, mqttClient mqtt.Client, pkt av.Packet, decoder *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) {
_, err := computervision.GetRawImage(frame, pkt, decoder, decoderMutex)
if err == nil {
bytes, _ := computervision.ImageToBytes(&img.Image)
bytes, _ := computervision.ImageToBytes(&frame.Image)
encoded := base64.StdEncoding.EncodeToString(bytes)
mqttClient.Publish(topic, 0, false, encoded)
}
// Cleanup the image.
img.Free()
}

func HandleLiveStreamHD(livestreamCursor *pubsub.QueueCursor, configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, codecs []av.CodecData, decoder *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) {
Expand Down
32 changes: 16 additions & 16 deletions machinery/src/computervision/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ func ProcessMotion(motionCursor *pubsub.QueueCursor, configuration *models.Confi

key := config.HubKey

// Allocate a VideoFrame
frame := ffmpeg.AllocVideoFrame()

// Initialise first 2 elements
var imageArray [3]*image.Gray

Expand All @@ -66,7 +69,7 @@ func ProcessMotion(motionCursor *pubsub.QueueCursor, configuration *models.Confi
pkt, cursorError = motionCursor.ReadPacket()
// Check If valid package.
if len(pkt.Data) > 0 && pkt.IsKeyFrame {
grayImage, err := GetGrayImage(pkt, decoder, decoderMutex)
grayImage, err := GetGrayImage(frame, pkt, decoder, decoderMutex)
if err == nil {
imageArray[j] = grayImage
j++
Expand Down Expand Up @@ -125,15 +128,15 @@ func ProcessMotion(motionCursor *pubsub.QueueCursor, configuration *models.Confi
continue
}

grayImage, err := GetGrayImage(pkt, decoder, decoderMutex)
grayImage, err := GetGrayImage(frame, pkt, decoder, decoderMutex)
if err == nil {
imageArray[2] = grayImage
}

// Store snapshots (jpg) for hull.
files, err := ioutil.ReadDir("./data/snapshots")
if err == nil {
rgbImage, err := GetRawImage(pkt, decoder, decoderMutex)
rgbImage, err := GetRawImage(frame, pkt, decoder, decoderMutex)
if err == nil {
sort.Slice(files, func(i, j int) bool {
return files[i].ModTime().Before(files[j].ModTime())
Expand All @@ -150,7 +153,6 @@ func ProcessMotion(motionCursor *pubsub.QueueCursor, configuration *models.Confi
f.Close()
}
}
rgbImage.Free()
}

// Check if within time interval
Expand Down Expand Up @@ -202,6 +204,8 @@ func ProcessMotion(motionCursor *pubsub.QueueCursor, configuration *models.Confi
img = nil
}
}

frame.Free()
}

log.Log.Debug("ProcessMotion: finished")
Expand All @@ -216,24 +220,20 @@ func FindMotion(imageArray [3]*image.Gray, coordinatesToCheck []int, pixelChange
return changes > pixelChangeThreshold, changes
}

func GetGrayImage(pkt av.Packet, dec *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) (*image.Gray, error) {
img, err := capture.DecodeImage(pkt, dec, decoderMutex)
func GetGrayImage(frame *ffmpeg.VideoFrame, pkt av.Packet, dec *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) (*image.Gray, error) {
_, err := capture.DecodeImage(frame, pkt, dec, decoderMutex)

// Do a deep copy of the image
imgDeepCopy := image.NewGray(img.ImageGray.Bounds())
imgDeepCopy.Stride = img.ImageGray.Stride
copy(imgDeepCopy.Pix, img.ImageGray.Pix)

// Cleanup of underlaying data
img.Free()
imgDeepCopy := image.NewGray(frame.ImageGray.Bounds())
imgDeepCopy.Stride = frame.ImageGray.Stride
copy(imgDeepCopy.Pix, frame.ImageGray.Pix)

return imgDeepCopy, err
}

func GetRawImage(pkt av.Packet, dec *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) (*ffmpeg.VideoFrame, error) {
img, err := capture.DecodeImage(pkt, dec, decoderMutex)
// We'll need to free up ourselves ;) using -> img.Free()
return img, err
func GetRawImage(frame *ffmpeg.VideoFrame, pkt av.Packet, dec *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) (*ffmpeg.VideoFrame, error) {
_, err := capture.DecodeImage(frame, pkt, dec, decoderMutex)
return frame, err
}

func ImageToBytes(img image.Image) ([]byte, error) {
Expand Down
11 changes: 8 additions & 3 deletions machinery/src/routers/websocket/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/kerberos-io/agent/machinery/src/computervision"
"github.com/kerberos-io/agent/machinery/src/log"
"github.com/kerberos-io/agent/machinery/src/models"
"github.com/kerberos-io/joy4/cgo/ffmpeg"
)

type Message struct {
Expand Down Expand Up @@ -129,6 +130,9 @@ func ForwardSDStream(ctx context.Context, clientID string, connection *Connectio
decoder := communication.Decoder
decoderMutex := communication.DecoderMutex

// Allocate ffmpeg.VideoFrame
frame := ffmpeg.AllocVideoFrame()

logreader:
for {
var encodedImage string
Expand All @@ -138,13 +142,11 @@ logreader:
if !pkt.IsKeyFrame {
continue
}
img, err := computervision.GetRawImage(pkt, decoder, decoderMutex)
img, err := computervision.GetRawImage(frame, pkt, decoder, decoderMutex)
if err == nil {
bytes, _ := computervision.ImageToBytes(&img.Image)
encodedImage = base64.StdEncoding.EncodeToString(bytes)
}
// Cleanup the image.
img.Free()
} else {
log.Log.Error("ForwardSDStream:" + err.Error())
break logreader
Expand All @@ -169,5 +171,8 @@ logreader:
default:
}
}

frame.Free()

log.Log.Info("ForwardSDStream: stop sending streaming over websocket")
}
13 changes: 4 additions & 9 deletions machinery/src/webrtc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"encoding/json"
"fmt"
"io"
"runtime"
"runtime/debug"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -152,8 +150,6 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati
if err := peerConnection.Close(); err != nil {
panic(err)
}
runtime.GC()
debug.FreeOSMemory()
} else if connectionState == pionWebRTC.ICEConnectionStateConnected {
atomic.AddInt64(&peerConnectionCount, 1)
} else if connectionState == pionWebRTC.ICEConnectionStateChecking {
Expand Down Expand Up @@ -316,7 +312,8 @@ func WriteToTrack(livestreamCursor *pubsub.QueueCursor, configuration *models.Co
}

if config.Capture.TranscodingWebRTC == "true" {
decoderMutex.Lock()

/*decoderMutex.Lock()
decoder.SetFramerate(30, 1)
frame, err := decoder.Decode(pkt.Data)
decoderMutex.Unlock()
Expand All @@ -332,10 +329,8 @@ func WriteToTrack(livestreamCursor *pubsub.QueueCursor, configuration *models.Co
pkt = _outpkts[0]
codecData, _ = encoder.CodecData()
}
}
if frame != nil {
frame.Free()
}
}*/

}

switch int(pkt.Idx) {
Expand Down

0 comments on commit 9aff467

Please sign in to comment.