Skip to content

Commit

Permalink
work!
Browse files Browse the repository at this point in the history
  • Loading branch information
QuinnDamerell committed Dec 29, 2024
1 parent 08649ac commit 4ba7278
Show file tree
Hide file tree
Showing 7 changed files with 459 additions and 307 deletions.
2 changes: 2 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@
"routable",
"rwix",
"sagehandler",
"sagehost",
"sagetranscribehandler",
"serverauth",
"servercon",
"serverdiscovery",
Expand Down
4 changes: 2 additions & 2 deletions homeway/homeway_linuxhost/linuxhost.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ def OnPrimaryConnectionEstablished(self, apiKey, connectedAccounts):
# Set the current API key to the event handler
self.HaEventHandler.SetHomewayApiKey(apiKey)

# Once we have the API key, we can start the Sage system.
self.Sage.Start(self.GetPluginId(), apiKey)
# Once we have the API key, we can start or refresh the Sage system.
self.Sage.StartOrRefresh(self.GetPluginId(), apiKey)

# Set the current API key to the custom file server
CustomFileServer.Get().UpdateAddonConfig(self.GetPluginId(), apiKey)
Expand Down
8 changes: 7 additions & 1 deletion homeway/homeway_linuxhost/sage/fabric.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ def __init__(self, logger:logging.Logger, fiberManager, pluginId:str, apiKey:str
self.IsConnected = False


# Updates the API key if we get a new one from the server.
def UpdateApiKey(self, apiKey:str) -> None:
self.ApiKey = apiKey


# Starts the connection thread.
def Start(self) -> None:
t = threading.Thread(target=self._ConnectionThread)
t.daemon = True
Expand Down Expand Up @@ -90,7 +96,7 @@ def Closed(ws:Client):
self.Logger.info(f"{self._getLogTag()} Websocket closed")

# Start the web socket connection.
#uri = "ws://10.0.0.229/sage-fabric-websocket"
#uri = "ws://10.0.0.15/sage-fabric-websocket"
uri = "wss://homeway.io/sage-fabric-websocket"
headers = {}
headers["X-Plugin-Id"] = self.PrinterId
Expand Down
103 changes: 80 additions & 23 deletions homeway/homeway_linuxhost/sage/fibermanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import struct
import logging
import threading
import octoflatbuffers
from typing import List
import octoflatbuffers

from homeway.sentry import Sentry

Expand All @@ -15,11 +15,9 @@

from .fabric import Fabric


# Manages the fiber streams being sent over Fabric.
class FiberManager:


def __init__(self, logger:logging.Logger):
self.Logger = logger
self.Fabric:Fabric = None
Expand Down Expand Up @@ -67,20 +65,19 @@ def ResetListen(self):
# First, call this with isTransmissionDone set to false to upload stream data. When called like this, this does not block.
# When the audio is fully streamed, call with isTransmissionDone and the reaming buffer (if any) to get the response. This will block.
# No matter how it's called, it returns None on failure. If this fails at anytime during a stream, it should not be called again until ResetListen is called.
def Listen(self, isTransmissionDone:bool, audio:bytes, audioFormat:SageDataTypesFormats, sampleRate:int, channels:int, bitsPerSample:int) -> str:
async def Listen(self, isTransmissionDone:bool, audio:bytes, audioFormat:SageDataTypesFormats, sampleRate:int, channels:int, bytesPerSample:int) -> str:

# This is only called on the first message sent, this sends the audio settings.
def createDataContextOffset(builder:octoflatbuffers.Builder) -> int:
return self._CreateDataContext(builder, audioFormat, sampleRate, channels, bitsPerSample)
return self._CreateDataContext(builder, audioFormat, sampleRate, channels, bytesPerSample)

# onDataStreamReceived can be called at anytime, streaming or waiting for the response.
# If it's called while streaming, an error has occurred and we should stop until the next audio reset.
class ResponseContext:
Text = None
StatusCode = None
response:ResponseContext = ResponseContext()

# This can be called at anytime, streaming or waiting for the response.
# If it's called while streaming, an error has occurred and we should stop until the next audio reset.
def onDataStreamReceived(statusCode:int, data:bytearray, dataContext:SageDataContext):
async def onDataStreamReceived(statusCode:int, data:bytearray, dataContext:SageDataContext):
# For listen, this should only be called once
if response.StatusCode is not None:
raise Exception("Sage Listen onDataStreamReceived called more than once.")
Expand All @@ -89,7 +86,7 @@ def onDataStreamReceived(statusCode:int, data:bytearray, dataContext:SageDataCon
# If we have anything but a 200, stop processing now.
response.StatusCode = statusCode
if response.StatusCode != 200:
return
return False

# This data format must be text.
dataType = dataContext.DataType()
Expand All @@ -99,9 +96,10 @@ def onDataStreamReceived(statusCode:int, data:bytearray, dataContext:SageDataCon

# Set the text.
response.Text = data.decode("utf-8")
return True

# Do the operation, stream or wait for the response.
result = self._SendAndReceive(SageOperationTypes.Listen, audio, createDataContextOffset, onDataStreamReceived, isTransmissionDone)
result = await self._SendAndReceive(SageOperationTypes.Listen, audio, createDataContextOffset, onDataStreamReceived, isTransmissionDone)

# If we failed, we always return None, for both upload streaming or the final response.
if result is False:
Expand All @@ -125,8 +123,52 @@ def onDataStreamReceived(statusCode:int, data:bytearray, dataContext:SageDataCon
return response.Text


# Called with a text string to synthesize audio.
# async streamingDataReceivedCallback(SpeakDataResponse) -> bool
# If the callback returns False, the operation will stop.
# Return True on success, False on failure.
async def Speak(self, text:str, streamingDataReceivedCallback) -> bool:

# Creates the sending data context for the text we want to send.
def createDataContextOffset(builder:octoflatbuffers.Builder) -> int:
return self._CreateDataContext(builder, SageDataTypesFormats.Text)

# This onDataStreamReceived will be called each time there's more chunked audio data
# to stream back.
class ResponseContext:
StatusCode:int = None
response:ResponseContext = ResponseContext()
async def onDataStreamReceived(statusCode:int, data:bytearray, dataContext:SageDataContext):

# Check for a failure, which can happen at anytime.
# If we have anything but a 200, stop processing now.
response.StatusCode = statusCode
if response.StatusCode != 200:
return

# Create the data object and call the handler.
# If it returns false, we will stop.
dataResponse = SpeakDataResponse(data, dataContext.DataType(), dataContext.SampleRate(), dataContext.Channels(), dataContext.BytesPerSample())
return await streamingDataReceivedCallback(dataResponse)

# Do the operation, stream or wait for the response.
data = text.encode("utf-8")
result = await self._SendAndReceive(SageOperationTypes.Speak, data, createDataContextOffset, onDataStreamReceived)

# If we failed, return false.
if result is False:
return False

# If the status code is set at any time and not 200, we failed, regardless of the mode.
if response.StatusCode is not None and response.StatusCode != 200:
self.Logger.error(f"Sage Speak failed with status code {response.StatusCode}")
return False

return True


# TODO
def Speak(self, text:str) -> bytearray:
async def Transcript(self, text:str) -> bytearray:

# This is only called on the first message sent, this sends the audio settings.
def createDataContextOffset(builder:octoflatbuffers.Builder) -> int:
Expand All @@ -140,7 +182,7 @@ class ResponseContext:

# This can be called at anytime, streaming or waiting for the response.
# If it's called while streaming, an error has occurred and we should stop until the next audio reset.
def onDataStreamReceived(statusCode:int, data:bytearray, dataContext:SageDataContext):
async def onDataStreamReceived(statusCode:int, data:bytearray, dataContext:SageDataContext):
# For listen, this should only be called once
if response.StatusCode is not None:
raise Exception("Sage Listen onDataStreamReceived called more than once.")
Expand All @@ -149,20 +191,21 @@ def onDataStreamReceived(statusCode:int, data:bytearray, dataContext:SageDataCon
# If we have anything but a 200, stop processing now.
response.StatusCode = statusCode
if response.StatusCode != 200:
return
return False

# This data format must be text.
dataType = dataContext.DataType()
if dataType != SageDataTypesFormats.AudioPCM:
if dataType != SageDataTypesFormats.Text:
response.StatusCode = 500
raise Exception("Sage Listen got a response that wasn't text?")

# Set the text.
response.Bytes = data
return True

# Do the operation, stream or wait for the response.
data = text.encode("utf-8")
result = self._SendAndReceive(SageOperationTypes.Speak, data, createDataContextOffset, onDataStreamReceived, True)
result = await self._SendAndReceive(SageOperationTypes.ModelCommunication, data, createDataContextOffset, onDataStreamReceived, True)

# If we failed, we always return None, for both upload streaming or the final response.
if result is False:
Expand All @@ -173,7 +216,7 @@ def onDataStreamReceived(statusCode:int, data:bytearray, dataContext:SageDataCon
self.Logger.error(f"Sage Listen failed with status code {response.StatusCode}")
return None

return response.Bytes
return response.Bytes.decode("utf-8")


# A helper function that allows us to send messages for many different types of actions.
Expand All @@ -183,7 +226,7 @@ def onDataStreamReceived(statusCode:int, data:bytearray, dataContext:SageDataCon
# When it's called and isTransmissionDone is set to True, the final data will be sent and the function will block until a response is received.
# The onDataReceivedCallback can be called many times if the response is being streamed.
# The onDataReceivedCallback can also be called on any call into this function with a status code, if the stream closed early for some reason.
def _SendAndReceive(self,
async def _SendAndReceive(self,
requestType:SageOperationTypes,
sendData:bytearray,
dataContextCreateCallback,
Expand Down Expand Up @@ -304,7 +347,8 @@ def _SendAndReceive(self,

# Process the data.
for d in data:
onDataStreamReceivedCallback(statusCode, d, dataContext)
if await onDataStreamReceivedCallback(statusCode, d, dataContext) is False:
return False

# If we processed all the data and the stream is done, we're done.
if isDataDownloadComplete:
Expand Down Expand Up @@ -341,12 +385,15 @@ def _CreateStreamMessage(self, builder:octoflatbuffers.Builder, streamId:int, ms


# Builds the data context.
def _CreateDataContext(self, builder:octoflatbuffers.Builder, dataFormat:SageDataTypesFormats, sampleRate:int, channels:int, bitsPerSample:int) -> int:
def _CreateDataContext(self, builder:octoflatbuffers.Builder, dataFormat:SageDataTypesFormats, sampleRate:int=None, channels:int=None, bytesPerSample:int=None) -> int:
SageDataContext.Start(builder)
SageDataContext.AddDataType(builder, dataFormat)
SageDataContext.AddSampleRate(builder, sampleRate)
SageDataContext.AddChannels(builder, channels)
SageDataContext.AddBitsPerSample(builder, bitsPerSample)
if sampleRate is not None:
SageDataContext.AddSampleRate(builder, sampleRate)
if channels is not None:
SageDataContext.AddChannels(builder, channels)
if bytesPerSample is not None:
SageDataContext.AddBytesPerSample(builder, bytesPerSample)
return SageDataContext.End(builder)


Expand Down Expand Up @@ -460,6 +507,16 @@ def Unpack32Int(self, buffer, bufferOffset) :
return (buffer[0 + bufferOffset] << 24) + (buffer[1 + bufferOffset] << 16) + (buffer[2 + bufferOffset] << 8) + (buffer[3 + bufferOffset])


# Hold the context of the result for the speak function.
class SpeakDataResponse:
def __init__(self, data:bytearray, dataType:SageDataTypesFormats, sampleRate:int, channels:int, bytesPerSample:int):
self.Bytes = data
self.DataFormat = dataType
self.SampleRate = sampleRate
self.Channels = channels
self.BytesPerSample = bytesPerSample


# Used to track the current state of a stream.
class StreamContext:

Expand Down
Loading

0 comments on commit 4ba7278

Please sign in to comment.