From dd7fcb31b161bd2d524f1c9b33088ba76ad4d29e Mon Sep 17 00:00:00 2001
From: Cedric Verstraeten <cedric@verstraeten.io>
Date: Fri, 17 Nov 2023 16:28:03 +0100
Subject: [PATCH] Add ONVIF backchannel functionality with G711 encoding

---
 machinery/go.mod                      |  5 +-
 machinery/go.sum                      |  6 +-
 machinery/src/cloud/Cloud.go          |  8 ++-
 machinery/src/components/Audio.go     | 80 +++++++++++++++++++++++++++
 machinery/src/components/Kerberos.go  | 11 ++++
 machinery/src/models/AudioData.go     |  6 ++
 machinery/src/models/Communication.go |  2 +
 machinery/src/models/MQTT.go          |  6 ++
 machinery/src/routers/mqtt/main.go    | 19 +++++++
 9 files changed, 138 insertions(+), 5 deletions(-)
 create mode 100644 machinery/src/components/Audio.go
 create mode 100644 machinery/src/models/AudioData.go

diff --git a/machinery/go.mod b/machinery/go.mod
index ae6b9e8..488a7ee 100644
--- a/machinery/go.mod
+++ b/machinery/go.mod
@@ -2,7 +2,7 @@ module github.com/kerberos-io/agent/machinery
 
 go 1.19
 
-//replace github.com/kerberos-io/joy4 v1.0.58 => ../../../../github.com/kerberos-io/joy4
+//replace github.com/kerberos-io/joy4 v1.0.60 => ../../../../github.com/kerberos-io/joy4
 
 // replace github.com/kerberos-io/onvif v0.0.6 => ../../../../github.com/kerberos-io/onvif
 
