From 681b0a96ee2d5e577261f83c5f59a08c7e031bba Mon Sep 17 00:00:00 2001 From: Alex Thomas Date: Thu, 15 Aug 2024 16:19:53 +0100 Subject: [PATCH 1/4] Added a timer to the KG creation pipeline --- src/neo4j_genai/experimental/pipeline/pipeline.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/neo4j_genai/experimental/pipeline/pipeline.py b/src/neo4j_genai/experimental/pipeline/pipeline.py index c036b780..c9372b2f 100644 --- a/src/neo4j_genai/experimental/pipeline/pipeline.py +++ b/src/neo4j_genai/experimental/pipeline/pipeline.py @@ -18,6 +18,7 @@ import enum import logging from datetime import datetime +from timeit import default_timer from typing import Any, AsyncGenerator, Awaitable, Callable, Optional from pydantic import BaseModel, Field @@ -118,7 +119,9 @@ async def execute(self, **kwargs: Any) -> RunResult | None: if the task run successfully, None if the status update was unsuccessful. """ - logger.debug(f"Running component {self.name} with {kwargs}") + logger.info(f"Running component {self.name}") + logger.debug(f"Component {self.name} arguments: {kwargs}") + start_time = default_timer() try: await self.set_status(RunStatus.RUNNING) except PipelineStatusUpdateError: @@ -130,6 +133,8 @@ async def execute(self, **kwargs: Any) -> RunResult | None: status=self.status, result=component_result, ) + end_time = default_timer() + logger.info(f"Component {self.name} finished in {end_time - start_time}s") return run_result def validate_inputs_config(self, input_data: dict[str, Any]) -> None: @@ -467,8 +472,12 @@ def validate_inputs_config(self, data: dict[str, Any]) -> None: task.validate_inputs_config(data) async def run(self, data: dict[str, Any]) -> dict[str, Any]: + logging.info("Starting pipeline") + start_time = default_timer() self.validate_inputs_config(data) self.reinitialize() orchestrator = Orchestrator(self) await orchestrator.run(data) + end_time = default_timer() + logging.info(f"Pipeline finished in {end_time - start_time}s") return self._final_results.all() From 7724ba5de36c18e1e054c5d33257e19762645d8c Mon Sep 17 00:00:00 2001 From: Alex Thomas Date: Fri, 16 Aug 2024 15:22:42 +0100 Subject: [PATCH 2/4] Reverted TaskPipelineNode logger call to debug --- src/neo4j_genai/experimental/pipeline/pipeline.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/neo4j_genai/experimental/pipeline/pipeline.py b/src/neo4j_genai/experimental/pipeline/pipeline.py index c9372b2f..b37b964c 100644 --- a/src/neo4j_genai/experimental/pipeline/pipeline.py +++ b/src/neo4j_genai/experimental/pipeline/pipeline.py @@ -119,8 +119,7 @@ async def execute(self, **kwargs: Any) -> RunResult | None: if the task run successfully, None if the status update was unsuccessful. """ - logger.info(f"Running component {self.name}") - logger.debug(f"Component {self.name} arguments: {kwargs}") + logger.debug(f"Running component {self.name} with {kwargs}") start_time = default_timer() try: await self.set_status(RunStatus.RUNNING) @@ -134,7 +133,7 @@ async def execute(self, **kwargs: Any) -> RunResult | None: result=component_result, ) end_time = default_timer() - logger.info(f"Component {self.name} finished in {end_time - start_time}s") + logger.debug(f"Component {self.name} finished in {end_time - start_time}s") return run_result def validate_inputs_config(self, input_data: dict[str, Any]) -> None: From acd8d16f4955a39c335aa2237111c676e00bcf4e Mon Sep 17 00:00:00 2001 From: Alex Thomas Date: Tue, 20 Aug 2024 17:28:27 +0100 Subject: [PATCH 3/4] Adds basic TQDM progress bar to KG creation pipeline --- poetry.lock | 4 +++- pyproject.toml | 1 + src/neo4j_genai/experimental/pipeline/pipeline.py | 8 ++++++-- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/poetry.lock b/poetry.lock index 781156b8..9aa548d7 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2472,6 +2472,8 @@ files = [ {file = "orjson-3.10.6-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:960db0e31c4e52fa0fc3ecbaea5b2d3b58f379e32a95ae6b0ebeaa25b93dfd34"}, {file = "orjson-3.10.6-cp312-none-win32.whl", hash = "sha256:a6ea7afb5b30b2317e0bee03c8d34c8181bc5a36f2afd4d0952f378972c4efd5"}, {file = "orjson-3.10.6-cp312-none-win_amd64.whl", hash = "sha256:874ce88264b7e655dde4aeaacdc8fd772a7962faadfb41abe63e2a4861abc3dc"}, + {file = "orjson-3.10.6-cp313-none-win32.whl", hash = "sha256:efdf2c5cde290ae6b83095f03119bdc00303d7a03b42b16c54517baa3c4ca3d0"}, + {file = "orjson-3.10.6-cp313-none-win_amd64.whl", hash = "sha256:8e190fe7888e2e4392f52cafb9626113ba135ef53aacc65cd13109eb9746c43e"}, {file = "orjson-3.10.6-cp38-cp38-macosx_10_15_x86_64.macosx_11_0_arm64.macosx_10_15_universal2.whl", hash = "sha256:66680eae4c4e7fc193d91cfc1353ad6d01b4801ae9b5314f17e11ba55e934183"}, {file = "orjson-3.10.6-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:caff75b425db5ef8e8f23af93c80f072f97b4fb3afd4af44482905c9f588da28"}, {file = "orjson-3.10.6-cp38-cp38-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:3722fddb821b6036fd2a3c814f6bd9b57a89dc6337b9924ecd614ebce3271394"}, @@ -4609,4 +4611,4 @@ external-clients = ["pinecone-client", "weaviate-client"] [metadata] lock-version = "2.0" python-versions = "^3.8.1" -content-hash = "f49763e3e7eede0e2e028e352310e7527836b4a0c038003a054ec56783435c24" +content-hash = "0d3376d8bdd258ee439abde896d639111baad5aa40cf77bb403ac27339002b27" diff --git a/pyproject.toml b/pyproject.toml index 8408143d..3e9c42ae 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ pinecone-client = {version = "^4.1.0", optional = true} types-mock = "^5.1.0.20240425" eval-type-backport = "^0.2.0" pypdf = "^4.3.1" +tqdm = "^4.66.5" [tool.poetry.group.dev.dependencies] pylint = "^3.1.0" diff --git a/src/neo4j_genai/experimental/pipeline/pipeline.py b/src/neo4j_genai/experimental/pipeline/pipeline.py index b37b964c..f9fa0584 100644 --- a/src/neo4j_genai/experimental/pipeline/pipeline.py +++ b/src/neo4j_genai/experimental/pipeline/pipeline.py @@ -22,6 +22,7 @@ from typing import Any, AsyncGenerator, Awaitable, Callable, Optional from pydantic import BaseModel, Field +from tqdm.asyncio import tqdm from neo4j_genai.experimental.pipeline.component import Component, DataModel from neo4j_genai.experimental.pipeline.exceptions import ( @@ -410,6 +411,7 @@ def on_task_complete(self, node: TaskPipelineNode, result: RunResult) -> None: if result.result: res_to_save = result.result.model_dump() self.add_result_for_component(node.name, res_to_save, is_final=node.is_leaf()) + self.pbar.update(1) def add_result_for_component( self, name: str, result: dict[str, Any] | None, is_final: bool = False @@ -471,12 +473,14 @@ def validate_inputs_config(self, data: dict[str, Any]) -> None: task.validate_inputs_config(data) async def run(self, data: dict[str, Any]) -> dict[str, Any]: - logging.info("Starting pipeline") + logger.debug("Starting pipeline") + self.pbar = tqdm(total=len(self._nodes), desc="Creating knowledge graph") start_time = default_timer() self.validate_inputs_config(data) self.reinitialize() orchestrator = Orchestrator(self) await orchestrator.run(data) end_time = default_timer() - logging.info(f"Pipeline finished in {end_time - start_time}s") + self.pbar.close() + logger.debug(f"Pipeline finished in {end_time - start_time}s") return self._final_results.all() From bed042c94c084b60cdbb60fac38d7e67a9704ee4 Mon Sep 17 00:00:00 2001 From: Alex Thomas Date: Thu, 22 Aug 2024 15:47:52 +0100 Subject: [PATCH 4/4] Removed progress bar from KG pipeline --- poetry.lock | 4 +--- pyproject.toml | 1 - src/neo4j_genai/experimental/pipeline/pipeline.py | 4 ---- 3 files changed, 1 insertion(+), 8 deletions(-) diff --git a/poetry.lock b/poetry.lock index 9aa548d7..781156b8 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2472,8 +2472,6 @@ files = [ {file = "orjson-3.10.6-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:960db0e31c4e52fa0fc3ecbaea5b2d3b58f379e32a95ae6b0ebeaa25b93dfd34"}, {file = "orjson-3.10.6-cp312-none-win32.whl", hash = "sha256:a6ea7afb5b30b2317e0bee03c8d34c8181bc5a36f2afd4d0952f378972c4efd5"}, {file = "orjson-3.10.6-cp312-none-win_amd64.whl", hash = "sha256:874ce88264b7e655dde4aeaacdc8fd772a7962faadfb41abe63e2a4861abc3dc"}, - {file = "orjson-3.10.6-cp313-none-win32.whl", hash = "sha256:efdf2c5cde290ae6b83095f03119bdc00303d7a03b42b16c54517baa3c4ca3d0"}, - {file = "orjson-3.10.6-cp313-none-win_amd64.whl", hash = "sha256:8e190fe7888e2e4392f52cafb9626113ba135ef53aacc65cd13109eb9746c43e"}, {file = "orjson-3.10.6-cp38-cp38-macosx_10_15_x86_64.macosx_11_0_arm64.macosx_10_15_universal2.whl", hash = "sha256:66680eae4c4e7fc193d91cfc1353ad6d01b4801ae9b5314f17e11ba55e934183"}, {file = "orjson-3.10.6-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:caff75b425db5ef8e8f23af93c80f072f97b4fb3afd4af44482905c9f588da28"}, {file = "orjson-3.10.6-cp38-cp38-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:3722fddb821b6036fd2a3c814f6bd9b57a89dc6337b9924ecd614ebce3271394"}, @@ -4611,4 +4609,4 @@ external-clients = ["pinecone-client", "weaviate-client"] [metadata] lock-version = "2.0" python-versions = "^3.8.1" -content-hash = "0d3376d8bdd258ee439abde896d639111baad5aa40cf77bb403ac27339002b27" +content-hash = "f49763e3e7eede0e2e028e352310e7527836b4a0c038003a054ec56783435c24" diff --git a/pyproject.toml b/pyproject.toml index 3e9c42ae..8408143d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,7 +37,6 @@ pinecone-client = {version = "^4.1.0", optional = true} types-mock = "^5.1.0.20240425" eval-type-backport = "^0.2.0" pypdf = "^4.3.1" -tqdm = "^4.66.5" [tool.poetry.group.dev.dependencies] pylint = "^3.1.0" diff --git a/src/neo4j_genai/experimental/pipeline/pipeline.py b/src/neo4j_genai/experimental/pipeline/pipeline.py index f9fa0584..b8243aa6 100644 --- a/src/neo4j_genai/experimental/pipeline/pipeline.py +++ b/src/neo4j_genai/experimental/pipeline/pipeline.py @@ -22,7 +22,6 @@ from typing import Any, AsyncGenerator, Awaitable, Callable, Optional from pydantic import BaseModel, Field -from tqdm.asyncio import tqdm from neo4j_genai.experimental.pipeline.component import Component, DataModel from neo4j_genai.experimental.pipeline.exceptions import ( @@ -411,7 +410,6 @@ def on_task_complete(self, node: TaskPipelineNode, result: RunResult) -> None: if result.result: res_to_save = result.result.model_dump() self.add_result_for_component(node.name, res_to_save, is_final=node.is_leaf()) - self.pbar.update(1) def add_result_for_component( self, name: str, result: dict[str, Any] | None, is_final: bool = False @@ -474,13 +472,11 @@ def validate_inputs_config(self, data: dict[str, Any]) -> None: async def run(self, data: dict[str, Any]) -> dict[str, Any]: logger.debug("Starting pipeline") - self.pbar = tqdm(total=len(self._nodes), desc="Creating knowledge graph") start_time = default_timer() self.validate_inputs_config(data) self.reinitialize() orchestrator = Orchestrator(self) await orchestrator.run(data) end_time = default_timer() - self.pbar.close() logger.debug(f"Pipeline finished in {end_time - start_time}s") return self._final_results.all()