From b3cd2ca3331c5726c869d05f9d4f63145afcec27 Mon Sep 17 00:00:00 2001 From: "bdodla@expedia.com" <13788369+EXPEbdodla@users.noreply.github.com> Date: Mon, 29 Jul 2024 10:49:03 -0700 Subject: [PATCH] fix: Fix compatibility issues between Pydantic V1 and V2 with timedelta (#121) * fix: Fix compatibility issues between Pydantic V1 and V2 with timedelta --------- Co-authored-by: Bhargav Dodla --- .../pydantic_models/data_source_model.py | 25 +++- .../pydantic_models/feature_view_model.py | 64 ++++++--- .../feast/infra/registry/caching_registry.py | 2 +- sdk/python/feast/infra/registry/http.py | 132 ++++++++++-------- .../infra/registry/proto_registry_utils.py | 11 +- sdk/python/tests/unit/test_pydantic_models.py | 5 +- 6 files changed, 158 insertions(+), 81 deletions(-) diff --git a/sdk/python/feast/expediagroup/pydantic_models/data_source_model.py b/sdk/python/feast/expediagroup/pydantic_models/data_source_model.py index eadc4f1977..5aa65e9524 100644 --- a/sdk/python/feast/expediagroup/pydantic_models/data_source_model.py +++ b/sdk/python/feast/expediagroup/pydantic_models/data_source_model.py @@ -9,7 +9,7 @@ from datetime import timedelta from typing import Dict, List, Literal, Optional, Union -from pydantic import BaseModel, ConfigDict +from pydantic import BaseModel, ConfigDict, field_serializer, field_validator from pydantic import Field as PydanticField from typing_extensions import Annotated, Self @@ -268,6 +268,29 @@ class KafkaSourceModel(DataSourceModel): batch_source: Optional[AnyBatchDataSource] = None watermark_delay_threshold: Optional[timedelta] = None + # To make it compatible with Pydantic V1, we need this field_serializer + @field_serializer("watermark_delay_threshold") + def serialize_ttl(self, ttl: timedelta): + return timedelta.total_seconds(ttl) if ttl else None + + # To make it compatible with Pydantic V1, we need this field_validator + @field_validator("watermark_delay_threshold", mode="before") + @classmethod + def validate_ttl(cls, v: Optional[Union[int, float, str, timedelta]]): + try: + if isinstance(v, timedelta): + return v + elif isinstance(v, float): + return timedelta(seconds=v) + elif isinstance(v, str): + return timedelta(seconds=float(v)) + elif isinstance(v, int): + return timedelta(seconds=v) + else: + return timedelta(seconds=0) + except ValueError: + raise ValueError("ttl must be one of the int, float, str, timedelta types") + def to_data_source(self) -> KafkaSource: """ Given a Pydantic KafkaSourceModel, create and return a KafkaSource. diff --git a/sdk/python/feast/expediagroup/pydantic_models/feature_view_model.py b/sdk/python/feast/expediagroup/pydantic_models/feature_view_model.py index 591bac2ef8..1735f18e7e 100644 --- a/sdk/python/feast/expediagroup/pydantic_models/feature_view_model.py +++ b/sdk/python/feast/expediagroup/pydantic_models/feature_view_model.py @@ -10,7 +10,7 @@ from typing import Dict, List, Optional, Tuple, Union import dill -from pydantic import BaseModel +from pydantic import BaseModel, field_serializer, field_validator from typing_extensions import Self from feast.expediagroup.pydantic_models.data_source_model import ( @@ -77,6 +77,29 @@ class FeatureViewModel(BaseFeatureViewModel): created_timestamp: Optional[datetime] last_updated_timestamp: Optional[datetime] + # To make it compatible with Pydantic V1, we need this field_serializer + @field_serializer("ttl") + def serialize_ttl(self, ttl: timedelta): + return timedelta.total_seconds(ttl) if ttl else None + + # To make it compatible with Pydantic V1, we need this field_validator + @field_validator("ttl", mode="before") + @classmethod + def validate_ttl(cls, v: Optional[Union[int, float, str, timedelta]]): + try: + if isinstance(v, timedelta): + return v + elif isinstance(v, float): + return timedelta(seconds=v) + elif isinstance(v, str): + return timedelta(seconds=float(v)) + elif isinstance(v, int): + return timedelta(seconds=v) + else: + return timedelta(seconds=0) # Default value + except ValueError: + raise ValueError("ttl must be one of the int, float, str, timedelta types") + def to_feature_view(self) -> FeatureView: """ Given a Pydantic FeatureViewModel, create and return a FeatureView. @@ -300,11 +323,13 @@ class OnDemandFeatureViewModel(BaseFeatureViewModel): source_request_sources: Dict[str, RequestSourceModel] udf: str = "" udf_string: str = "" - feature_transformation: Union[ - PandasTransformationModel, - PythonTransformationModel, - SubstraitTransformationModel, - ] + feature_transformation: Optional[ + Union[ + PandasTransformationModel, + PythonTransformationModel, + SubstraitTransformationModel, + ] + ] = None mode: str = "pandas" description: str = "" tags: Optional[dict[str, str]] = None @@ -328,17 +353,22 @@ def to_feature_view(self) -> OnDemandFeatureView: feature_view_projection.to_feature_view_projection() # type: ignore ) - if self.mode == "pandas": - feature_transformation = ( - self.feature_transformation.to_pandas_transformation() # type: ignore - ) - elif self.mode == "python": - feature_transformation = ( - self.feature_transformation.to_python_transformation() # type: ignore - ) - elif self.mode == "substrait": - feature_transformation = ( - self.feature_transformation.to_substrait_transformation() # type: ignore + if self.feature_transformation is not None: + if self.mode == "pandas": + feature_transformation = ( + self.feature_transformation.to_pandas_transformation() # type: ignore + ) + elif self.mode == "python": + feature_transformation = ( + self.feature_transformation.to_python_transformation() # type: ignore + ) + elif self.mode == "substrait": + feature_transformation = ( + self.feature_transformation.to_substrait_transformation() # type: ignore + ) + else: + feature_transformation = PandasTransformation( + udf=dill.loads(bytes.fromhex(self.udf)), udf_string=self.udf_string ) odfv = OnDemandFeatureView( diff --git a/sdk/python/feast/infra/registry/caching_registry.py b/sdk/python/feast/infra/registry/caching_registry.py index 298639028d..75f2f38f76 100644 --- a/sdk/python/feast/infra/registry/caching_registry.py +++ b/sdk/python/feast/infra/registry/caching_registry.py @@ -351,7 +351,7 @@ def _start_thread_async_refresh(self, cache_ttl_seconds): self.registry_refresh_thread = threading.Timer( cache_ttl_seconds, self._start_thread_async_refresh, [cache_ttl_seconds] ) - self.registry_refresh_thread.setDaemon(True) + self.registry_refresh_thread.daemon = True self.registry_refresh_thread.start() def _exit_handler(self): diff --git a/sdk/python/feast/infra/registry/http.py b/sdk/python/feast/infra/registry/http.py index c994f549ce..4009e59dfe 100644 --- a/sdk/python/feast/infra/registry/http.py +++ b/sdk/python/feast/infra/registry/http.py @@ -151,18 +151,18 @@ def apply_project( # type: ignore[return] url = f"{self.base_url}/projects" params = {"project": project, "commit": commit} response_data = self._send_request("PUT", url, params=params) - return ProjectMetadataModel.parse_obj(response_data) + return ProjectMetadataModel.model_validate(response_data) except Exception as exception: self._handle_exception(exception) def apply_entity(self, entity: Entity, project: str, commit: bool = True): try: url = f"{self.base_url}/projects/{project}/entities" - data = EntityModel.from_entity(entity).json() + data = EntityModel.from_entity(entity).model_dump_json() params = {"commit": commit} response_data = self._send_request("PUT", url, params=params, data=data) - return EntityModel.parse_obj(response_data).to_entity() + return EntityModel.model_validate(response_data).to_entity() except Exception as exception: self._handle_exception(exception) @@ -193,9 +193,8 @@ def get_entity( # type: ignore[return] ) try: url = f"{self.base_url}/projects/{project}/entities/{name}" - params = {"allow_cache": True} - response_data = self._send_request("GET", url, params=params) - return EntityModel.parse_obj(response_data).to_entity() + response_data = self._send_request("GET", url) + return EntityModel.model_validate(response_data).to_entity() except EntityNotFoundException as exception: logger.error( f"Entity {name} requested does not exist: {str(exception)}", @@ -217,11 +216,11 @@ def list_entities( # type: ignore[return] ) try: url = f"{self.base_url}/projects/{project}/entities" - params = {"allow_cache": True} - response_data = self._send_request("GET", url, params=params) + response_data = self._send_request("GET", url) response_list = response_data if isinstance(response_data, list) else [] return [ - EntityModel.parse_obj(entity).to_entity() for entity in response_list + EntityModel.model_validate(entity).to_entity() + for entity in response_list ] except Exception as exception: self._handle_exception(exception) @@ -233,21 +232,23 @@ def apply_data_source( url = f"{self.base_url}/projects/{project}/data_sources" params = {"commit": commit} if isinstance(data_source, SparkSource): - data = SparkSourceModel.from_data_source(data_source).json() + data = SparkSourceModel.from_data_source(data_source).model_dump_json() response_data = self._send_request("PUT", url, params=params, data=data) - return SparkSourceModel.parse_obj(response_data).to_data_source() + return SparkSourceModel.model_validate(response_data).to_data_source() elif isinstance(data_source, RequestSource): - data = RequestSourceModel.from_data_source(data_source).json() + data = RequestSourceModel.from_data_source( + data_source + ).model_dump_json() response_data = self._send_request("PUT", url, params=params, data=data) - return RequestSourceModel.parse_obj(response_data).to_data_source() + return RequestSourceModel.model_validate(response_data).to_data_source() elif isinstance(data_source, PushSource): - data = PushSourceModel.from_data_source(data_source).json() + data = PushSourceModel.from_data_source(data_source).model_dump_json() response_data = self._send_request("PUT", url, params=params, data=data) - return PushSourceModel.parse_obj(response_data).to_data_source() + return PushSourceModel.model_validate(response_data).to_data_source() elif isinstance(data_source, KafkaSource): - data = KafkaSourceModel.from_data_source(data_source).json() + data = KafkaSourceModel.from_data_source(data_source).model_dump_json() response_data = self._send_request("PUT", url, params=params, data=data) - return KafkaSourceModel.parse_obj(response_data).to_data_source() + return KafkaSourceModel.model_validate(response_data).to_data_source() else: raise TypeError( "Unsupported DataSource type. Please use either SparkSource, RequestSource, PushSource or KafkaSource only" @@ -282,15 +283,20 @@ def get_data_source( # type: ignore[return] ) try: url = f"{self.base_url}/projects/{project}/data_sources/{name}" - params = {"allow_cache": True} - response_data = self._send_request("GET", url, params=params) + response_data = self._send_request("GET", url) if "model_type" in response_data: if response_data["model_type"] == "RequestSourceModel": - return RequestSourceModel.parse_obj(response_data).to_data_source() + return RequestSourceModel.model_validate( + response_data + ).to_data_source() elif response_data["model_type"] == "SparkSourceModel": - return SparkSourceModel.parse_obj(response_data).to_data_source() + return SparkSourceModel.model_validate( + response_data + ).to_data_source() elif response_data["model_type"] == "KafkaSourceModel": - return KafkaSourceModel.parse_obj(response_data).to_data_source() + return KafkaSourceModel.model_validate( + response_data + ).to_data_source() logger.error(f"Unable to parse object with response: {response_data}") raise ValueError("Unable to parse object") @@ -315,23 +321,28 @@ def list_data_sources( # type: ignore[return] ) try: url = f"{self.base_url}/projects/{project}/data_sources" - params = {"allow_cache": True} - response_data = self._send_request("GET", url, params=params) + response_data = self._send_request("GET", url) response_list = response_data if isinstance(response_data, list) else [] data_source_list: List[DataSource] = [] for data_source in response_list: if "model_type" in data_source: if data_source["model_type"] == "RequestSourceModel": data_source_list.append( - RequestSourceModel.parse_obj(data_source).to_data_source() + RequestSourceModel.model_validate( + data_source + ).to_data_source() ) elif data_source["model_type"] == "SparkSourceModel": data_source_list.append( - SparkSourceModel.parse_obj(data_source).to_data_source() + SparkSourceModel.model_validate( + data_source + ).to_data_source() ) elif data_source["model_type"] == "KafkaSourceModel": data_source_list.append( - KafkaSourceModel.parse_obj(data_source).to_data_source() + KafkaSourceModel.model_validate( + data_source + ).to_data_source() ) else: logger.error( @@ -350,10 +361,14 @@ def apply_feature_service( ): try: url = f"{self.base_url}/projects/{project}/feature_services" - data = FeatureServiceModel.from_feature_service(feature_service).json() + data = FeatureServiceModel.from_feature_service( + feature_service + ).model_dump_json() params = {"commit": commit} response_data = self._send_request("PUT", url, params=params, data=data) - return FeatureServiceModel.parse_obj(response_data).to_feature_service() + return FeatureServiceModel.model_validate( + response_data + ).to_feature_service() except Exception as exception: self._handle_exception(exception) @@ -385,9 +400,10 @@ def get_feature_service( # type: ignore[return] ) try: url = f"{self.base_url}/projects/{project}/feature_services/{name}" - params = {"allow_cache": True} - response_data = self._send_request("GET", url, params=params) - return FeatureServiceModel.parse_obj(response_data).to_feature_service() + response_data = self._send_request("GET", url) + return FeatureServiceModel.model_validate( + response_data + ).to_feature_service() except FeatureServiceNotFoundException as exception: logger.error( f"FeatureService {name} requested does not exist: %s", str(exception) @@ -409,11 +425,10 @@ def list_feature_services( # type: ignore[return] ) try: url = f"{self.base_url}/projects/{project}/feature_services" - params = {"allow_cache": True} - response_data = self._send_request("GET", url, params=params) + response_data = self._send_request("GET", url) response_list = response_data if isinstance(response_data, list) else [] return [ - FeatureServiceModel.parse_obj(feature_service).to_feature_service() + FeatureServiceModel.model_validate(feature_service).to_feature_service() for feature_service in response_list ] except Exception as exception: @@ -426,14 +441,18 @@ def apply_feature_view( params = {"commit": commit} if isinstance(feature_view, FeatureView): url = f"{self.base_url}/projects/{project}/feature_views" - data = FeatureViewModel.from_feature_view(feature_view).json() + data = FeatureViewModel.from_feature_view( + feature_view + ).model_dump_json() response_data = self._send_request("PUT", url, params=params, data=data) - return FeatureViewModel.parse_obj(response_data).to_feature_view() + return FeatureViewModel.model_validate(response_data).to_feature_view() elif isinstance(feature_view, OnDemandFeatureView): url = f"{self.base_url}/projects/{project}/on_demand_feature_views" - data = OnDemandFeatureViewModel.from_feature_view(feature_view).json() + data = OnDemandFeatureViewModel.from_feature_view( + feature_view + ).model_dump_json() response_data = self._send_request("PUT", url, params=params, data=data) - return OnDemandFeatureViewModel.parse_obj( + return OnDemandFeatureViewModel.model_validate( response_data ).to_feature_view() else: @@ -471,9 +490,8 @@ def get_feature_view( # type: ignore[return] ) try: url = f"{self.base_url}/projects/{project}/feature_views/{name}" - params = {"allow_cache": True} - response_data = self._send_request("GET", url, params=params) - return FeatureViewModel.parse_obj(response_data).to_feature_view() + response_data = self._send_request("GET", url) + return FeatureViewModel.model_validate(response_data).to_feature_view() except FeatureViewNotFoundException as exception: logger.error( f"FeatureView {name} requested does not exist: %s", str(exception) @@ -495,11 +513,10 @@ def list_feature_views( # type: ignore[return] ) try: url = f"{self.base_url}/projects/{project}/feature_views" - params = {"allow_cache": True} - response_data = self._send_request("GET", url, params=params) + response_data = self._send_request("GET", url) response_list = response_data if isinstance(response_data, list) else [] return [ - FeatureViewModel.parse_obj(feature_view).to_feature_view() + FeatureViewModel.model_validate(feature_view).to_feature_view() for feature_view in response_list ] except Exception as exception: @@ -515,9 +532,10 @@ def get_on_demand_feature_view( # type: ignore[return] ) try: url = f"{self.base_url}/projects/{project}/on_demand_feature_views/{name}" - params = {"allow_cache": True} - response_data = self._send_request("GET", url, params=params) - return OnDemandFeatureViewModel.parse_obj(response_data).to_feature_view() + response_data = self._send_request("GET", url) + return OnDemandFeatureViewModel.model_validate( + response_data + ).to_feature_view() except FeatureViewNotFoundException as exception: logger.error( f"FeatureView {name} requested does not exist: %s", str(exception) @@ -539,11 +557,10 @@ def list_on_demand_feature_views( # type: ignore[return] ) try: url = f"{self.base_url}/projects/{project}/on_demand_feature_views" - params = {"allow_cache": True} - response_data = self._send_request("GET", url, params=params) + response_data = self._send_request("GET", url) response_list = response_data if isinstance(response_data, list) else [] return [ - OnDemandFeatureViewModel.parse_obj(feature_view).to_feature_view() + OnDemandFeatureViewModel.model_validate(feature_view).to_feature_view() for feature_view in response_list ] except Exception as exception: @@ -579,9 +596,11 @@ def apply_materialization( feature_view.materialization_intervals.append((start_date, end_date)) params = {"commit": commit} url = f"{self.base_url}/projects/{project}/feature_views" - data = FeatureViewModel.from_feature_view(feature_view).json() + data = FeatureViewModel.from_feature_view( + feature_view + ).model_dump_json() response_data = self._send_request("PUT", url, params=params, data=data) - return FeatureViewModel.parse_obj(response_data).to_feature_view() + return FeatureViewModel.model_validate(response_data).to_feature_view() else: raise TypeError( "Unsupported FeatureView type. Please use either FeatureView or OnDemandFeatureView only" @@ -787,9 +806,10 @@ def list_project_metadata( # type: ignore[return] ) try: url = f"{self.base_url}/projects/{project}" - params = {"allow_cache": True} - response_data = self._send_request("GET", url, params=params) - return [ProjectMetadataModel.parse_obj(response_data).to_project_metadata()] + response_data = self._send_request("GET", url) + return [ + ProjectMetadataModel.model_validate(response_data).to_project_metadata() + ] except ProjectMetadataNotFoundException as exception: logger.error( f"Project {project} requested does not exist: {str(exception)}" diff --git a/sdk/python/feast/infra/registry/proto_registry_utils.py b/sdk/python/feast/infra/registry/proto_registry_utils.py index 0e85f5b0a9..d6c058f031 100644 --- a/sdk/python/feast/infra/registry/proto_registry_utils.py +++ b/sdk/python/feast/infra/registry/proto_registry_utils.py @@ -68,10 +68,13 @@ def wrapper( def init_project_metadata(cached_registry_proto: RegistryProto, project: str): - new_project_uuid = f"{uuid.uuid4()}" - cached_registry_proto.project_metadata.append( - ProjectMetadata(project_name=project, project_uuid=new_project_uuid).to_proto() - ) + if project is not None: + new_project_uuid = f"{uuid.uuid4()}" + cached_registry_proto.project_metadata.append( + ProjectMetadata( + project_name=project, project_uuid=new_project_uuid + ).to_proto() + ) def get_project_metadata( diff --git a/sdk/python/tests/unit/test_pydantic_models.py b/sdk/python/tests/unit/test_pydantic_models.py index 201f6be4e5..ab3fc59d38 100644 --- a/sdk/python/tests/unit/test_pydantic_models.py +++ b/sdk/python/tests/unit/test_pydantic_models.py @@ -369,6 +369,7 @@ def test_idempotent_featureview_conversion(): Field(name="feature2", dtype=Float32), ], source=spark_source, + ttl=timedelta(days=10), ) python_obj.materialization_intervals = [ (datetime.now() - timedelta(days=10), datetime.now() - timedelta(days=9)), @@ -416,7 +417,6 @@ def test_idempotent_featureview_with_streaming_source_conversion(): Field(name="feature2", dtype=Float32), ], source=kafka_source, - ttl=timedelta(days=0), ) feature_view_model = FeatureViewModel.from_feature_view(feature_view) feature_view_b = feature_view_model.to_feature_view() @@ -446,6 +446,7 @@ def test_idempotent_featureview_with_streaming_source_conversion(): Field(name="feature2", dtype=Float32), ], source=spark_source, + ttl=timedelta(days=0), ) python_obj.materialization_intervals = [ (datetime.now() - timedelta(days=10), datetime.now() - timedelta(days=9)), @@ -495,7 +496,6 @@ def test_idempotent_featureview_with_confluent_streaming_source_conversion(): Field(name="feature2", dtype=Float32), ], source=kafka_source, - ttl=timedelta(days=0), ) feature_view_model = FeatureViewModel.from_feature_view(feature_view) feature_view_b = feature_view_model.to_feature_view() @@ -796,6 +796,7 @@ def test_idempotent_feature_service_conversion(): Field(name="feature2", dtype=Float32), ], source=spark_source_2, + ttl=timedelta(days=10), ) python_obj = FeatureService(