Skip to content

Commit

Permalink
feat: Kubeflow pipelines 2.0 update (#103)
Browse files Browse the repository at this point in the history
  • Loading branch information
jsilva authored Oct 23, 2023
1 parent b59a582 commit 1b33f8e
Show file tree
Hide file tree
Showing 21 changed files with 1,035 additions and 1,613 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install poetry==1.3.2
pip install poetry==1.6.1
poetry install
- name: Build
run: |
Expand All @@ -38,7 +38,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install poetry==1.3.2
pip install poetry==1.6.1
poetry install
- name: Build
run: |
Expand Down
2,476 changes: 964 additions & 1,512 deletions poetry.lock

Large diffs are not rendered by default.

92 changes: 46 additions & 46 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,70 +34,70 @@ test = ["pytest"]

[tool.poetry.dependencies]
case-converter = "^1.1.0"
cookiecutter = "^2.1.1"
cron-validator = "^1.0.6"
cookiecutter = "^2.4.0"
cron-validator = "^1.0.8"
dirhash = "^0.2.1"
email-validator = "^1.3.0"
emoji = "^1.7.0"
gitpython = "^3.1.31"
google-api-core = "^2.11.0"
google-cloud-aiplatform = "^1.22.1"
google-cloud-build = "^3.13.0"
google-cloud-compute = "^1.10.1"
google-cloud-functions = "^1.11.0"
google-cloud-logging = "^3.5.0"
google-cloud-monitoring = "^2.14.1"
google-cloud-notebooks = "^1.6.1"
google-cloud-pipeline-components = "^1.0.39"
google-cloud-resource-manager = "^1.9.0"
google-cloud-scheduler = "^2.10.0"
google-cloud-storage = "^2.7.0"
email-validator = "^2.1.0"
emoji = "^2.8.0"
gitpython = "^3.1.40"
google-api-core = "^2.12.0"
google-cloud-aiplatform = "^1.35.0"
google-cloud-build = "^3.20.1"
google-cloud-compute = "^1.14.1"
google-cloud-functions = "^1.13.3"
google-cloud-logging = "^3.8.0"
google-cloud-monitoring = "^2.16.0"
google-cloud-notebooks = "^1.8.1"
google-cloud-pipeline-components = "^2.5.0"
google-cloud-resource-manager = "^1.10.4"
google-cloud-scheduler = "^2.11.2"
google-cloud-storage = "^2.12.0"
gcloud-config-helper = "^0.3.1"
halo = "^0.0.31"
Jinja2 = "^3.1.2"
kfp = "^1.8.19"
pathvalidate = "^2.5.2"
kfp = "^2.3.0"
pathvalidate = "^3.2.0"
pydantic = "^1.10.5"
python = ">=3.8.1,<3.11"
python-on-whales = "^0.59.0"
python-on-whales = "^0.65.0"
pyyaml-include = "^1.3.1"
PyYAML = "^6.0.1"
smart-open = {extras = ["gcs"], version = "^6.3.0"}
treelib = "^1.6.1"
typer = "0.7.0"
smart-open = {extras = ["gcs"], version = "^6.4.0"}
treelib = "^1.7.0"
typer = "0.9.0"
waiting = "^1.4.1"
rich = "^13.3.1"
rich = "^13.6.0"
scandir = [
{version = "^1.10.0", platform="win32" }
]
pendulum = "^2.1.2"

[tool.poetry.dev-dependencies]
black = "^22.3.0"
flake8 = "^6.0.0"
isort = "^5.10.1"
mkdocs-click = "^0.7.0"
mkdocs-material = "^8.1.6"
mkdocs-typer = "^0.0.2"
mkdocstrings = "^0.19.0"
mkdocstrings-python = "^0.7.1"
black = "^23.10.0"
flake8 = "^6.1.0"
isort = "^5.12.0"
mkdocs-click = "^0.8.1"
mkdocs-material = "^9.4.6"
mkdocs-typer = "^0.0.3"
mkdocstrings = "^0.23.0"
mkdocstrings-python = "^1.7.3"
mkautodoc = "^0.2.0"
mock = "^4.0.3"
mypy = "^1.0.1"
pre-commit = "^2.19.0"
pylint = "^2.12.2"
pylint-pydantic = "^0.1.4"
pytest = "^6.2.5"
pytest-cov = "^3.0.0"
pytest-mock = "^3.6.1"
markdown = "^3.5"
mock = "^5.1.0"
mypy = "^1.6.1"
pre-commit = "^3.5.0"
pylint = "^3.0.2"
pylint-pydantic = "^0.3.0"
pytest = "^7.4.0"
pytest-cov = "^4.1.0"
pytest-mock = "^3.12.0"
taskipy = "^1.12.0"
types-mock = "^5.1.0.2"
types-pyyaml = "^6.0.12"
types-requests = "^2.31.0"
types-setuptools = "^68.2.0"
scikit-learn = "^1.2.1"
taskipy = "^1.9.0"
types-mock = "^4.0.12"
types-pyyaml = "^6.0.5"
types-requests = "^2.27.14"
types-setuptools = "^63.2.3"
xgboost = "^1.7.4"
markdown = "^3.3.6"

[tool.mypy]
exclude = "src/wanna/components/templates"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import List

from kfp.v2.dsl import component
from kfp.dsl import component


@component(
Expand Down
2 changes: 1 addition & 1 deletion samples/pipelines/sklearn/wanna_simple/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@
SERVING_MACHINE_TYPE = "n1-standard-4"
SERVING_MIN_REPLICA_COUNT = 1
SERVING_MAX_REPLICA_COUNT = 2
SERVING_TRAFFIC_SPLIT = '{"0": 100}'
SERVING_TRAFFIC_SPLIT = {"0": 100}
13 changes: 7 additions & 6 deletions samples/pipelines/sklearn/wanna_simple/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# ignore: import-error
# pylint: disable = no-value-for-parameter

from google_cloud_pipeline_components import aiplatform as aip_components
from kfp.v2 import dsl
from kfp.v2.dsl import component
from google_cloud_pipeline_components.v1.endpoint.deploy_model.component import model_deploy

from kfp import dsl
from kfp.dsl import component

import wanna_simple.config as cfg
from wanna_simple.components.data.get_data import get_data_op
Expand Down Expand Up @@ -70,7 +71,7 @@ def wanna_sklearn_sample(eval_acc_threshold: float, start_date: str):
# ===================================================================
# simple model training directly in component
# kfp.components.load_component_from_file()
train_op = train_xgb_model_op(dataset_op.outputs["dataset_train"])
train_op = train_xgb_model_op(dataset=dataset_op.outputs["dataset_train"])

# ===================================================================
# eval model
Expand Down Expand Up @@ -126,9 +127,9 @@ def wanna_sklearn_sample(eval_acc_threshold: float, start_date: str):
# ===================================================================
# deploy models to endpoint to associates physical resources with the model
# so it can serve online predictions
model_deploy_task = aip_components.ModelDeployOp(
model_deploy_task = model_deploy(
endpoint=endpoint_create_task.outputs["endpoint"],
model=model_upload_task.outputs["model"].ignore_type(),
model=model_upload_task.outputs["model"],
deployed_model_display_name=cfg.MODEL_NAME,
dedicated_resources_machine_type=cfg.SERVING_MACHINE_TYPE,
dedicated_resources_min_replica_count=cfg.SERVING_MIN_REPLICA_COUNT,
Expand Down
2 changes: 1 addition & 1 deletion src/wanna/cli/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
app = runner.app


class WannaRepositoryTemplate(str, Enum):
class WannaRepositoryTemplate(Enum):
sklearn = "sklearn"
blank = "blank"

Expand Down
5 changes: 2 additions & 3 deletions src/wanna/components/kubeflow/get_or_create_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
@dsl.component(
base_image="python:3.9",
packages_to_install=[
"google-cloud-pipeline-components==1.0.39",
"google-cloud-aiplatform==1.22.1",
"google-cloud-pipeline-components==2.5.0",
"google-cloud-aiplatform==1.35.0",
],
)
def get_or_create_endpoint(
Expand All @@ -18,7 +18,6 @@ def get_or_create_endpoint(
labels: Dict[str, str] = {},
network: Optional[str] = None,
):

import logging

from google.api_core.client_options import ClientOptions
Expand Down
9 changes: 5 additions & 4 deletions src/wanna/components/kubeflow/upload_model_version.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from typing import Dict, List, Optional

from kfp.v2 import dsl
from google_cloud_pipeline_components.types.artifact_types import VertexModel
from kfp import dsl


@dsl.component(
base_image="python:3.9",
packages_to_install=[
"google-cloud-pipeline-components==1.0.39",
"google-cloud-aiplatform==1.22.1",
"google-cloud-pipeline-components==2.3.0",
"google-cloud-aiplatform==1.31.0",
],
)
def upload_model_version(
Expand All @@ -16,7 +17,7 @@ def upload_model_version(
display_name: str,
serving_container_image_uri: str,
artifact_uri: str,
model: dsl.Output[dsl.Model],
model: dsl.Output[VertexModel],
model_output_path: dsl.OutputPath(str), # type: ignore
labels: Dict[str, str] = {},
version_aliases: List[str] = [],
Expand Down
1 change: 0 additions & 1 deletion src/wanna/core/deployment/artifacts_push.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ def push_json(artifacts: List[JsonArtifact]):
results: PushResult = []

for push_task in push_tasks:

push_containers(container_artifacts=push_task.container_artifacts)
push_manifests(manifest_artifacts=push_task.manifest_artifacts)
push_json(artifacts=push_task.json_artifacts)
Expand Down
2 changes: 0 additions & 2 deletions src/wanna/core/deployment/vertex_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ def run_custom_job(self, manifest: JobResource[CustomJobModel], sync: bool) -> N
custom_job = CustomJob(**manifest.job_payload)

if manifest.job_config.hp_tuning:

parameter_spec = {
param.var_name: self._create_hyperparameter_spec(param)
for param in manifest.job_config.hp_tuning.parameters
Expand Down Expand Up @@ -129,7 +128,6 @@ def run_training_job(
)

with logger.user_spinner(f"Initiating {manifest.job_config.name} custom job"):

logger.user_info(f"Outputs will be saved to {manifest.job_config.base_output_directory}")
training_job.run(
machine_type=manifest.job_config.worker.machine_type,
Expand Down
4 changes: 1 addition & 3 deletions src/wanna/core/deployment/vertex_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ def run_pipeline(
extra_params: Optional[Path],
sync: bool = True,
) -> None:

mode = "sync mode" if sync else "fire-forget mode"

logger.user_info(f"Running pipeline {resource.pipeline_name} in {mode}")
Expand Down Expand Up @@ -94,7 +93,6 @@ def run_pipeline(
def deploy_pipeline(
self, resource: PipelineResource, pipeline_paths: PipelinePaths, version: str, env: str
) -> None:

pipeline_service_account = (
resource.schedule.service_account
if resource.schedule and resource.schedule.service_account
Expand Down Expand Up @@ -288,5 +286,5 @@ def upsert_sla_function(self, resource: PipelineResource, version: str, env: str
}
try:
cf.create_function({"location": parent, "function": function}).result()
except (AlreadyExists):
except AlreadyExists:
logger.user_error(f"Function {function_name} already exists, no need to re-deploy")
1 change: 0 additions & 1 deletion src/wanna/core/services/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ def _is_docker_client_active() -> bool:
def _build_image(
self, context_dir, file_path: Path, tags: List[str], docker_image_ref: str, **build_args
) -> Union[Image, None]:

should_build = self._should_build_by_context_dir_checksum(self.build_dir / docker_image_ref, context_dir)

if should_build and not self.quick_mode:
Expand Down
2 changes: 0 additions & 2 deletions src/wanna/core/services/path_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@


class JobPaths:

job_manifest_filename = "job-manifest.json"

def __init__(self, workdir: Path, bucket: str, job_name: str):
Expand Down Expand Up @@ -47,7 +46,6 @@ def get_gcs_job_wanna_manifest_path(self, version: str) -> str:


class PipelinePaths:

json_spec_filename = "pipeline-spec.json"
wanna_manifest_filename = "wanna-manifest.json"
job_manifest_filename = "job-manifest.json"
Expand Down
16 changes: 1 addition & 15 deletions src/wanna/core/services/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from caseconverter import snakecase
from google.cloud import aiplatform
from kfp.v2.compiler import Compiler
from kfp.v2.compiler.main import compile_pyfile
from python_on_whales import Image

from wanna.core.deployment.artifacts_push import PushResult
Expand Down Expand Up @@ -210,7 +209,7 @@ def _export_pipeline_params(
env_name = snakecase(f"{pipeline_name_prefix}_{key.upper()}").upper()
os.environ[env_name] = str(value)

for (docker_image_model, _, tag) in images:
for docker_image_model, _, tag in images:
env_name = snakecase(f"{docker_image_model.name}_DOCKER_URI").upper()
os.environ[env_name] = tag

Expand All @@ -230,7 +229,6 @@ def _export_pipeline_params(
return pipeline_env_params, pipeline_compile_params

def _compile_one_instance(self, pipeline: PipelineModel, pipeline_params_path: Optional[Path] = None) -> Path:

image_tags = [
self.docker_service.get_image(docker_image_ref=docker_image_ref)
for docker_image_ref in pipeline.docker_image_ref
Expand Down Expand Up @@ -283,18 +281,6 @@ def _compile_one_instance(self, pipeline: PipelineModel, pipeline_params_path: O
package_path=pipeline_paths.get_local_pipeline_json_spec_path(self.version),
type_check=True,
)
elif pipeline.pipeline_file:
# Get the file
pyfile = str(self.workdir / pipeline.pipeline_file)
logger.user_info(f"Using kfp compile_pyfile with {pyfile}")
compile_pyfile(
pyfile=pyfile,
function_name=pipeline.pipeline_function,
pipeline_parameters=pipeline_params,
package_path=pipeline_paths.get_local_pipeline_json_spec_path(self.version),
type_check=True,
use_experimental=False,
)
else:
raise ValueError("Can not compile kfp pipeline, " "pipeline_file or pipeline_function must be set.")

Expand Down
1 change: 0 additions & 1 deletion src/wanna/core/utils/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ def get_timestamp():


def update_time_template(params: Dict[str, Any]):

for k, v in params.items():
if isinstance(v, str):
v = _jinja_env.from_string(v).render()
Expand Down
8 changes: 4 additions & 4 deletions tests/init_templates/test_init_templates.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import os
from cookiecutter.main import cookiecutter
import shutil
from pathlib import Path

import unittest
from cookiecutter.main import cookiecutter

from wanna.core.models.gcp_profile import GCPProfileModel
from wanna.cli.__main__ import WannaRepositoryTemplate
from wanna.core.utils.config_loader import load_config_from_yaml

Expand All @@ -25,7 +24,8 @@ def _test_template(self, template_name: str):
no_input=True,
overwrite_if_exists=True,
output_dir=os.path.join("testing", "templates", template_name))
config = load_config_from_yaml(os.path.join(result_dir, "wanna.yaml"), "default")

_ = load_config_from_yaml(Path(os.path.join(result_dir, "wanna.yaml")), "default")

def test_templates(self):
for template in self.template_names:
Expand Down
1 change: 0 additions & 1 deletion tests/pipeline/test_pipeline_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ def tearDown(self) -> None:

@patch("wanna.core.services.docker.docker")
def test_run_pipeline(self, docker_mock):

docker_mock.build = MagicMock(return_value=None)
docker_mock.pull = MagicMock(return_value=None)

Expand Down
Loading

0 comments on commit 1b33f8e

Please sign in to comment.