Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Feature Implementation: AWS Glue Job Execution Support (#308)
Browse files Browse the repository at this point in the history
Co-authored-by: Alexander Streed <[email protected]>
Co-authored-by: Alexander Streed <[email protected]>
  • Loading branch information
3 people authored Apr 24, 2024
1 parent 1e21cd5 commit 3bb6261
Show file tree
Hide file tree
Showing 6 changed files with 356 additions and 2 deletions.
6 changes: 6 additions & 0 deletions docs/glue_job.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
description: Tasks for interacting with AWS Glue Job
notes: This documentation page is generated from source file docstrings.
---

::: prefect_aws.glue_job
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ nav:
- Lambda: lambda_function.md
- Deployments:
- Steps: deployments/steps.md
- Glue Job: glue_job.md
- S3: s3.md
- Secrets Manager: secrets_manager.md

Expand Down
188 changes: 188 additions & 0 deletions prefect_aws/glue_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
"""
Integrations with the AWS Glue Job.
"""
import time
from typing import Any, Optional

from prefect.blocks.abstract import JobBlock, JobRun
from pydantic import VERSION as PYDANTIC_VERSION

if PYDANTIC_VERSION.startswith("2."):
from pydantic.v1 import BaseModel, Field
else:
from pydantic import BaseModel, Field

from prefect_aws import AwsCredentials

_GlueJobClient = Any


class GlueJobRun(JobRun, BaseModel):
"""Execute a Glue Job"""

job_name: str = Field(
...,
title="AWS Glue Job Name",
description="The name of the job definition to use.",
)

job_id: str = Field(
...,
title="AWS Glue Job ID",
description="The ID of the job run.",
)

job_watch_poll_interval: float = Field(
default=60.0,
description=(
"The amount of time to wait between AWS API calls while monitoring the "
"state of an Glue Job."
),
)

_error_states = ["FAILED", "STOPPED", "ERROR", "TIMEOUT"]

aws_credentials: AwsCredentials = Field(
title="AWS Credentials",
default_factory=AwsCredentials,
description="The AWS credentials to use to connect to Glue.",
)

client: _GlueJobClient = Field(default=None, description="")

async def fetch_result(self) -> str:
"""fetch glue job state"""
job = self._get_job_run()
return job["JobRun"]["JobRunState"]

def wait_for_completion(self) -> None:
"""
Wait for the job run to complete and get exit code
"""
self.logger.info(f"watching job {self.job_name} with run id {self.job_id}")
while True:
job = self._get_job_run()
job_state = job["JobRun"]["JobRunState"]
if job_state in self._error_states:
# Generate a dynamic exception type from the AWS name
self.logger.error(f"job failed: {job['JobRun']['ErrorMessage']}")
raise RuntimeError(job["JobRun"]["ErrorMessage"])
elif job_state == "SUCCEEDED":
self.logger.info(f"job succeeded: {self.job_id}")
break

time.sleep(self.job_watch_poll_interval)

def _get_job_run(self):
"""get glue job"""
return self.client.get_job_run(JobName=self.job_name, RunId=self.job_id)


class GlueJobBlock(JobBlock):
"""Execute a job to the AWS Glue Job service.
Attributes:
job_name: The name of the job definition to use.
arguments: The job arguments associated with this run.
For this job run, they replace the default arguments set in the job
definition itself.
You can specify arguments here that your own job-execution script consumes,
as well as arguments that Glue itself consumes.
Job arguments may be logged. Do not pass plaintext secrets as arguments.
Retrieve secrets from a Glue Connection, Secrets Manager or other secret
management mechanism if you intend to keep them within the Job.
[doc](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html)
job_watch_poll_interval: The amount of time to wait between AWS API
calls while monitoring the state of a Glue Job.
default is 60s because of jobs that use AWS Glue versions 2.0 and later
have a 1-minute minimum.
[AWS Glue Pricing](https://aws.amazon.com/glue/pricing/?nc1=h_ls)
Example:
Start a job to AWS Glue Job.
```python
from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.glue_job import GlueJobBlock
@flow
def example_run_glue_job():
aws_credentials = AwsCredentials(
aws_access_key_id="your_access_key_id",
aws_secret_access_key="your_secret_access_key"
)
glue_job_run = GlueJobBlock(
job_name="your_glue_job_name",
arguments={"--YOUR_EXTRA_ARGUMENT": "YOUR_EXTRA_ARGUMENT_VALUE"},
).trigger()
return glue_job_run.wait_for_completion()
example_run_glue_job()
```
"""

job_name: str = Field(
...,
title="AWS Glue Job Name",
description="The name of the job definition to use.",
)

arguments: Optional[dict] = Field(
default=None,
title="AWS Glue Job Arguments",
description="The job arguments associated with this run.",
)
job_watch_poll_interval: float = Field(
default=60.0,
description=(
"The amount of time to wait between AWS API calls while monitoring the "
"state of an Glue Job."
),
)

aws_credentials: AwsCredentials = Field(
title="AWS Credentials",
default_factory=AwsCredentials,
description="The AWS credentials to use to connect to Glue.",
)

async def trigger(self) -> GlueJobRun:
"""trigger for GlueJobRun"""
client = self._get_client()
job_run_id = self._start_job(client)
return GlueJobRun(
job_name=self.job_name,
job_id=job_run_id,
job_watch_poll_interval=self.job_watch_poll_interval,
)

def _start_job(self, client: _GlueJobClient) -> str:
"""
Start the AWS Glue Job
[doc](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/client/start_job_run.html)
"""
self.logger.info(
f"starting job {self.job_name} with arguments {self.arguments}"
)
try:
response = client.start_job_run(
JobName=self.job_name,
Arguments=self.arguments,
)
job_run_id = str(response["JobRunId"])
self.logger.info(f"job started with job run id: {job_run_id}")
return job_run_id
except Exception as e:
self.logger.error(f"failed to start job: {e}")
raise RuntimeError

def _get_client(self) -> _GlueJobClient:
"""
Retrieve a Glue Job Client
"""
boto_session = self.aws_credentials.get_boto3_session()
return boto_session.client("glue")
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ botocore>=1.27.53
mypy_boto3_s3>=1.24.94
mypy_boto3_secretsmanager>=1.26.49
prefect>=2.16.4
tenacity>=8.0.0
pyparsing>=3.1.1
tenacity>=8.0.0
6 changes: 5 additions & 1 deletion tests/mock_aws_credentials
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
[TEST_PROFILE_1]
aws_access_key_id = mock
aws_secret_access_key = mock
aws_region = us-east-1
aws_default_region = us-east-1

[TEST_PROFILE_2]
aws_access_key_id = mock
aws_secret_access_key = mock
aws_secret_access_key = mock
aws_region = us-east-1
aws_default_region = us-east-1
154 changes: 154 additions & 0 deletions tests/test_glue_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
from unittest.mock import MagicMock

import pytest
from moto import mock_glue

from prefect_aws.glue_job import GlueJobBlock, GlueJobRun


@pytest.fixture(scope="function")
def glue_job_client(aws_credentials):
with mock_glue():
boto_session = aws_credentials.get_boto3_session()
yield boto_session.client("glue", region_name="us-east-1")


async def test_fetch_result(aws_credentials, glue_job_client):
glue_job_client.create_job(
Name="test_job_name", Role="test-role", Command={}, DefaultArguments={}
)
job_run_id = glue_job_client.start_job_run(
JobName="test_job_name",
Arguments={},
)["JobRunId"]
glue_job_run = GlueJobRun(
job_name="test_job_name", job_id=job_run_id, client=glue_job_client
)
result = await glue_job_run.fetch_result()
assert result == "SUCCEEDED"


def test_wait_for_completion(aws_credentials, glue_job_client):
with mock_glue():
glue_job_client.create_job(
Name="test_job_name", Role="test-role", Command={}, DefaultArguments={}
)
job_run_id = glue_job_client.start_job_run(
JobName="test_job_name",
Arguments={},
)["JobRunId"]

glue_job_run = GlueJobRun(
job_name="test_job_name",
job_id=job_run_id,
job_watch_poll_interval=0.1,
client=glue_job_client,
)

glue_job_client.get_job_run = MagicMock(
side_effect=[
{
"JobRun": {
"JobName": "test_job_name",
"JobRunState": "RUNNING",
}
},
{
"JobRun": {
"JobName": "test_job_name",
"JobRunState": "SUCCEEDED",
}
},
]
)
glue_job_run.wait_for_completion()


def test_wait_for_completion_fail(aws_credentials, glue_job_client):
with mock_glue():
glue_job_client.create_job(
Name="test_job_name", Role="test-role", Command={}, DefaultArguments={}
)
job_run_id = glue_job_client.start_job_run(
JobName="test_job_name",
Arguments={},
)["JobRunId"]
glue_job_client.get_job_run = MagicMock(
side_effect=[
{
"JobRun": {
"JobName": "test_job_name",
"JobRunState": "FAILED",
"ErrorMessage": "err",
}
},
]
)

glue_job_run = GlueJobRun(
job_name="test_job_name", job_id=job_run_id, client=glue_job_client
)
with pytest.raises(RuntimeError):
glue_job_run.wait_for_completion()


def test__get_job_run(aws_credentials, glue_job_client):
with mock_glue():
glue_job_client.create_job(
Name="test_job_name", Role="test-role", Command={}, DefaultArguments={}
)
job_run_id = glue_job_client.start_job_run(
JobName="test_job_name",
Arguments={},
)["JobRunId"]

glue_job_run = GlueJobRun(
job_name="test_job_name", job_id=job_run_id, client=glue_job_client
)
response = glue_job_run._get_job_run()
assert response["JobRun"]["JobRunState"] == "SUCCEEDED"


async def test_trigger(aws_credentials, glue_job_client):
glue_job_client.create_job(
Name="test_job_name", Role="test-role", Command={}, DefaultArguments={}
)
glue_job = GlueJobBlock(
job_name="test_job_name",
arguments={"arg1": "value1"},
aws_credential=aws_credentials,
)
glue_job._get_client = MagicMock(side_effect=[glue_job_client])
glue_job._start_job = MagicMock(side_effect=["test_job_id"])
glue_job_run = await glue_job.trigger()
assert isinstance(glue_job_run, GlueJobRun)


def test_start_job(aws_credentials, glue_job_client):
with mock_glue():
glue_job_client.create_job(
Name="test_job_name", Role="test-role", Command={}, DefaultArguments={}
)
glue_job = GlueJobBlock(job_name="test_job_name", arguments={"arg1": "value1"})

glue_job_client.start_job_run = MagicMock(
side_effect=[{"JobRunId": "test_job_run_id"}]
)
job_run_id = glue_job._start_job(glue_job_client)
assert job_run_id == "test_job_run_id"


def test_start_job_fail_because_not_exist_job(aws_credentials, glue_job_client):
with mock_glue():
glue_job = GlueJobBlock(job_name="test_job_name", arguments={"arg1": "value1"})
with pytest.raises(RuntimeError):
glue_job._start_job(glue_job_client)


def test_get_client(aws_credentials):
with mock_glue():
glue_job_run = GlueJobBlock(
job_name="test_job_name", aws_credentials=aws_credentials
)
client = glue_job_run._get_client()
assert hasattr(client, "get_job_run")

0 comments on commit 3bb6261

Please sign in to comment.