Skip to content

Commit

Permalink
fix(py-sdk): DataJobPatchBuilder handling timestamps, output edges (d…
Browse files Browse the repository at this point in the history
…atahub-project#12067)

Co-authored-by: Harshal Sheth <[email protected]>
  • Loading branch information
shirshanka and hsheth2 authored Dec 10, 2024
1 parent 8a1c180 commit e4ea993
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 84 deletions.
9 changes: 8 additions & 1 deletion docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,14 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- #11619 - schema field/column paths can no longer be duplicated within the schema
- #11570 - The `DatahubClientConfig`'s server field no longer defaults to `http://localhost:8080`. Be sure to explicitly set this.
- #11570 - If a `datahub_api` is explicitly passed to a stateful ingestion config provider, it will be used. We previously ignored it if the pipeline context also had a graph object.
- #11518 - DataHub Garbage Collection: Various entities that are soft-deleted (after 10d) or are timeseries *entities* (dataprocess, execution requests) will be removed automatically using logic in the `datahub-gc` ingestion source.
- #11518 - DataHub Garbage Collection: Various entities that are soft-deleted
(after 10d) or are timeseries *entities* (dataprocess, execution requests)
will be removed automatically using logic in the `datahub-gc` ingestion
source.
- #12067 - Default behavior of DataJobPatchBuilder in Python sdk has been
changed to NOT fill out `created` and `lastModified` auditstamps by default
for input and output dataset edges. This should not have any user-observable
impact (time-based lineage viz will still continue working based on observed time), but could break assumptions previously being made by clients.

### Potential Downtime

Expand Down
14 changes: 4 additions & 10 deletions metadata-ingestion/src/datahub/specific/datajob.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def add_input_datajob(self, input: Union[Edge, Urn, str]) -> "DataJobPatchBuilde
Notes:
If `input` is an Edge object, it is used directly. If `input` is a Urn object or string,
it is converted to an Edge object and added with default audit stamps.
it is converted to an Edge object and added without any audit stamps.
"""
if isinstance(input, Edge):
input_urn: str = input.destinationUrn
Expand All @@ -114,8 +114,6 @@ def add_input_datajob(self, input: Union[Edge, Urn, str]) -> "DataJobPatchBuilde

input_edge = Edge(
destinationUrn=input_urn,
created=self._mint_auditstamp(),
lastModified=self._mint_auditstamp(),
)

self._ensure_urn_type("dataJob", [input_edge], "add_input_datajob")
Expand Down Expand Up @@ -185,7 +183,7 @@ def add_input_dataset(self, input: Union[Edge, Urn, str]) -> "DataJobPatchBuilde
Notes:
If `input` is an Edge object, it is used directly. If `input` is a Urn object or string,
it is converted to an Edge object and added with default audit stamps.
it is converted to an Edge object and added without any audit stamps.
"""
if isinstance(input, Edge):
input_urn: str = input.destinationUrn
Expand All @@ -197,8 +195,6 @@ def add_input_dataset(self, input: Union[Edge, Urn, str]) -> "DataJobPatchBuilde

input_edge = Edge(
destinationUrn=input_urn,
created=self._mint_auditstamp(),
lastModified=self._mint_auditstamp(),
)

self._ensure_urn_type("dataset", [input_edge], "add_input_dataset")
Expand Down Expand Up @@ -270,7 +266,7 @@ def add_output_dataset(
Notes:
If `output` is an Edge object, it is used directly. If `output` is a Urn object or string,
it is converted to an Edge object and added with default audit stamps.
it is converted to an Edge object and added without any audit stamps.
"""
if isinstance(output, Edge):
output_urn: str = output.destinationUrn
Expand All @@ -282,15 +278,13 @@ def add_output_dataset(

output_edge = Edge(
destinationUrn=output_urn,
created=self._mint_auditstamp(),
lastModified=self._mint_auditstamp(),
)

self._ensure_urn_type("dataset", [output_edge], "add_output_dataset")
self._add_patch(
DataJobInputOutput.ASPECT_NAME,
"add",
path=f"/outputDatasetEdges/{self.quote(str(output))}",
path=f"/outputDatasetEdges/{self.quote(output_urn)}",
value=output_edge,
)
return self
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,7 @@
"op": "add",
"path": "/inputDatajobEdges/urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),91d59f03-1c2b-3f3f-48bc-f89296a328bd)",
"value": {
"destinationUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),91d59f03-1c2b-3f3f-48bc-f89296a328bd)",
"created": {
"time": 1638532800000,
"actor": "urn:li:corpuser:datahub"
},
"lastModified": {
"time": 1638532800000,
"actor": "urn:li:corpuser:datahub"
}
"destinationUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),91d59f03-1c2b-3f3f-48bc-f89296a328bd)"
}
}
]
Expand Down Expand Up @@ -178,30 +170,14 @@
"op": "add",
"path": "/inputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)",
"value": {
"destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)",
"created": {
"time": 1638532800000,
"actor": "urn:li:corpuser:datahub"
},
"lastModified": {
"time": 1638532800000,
"actor": "urn:li:corpuser:datahub"
}
"destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)"
}
},
{
"op": "add",
"path": "/inputDatajobEdges/urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),cb7693ed-f93b-3340-3776-fe80e6283ddc)",
"value": {
"destinationUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),cb7693ed-f93b-3340-3776-fe80e6283ddc)",
"created": {
"time": 1638532800000,
"actor": "urn:li:corpuser:datahub"
},
"lastModified": {
"time": 1638532800000,
"actor": "urn:li:corpuser:datahub"
}
"destinationUrn": "urn:li:dataJob:(urn:li:dataFlow:(nifi,803ebb92-017d-1000-2961-4bdaa27a3ba0,PROD),cb7693ed-f93b-3340-3776-fe80e6283ddc)"
}
}
]
Expand Down Expand Up @@ -287,15 +263,7 @@
"op": "add",
"path": "/inputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)",
"value": {
"destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)",
"created": {
"time": 1638532800000,
"actor": "urn:li:corpuser:datahub"
},
"lastModified": {
"time": 1638532800000,
"actor": "urn:li:corpuser:datahub"
}
"destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,enriched-topical-chat,PROD)"
}
}
]
Expand Down
117 changes: 80 additions & 37 deletions metadata-ingestion/tests/unit/patch/test_patch_builder.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import pathlib
from typing import Any, Dict, Union

import pytest
from freezegun.api import freeze_time
Expand All @@ -15,7 +16,9 @@
)
from datahub.ingestion.sink.file import write_metadata_file
from datahub.metadata.schema_classes import (
AuditStampClass,
DatasetLineageTypeClass,
EdgeClass,
FineGrainedLineageClass,
FineGrainedLineageDownstreamTypeClass,
FineGrainedLineageUpstreamTypeClass,
Expand Down Expand Up @@ -182,8 +185,66 @@ def test_basic_dashboard_patch_builder():
]


@pytest.mark.parametrize(
"created_on,last_modified,expected_actor",
[
(1586847600000, 1586847600000, "urn:li:corpuser:datahub"),
(None, None, "urn:li:corpuser:datahub"),
(1586847600000, None, "urn:li:corpuser:datahub"),
(None, 1586847600000, "urn:li:corpuser:datahub"),
],
ids=["both_timestamps", "no_timestamps", "only_created", "only_modified"],
)
@freeze_time("2020-04-14 07:00:00")
def test_datajob_patch_builder():
def test_datajob_patch_builder(created_on, last_modified, expected_actor):
def make_edge_or_urn(urn: str) -> Union[EdgeClass, str]:
if created_on or last_modified:
return EdgeClass(
destinationUrn=str(urn),
created=(
AuditStampClass(
time=created_on,
actor=expected_actor,
)
if created_on
else None
),
lastModified=(
AuditStampClass(
time=last_modified,
actor=expected_actor,
)
if last_modified
else None
),
)
return urn

def get_edge_expectation(urn: str) -> Dict[str, Any]:
if created_on or last_modified:
expected = {
"destinationUrn": str(urn),
"created": (
AuditStampClass(
time=created_on,
actor=expected_actor,
).to_obj()
if created_on
else None
),
"lastModified": (
AuditStampClass(
time=last_modified,
actor=expected_actor,
).to_obj()
if last_modified
else None
),
}
# filter out None values
return {k: v for k, v in expected.items() if v is not None}
return {"destinationUrn": str(urn)}

flow_urn = make_data_flow_urn(
orchestrator="nifi", flow_id="252C34e5af19-0192-1000-b248-b1abee565b5d"
)
Expand All @@ -193,13 +254,19 @@ def test_datajob_patch_builder():
patcher = DataJobPatchBuilder(job_urn)

patcher.add_output_dataset(
"urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder1,DEV)"
make_edge_or_urn(
"urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder1,DEV)"
)
)
patcher.add_output_dataset(
"urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder3,DEV)"
make_edge_or_urn(
"urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder3,DEV)"
)
)
patcher.add_output_dataset(
"urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder2,DEV)"
make_edge_or_urn(
"urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder2,DEV)"
)
)

assert patcher.build() == [
Expand All @@ -214,47 +281,23 @@ def test_datajob_patch_builder():
{
"op": "add",
"path": "/outputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket~1folder1,DEV)",
"value": {
"destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder1,DEV)",
"created": {
"time": 1586847600000,
"actor": "urn:li:corpuser:datahub",
},
"lastModified": {
"time": 1586847600000,
"actor": "urn:li:corpuser:datahub",
},
},
"value": get_edge_expectation(
"urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder1,DEV)"
),
},
{
"op": "add",
"path": "/outputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket~1folder3,DEV)",
"value": {
"destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder3,DEV)",
"created": {
"time": 1586847600000,
"actor": "urn:li:corpuser:datahub",
},
"lastModified": {
"time": 1586847600000,
"actor": "urn:li:corpuser:datahub",
},
},
"value": get_edge_expectation(
"urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder3,DEV)"
),
},
{
"op": "add",
"path": "/outputDatasetEdges/urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket~1folder2,DEV)",
"value": {
"destinationUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder2,DEV)",
"created": {
"time": 1586847600000,
"actor": "urn:li:corpuser:datahub",
},
"lastModified": {
"time": 1586847600000,
"actor": "urn:li:corpuser:datahub",
},
},
"value": get_edge_expectation(
"urn:li:dataset:(urn:li:dataPlatform:s3,output-bucket/folder2,DEV)"
),
},
]
).encode("utf-8"),
Expand Down
Loading

0 comments on commit e4ea993

Please sign in to comment.