From 9b858621435d9f04848f1614a95e57791c3daae8 Mon Sep 17 00:00:00 2001 From: Siddharth Murching Date: Thu, 26 Sep 2024 15:29:08 -0400 Subject: [PATCH] Add CI for agent example within AI cookbook (#28) * WIP Signed-off-by: Sid Murching * WIP Signed-off-by: Sid Murching * Remove lint workflow Signed-off-by: Sid Murching * Add comments to utils Signed-off-by: Sid Murching * Fix test Signed-off-by: Sid Murching * Delete autogenerated comments Signed-off-by: Sid Murching * Switch to .show() which is defined locally Signed-off-by: Sid Murching --------- Signed-off-by: Sid Murching --- .github/workflows/unit-test.yml | 29 +++++++ .gitignore | 7 +- agent_app_sample_code/02_data_pipeline.py | 28 ++++--- agent_app_sample_code/__init__.py | 0 agent_app_sample_code/tests/conftest.py | 6 ++ .../tests/test_file_loading.py | 46 ++++++++++++ agent_app_sample_code/utils/__init__.py | 0 .../utils/build_retriever_index.py | 2 +- ...lume_to_delta_table.py => file_loading.py} | 75 +++++++------------ dev/README.md | 4 +- dev/dev_requirements.txt | 8 +- 11 files changed, 142 insertions(+), 63 deletions(-) create mode 100644 .github/workflows/unit-test.yml create mode 100644 agent_app_sample_code/__init__.py create mode 100644 agent_app_sample_code/tests/conftest.py create mode 100644 agent_app_sample_code/tests/test_file_loading.py create mode 100644 agent_app_sample_code/utils/__init__.py rename agent_app_sample_code/utils/{load_uc_volume_to_delta_table.py => file_loading.py} (67%) diff --git a/.github/workflows/unit-test.yml b/.github/workflows/unit-test.yml new file mode 100644 index 0000000..c3f1435 --- /dev/null +++ b/.github/workflows/unit-test.yml @@ -0,0 +1,29 @@ +name: Unit tests + +on: + push: + branches: [ "main" ] + pull_request: + branches: [ "main" ] + +jobs: + build: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + python-version: ["3.10", "3.11"] + + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v3 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r dev/dev_requirements.txt + - name: Test with pytest + run: | + pytest diff --git a/.gitignore b/.gitignore index 49c6178..5600fdc 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,9 @@ genai_cookbook/_build env .env -.DS_STORE \ No newline at end of file +.DS_STORE +.idea +__pycache__ + +# Exclude `databricks sync` CLI command snapshots +.databricks diff --git a/agent_app_sample_code/02_data_pipeline.py b/agent_app_sample_code/02_data_pipeline.py index 4dba118..9df6dc1 100644 --- a/agent_app_sample_code/02_data_pipeline.py +++ b/agent_app_sample_code/02_data_pipeline.py @@ -42,10 +42,6 @@ # COMMAND ---------- -# MAGIC %run ./utils/install_aptget_package - -# COMMAND ---------- - # MAGIC %md # MAGIC ## Import the global configuration @@ -56,7 +52,7 @@ # COMMAND ---------- # MAGIC %md -# MAGIC ## Set the MLflow experiement name +# MAGIC ## Set the MLflow experiment name # MAGIC # MAGIC Used to track information about this Data Pipeline that are used in the later notebooks. @@ -291,21 +287,31 @@ def file_parser( # COMMAND ---------- -# MAGIC %run ./utils/load_uc_volume_to_delta_table +from utils.file_loading import load_files_to_df, apply_parsing_udf # COMMAND ---------- -load_uc_volume_to_delta_table( +raw_files_df = load_files_to_df( + spark=spark, source_path=SOURCE_UC_VOLUME, - dest_table_name=DOCS_DELTA_TABLE, +) + +parsed_files_df = apply_parsing_udf( + raw_files_df=raw_files_df, # Modify this function to change the parser, extract additional metadata, etc parse_file_udf=file_parser, # The schema of the resulting Delta Table will follow the schema defined in ParserReturnValue - spark_dataframe_schema=typed_dicts_to_spark_schema(ParserReturnValue), + parsed_df_schema=typed_dicts_to_spark_schema(ParserReturnValue) ) -print(DOCS_DELTA_TABLE) -display(spark.table(DOCS_DELTA_TABLE)) +# Write to a Delta Table +parsed_files_df.write.mode("overwrite").option( + "overwriteSchema", "true" +).saveAsTable(DOCS_DELTA_TABLE) + +# Display for debugging +print(f"Parsed {parsed_files_df.count()} documents.") +parsed_files_df.display() # Log the resulting table to MLflow mlflow.log_input( diff --git a/agent_app_sample_code/__init__.py b/agent_app_sample_code/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/agent_app_sample_code/tests/conftest.py b/agent_app_sample_code/tests/conftest.py new file mode 100644 index 0000000..1fefeba --- /dev/null +++ b/agent_app_sample_code/tests/conftest.py @@ -0,0 +1,6 @@ +import sys +import os + +# Add the root directory to sys.path, so that we can treat directories like +# agent_app_sample_code as modules +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))) diff --git a/agent_app_sample_code/tests/test_file_loading.py b/agent_app_sample_code/tests/test_file_loading.py new file mode 100644 index 0000000..85abf1e --- /dev/null +++ b/agent_app_sample_code/tests/test_file_loading.py @@ -0,0 +1,46 @@ +import pytest +import pyspark +import pandas as pd + +from agent_app_sample_code.utils.file_loading import load_files_to_df + +@pytest.fixture(scope="module") +def spark(): + return ( + pyspark.sql.SparkSession.builder + .master("local[1]") + # Uncomment the following line for testing on Apple silicon locally + .config("spark.driver.bindAddress", "127.0.0.1") + .config("spark.task.maxFailures", "1") # avoid retry failed spark tasks + .getOrCreate() + ) + +def test_load_files_to_df(spark, tmpdir): + temp_dir = tmpdir.mkdir("files_subdir") + file_1 = temp_dir.join("file1.txt") + file_2 = temp_dir.join("file2.txt") + file_1.write("file1 content") + file_2.write("file2 content") + raw_files_df = load_files_to_df(spark, str(temp_dir)).drop("modificationTime").orderBy("path") + assert raw_files_df.count() == 2 + raw_pandas_df = raw_files_df.toPandas() + # Decode the content from bytes to string + raw_pandas_df['content'] = raw_pandas_df['content'].apply( + lambda x: bytes(x).decode('utf-8') + ) + # Expected DataFrame + expected_df = pd.DataFrame([{ + "path": f"file:{str(file_1)}", + "length": len("file1 content"), + "content": "file1 content", + }, { + "path": f"file:{str(file_2)}", + "length": len("file2 content"), + "content": "file2 content", + }]) + pd.testing.assert_frame_equal(raw_pandas_df, expected_df) + +def test_load_files_to_df_throws_if_no_files(spark, tmpdir): + temp_dir = tmpdir.mkdir("files_subdir") + with pytest.raises(Exception, match="does not contain any files"): + load_files_to_df(spark, str(temp_dir)) diff --git a/agent_app_sample_code/utils/__init__.py b/agent_app_sample_code/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/agent_app_sample_code/utils/build_retriever_index.py b/agent_app_sample_code/utils/build_retriever_index.py index 0e1e95a..ce7a4c8 100644 --- a/agent_app_sample_code/utils/build_retriever_index.py +++ b/agent_app_sample_code/utils/build_retriever_index.py @@ -73,7 +73,7 @@ def find_index(endpoint_name, index_name): if create_index: print( - f"Computing document embeddings and Vector Search Index {get_table_url}. This can take 15 minutes or much longer if you have a larger number of documents." + f"Computing document embeddings and Vector Search Index. This can take 15 minutes or much longer if you have a larger number of documents." ) vsc.create_delta_sync_index_and_wait( diff --git a/agent_app_sample_code/utils/load_uc_volume_to_delta_table.py b/agent_app_sample_code/utils/file_loading.py similarity index 67% rename from agent_app_sample_code/utils/load_uc_volume_to_delta_table.py rename to agent_app_sample_code/utils/file_loading.py index 1c59ddb..16d13de 100644 --- a/agent_app_sample_code/utils/load_uc_volume_to_delta_table.py +++ b/agent_app_sample_code/utils/file_loading.py @@ -1,19 +1,3 @@ -# Databricks notebook source -# MAGIC %md -# MAGIC ##### `load_uc_volume_to_delta_table` -# MAGIC -# MAGIC `load_uc_volume_to_delta_table` loads files from a specified source path into a Delta Table after parsing and extracting metadata. -# MAGIC -# MAGIC Arguments: -# MAGIC - source_path: The path to the folder of files. This should be a valid directory path where the files are stored. -# MAGIC - dest_table_name: The name of the destination Delta Table. -# MAGIC - parse_file_udf: A user-defined function that takes the bytes of the file, parses it, and returns the parsed content and metadata. -# MAGIC For example: `def parse_file(raw_doc_contents_bytes, doc_path): return {'doc_content': content, 'metadata': metadata}` -# MAGIC - spark_dataframe_schema: The schema of the resulting Spark DataFrame after parsing and metadata extraction. - -# COMMAND ---------- - -import json import traceback from datetime import datetime from typing import Any, Callable, TypedDict, Dict @@ -22,9 +6,10 @@ import warnings import pyspark.sql.functions as func from pyspark.sql.types import StructType +from pyspark.sql import DataFrame, SparkSession -def parse_and_extract( +def _parse_and_extract( raw_doc_contents_bytes: bytes, modification_time: datetime, doc_bytes_length: int, @@ -57,7 +42,7 @@ def parse_and_extract( } -def get_parser_udf( +def _get_parser_udf( # extract_metadata_udf: Callable[[[dict, Any]], str], parse_file_udf: Callable[[[dict, Any]], str], spark_dataframe_schema: StructType, @@ -71,7 +56,7 @@ def get_parser_udf( """ # This UDF will load each file, parse the doc, and extract metadata. parser_udf = func.udf( - lambda raw_doc_contents_bytes, modification_time, doc_bytes_length, doc_path: parse_and_extract( + lambda raw_doc_contents_bytes, modification_time, doc_bytes_length, doc_path: _parse_and_extract( raw_doc_contents_bytes, modification_time, doc_bytes_length, @@ -82,16 +67,18 @@ def get_parser_udf( ) return parser_udf +def load_files_to_df( + spark: SparkSession, + source_path: str) -> DataFrame: + """ + Load files from a directory into a Spark DataFrame. + Each row in the DataFrame will contain the path, length, and content of the file; for more + details, see https://spark.apache.org/docs/latest/sql-data-sources-binaryFile.html + """ -def load_uc_volume_to_delta_table( - source_path: str, - dest_table_name: str, - parse_file_udf: Callable[[[dict, Any]], str], - spark_dataframe_schema: StructType -) -> str: if not os.path.exists(source_path): raise ValueError( - f"{source_path} passed to `load_uc_volume_to_delta_table` does not exist." + f"{source_path} passed to `load_uc_volume_files` does not exist." ) # Load the raw riles @@ -105,12 +92,19 @@ def load_uc_volume_to_delta_table( raise Exception(f"`{source_path}` does not contain any files.") print(f"Found {raw_files_df.count()} files in {source_path}.") - display(raw_files_df) + raw_files_df.show() + return raw_files_df - print() + +def apply_parsing_udf(raw_files_df: DataFrame, parse_file_udf: Callable[[[dict, Any]], str], parsed_df_schema: StructType) -> DataFrame: + """ + Apply a file-parsing UDF to a DataFrame whose rows correspond to file content/metadata loaded via + https://spark.apache.org/docs/latest/sql-data-sources-binaryFile.html + Returns a DataFrame with the parsed content and metadata. + """ print("Running parsing & metadata extraction UDF in spark...") - parser_udf = get_parser_udf(parse_file_udf, spark_dataframe_schema) + parser_udf = _get_parser_udf(parse_file_udf, parsed_df_schema) # Run the parsing parsed_files_staging_df = raw_files_df.withColumn( @@ -127,7 +121,7 @@ def load_uc_volume_to_delta_table( display_markdown( f"### {num_errors} documents had parse errors. Please review.", raw=True ) - display(errors_df) + errors_df.show() if errors_df.count() == parsed_files_staging_df.count(): raise ValueError( @@ -139,34 +133,21 @@ def load_uc_volume_to_delta_table( display_markdown( f"### {num_errors} documents have no content. Please review.", raw=True ) - display(errors_df) + errors_df.show() if num_empty_content == parsed_files_staging_df.count(): raise ValueError("All documents are empty. Please review.") # Filter for successfully parsed files # Change the schema to the resulting schema - resulting_fields = [field.name for field in spark_dataframe_schema.fields] + resulting_fields = [field.name for field in parsed_df_schema.fields] parsed_files_df = parsed_files_staging_df.filter( parsed_files_staging_df.parsing.parser_status == "SUCCESS" ) - # display(parsed_files_df) + parsed_files_df.show() parsed_files_df = parsed_files_df.select( *[func.col(f"parsing.{field}").alias(field) for field in resulting_fields] ) - - # Write to a aDelta Table and overwrite it. - parsed_files_df.write.mode("overwrite").option( - "overwriteSchema", "true" - ).saveAsTable(dest_table_name) - - # Reload to get correct lineage in UC. - parsed_files_df = spark.table(dest_table_name) - - # Display for debugging - print(f"Parsed {parsed_files_df.count()} documents.") - # display(parsed_files_df) - - return dest_table_name + return parsed_files_df diff --git a/dev/README.md b/dev/README.md index c74406b..b583ff8 100644 --- a/dev/README.md +++ b/dev/README.md @@ -3,11 +3,11 @@ To start working on this book: - clone the repo; `cd cookbook` - use your preferred approach to starting a new python environment -- in that environment, `pip install jupyter-book` +- in that environment, `pip install -r dev/dev_requirements.txt` - build and preview the site with `jupyter-book build --all genai_cookbook` The homepage is at `genai_cookbook/index.md` The content pages are in `genai_cookbook/nbs/` -Jupyter book is fairly flexible and offers a lot of different options for formatting, cross-referencing, adding formatted callouts, etc. Read more at the [Jupyter Book docs](https://jupyterbook.org/en/stable/intro.html). \ No newline at end of file +Jupyter book is fairly flexible and offers a lot of different options for formatting, cross-referencing, adding formatted callouts, etc. Read more at the [Jupyter Book docs](https://jupyterbook.org/en/stable/intro.html). diff --git a/dev/dev_requirements.txt b/dev/dev_requirements.txt index a64bbee..50381a6 100644 --- a/dev/dev_requirements.txt +++ b/dev/dev_requirements.txt @@ -1,2 +1,8 @@ jupyter-book -livereload \ No newline at end of file +livereload +black +pytest +mlflow-skinny[databricks] +databricks-vectorsearch +pyspark +pandas