Skip to content

Commit

Permalink
Add CI for agent example within AI cookbook (#28)
Browse files Browse the repository at this point in the history
* WIP

Signed-off-by: Sid Murching <[email protected]>

* WIP

Signed-off-by: Sid Murching <[email protected]>

* Remove lint workflow

Signed-off-by: Sid Murching <[email protected]>

* Add comments to utils

Signed-off-by: Sid Murching <[email protected]>

* Fix test

Signed-off-by: Sid Murching <[email protected]>

* Delete autogenerated comments

Signed-off-by: Sid Murching <[email protected]>

* Switch to .show() which is defined locally

Signed-off-by: Sid Murching <[email protected]>

---------

Signed-off-by: Sid Murching <[email protected]>
  • Loading branch information
smurching authored Sep 26, 2024
1 parent ae2f30f commit 9b85862
Show file tree
Hide file tree
Showing 11 changed files with 142 additions and 63 deletions.
29 changes: 29 additions & 0 deletions .github/workflows/unit-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: Unit tests

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

jobs:
build:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ["3.10", "3.11"]

steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v3
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r dev/dev_requirements.txt
- name: Test with pytest
run: |
pytest
7 changes: 6 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
genai_cookbook/_build
env
.env
.DS_STORE
.DS_STORE
.idea
__pycache__

# Exclude `databricks sync` CLI command snapshots
.databricks
28 changes: 17 additions & 11 deletions agent_app_sample_code/02_data_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@

# COMMAND ----------

# MAGIC %run ./utils/install_aptget_package

# COMMAND ----------

# MAGIC %md
# MAGIC ## Import the global configuration

Expand All @@ -56,7 +52,7 @@
# COMMAND ----------

# MAGIC %md
# MAGIC ## Set the MLflow experiement name
# MAGIC ## Set the MLflow experiment name
# MAGIC
# MAGIC Used to track information about this Data Pipeline that are used in the later notebooks.

Expand Down Expand Up @@ -291,21 +287,31 @@ def file_parser(

# COMMAND ----------

# MAGIC %run ./utils/load_uc_volume_to_delta_table
from utils.file_loading import load_files_to_df, apply_parsing_udf

# COMMAND ----------

load_uc_volume_to_delta_table(
raw_files_df = load_files_to_df(
spark=spark,
source_path=SOURCE_UC_VOLUME,
dest_table_name=DOCS_DELTA_TABLE,
)

parsed_files_df = apply_parsing_udf(
raw_files_df=raw_files_df,
# Modify this function to change the parser, extract additional metadata, etc
parse_file_udf=file_parser,
# The schema of the resulting Delta Table will follow the schema defined in ParserReturnValue
spark_dataframe_schema=typed_dicts_to_spark_schema(ParserReturnValue),
parsed_df_schema=typed_dicts_to_spark_schema(ParserReturnValue)
)

print(DOCS_DELTA_TABLE)
display(spark.table(DOCS_DELTA_TABLE))
# Write to a Delta Table
parsed_files_df.write.mode("overwrite").option(
"overwriteSchema", "true"
).saveAsTable(DOCS_DELTA_TABLE)

# Display for debugging
print(f"Parsed {parsed_files_df.count()} documents.")
parsed_files_df.display()

# Log the resulting table to MLflow
mlflow.log_input(
Expand Down
Empty file.
6 changes: 6 additions & 0 deletions agent_app_sample_code/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import sys
import os

# Add the root directory to sys.path, so that we can treat directories like
# agent_app_sample_code as modules
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
46 changes: 46 additions & 0 deletions agent_app_sample_code/tests/test_file_loading.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import pytest
import pyspark
import pandas as pd

from agent_app_sample_code.utils.file_loading import load_files_to_df

@pytest.fixture(scope="module")
def spark():
return (
pyspark.sql.SparkSession.builder
.master("local[1]")
# Uncomment the following line for testing on Apple silicon locally
.config("spark.driver.bindAddress", "127.0.0.1")
.config("spark.task.maxFailures", "1") # avoid retry failed spark tasks
.getOrCreate()
)

def test_load_files_to_df(spark, tmpdir):
temp_dir = tmpdir.mkdir("files_subdir")
file_1 = temp_dir.join("file1.txt")
file_2 = temp_dir.join("file2.txt")
file_1.write("file1 content")
file_2.write("file2 content")
raw_files_df = load_files_to_df(spark, str(temp_dir)).drop("modificationTime").orderBy("path")
assert raw_files_df.count() == 2
raw_pandas_df = raw_files_df.toPandas()
# Decode the content from bytes to string
raw_pandas_df['content'] = raw_pandas_df['content'].apply(
lambda x: bytes(x).decode('utf-8')
)
# Expected DataFrame
expected_df = pd.DataFrame([{
"path": f"file:{str(file_1)}",
"length": len("file1 content"),
"content": "file1 content",
}, {
"path": f"file:{str(file_2)}",
"length": len("file2 content"),
"content": "file2 content",
}])
pd.testing.assert_frame_equal(raw_pandas_df, expected_df)

def test_load_files_to_df_throws_if_no_files(spark, tmpdir):
temp_dir = tmpdir.mkdir("files_subdir")
with pytest.raises(Exception, match="does not contain any files"):
load_files_to_df(spark, str(temp_dir))
Empty file.
2 changes: 1 addition & 1 deletion agent_app_sample_code/utils/build_retriever_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def find_index(endpoint_name, index_name):

if create_index:
print(
f"Computing document embeddings and Vector Search Index {get_table_url}. This can take 15 minutes or much longer if you have a larger number of documents."
f"Computing document embeddings and Vector Search Index. This can take 15 minutes or much longer if you have a larger number of documents."
)

vsc.create_delta_sync_index_and_wait(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,3 @@
# Databricks notebook source
# MAGIC %md
# MAGIC ##### `load_uc_volume_to_delta_table`
# MAGIC
# MAGIC `load_uc_volume_to_delta_table` loads files from a specified source path into a Delta Table after parsing and extracting metadata.
# MAGIC
# MAGIC Arguments:
# MAGIC - source_path: The path to the folder of files. This should be a valid directory path where the files are stored.
# MAGIC - dest_table_name: The name of the destination Delta Table.
# MAGIC - parse_file_udf: A user-defined function that takes the bytes of the file, parses it, and returns the parsed content and metadata.
# MAGIC For example: `def parse_file(raw_doc_contents_bytes, doc_path): return {'doc_content': content, 'metadata': metadata}`
# MAGIC - spark_dataframe_schema: The schema of the resulting Spark DataFrame after parsing and metadata extraction.

# COMMAND ----------

import json
import traceback
from datetime import datetime
from typing import Any, Callable, TypedDict, Dict
Expand All @@ -22,9 +6,10 @@
import warnings
import pyspark.sql.functions as func
from pyspark.sql.types import StructType
from pyspark.sql import DataFrame, SparkSession


def parse_and_extract(
def _parse_and_extract(
raw_doc_contents_bytes: bytes,
modification_time: datetime,
doc_bytes_length: int,
Expand Down Expand Up @@ -57,7 +42,7 @@ def parse_and_extract(
}


def get_parser_udf(
def _get_parser_udf(
# extract_metadata_udf: Callable[[[dict, Any]], str],
parse_file_udf: Callable[[[dict, Any]], str],
spark_dataframe_schema: StructType,
Expand All @@ -71,7 +56,7 @@ def get_parser_udf(
"""
# This UDF will load each file, parse the doc, and extract metadata.
parser_udf = func.udf(
lambda raw_doc_contents_bytes, modification_time, doc_bytes_length, doc_path: parse_and_extract(
lambda raw_doc_contents_bytes, modification_time, doc_bytes_length, doc_path: _parse_and_extract(
raw_doc_contents_bytes,
modification_time,
doc_bytes_length,
Expand All @@ -82,16 +67,18 @@ def get_parser_udf(
)
return parser_udf

def load_files_to_df(
spark: SparkSession,
source_path: str) -> DataFrame:
"""
Load files from a directory into a Spark DataFrame.
Each row in the DataFrame will contain the path, length, and content of the file; for more
details, see https://spark.apache.org/docs/latest/sql-data-sources-binaryFile.html
"""

def load_uc_volume_to_delta_table(
source_path: str,
dest_table_name: str,
parse_file_udf: Callable[[[dict, Any]], str],
spark_dataframe_schema: StructType
) -> str:
if not os.path.exists(source_path):
raise ValueError(
f"{source_path} passed to `load_uc_volume_to_delta_table` does not exist."
f"{source_path} passed to `load_uc_volume_files` does not exist."
)

# Load the raw riles
Expand All @@ -105,12 +92,19 @@ def load_uc_volume_to_delta_table(
raise Exception(f"`{source_path}` does not contain any files.")

print(f"Found {raw_files_df.count()} files in {source_path}.")
display(raw_files_df)
raw_files_df.show()
return raw_files_df

print()

def apply_parsing_udf(raw_files_df: DataFrame, parse_file_udf: Callable[[[dict, Any]], str], parsed_df_schema: StructType) -> DataFrame:
"""
Apply a file-parsing UDF to a DataFrame whose rows correspond to file content/metadata loaded via
https://spark.apache.org/docs/latest/sql-data-sources-binaryFile.html
Returns a DataFrame with the parsed content and metadata.
"""
print("Running parsing & metadata extraction UDF in spark...")

parser_udf = get_parser_udf(parse_file_udf, spark_dataframe_schema)
parser_udf = _get_parser_udf(parse_file_udf, parsed_df_schema)

# Run the parsing
parsed_files_staging_df = raw_files_df.withColumn(
Expand All @@ -127,7 +121,7 @@ def load_uc_volume_to_delta_table(
display_markdown(
f"### {num_errors} documents had parse errors. Please review.", raw=True
)
display(errors_df)
errors_df.show()

if errors_df.count() == parsed_files_staging_df.count():
raise ValueError(
Expand All @@ -139,34 +133,21 @@ def load_uc_volume_to_delta_table(
display_markdown(
f"### {num_errors} documents have no content. Please review.", raw=True
)
display(errors_df)
errors_df.show()

if num_empty_content == parsed_files_staging_df.count():
raise ValueError("All documents are empty. Please review.")

# Filter for successfully parsed files
# Change the schema to the resulting schema
resulting_fields = [field.name for field in spark_dataframe_schema.fields]
resulting_fields = [field.name for field in parsed_df_schema.fields]

parsed_files_df = parsed_files_staging_df.filter(
parsed_files_staging_df.parsing.parser_status == "SUCCESS"
)

# display(parsed_files_df)
parsed_files_df.show()
parsed_files_df = parsed_files_df.select(
*[func.col(f"parsing.{field}").alias(field) for field in resulting_fields]
)

# Write to a aDelta Table and overwrite it.
parsed_files_df.write.mode("overwrite").option(
"overwriteSchema", "true"
).saveAsTable(dest_table_name)

# Reload to get correct lineage in UC.
parsed_files_df = spark.table(dest_table_name)

# Display for debugging
print(f"Parsed {parsed_files_df.count()} documents.")
# display(parsed_files_df)

return dest_table_name
return parsed_files_df
4 changes: 2 additions & 2 deletions dev/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
To start working on this book:
- clone the repo; `cd cookbook`
- use your preferred approach to starting a new python environment
- in that environment, `pip install jupyter-book`
- in that environment, `pip install -r dev/dev_requirements.txt`
- build and preview the site with `jupyter-book build --all genai_cookbook`

The homepage is at `genai_cookbook/index.md`

The content pages are in `genai_cookbook/nbs/`

Jupyter book is fairly flexible and offers a lot of different options for formatting, cross-referencing, adding formatted callouts, etc. Read more at the [Jupyter Book docs](https://jupyterbook.org/en/stable/intro.html).
Jupyter book is fairly flexible and offers a lot of different options for formatting, cross-referencing, adding formatted callouts, etc. Read more at the [Jupyter Book docs](https://jupyterbook.org/en/stable/intro.html).
8 changes: 7 additions & 1 deletion dev/dev_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,8 @@
jupyter-book
livereload
livereload
black
pytest
mlflow-skinny[databricks]
databricks-vectorsearch
pyspark
pandas

0 comments on commit 9b85862

Please sign in to comment.