@@ -26,7 +26,7 @@ require (
 	github.com/golang-module/carbon/v2 v2.2.3
 	github.com/gorilla/websocket v1.5.0
 	github.com/kellydunn/golang-geo v0.7.0
-	github.com/kerberos-io/joy4 v1.0.60
+	github.com/kerberos-io/joy4 v1.0.62
 	github.com/kerberos-io/onvif v0.0.7
 	github.com/minio/minio-go/v6 v6.0.57
 	github.com/nsmith5/mjpeg v0.0.0-20200913181537-54b8ada0e53e
@@ -38,6 +38,7 @@ require (
 	github.com/swaggo/gin-swagger v1.5.3
 	github.com/swaggo/swag v1.8.9
 	github.com/tevino/abool v1.2.0
+	github.com/zaf/g711 v0.0.0-20220109202201-cf0017bf0359
 	go.mongodb.org/mongo-driver v1.7.5
 	gopkg.in/DataDog/dd-trace-go.v1 v1.46.0
 	gopkg.in/natefinch/lumberjack.v2 v2.0.0
diff --git a/machinery/go.sum b/machinery/go.sum
index fc24c14..3c4c8d6 100644
--- a/machinery/go.sum
+++ b/machinery/go.sum
@@ -264,8 +264,8 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
 github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
 github.com/kellydunn/golang-geo v0.7.0 h1:A5j0/BvNgGwY6Yb6inXQxzYwlPHc6WVZR+MrarZYNNg=
 github.com/kellydunn/golang-geo v0.7.0/go.mod h1:YYlQPJ+DPEzrHx8kT3oPHC/NjyvCCXE+IuKGKdrjrcU=
-github.com/kerberos-io/joy4 v1.0.60 h1:W9LMTHw+Lgz4J9/28xCvvVebhcAioup49NqxYVmrH38=
-github.com/kerberos-io/joy4 v1.0.60/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
+github.com/kerberos-io/joy4 v1.0.62 h1:LsjGrss5I2UGfTovAF0icTuEcxwOPptkSqGyxeIwa40=
+github.com/kerberos-io/joy4 v1.0.62/go.mod h1:nZp4AjvKvTOXRrmDyAIOw+Da+JA5OcSo/JundGfOlFU=
 github.com/kerberos-io/onvif v0.0.7 h1:LIrXjTH7G2W9DN69xZeJSB0uS3W1+C3huFO8kTqx7/A=
 github.com/kerberos-io/onvif v0.0.7/go.mod h1:Hr2dJOH2LM5SpYKk17gYZ1CMjhGhUl+QlT5kwYogrW0=
 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
@@ -471,6 +471,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
 github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
 github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
 github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
+github.com/zaf/g711 v0.0.0-20220109202201-cf0017bf0359 h1:P9yeMx2iNJxJqXEwLtMjSwWcD2a0AlFmFByeosMZhLM=
+github.com/zaf/g711 v0.0.0-20220109202201-cf0017bf0359/go.mod h1:ySLGJD8AQluMQuu5JDvfJrwsBra+8iX1jFsKS8KfB2I=
 github.com/ziutek/mymysql v1.5.4 h1:GB0qdRGsTwQSBVYuVShFBKaXSnSnYYC2d9knnE1LHFs=
 github.com/ziutek/mymysql v1.5.4/go.mod h1:LMSpPZ6DbqWFxNCHW77HeMg9I646SAhApZ/wKdgO/C0=
 go.mongodb.org/mongo-driver v1.7.5 h1:ny3p0reEpgsR2cfA5cjgwFZg3Cv/ofFh/8jbhGtz9VI=
diff --git a/machinery/src/cloud/Cloud.go b/machinery/src/cloud/Cloud.go
index 23aedc3..34a0fdb 100644
--- a/machinery/src/cloud/Cloud.go
+++ b/machinery/src/cloud/Cloud.go
@@ -278,6 +278,11 @@ loop:
 				cameraConnected = "false"
 			}
 
+			hasBackChannel := "false"
+			if communication.HasBackChannel {
+				hasBackChannel = "true"
+			}
+
 			// We will formated the uptime to a human readable format
 			// this will be used on Kerberos Hub: Uptime -> 1 day and 2 hours.
 			uptimeFormatted := uptimeStart.Format("2006-01-02 15:04:05")
@@ -382,13 +387,14 @@ loop:
 						"onvif_presets": "%s",
 						"onvif_presets_list": %s,
 						"cameraConnected": "%s",
+						"hasBackChannel": "%s",
 						"numberoffiles" : "33",
 						"timestamp" : 1564747908,
 						"cameratype" : "IPCamera",
 						"docker" : true,
 						"kios" : false,
 						"raspberrypi" : false
-					}`, config.Key, system.Version, system.CPUId, username, key, name, isEnterprise, system.Hostname, system.Architecture, system.TotalMemory, system.UsedMemory, system.FreeMemory, system.ProcessUsedMemory, macs, ips, "0", "0", "0", uptimeString, boottimeString, config.HubSite, onvifEnabled, onvifZoom, onvifPanTilt, onvifPresets, onvifPresetsList, cameraConnected)
+					}`, config.Key, system.Version, system.CPUId, username, key, name, isEnterprise, system.Hostname, system.Architecture, system.TotalMemory, system.UsedMemory, system.FreeMemory, system.ProcessUsedMemory, macs, ips, "0", "0", "0", uptimeString, boottimeString, config.HubSite, onvifEnabled, onvifZoom, onvifPanTilt, onvifPresets, onvifPresetsList, cameraConnected, hasBackChannel)
 
 				var jsonStr = []byte(object)
 				buffy := bytes.NewBuffer(jsonStr)
