From 69f64899feb456e3207c1c52adf642a409ff044d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 23 Jul 2024 10:50:15 -0700 Subject: [PATCH 1/4] pipeline: add send_initial_empty_metrics flag --- CHANGELOG.md | 5 +++++ src/pipecat/pipeline/pipeline.py | 2 +- src/pipecat/pipeline/task.py | 5 ++++- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0f5bac054..8e9b344d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unrelease] +### Added + +- Added `send_initial_empty_metrics` flag to `PipelineParams` to request for + initial empty metrics (zero values). True by default. + ### Fixed - STT services should be using ISO 8601 time format for transcription frames. diff --git a/src/pipecat/pipeline/pipeline.py b/src/pipecat/pipeline/pipeline.py index 165717ad3..6805cfad0 100644 --- a/src/pipecat/pipeline/pipeline.py +++ b/src/pipecat/pipeline/pipeline.py @@ -64,7 +64,7 @@ def processors_with_metrics(self): services = [] for p in self._processors: if isinstance(p, BasePipeline): - services += p.processors_with_metrics() + services.extend(p.processors_with_metrics()) elif p.can_generate_metrics(): services.append(p) return services diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index 3424aac86..ae5056a48 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -21,6 +21,7 @@ class PipelineParams(BaseModel): allow_interruptions: bool = False enable_metrics: bool = False + send_initial_empty_metrics: bool = True report_only_initial_ttfb: bool = False @@ -106,7 +107,9 @@ async def _process_down_queue(self): report_only_initial_ttfb=self._params.report_only_initial_ttfb ) await self._source.process_frame(start_frame, FrameDirection.DOWNSTREAM) - await self._source.process_frame(self._initial_metrics_frame(), FrameDirection.DOWNSTREAM) + + if self._params.send_initial_empty_metrics: + await self._source.process_frame(self._initial_metrics_frame(), FrameDirection.DOWNSTREAM) running = True should_cleanup = True From f41c2b3c9fe27201ece57c65aafe1d52538a0cfe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 23 Jul 2024 10:50:36 -0700 Subject: [PATCH 2/4] transports(daily): don't send empty metrics --- src/pipecat/transports/services/daily.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/pipecat/transports/services/daily.py b/src/pipecat/transports/services/daily.py index 142a38c02..8cfbc7530 100644 --- a/src/pipecat/transports/services/daily.py +++ b/src/pipecat/transports/services/daily.py @@ -678,12 +678,15 @@ async def send_message(self, frame: TransportMessageFrame): await self._client.send_message(frame) async def send_metrics(self, frame: MetricsFrame): + metrics = {} + if frame.ttfb: + metrics["ttfb"] = frame.ttfb + if frame.processing: + metrics["processing"] = frame.processing + message = DailyTransportMessageFrame(message={ "type": "pipecat-metrics", - "metrics": { - "ttfb": frame.ttfb or [], - "processing": frame.processing or [], - }, + "metrics": metrics }) await self._client.send_message(message) From 05d4fba551aace80062644e28fe359866702bde9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 23 Jul 2024 10:50:56 -0700 Subject: [PATCH 3/4] processors(rtvi): send initial empty metrics --- src/pipecat/processors/frameworks/rtvi.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index f9e76fb6d..f7bd170c2 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -19,6 +19,7 @@ LLMMessagesAppendFrame, LLMMessagesUpdateFrame, LLMModelUpdateFrame, + MetricsFrame, StartFrame, SystemFrame, TTSSpeakFrame, @@ -456,6 +457,13 @@ async def _handle_setup(self, setup: RTVISetup | None): start_frame = dataclasses.replace(self._start_frame) await self.push_frame(start_frame) + # Send new initial metrics with the new processors + processors = parent.processors_with_metrics() + processors.extend(self._pipeline.processors_with_metrics()) + ttfb = [{"name": p.name, "time": 0.0} for p in processors] + processing = [{"name": p.name, "time": 0.0} for p in processors] + await self.push_frame(MetricsFrame(ttfb=ttfb, processing=processing)) + message = RTVIBotReady() frame = TransportMessageFrame(message=message.model_dump(exclude_none=True)) await self.push_frame(frame) From 08e0722d97ae1bb2f221020e5107262eb37f741c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleix=20Conchillo=20Flaqu=C3=A9?= Date: Tue, 23 Jul 2024 11:17:33 -0700 Subject: [PATCH 4/4] fix initial metrics format --- CHANGELOG.md | 5 ++++- src/pipecat/pipeline/task.py | 4 ++-- src/pipecat/processors/frameworks/rtvi.py | 4 ++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e9b344d7..f557463a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,9 +14,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Fixed initial metrics format. It was using the wrong keys name/time instead of + processor/value. + - STT services should be using ISO 8601 time format for transcription frames. -- Fix an issue that would cause Daily transport to show a stop transcription +- Fixed an issue that would cause Daily transport to show a stop transcription error when actually none occurred. ## [0.0.37] - 2024-07-22 diff --git a/src/pipecat/pipeline/task.py b/src/pipecat/pipeline/task.py index ae5056a48..9f45f33ea 100644 --- a/src/pipecat/pipeline/task.py +++ b/src/pipecat/pipeline/task.py @@ -96,8 +96,8 @@ async def queue_frames(self, frames: Iterable[Frame] | AsyncIterable[Frame]): def _initial_metrics_frame(self) -> MetricsFrame: processors = self._pipeline.processors_with_metrics() - ttfb = [{"name": p.name, "time": 0.0} for p in processors] - processing = [{"name": p.name, "time": 0.0} for p in processors] + ttfb = [{"processor": p.name, "value": 0.0} for p in processors] + processing = [{"processor": p.name, "value": 0.0} for p in processors] return MetricsFrame(ttfb=ttfb, processing=processing) async def _process_down_queue(self): diff --git a/src/pipecat/processors/frameworks/rtvi.py b/src/pipecat/processors/frameworks/rtvi.py index f7bd170c2..986b59bcd 100644 --- a/src/pipecat/processors/frameworks/rtvi.py +++ b/src/pipecat/processors/frameworks/rtvi.py @@ -460,8 +460,8 @@ async def _handle_setup(self, setup: RTVISetup | None): # Send new initial metrics with the new processors processors = parent.processors_with_metrics() processors.extend(self._pipeline.processors_with_metrics()) - ttfb = [{"name": p.name, "time": 0.0} for p in processors] - processing = [{"name": p.name, "time": 0.0} for p in processors] + ttfb = [{"processor": p.name, "value": 0.0} for p in processors] + processing = [{"processor": p.name, "value": 0.0} for p in processors] await self.push_frame(MetricsFrame(ttfb=ttfb, processing=processing)) message = RTVIBotReady()