Skip to content

Commit

Permalink
work
Browse files Browse the repository at this point in the history
  • Loading branch information
QuinnDamerell committed Dec 28, 2024
1 parent 9960a22 commit 19ce127
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 31 deletions.
5 changes: 3 additions & 2 deletions homeway/homeway_linuxhost/sage/fabric.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ def Closed(ws:Client):
self.Logger.info(f"{self._getLogTag()} Websocket closed")

# Start the web socket connection.
uri = "ws://10.0.0.15/sage-fabric-websocket"
#uri = "ws://10.0.0.229/sage-fabric-websocket"
uri = "wss://homeway.io/sage-fabric-websocket"
headers = {}
headers["X-Plugin-Id"] = self.PrinterId
headers["X-Api-Key"] = self.ApiKey
Expand Down Expand Up @@ -121,4 +122,4 @@ def _OnData(self, ws:Client, buffer:bytes, msgType):


def _getLogTag(self) -> str:
return f"Fabric [{self.ConId}]"
return f"Sage Fabric [{self.ConId}]"
53 changes: 52 additions & 1 deletion homeway/homeway_linuxhost/sage/fibermanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,57 @@ def onDataStreamReceived(statusCode:int, data:bytearray, dataContext:SageDataCon
return response.Text


# TODO
def Speak(self, text:str) -> bytearray:

# This is only called on the first message sent, this sends the audio settings.
def createDataContextOffset(builder:octoflatbuffers.Builder) -> int:
# TODO
return self._CreateDataContext(builder, SageDataTypesFormats.Text, 0, 0, 0)

class ResponseContext:
Bytes = None
StatusCode = None
response:ResponseContext = ResponseContext()

# This can be called at anytime, streaming or waiting for the response.
# If it's called while streaming, an error has occurred and we should stop until the next audio reset.
def onDataStreamReceived(statusCode:int, data:bytearray, dataContext:SageDataContext):
# For listen, this should only be called once
if response.StatusCode is not None:
raise Exception("Sage Listen onDataStreamReceived called more than once.")

# Check for a failure, which can happen at anytime.
# If we have anything but a 200, stop processing now.
response.StatusCode = statusCode
if response.StatusCode != 200:
return

# This data format must be text.
dataType = dataContext.DataType()
if dataType != SageDataTypesFormats.AudioPCM:
response.StatusCode = 500
raise Exception("Sage Listen got a response that wasn't text?")

# Set the text.
response.Bytes = data

# Do the operation, stream or wait for the response.
data = text.encode("utf-8")
result = self._SendAndReceive(SageOperationTypes.Speak, data, createDataContextOffset, onDataStreamReceived, True)

# If we failed, we always return None, for both upload streaming or the final response.
if result is False:
return None

# If the status code is set at any time and not 200, we failed, regardless of the mode.
if response.StatusCode is not None and response.StatusCode != 200:
self.Logger.error(f"Sage Listen failed with status code {response.StatusCode}")
return None

return response.Bytes


# A helper function that allows us to send messages for many different types of actions.
# Returns true on success, false on failure.
# Note the behavior of isTransmissionDone:
Expand All @@ -137,7 +188,7 @@ def _SendAndReceive(self,
sendData:bytearray,
dataContextCreateCallback,
onDataStreamReceivedCallback,
isTransmissionDone:bool=False,
isTransmissionDone:bool=True,
timeoutSec:float = 20.0) -> bool:

# First, get or create the stream.
Expand Down
57 changes: 30 additions & 27 deletions homeway/homeway_linuxhost/sage/sagehandler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import time
import math

from wyoming.asr import Transcript
from wyoming.tts import Synthesize
Expand Down Expand Up @@ -78,37 +79,39 @@ async def handle_event(self, event: Event) -> bool:
if Synthesize.is_type(event.type):
transcript = Synthesize.from_event(event)

# # Ensure all of the text is joined on one line.
# text = " ".join(transcript.text.strip().splitlines())
# Ensure all of the text is joined on one line.
text = " ".join(transcript.text.strip().splitlines())

# self.Logger.debug(f"Sage - Synthesize Start - {text}")
self.Logger.debug(f"Sage - Synthesize Start - {text}")

start = time.time()
bytes = self.FiberManager.Speak(text)

# start = time.time()
# url = "https://homeway.io/api/sage/speak"
# response = HttpSessions.GetSession(url).post(url, json={"Text": text}, timeout=120)

# # Compute the audio values.
# data = response.content
# rate = 24000
# width = 2
# channels = 1
# bytesPerSample = width * channels
# bytesPerChunk = bytesPerSample * 1024
# chunks = int(math.ceil(len(data) / bytesPerChunk))

# # Start the response.
# await self.write_event(AudioStart(rate=rate, width=width, channels=channels).event())

# # Write the audio chunks.
# for i in range(chunks):
# offset = i * bytesPerChunk
# chunk = data[offset : offset + bytesPerChunk]
# await self.write_event(AudioChunk(audio=chunk, rate=rate, width=width, channels=channels).event())

# # Write the end event.
# await self.write_event(AudioStop().event())
# self.Logger.warn(f"Sage Synthesize End - {text} - time: {time.time() - start}")
# return True
# Compute the audio values.
data = bytes
rate = 24000
width = 2
channels = 1
bytesPerSample = width * channels
bytesPerChunk = bytesPerSample * 1024
chunks = int(math.ceil(len(data) / bytesPerChunk))

# Start the response.
await self.write_event(AudioStart(rate=rate, width=width, channels=channels).event())

# Write the audio chunks.
for i in range(chunks):
offset = i * bytesPerChunk
chunk = data[offset : offset + bytesPerChunk]
await self.write_event(AudioChunk(audio=chunk, rate=rate, width=width, channels=channels).event())

# Write the end event.
await self.write_event(AudioStop().event())
self.Logger.warn(f"Sage Synthesize End - {text} - time: {time.time() - start}")
return True


# For all other events, return True.
Expand Down Expand Up @@ -197,6 +200,6 @@ async def _HandleStreamingAudio(self, event: Event) -> bool:
return True

# Send the text back to the client.
self.Logger.debug(f"Sage Listen End - {text} - latency: {time.time() - start}s")
self.Logger.info(f"Sage Listen End - {text} - latency: {time.time() - start}s")
await self.write_event(Transcript(text=text).event())
return True
10 changes: 9 additions & 1 deletion homeway/homeway_linuxhost/sage/sagehost.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,16 @@ def __init__(self, logger:logging.Logger):
self.ApiKey:str = None
self.Fabric:Fabric = None
self.FiberManager:FiberManager = None
self.WyomingServerThreadRunning = False


# Once the api key is known, we can start.
# Note this is called every time the main WS connection is reset.
def Start(self, pluginId:str, apiKey:str):
# TODO - We need to update the API key or restart the WS
if self.PluginId is not None:
return

self.PluginId = pluginId
self.ApiKey = apiKey

Expand All @@ -39,7 +45,9 @@ def Start(self, pluginId:str, apiKey:str):
self.Fabric.Start()

# Start an independent thread to run asyncio.
threading.Thread(target=self._run).start()
if self.WyomingServerThreadRunning is False:
self.WyomingServerThreadRunning = True
threading.Thread(target=self._run).start()


def _run(self):
Expand Down

0 comments on commit 19ce127

Please sign in to comment.