diff --git a/machinery/src/components/Audio.go b/machinery/src/components/Audio.go
new file mode 100644
index 0000000..4e24d2f
--- /dev/null
+++ b/machinery/src/components/Audio.go
@@ -0,0 +1,80 @@
+package components
+
+import (
+	"bufio"
+	"fmt"
+	"os"
+	"time"
+
+	"github.com/kerberos-io/agent/machinery/src/log"
+	"github.com/kerberos-io/agent/machinery/src/models"
+	"github.com/kerberos-io/joy4/av"
+	"github.com/zaf/g711"
+)
+
+func GetBackChannelAudioCodec(streams []av.CodecData, communication *models.Communication) av.AudioCodecData {
+	for _, stream := range streams {
+		if stream.Type().IsAudio() {
+			if stream.Type().String() == "PCM_MULAW" {
+				pcmuCodec := stream.(av.AudioCodecData)
+				if pcmuCodec.IsBackChannel() {
+					communication.HasBackChannel = true
+					return pcmuCodec
+				}
+			}
+		}
+	}
+	return nil
+}
+
+func WriteAudioToBackchannel(infile av.DemuxCloser, streams []av.CodecData, communication *models.Communication) {
+	log.Log.Info("WriteAudioToBackchannel: looking for backchannel audio codec")
+
+	pcmuCodec := GetBackChannelAudioCodec(streams, communication)
+	if pcmuCodec != nil {
+		log.Log.Info("WriteAudioToBackchannel: found backchannel audio codec")
+
+		length := 0
+		channel := pcmuCodec.GetIndex() * 2 // This is the same calculation as Interleaved property in the SDP file.
+		for audio := range communication.HandleAudio {
+			// Encode PCM to MULAW
+			var bufferUlaw []byte
+			for _, v := range audio.Data {
+				b := g711.EncodeUlawFrame(v)
+				bufferUlaw = append(bufferUlaw, b)
+			}
+			infile.Write(bufferUlaw, channel, uint32(length))
+			length = (length + len(bufferUlaw)) % 65536
+			time.Sleep(128 * time.Millisecond)
+		}
+	}
+	log.Log.Info("WriteAudioToBackchannel: finished")
+
+}
+
+func WriteFileToBackChannel(infile av.DemuxCloser) {
+	// Do the warmup!
+	file, err := os.Open("./audiofile.bye")
+	if err != nil {
+		fmt.Println("WriteFileToBackChannel: error opening audiofile.bye file")
+	}
+	defer file.Close()
+
+	// Read file into buffer
+	reader := bufio.NewReader(file)
+	buffer := make([]byte, 1024)
+
+	count := 0
+	for {
+		_, err := reader.Read(buffer)
+		if err != nil {
+			break
+		}
+		// Send to backchannel
+		fmt.Println(buffer)
+		infile.Write(buffer, 2, uint32(count))
+
+		count = count + 1024
+		time.Sleep(128 * time.Millisecond)
+	}
+}
diff --git a/machinery/src/components/Kerberos.go b/machinery/src/components/Kerberos.go
index 947e2d6..51eca14 100644
--- a/machinery/src/components/Kerberos.go
+++ b/machinery/src/components/Kerberos.go
@@ -11,6 +11,8 @@ import (
 	mqtt "github.com/eclipse/paho.mqtt.golang"
 	"github.com/kerberos-io/joy4/cgo/ffmpeg"
 
+	//"github.com/youpy/go-wav"
+
 	"github.com/kerberos-io/agent/machinery/src/capture"
 	"github.com/kerberos-io/agent/machinery/src/cloud"
 	"github.com/kerberos-io/agent/machinery/src/computervision"
@@ -244,6 +246,9 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
 			go capture.HandleSubStream(subInfile, subQueue, communication)
 		}
 
+		// Handle processing of audio
+		communication.HandleAudio = make(chan models.AudioDataPartial)
+
 		// Handle processing of motion
 		communication.HandleMotion = make(chan models.MotionDataPartial, 1)
 		if subStreamEnabled {
@@ -285,6 +290,10 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
 		// If we reach this point, we have a working RTSP connection.
 		communication.CameraConnected = true
 
+		// We might have a camera with audio backchannel enabled.
+		// Check if we have a stream with a backchannel and is PCMU encoded.
+		go WriteAudioToBackchannel(infile, streams, communication)
+
 		// !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
 		// This will go into a blocking state, once this channel is triggered
 		// the agent will cleanup and restart.
@@ -328,6 +337,8 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu
 		}
 		close(communication.HandleMotion)
 		communication.HandleMotion = nil
