Skip to content

Commit

Permalink
fix: Fix compatibility issues between Pydantic V1 and V2 with timedel…
Browse files Browse the repository at this point in the history
…ta (#121)

* fix: Fix compatibility issues between Pydantic V1 and V2 with timedelta

---------

Co-authored-by: Bhargav Dodla <[email protected]>
  • Loading branch information
EXPEbdodla and Bhargav Dodla authored Jul 29, 2024
1 parent 28a0996 commit b3cd2ca
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from datetime import timedelta
from typing import Dict, List, Literal, Optional, Union

from pydantic import BaseModel, ConfigDict
from pydantic import BaseModel, ConfigDict, field_serializer, field_validator
from pydantic import Field as PydanticField
from typing_extensions import Annotated, Self

Expand Down Expand Up @@ -268,6 +268,29 @@ class KafkaSourceModel(DataSourceModel):
batch_source: Optional[AnyBatchDataSource] = None
watermark_delay_threshold: Optional[timedelta] = None

# To make it compatible with Pydantic V1, we need this field_serializer
@field_serializer("watermark_delay_threshold")
def serialize_ttl(self, ttl: timedelta):
return timedelta.total_seconds(ttl) if ttl else None

# To make it compatible with Pydantic V1, we need this field_validator
@field_validator("watermark_delay_threshold", mode="before")
@classmethod
def validate_ttl(cls, v: Optional[Union[int, float, str, timedelta]]):
try:
if isinstance(v, timedelta):
return v
elif isinstance(v, float):
return timedelta(seconds=v)
elif isinstance(v, str):
return timedelta(seconds=float(v))
elif isinstance(v, int):
return timedelta(seconds=v)
else:
return timedelta(seconds=0)
except ValueError:
raise ValueError("ttl must be one of the int, float, str, timedelta types")

def to_data_source(self) -> KafkaSource:
"""
Given a Pydantic KafkaSourceModel, create and return a KafkaSource.
Expand Down
64 changes: 47 additions & 17 deletions sdk/python/feast/expediagroup/pydantic_models/feature_view_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from typing import Dict, List, Optional, Tuple, Union

import dill
from pydantic import BaseModel
from pydantic import BaseModel, field_serializer, field_validator
from typing_extensions import Self

from feast.expediagroup.pydantic_models.data_source_model import (
Expand Down Expand Up @@ -77,6 +77,29 @@ class FeatureViewModel(BaseFeatureViewModel):
created_timestamp: Optional[datetime]
last_updated_timestamp: Optional[datetime]

# To make it compatible with Pydantic V1, we need this field_serializer
@field_serializer("ttl")
def serialize_ttl(self, ttl: timedelta):
return timedelta.total_seconds(ttl) if ttl else None

# To make it compatible with Pydantic V1, we need this field_validator
@field_validator("ttl", mode="before")
@classmethod
def validate_ttl(cls, v: Optional[Union[int, float, str, timedelta]]):
try:
if isinstance(v, timedelta):
return v
elif isinstance(v, float):
return timedelta(seconds=v)
elif isinstance(v, str):
return timedelta(seconds=float(v))
elif isinstance(v, int):
return timedelta(seconds=v)
else:
return timedelta(seconds=0) # Default value
except ValueError:
raise ValueError("ttl must be one of the int, float, str, timedelta types")

def to_feature_view(self) -> FeatureView:
"""
Given a Pydantic FeatureViewModel, create and return a FeatureView.
Expand Down Expand Up @@ -300,11 +323,13 @@ class OnDemandFeatureViewModel(BaseFeatureViewModel):
source_request_sources: Dict[str, RequestSourceModel]
udf: str = ""
udf_string: str = ""
feature_transformation: Union[
PandasTransformationModel,
PythonTransformationModel,
SubstraitTransformationModel,
]
feature_transformation: Optional[
Union[
PandasTransformationModel,
PythonTransformationModel,
SubstraitTransformationModel,
]
] = None
mode: str = "pandas"
description: str = ""
tags: Optional[dict[str, str]] = None
Expand All @@ -328,17 +353,22 @@ def to_feature_view(self) -> OnDemandFeatureView:
feature_view_projection.to_feature_view_projection() # type: ignore
)

if self.mode == "pandas":
feature_transformation = (
self.feature_transformation.to_pandas_transformation() # type: ignore
)
elif self.mode == "python":
feature_transformation = (
self.feature_transformation.to_python_transformation() # type: ignore
)
elif self.mode == "substrait":
feature_transformation = (
self.feature_transformation.to_substrait_transformation() # type: ignore
if self.feature_transformation is not None:
if self.mode == "pandas":
feature_transformation = (
self.feature_transformation.to_pandas_transformation() # type: ignore
)
elif self.mode == "python":
feature_transformation = (
self.feature_transformation.to_python_transformation() # type: ignore
)
elif self.mode == "substrait":
feature_transformation = (
self.feature_transformation.to_substrait_transformation() # type: ignore
)
else:
feature_transformation = PandasTransformation(
udf=dill.loads(bytes.fromhex(self.udf)), udf_string=self.udf_string
)

odfv = OnDemandFeatureView(
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/infra/registry/caching_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ def _start_thread_async_refresh(self, cache_ttl_seconds):
self.registry_refresh_thread = threading.Timer(
cache_ttl_seconds, self._start_thread_async_refresh, [cache_ttl_seconds]
)
self.registry_refresh_thread.setDaemon(True)
self.registry_refresh_thread.daemon = True
self.registry_refresh_thread.start()

def _exit_handler(self):
Expand Down
Loading

0 comments on commit b3cd2ca

Please sign in to comment.