+		close(communication.HandleAudio)
+		communication.HandleAudio = nil
 
 		// Waiting for some seconds to make sure everything is properly closed.
 		log.Log.Info("RunAgent: waiting 3 seconds to make sure everything is properly closed.")
diff --git a/machinery/src/models/AudioData.go b/machinery/src/models/AudioData.go
new file mode 100644
index 0000000..8e18a8d
--- /dev/null
+++ b/machinery/src/models/AudioData.go
@@ -0,0 +1,6 @@
+package models
+
+type AudioDataPartial struct {
+	Timestamp int64   `json:"timestamp" bson:"timestamp"`
+	Data      []int16 `json:"data" bson:"data"`
+}
diff --git a/machinery/src/models/Communication.go b/machinery/src/models/Communication.go
index 8c54746..c9723b8 100644
--- a/machinery/src/models/Communication.go
+++ b/machinery/src/models/Communication.go
@@ -22,6 +22,7 @@ type Communication struct {
 	HandleStream          chan string
 	HandleSubStream       chan string
 	HandleMotion          chan MotionDataPartial
+	HandleAudio           chan AudioDataPartial
 	HandleUpload          chan string
 	HandleHeartBeat       chan string
 	HandleLiveSD          chan int64
@@ -38,4 +39,5 @@ type Communication struct {
 	SubDecoder            *ffmpeg.VideoDecoder
 	Image                 string
 	CameraConnected       bool
+	HasBackChannel        bool
 }
diff --git a/machinery/src/models/MQTT.go b/machinery/src/models/MQTT.go
index b9760d8..a8aff43 100644
--- a/machinery/src/models/MQTT.go
+++ b/machinery/src/models/MQTT.go
@@ -107,6 +107,12 @@ type Payload struct {
 	Value          map[string]interface{} `json:"value"`
 }
 
+// We received a audio input
+type AudioPayload struct {
+	Timestamp int64   `json:"timestamp"` // timestamp of the recording request.
+	Data      []int16 `json:"data"`
+}
+
 // We received a recording request, we'll send it to the motion handler.
 type RecordPayload struct {
 	Timestamp int64 `json:"timestamp"` // timestamp of the recording request.
diff --git a/machinery/src/routers/mqtt/main.go b/machinery/src/routers/mqtt/main.go
index 2f01af6..ebb27ab 100644
--- a/machinery/src/routers/mqtt/main.go
+++ b/machinery/src/routers/mqtt/main.go
@@ -229,6 +229,8 @@ func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory
 				switch payload.Action {
 				case "record":
 					go HandleRecording(mqttClient, hubKey, payload, configuration, communication)
+				case "get-audio-backchannel":
+					go HandleAudio(mqttClient, hubKey, payload, configuration, communication)
 				case "get-ptz-position":
 					go HandleGetPTZPosition(mqttClient, hubKey, payload, configuration, communication)
 				case "update-ptz-position":
@@ -268,6 +270,23 @@ func HandleRecording(mqttClient mqtt.Client, hubKey string, payload models.Paylo
 	}
 }
 
+func HandleAudio(mqttClient mqtt.Client, hubKey string, payload models.Payload, configuration *models.Configuration, communication *models.Communication) {
+	value := payload.Value
+
+	// Convert map[string]interface{} to AudioPayload
+	jsonData, _ := json.Marshal(value)
+	var audioPayload models.AudioPayload
+	json.Unmarshal(jsonData, &audioPayload)
+
+	if audioPayload.Timestamp != 0 {
+		audioDataPartial := models.AudioDataPartial{
+			Timestamp: audioPayload.Timestamp,
+			Data:      audioPayload.Data,
+		}
+		communication.HandleAudio <- audioDataPartial
+	}
+}
+
 func HandleGetPTZPosition(mqttClient mqtt.Client, hubKey string, payload models.Payload, configuration *models.Configuration, communication *models.Communication) {
 	value := payload.Value