From 5993c32e3af3a7408ac4ecca55faf5f149e21f6f Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Thu, 18 Jan 2024 14:04:43 -0600 Subject: [PATCH 01/26] Add empty model and format hooks for ConfluentAvro format --- sdk/python/feast/data_format.py | 19 +++++++++++ .../pydantic_models/data_source_model.py | 3 +- .../pydantic_models/stream_format_model.py | 33 ++++++++++++++++++- .../infra/contrib/spark_kafka_processor.py | 26 ++++++++++----- 4 files changed, 71 insertions(+), 10 deletions(-) diff --git a/sdk/python/feast/data_format.py b/sdk/python/feast/data_format.py index 8f3b195e3e..5726fed98b 100644 --- a/sdk/python/feast/data_format.py +++ b/sdk/python/feast/data_format.py @@ -115,6 +115,25 @@ def to_proto(self): return StreamFormatProto(avro_format=proto) +class ConfluentAvroFormat(StreamFormat): + """ + Defines the ConfluentAvro streaming data format that encodes data in ConfluentAvro format + """ + + def __init__(self, schema_json: str): + """ + Construct a new ConfluentAvro data format. + + Args: + schema_json: ConfluentAvro schema definition in JSON + """ + self.schema_json = schema_json + + def to_proto(self): + proto = StreamFormatProto.ConfluentAvroFormat(schema_json=self.schema_json) + return StreamFormatProto(confluent_avro_format=proto) + + class JsonFormat(StreamFormat): """ Defines the Json streaming data format that encodes data in Json format diff --git a/sdk/python/feast/expediagroup/pydantic_models/data_source_model.py b/sdk/python/feast/expediagroup/pydantic_models/data_source_model.py index ba17545593..9af87c07ac 100644 --- a/sdk/python/feast/expediagroup/pydantic_models/data_source_model.py +++ b/sdk/python/feast/expediagroup/pydantic_models/data_source_model.py @@ -17,6 +17,7 @@ from feast.expediagroup.pydantic_models.stream_format_model import ( AnyStreamFormat, AvroFormatModel, + ConfluentAvroFormatModel, JsonFormatModel, ProtoFormatModel, ) @@ -230,7 +231,7 @@ def from_data_source( ) -SUPPORTED_MESSAGE_FORMATS = [AvroFormatModel, JsonFormatModel, ProtoFormatModel] +SUPPORTED_MESSAGE_FORMATS = [AvroFormatModel, ConfluentAvroFormatModel, JsonFormatModel, ProtoFormatModel] SUPPORTED_KAFKA_BATCH_SOURCES = [RequestSourceModel, SparkSourceModel] diff --git a/sdk/python/feast/expediagroup/pydantic_models/stream_format_model.py b/sdk/python/feast/expediagroup/pydantic_models/stream_format_model.py index 53987723d2..643848aecd 100644 --- a/sdk/python/feast/expediagroup/pydantic_models/stream_format_model.py +++ b/sdk/python/feast/expediagroup/pydantic_models/stream_format_model.py @@ -4,7 +4,7 @@ from pydantic import Field as PydanticField from typing_extensions import Annotated, Self -from feast.data_format import AvroFormat, JsonFormat, ProtoFormat +from feast.data_format import AvroFormat, ConfluentAvroFormat, JsonFormat, ProtoFormat class StreamFormatModel(BaseModel): @@ -63,6 +63,37 @@ def from_stream_format( return cls(schoma=avro_format.schema_json) +class ConfluentAvroFormatModel(StreamFormatModel): + """ + Pydantic Model of a Feast ConfluentAvroFormat. + """ + + format: Literal["ConfluentAvroFormatModel"] = "ConfluentAvroFormatModel" + schoma: str + + def to_stream_format(self) -> ConfluentAvroFormat: + """ + Given a Pydantic ConfluentAvroFormatModel, create and return a ConfluentAvroFormat. + + Returns: + A ConfluentAvroFormat. + """ + return ConfluentAvroFormat(schema_json=self.schoma) + + @classmethod + def from_stream_format( + cls, + confluent_avro_format, + ) -> Self: # type: ignore + """ + Converts a ConfluentAvroFormat object to its pydantic model representation. + + Returns: + A ConfluentAvroFormatModel. + """ + return cls(schoma=confluent_avro_format.schema_json) + + class JsonFormatModel(StreamFormatModel): """ Pydantic Model of a Feast JsonFormat. diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index ea55d89988..0953deebee 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -6,7 +6,7 @@ from pyspark.sql.avro.functions import from_avro from pyspark.sql.functions import col, from_json -from feast.data_format import AvroFormat, JsonFormat +from feast.data_format import AvroFormat, ConfluentAvroFormat, JsonFormat from feast.data_source import KafkaSource, PushMode from feast.feature_store import FeatureStore from feast.infra.contrib.stream_processor import ( @@ -42,17 +42,20 @@ def __init__( if not isinstance( sfv.stream_source.kafka_options.message_format, AvroFormat ) and not isinstance( + sfv.stream_source.kafka_options.message_format, ConfluentAvroFormat + )and not isinstance( sfv.stream_source.kafka_options.message_format, JsonFormat ): raise ValueError( - "spark streaming currently only supports json or avro format for kafka source schema" + "spark streaming currently only supports json, avro and confluent avro formats for kafka source schema" ) - self.format = ( - "json" - if isinstance(sfv.stream_source.kafka_options.message_format, JsonFormat) - else "avro" - ) + self.format = "avro" + if isinstance(sfv.stream_source.kafka_options.message_format, JsonFormat): + self.format = "json" + elif isinstance(sfv.stream_source.kafka_options.message_format, ConfluentAvroFormat): + "confluent_avro" + if not isinstance(config, SparkProcessorConfig): raise ValueError("config is not spark processor config") @@ -70,7 +73,7 @@ def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None: return online_store_query def _ingest_stream_data(self) -> StreamTable: - """Only supports json and avro formats currently.""" + """Only supports json, avro and confluent_avro formats currently.""" if self.format == "json": if not isinstance( self.data_source.kafka_options.message_format, JsonFormat @@ -117,6 +120,13 @@ def _ingest_stream_data(self) -> StreamTable: ) .select("table.*") ) + else: + if not isinstance( + self.data_source.kafka_options.message_format, ConfluentAvroFormat + ): + raise ValueError("kafka source message format is not confluent_avro format") + #TO_DO: process ConfluentAvro format. + stream_df = None return stream_df def _construct_transformation_plan(self, df: StreamTable) -> StreamTable: From 4f9fb06171e176cf59678ca098207e0ad3c00f83 Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Sun, 4 Feb 2024 11:39:45 -0600 Subject: [PATCH 02/26] More confluent avro changes --- protos/feast/core/DataFormat.proto | 8 +++++ sdk/python/feast/data_format.py | 2 ++ .../pydantic_models/stream_format_model.py | 2 +- .../infra/contrib/spark_kafka_processor.py | 33 ++++++++++++++----- sdk/python/tests/unit/test_feature_views.py | 4 +-- 5 files changed, 38 insertions(+), 11 deletions(-) diff --git a/protos/feast/core/DataFormat.proto b/protos/feast/core/DataFormat.proto index c453e5e4c8..74a1aa4cea 100644 --- a/protos/feast/core/DataFormat.proto +++ b/protos/feast/core/DataFormat.proto @@ -48,6 +48,13 @@ message StreamFormat { string schema_json = 1; } + // Defines options for the avro data format + message ConfluentAvroFormat { + // Optional if used in a File DataSource as schema is embedded in avro file. + // Specifies the schema of the Avro message as JSON string. + string schema_json = 1; + } + message JsonFormat { string schema_json = 1; } @@ -57,5 +64,6 @@ message StreamFormat { AvroFormat avro_format = 1; ProtoFormat proto_format = 2; JsonFormat json_format = 3; + ConfluentAvroFormat confluent_avro_format = 4; } } diff --git a/sdk/python/feast/data_format.py b/sdk/python/feast/data_format.py index 5726fed98b..f61beadb64 100644 --- a/sdk/python/feast/data_format.py +++ b/sdk/python/feast/data_format.py @@ -89,6 +89,8 @@ def from_proto(cls, proto): fmt = proto.WhichOneof("format") if fmt == "avro_format": return AvroFormat(schema_json=proto.avro_format.schema_json) + if fmt == "confluent_avro_format": + return ConfluentAvroFormat(schema_json=proto.confluent_avro_format.schema_json) if fmt == "json_format": return JsonFormat(schema_json=proto.json_format.schema_json) if fmt == "proto_format": diff --git a/sdk/python/feast/expediagroup/pydantic_models/stream_format_model.py b/sdk/python/feast/expediagroup/pydantic_models/stream_format_model.py index 643848aecd..bbea559f4a 100644 --- a/sdk/python/feast/expediagroup/pydantic_models/stream_format_model.py +++ b/sdk/python/feast/expediagroup/pydantic_models/stream_format_model.py @@ -159,6 +159,6 @@ def from_stream_format( # https://blog.devgenius.io/deserialize-child-classes-with-pydantic-that-gonna-work-784230e1cf83 # This lets us discriminate child classes of DataSourceModel with type hints. AnyStreamFormat = Annotated[ - Union[AvroFormatModel, JsonFormatModel, ProtoFormatModel], + Union[ConfluentAvroFormatModel, AvroFormatModel, JsonFormatModel, ProtoFormatModel], PydanticField(discriminator="format"), ] diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index 0953deebee..80a24e83ec 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -54,7 +54,7 @@ def __init__( if isinstance(sfv.stream_source.kafka_options.message_format, JsonFormat): self.format = "json" elif isinstance(sfv.stream_source.kafka_options.message_format, ConfluentAvroFormat): - "confluent_avro" + self.format = "confluent_avro" if not isinstance(config, SparkProcessorConfig): @@ -74,6 +74,8 @@ def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None: def _ingest_stream_data(self) -> StreamTable: """Only supports json, avro and confluent_avro formats currently.""" + # Test that we reach this path, and stop. + raise ValueError("No, no, NO NO NO, stop everything. Shut it down.") if self.format == "json": if not isinstance( self.data_source.kafka_options.message_format, JsonFormat @@ -97,6 +99,28 @@ def _ingest_stream_data(self) -> StreamTable: ) .select("table.*") ) + elif self.format == "confluent_avro": + if not isinstance( + self.data_source.kafka_options.message_format, ConfluentAvroFormat + ): + raise ValueError("kafka source message format is not confluent_avro format") + #TODO: process ConfluentAvro format. + # stream_df = ( + # self.spark.readStream.format("kafka") + # .options(**self.kafka_options_config) + # .load() + # .withColumn("byteValue", func.expr("substring(value, 6, length(value)-5)")) + # .select( + # from_avro( + # func.col("byteValue"), + # self.data_source.kafka_options.message_format.schema_json, + # self.avro_options, + # ).alias("table") + # ) + # .select("table.*") + # ) + raise ValueError("HOLY MOLY I AM NOT READY TO DEAL WITH CONFLUENT AVRO, GUYS") + stream_df = None else: if not isinstance( self.data_source.kafka_options.message_format, AvroFormat @@ -120,13 +144,6 @@ def _ingest_stream_data(self) -> StreamTable: ) .select("table.*") ) - else: - if not isinstance( - self.data_source.kafka_options.message_format, ConfluentAvroFormat - ): - raise ValueError("kafka source message format is not confluent_avro format") - #TO_DO: process ConfluentAvro format. - stream_df = None return stream_df def _construct_transformation_plan(self, df: StreamTable) -> StreamTable: diff --git a/sdk/python/tests/unit/test_feature_views.py b/sdk/python/tests/unit/test_feature_views.py index f421751300..1ed8b7fb64 100644 --- a/sdk/python/tests/unit/test_feature_views.py +++ b/sdk/python/tests/unit/test_feature_views.py @@ -5,7 +5,7 @@ from feast.aggregation import Aggregation from feast.batch_feature_view import BatchFeatureView -from feast.data_format import AvroFormat +from feast.data_format import AvroFormat, ConfluentAvroFormat from feast.data_source import KafkaSource, PushSource from feast.entity import Entity from feast.feature_view import FeatureView @@ -48,7 +48,7 @@ def test_create_batch_feature_view(): name="kafka", timestamp_field="event_timestamp", kafka_bootstrap_servers="", - message_format=AvroFormat(""), + message_format=ConfluentAvroFormat(""), topic="topic", batch_source=FileSource(path="some path"), ) From b2c2154462c960b2dc14bf8bdaa35c80c6bc00d0 Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Wed, 7 Feb 2024 14:40:09 -0600 Subject: [PATCH 03/26] Empty commit --- empty.txt | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 empty.txt diff --git a/empty.txt b/empty.txt new file mode 100644 index 0000000000..e69de29bb2 From 875108325ae7c57073ff46117a67245d66066ab4 Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Wed, 7 Feb 2024 14:40:23 -0600 Subject: [PATCH 04/26] Empty commit 2 --- empty.txt | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 empty.txt diff --git a/empty.txt b/empty.txt deleted file mode 100644 index e69de29bb2..0000000000 From d86093d270302dd6fc943290d93f6f757a1557ab Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Wed, 7 Feb 2024 14:45:01 -0600 Subject: [PATCH 05/26] Empty commit 3 --- empty.txt | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 empty.txt diff --git a/empty.txt b/empty.txt new file mode 100644 index 0000000000..e69de29bb2 From 503a7012f525111bec1afc7019fbb325a16411c6 Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Wed, 7 Feb 2024 14:45:11 -0600 Subject: [PATCH 06/26] Empty commit 4 --- empty.txt | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 empty.txt diff --git a/empty.txt b/empty.txt deleted file mode 100644 index e69de29bb2..0000000000 From ac014b54e17d20c8e337a2e9348588fc961701a7 Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Wed, 7 Feb 2024 21:04:31 -0600 Subject: [PATCH 07/26] Empty commit --- sdk/python/tests/unit/test_types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/tests/unit/test_types.py b/sdk/python/tests/unit/test_types.py index a45e83e218..1312ab9182 100644 --- a/sdk/python/tests/unit/test_types.py +++ b/sdk/python/tests/unit/test_types.py @@ -14,7 +14,7 @@ def test_primitive_feast_type(): def test_array_feast_type(): array_string = Array(String) - assert array_string.to_value_type() == ValueType.STRING_LIST + assert array_string.to_value_type() == ValueType.STRING_LIST assert from_value_type(array_string.to_value_type()) == array_string array_float_32 = Array(Float32) From 67cb9914fe114c3a052f3fd49da01ab3caeb0699 Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Wed, 7 Feb 2024 21:04:38 -0600 Subject: [PATCH 08/26] Empty commit --- sdk/python/tests/unit/test_types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/tests/unit/test_types.py b/sdk/python/tests/unit/test_types.py index 1312ab9182..a45e83e218 100644 --- a/sdk/python/tests/unit/test_types.py +++ b/sdk/python/tests/unit/test_types.py @@ -14,7 +14,7 @@ def test_primitive_feast_type(): def test_array_feast_type(): array_string = Array(String) - assert array_string.to_value_type() == ValueType.STRING_LIST + assert array_string.to_value_type() == ValueType.STRING_LIST assert from_value_type(array_string.to_value_type()) == array_string array_float_32 = Array(Float32) From f30e0d2e5853fa9aec0b53aaf540833b78e7e54d Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Wed, 7 Feb 2024 21:10:33 -0600 Subject: [PATCH 09/26] Update streaming tests GA --- .github/workflows/streaming-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/streaming-tests.yml b/.github/workflows/streaming-tests.yml index 89595fdd73..1fe99e0af7 100644 --- a/.github/workflows/streaming-tests.yml +++ b/.github/workflows/streaming-tests.yml @@ -51,7 +51,7 @@ jobs: brew install mysql PATH=$PATH:/usr/local/mysql/bin - name: Work around Homebrew MySQL being broken - # See https://github.com/Homebrew/homebrew-core/issues/130258 for more details. + # See https://github.com/Homebrew/homebrew-core/issues/130258 for more details. if: startsWith(matrix.os, 'macOS') run: | brew install zlib From c6ffa7da1a0ae606ac559b1ef42f9164cb1868bb Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Sun, 11 Feb 2024 10:44:01 -0600 Subject: [PATCH 10/26] Add workflow_dispatch as a trigger for unit_tests github action --- .github/workflows/unit_tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml index ac2f513229..755567fa8a 100644 --- a/.github/workflows/unit_tests.yml +++ b/.github/workflows/unit_tests.yml @@ -1,6 +1,6 @@ name: unit-tests -on: [pull_request] +on: [pull_request, workflow_dispatch] jobs: unit-test-python: runs-on: ${{ matrix.os }} From 1b5d0c0236131c484729cb501818a22e5c73e2be Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Sun, 11 Feb 2024 11:04:22 -0600 Subject: [PATCH 11/26] Add confluent packages to setup.py --- setup.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 3a0a2c4314..93b3a7850b 100644 --- a/setup.py +++ b/setup.py @@ -64,7 +64,7 @@ "pyarrow>=4,<12", "pydantic>=1,<2", "pygments>=2.12.0,<3", - "PyYAML>=5.4.0,<7", + # "PyYAML>=5.4.0,<7", "requests", "SQLAlchemy[mypy]>1,<2", "tabulate>=0.8.0,<1", @@ -157,6 +157,12 @@ "elasticsearch==8.8", ] +CONFLUENT_REQUIRED = [ + "pipenv", + "confluent-kafka>=2.0.2", + "pip-system-certs==4.0" +] + CI_REQUIRED = ( [ "build", @@ -195,7 +201,7 @@ "types-protobuf~=3.19.22", "types-python-dateutil", "types-pytz", - "types-PyYAML", + # "types-PyYAML", "types-redis", "types-requests", "types-setuptools", @@ -218,6 +224,7 @@ + HAZELCAST_REQUIRED + MILVUS_REQUIRED + ELASTICSEARCH_REQUIRED + + CONFLUENT_REQUIRED ) From 06f01084e94075d9790f8d09b43d6ef8bcad25ca Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Sun, 11 Feb 2024 11:15:00 -0600 Subject: [PATCH 12/26] Fix test-python-universal-spark to use docker ala the integration tests --- .../workflows/test-python-universal-spark.yml | 165 +++++++++++++----- 1 file changed, 125 insertions(+), 40 deletions(-) diff --git a/.github/workflows/test-python-universal-spark.yml b/.github/workflows/test-python-universal-spark.yml index 3b27cfc36e..261090e09c 100644 --- a/.github/workflows/test-python-universal-spark.yml +++ b/.github/workflows/test-python-universal-spark.yml @@ -1,42 +1,112 @@ name: test-python-universal-spark -on: - repository_dispatch: - branches: [ "confluent_avro_changes", "master" ] - workflow_dispatch: - branches: [ "confluent_avro_changes", "master" ] +on [pull_request, workflow_dispatch]: +# concurrency is currently broken, see details https://github.com/actions/runner/issues/1532 +#concurrency: +# group: pr-integration-tests-${{ github.event.pull_request.number }} +# cancel-in-progress: true jobs: - build: - + build-docker-image: + # all jobs MUST have this if check for 'ok-to-test' or 'approved' for security purposes. + if: + ((github.event.action == 'labeled' && (github.event.label.name == 'approved' || github.event.label.name == 'lgtm' || github.event.label.name == 'ok-to-test')) || + (github.event.action != 'labeled' && (contains(github.event.pull_request.labels.*.name, 'ok-to-test') || contains(github.event.pull_request.labels.*.name, 'approved') || contains(github.event.pull_request.labels.*.name, 'lgtm')))) && + github.repository == 'feast-dev/feast' + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + with: + # pull_request_target runs the workflow in the context of the base repo + # as such actions/checkout needs to be explicit configured to retrieve + # code from the PR. + ref: refs/pull/${{ github.event.pull_request.number }}/merge + submodules: recursive + - name: Set up QEMU + uses: docker/setup-qemu-action@v1 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + with: + install: true + - name: Set up AWS SDK + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: us-west-2 + - name: Login to Amazon ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v1 + - name: Set ECR image tag + id: image-tag + run: echo "::set-output name=DOCKER_IMAGE_TAG::`git rev-parse HEAD`" + - name: Cache Public ECR Image + id: lambda_python_3_9 + uses: actions/cache@v2 + with: + path: ~/cache + key: lambda_python_3_9 + - name: Handle Cache Miss (pull public ECR image & save it to tar file) + if: steps.cache-primes.outputs.cache-hit != 'true' + run: | + mkdir -p ~/cache + docker pull public.ecr.aws/lambda/python:3.9 + docker save public.ecr.aws/lambda/python:3.9 -o ~/cache/lambda_python_3_9.tar + - name: Handle Cache Hit (load docker image from tar file) + if: steps.cache-primes.outputs.cache-hit == 'true' + run: | + docker load -i ~/cache/lambda_python_3_9.tar + - name: Build and push + env: + ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} + ECR_REPOSITORY: feast-python-server + run: | + docker build \ + --file sdk/python/feast/infra/feature_servers/aws_lambda/Dockerfile \ + --tag $ECR_REGISTRY/$ECR_REPOSITORY:${{ steps.image-tag.outputs.DOCKER_IMAGE_TAG }} \ + --load \ + . + docker push $ECR_REGISTRY/$ECR_REPOSITORY:${{ steps.image-tag.outputs.DOCKER_IMAGE_TAG }} + outputs: + DOCKER_IMAGE_TAG: ${{ steps.image-tag.outputs.DOCKER_IMAGE_TAG }} + spark-test-python: + # all jobs MUST have this if check for 'ok-to-test' or 'approved' for security purposes. + if: + ((github.event.action == 'labeled' && (github.event.label.name == 'approved' || github.event.label.name == 'lgtm' || github.event.label.name == 'ok-to-test')) || + (github.event.action != 'labeled' && (contains(github.event.pull_request.labels.*.name, 'ok-to-test') || contains(github.event.pull_request.labels.*.name, 'approved') || contains(github.event.pull_request.labels.*.name, 'lgtm')))) && + github.repository == 'feast-dev/feast' + needs: build-docker-image runs-on: ${{ matrix.os }} strategy: fail-fast: false matrix: - python-version: ["3.8"] - os: [ubuntu-latest, macOS-latest] - exclude: - - os: macOS-latest - python-version: "3.8" + python-version: [ "3.9" ] + os: [ ubuntu-latest ] env: OS: ${{ matrix.os }} PYTHON: ${{ matrix.python-version }} + services: + redis: + image: redis + ports: + - 6379:6379 + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 steps: - - name: Increase swapfile - # Increase ubuntu's swapfile to avoid running out of resources which causes the action to terminate - if: startsWith(matrix.os, 'ubuntu') - run: | - sudo swapoff -a - sudo fallocate -l 8G /swapfile - sudo chmod 600 /swapfile - sudo mkswap /swapfile - sudo swapon /swapfile - sudo swapon --show - uses: actions/checkout@v2 + with: + # pull_request_target runs the workflow in the context of the base repo + # as such actions/checkout needs to be explicit configured to retrieve + # code from the PR. + ref: refs/pull/${{ github.event.pull_request.number }}/merge + submodules: recursive - name: Setup Python - id: setup-python uses: actions/setup-python@v2 + id: setup-python with: python-version: ${{ matrix.python-version }} architecture: x64 @@ -45,17 +115,24 @@ jobs: uses: actions/setup-go@v2 with: go-version: 1.19.7 - - name: Install mysql on macOS - if: startsWith(matrix.os, 'macOS') - run: | - brew install mysql - PATH=$PATH:/usr/local/mysql/bin - - name: Work around Homebrew MySQL being broken - # See https://github.com/Homebrew/homebrew-core/issues/130258 for more details. - if: startsWith(matrix.os, 'macOS') - run: | - brew install zlib - ln -sv $(brew --prefix zlib)/lib/libz.dylib $(brew --prefix)/lib/libzlib.dylib + - name: Authenticate to Google Cloud + uses: 'google-github-actions/auth@v1' + with: + credentials_json: '${{ secrets.GCP_SA_KEY }}' + - name: Set up gcloud SDK + uses: google-github-actions/setup-gcloud@v1 + with: + project_id: ${{ secrets.GCP_PROJECT_ID }} + - name: Use gcloud CLI + run: gcloud info + - name: Set up AWS SDK + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: us-west-2 + - name: Use AWS CLI + run: aws sts get-caller-identity - name: Upgrade pip version run: | pip install --upgrade pip @@ -74,8 +151,7 @@ jobs: restore-keys: | ${{ runner.os }}-${{ steps.setup-python.outputs.python-version }}-pip- - name: Install pip-tools - run: | - pip install -U pip-tools + run: pip install pip-tools - name: Install apache-arrow on ubuntu if: matrix.os == 'ubuntu-latest' run: | @@ -92,8 +168,17 @@ jobs: brew install pkg-config - name: Install dependencies run: make install-python-ci-dependencies - - name: Test Spark + - name: Setup Redis Cluster run: | - make test-python-universal-spark - - + docker pull vishnunair/docker-redis-cluster:latest + docker run -d -p 6001:6379 -p 6002:6380 -p 6003:6381 -p 6004:6382 -p 6005:6383 -p 6006:6384 --name redis-cluster vishnunair/docker-redis-cluster + - name: Test python + if: ${{ always() }} # this will guarantee that step won't be canceled and resources won't leak + env: + FEAST_SERVER_DOCKER_IMAGE_TAG: ${{ needs.build-docker-image.outputs.DOCKER_IMAGE_TAG }} + SNOWFLAKE_CI_DEPLOYMENT: ${{ secrets.SNOWFLAKE_CI_DEPLOYMENT }} + SNOWFLAKE_CI_USER: ${{ secrets.SNOWFLAKE_CI_USER }} + SNOWFLAKE_CI_PASSWORD: ${{ secrets.SNOWFLAKE_CI_PASSWORD }} + SNOWFLAKE_CI_ROLE: ${{ secrets.SNOWFLAKE_CI_ROLE }} + SNOWFLAKE_CI_WAREHOUSE: ${{ secrets.SNOWFLAKE_CI_WAREHOUSE }} + run: make test-python-universal-spark \ No newline at end of file From 6673e7a39a7a6f15800d24345a6a34acac03223b Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Sun, 11 Feb 2024 11:18:18 -0600 Subject: [PATCH 13/26] Fix test-python-universal-spark to use docker ala the integration tests, take 2 --- .github/workflows/test-python-universal-spark.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test-python-universal-spark.yml b/.github/workflows/test-python-universal-spark.yml index 261090e09c..d2ae316741 100644 --- a/.github/workflows/test-python-universal-spark.yml +++ b/.github/workflows/test-python-universal-spark.yml @@ -1,6 +1,6 @@ name: test-python-universal-spark -on [pull_request, workflow_dispatch]: +on: [pull_request, workflow_dispatch]: # concurrency is currently broken, see details https://github.com/actions/runner/issues/1532 #concurrency: From e3c7f42716ca9f64345d40a08dd2ea3839d157d8 Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Sun, 11 Feb 2024 11:56:18 -0600 Subject: [PATCH 14/26] Link spark_kafka_processor --- .../infra/contrib/spark_kafka_processor.py | 28 +++++++------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index 80a24e83ec..24056f4a50 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -2,6 +2,8 @@ from typing import List, Optional import pandas as pd +from confluent_kafka.schema_registry import SchemaRegistryClient +from confluent_kafka.schema_registry.avro import AvroDeserializer from pyspark.sql import DataFrame, SparkSession from pyspark.sql.avro.functions import from_avro from pyspark.sql.functions import col, from_json @@ -43,7 +45,7 @@ def __init__( sfv.stream_source.kafka_options.message_format, AvroFormat ) and not isinstance( sfv.stream_source.kafka_options.message_format, ConfluentAvroFormat - )and not isinstance( + ) and not isinstance( sfv.stream_source.kafka_options.message_format, JsonFormat ): raise ValueError( @@ -55,7 +57,7 @@ def __init__( self.format = "json" elif isinstance(sfv.stream_source.kafka_options.message_format, ConfluentAvroFormat): self.format = "confluent_avro" - + self.init_confluent_avro_processor() if not isinstance(config, SparkProcessorConfig): raise ValueError("config is not spark processor config") @@ -66,6 +68,12 @@ def __init__( self.join_keys = [fs.get_entity(entity).join_key for entity in sfv.entities] super().__init__(fs=fs, sfv=sfv, data_source=sfv.stream_source) + + def init_confluent_avro_processor(self) -> None: + """Extra initialization for Confluent Avro processor, which uses + SchemaRegistry and the Avro Deserializer, both of which need initialization.""" + pass + def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None: ingested_stream_df = self._ingest_stream_data() transformed_df = self._construct_transformation_plan(ingested_stream_df) @@ -75,7 +83,6 @@ def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None: def _ingest_stream_data(self) -> StreamTable: """Only supports json, avro and confluent_avro formats currently.""" # Test that we reach this path, and stop. - raise ValueError("No, no, NO NO NO, stop everything. Shut it down.") if self.format == "json": if not isinstance( self.data_source.kafka_options.message_format, JsonFormat @@ -104,21 +111,6 @@ def _ingest_stream_data(self) -> StreamTable: self.data_source.kafka_options.message_format, ConfluentAvroFormat ): raise ValueError("kafka source message format is not confluent_avro format") - #TODO: process ConfluentAvro format. - # stream_df = ( - # self.spark.readStream.format("kafka") - # .options(**self.kafka_options_config) - # .load() - # .withColumn("byteValue", func.expr("substring(value, 6, length(value)-5)")) - # .select( - # from_avro( - # func.col("byteValue"), - # self.data_source.kafka_options.message_format.schema_json, - # self.avro_options, - # ).alias("table") - # ) - # .select("table.*") - # ) raise ValueError("HOLY MOLY I AM NOT READY TO DEAL WITH CONFLUENT AVRO, GUYS") stream_df = None else: From 35a022c4501621eb81fe81c3174de3daea483614 Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Sun, 11 Feb 2024 12:50:48 -0600 Subject: [PATCH 15/26] Add SchemaRegistry wrapper --- .../schema_registry/schema_registry.py | 60 +++++++++++++++++++ .../infra/contrib/spark_kafka_processor.py | 8 ++- 2 files changed, 66 insertions(+), 2 deletions(-) create mode 100644 sdk/python/feast/expediagroup/schema_registry/schema_registry.py diff --git a/sdk/python/feast/expediagroup/schema_registry/schema_registry.py b/sdk/python/feast/expediagroup/schema_registry/schema_registry.py new file mode 100644 index 0000000000..fd8002bf0a --- /dev/null +++ b/sdk/python/feast/expediagroup/schema_registry/schema_registry.py @@ -0,0 +1,60 @@ +""" +Wrapper for SchemaRegistryClient, to separate Feast from +the extensive auth and configuration process of +connecting to a SchemaRegistry. + +Copyright 2024 Expedia Group +Author: matcarlin@expediagroup.com +""" + +import requests + +from confluent_kafka.schema_registry import SchemaRegistryClient + + +class SchemaRegistry(): + # spark: SparkSession + # format: str + # preprocess_fn: Optional[MethodType] + # join_keys: List[str] + + def __init__(self): + pass + + def get_properties( + user: String, + password: String, + urn: String, + environment: String, + cert_path: String, #https://stackoverflow.com/questions/55203791/python-requests-using-certificate-value-instead-of-path + ) -> dict: + """Discover a Schema Registry with the provided urn and credentials, + and obtain a set of properties for use in Schema Registry calls.""" + discovery_url = "https://stream-discovery-service-{environment}.rcp.us-east-1.data.{environment}.exp-aws.net/v2/discovery/urn/{urn}".format( + environment=environment, urn=urn + ) + + response = requests.get( + discovery_url, + auth=(user, password), + headers={"Accept": "application/json"}, + verify=cert_path, + ) + + if response.status_code != 200: + raise RuntimeError( + "Discovery API returned unexpected HTTP status: {status}".format( + status=str(response.status_code) + ) + ) + + try: + props = json.loads(response.text) + except (TypeError, UnicodeDecodeError): + raise TypeError( + "Discovery API response did not contain valid json: {response}".format( + response=response.text + ) + ) + + return props \ No newline at end of file diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index 24056f4a50..383b7e929c 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -2,7 +2,7 @@ from typing import List, Optional import pandas as pd -from confluent_kafka.schema_registry import SchemaRegistryClient +from feast.expediagroup.schema_registry.schema_registry import SchemaRegistry from confluent_kafka.schema_registry.avro import AvroDeserializer from pyspark.sql import DataFrame, SparkSession from pyspark.sql.avro.functions import from_avro @@ -72,7 +72,11 @@ def __init__( def init_confluent_avro_processor(self) -> None: """Extra initialization for Confluent Avro processor, which uses SchemaRegistry and the Avro Deserializer, both of which need initialization.""" - pass + + user = "VAULT_SECRETS" + password = "VAULT_SECRETS" + urn = "NOT SURE" + environment = "NOT SURE" def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None: ingested_stream_df = self._ingest_stream_data() From a4484808f79bda6226faccfb18a7f0403ab39845 Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Sun, 11 Feb 2024 15:23:04 -0600 Subject: [PATCH 16/26] Add AvroDeserializer to Spark Kafka Processor --- .../schema_registry/schema_registry.py | 198 ++++++++++++++---- .../infra/contrib/spark_kafka_processor.py | 31 +-- 2 files changed, 174 insertions(+), 55 deletions(-) diff --git a/sdk/python/feast/expediagroup/schema_registry/schema_registry.py b/sdk/python/feast/expediagroup/schema_registry/schema_registry.py index fd8002bf0a..52bfe70174 100644 --- a/sdk/python/feast/expediagroup/schema_registry/schema_registry.py +++ b/sdk/python/feast/expediagroup/schema_registry/schema_registry.py @@ -7,54 +7,166 @@ Author: matcarlin@expediagroup.com """ -import requests +import json +import os +import tempfile +from typing import Dict -from confluent_kafka.schema_registry import SchemaRegistryClient +import requests +from confluent_kafka.schema_registry import RegisteredSchema, SchemaRegistryClient class SchemaRegistry(): - # spark: SparkSession - # format: str - # preprocess_fn: Optional[MethodType] - # join_keys: List[str] + props: Dict[str, str] + kafka_params: Dict[str, str] + schema_registry_config: Dict[str, str] + client: SchemaRegistryClient def __init__(self): pass - def get_properties( - user: String, - password: String, - urn: String, - environment: String, - cert_path: String, #https://stackoverflow.com/questions/55203791/python-requests-using-certificate-value-instead-of-path - ) -> dict: - """Discover a Schema Registry with the provided urn and credentials, - and obtain a set of properties for use in Schema Registry calls.""" - discovery_url = "https://stream-discovery-service-{environment}.rcp.us-east-1.data.{environment}.exp-aws.net/v2/discovery/urn/{urn}".format( - environment=environment, urn=urn - ) - - response = requests.get( - discovery_url, - auth=(user, password), - headers={"Accept": "application/json"}, - verify=cert_path, - ) - - if response.status_code != 200: - raise RuntimeError( - "Discovery API returned unexpected HTTP status: {status}".format( - status=str(response.status_code) - ) - ) - - try: - props = json.loads(response.text) - except (TypeError, UnicodeDecodeError): - raise TypeError( - "Discovery API response did not contain valid json: {response}".format( - response=response.text - ) - ) - - return props \ No newline at end of file + def initialize_client( + self, + user: str, + password: str, + urn: str, + environment: str, + cert_path: str, # https://stackoverflow.com/questions/55203791/python-requests-using-certificate-value-instead-of-path + ) -> None: + """ + Discover a Schema Registry with the provided urn and credentials, + obtain a set of properties for use in Schema Registry calls, + and initialize the SchemaRegistryClient. + """ + + discovery_url = "https://stream-discovery-service-{environment}.rcp.us-east-1.data.{environment}.exp-aws.net/v2/discovery/urn/{urn}".format( + environment=environment, urn=urn + ) + + response = requests.get( + discovery_url, + auth=(user, password), + headers={"Accept": "application/json"}, + verify=cert_path, + ) + + if response.status_code != 200: + raise RuntimeError( + "Discovery API returned unexpected HTTP status: {status}".format( + status=str(response.status_code) + ) + ) + + try: + props = json.loads(response.text) + except (TypeError, UnicodeDecodeError): + raise TypeError( + "Discovery API response did not contain valid json: {response}".format( + response=response.text + ) + ) + + self.props = props + + # write ssl key and cert to disk + ssl_key_file, ssl_key_path = tempfile.mkstemp() + with os.fdopen(ssl_key_file, "w") as f: + f.write(props["serde"]["schema.registry.ssl.keystore.key"]) + + ssl_certificate_file, ssl_certificate_path = tempfile.mkstemp() + with os.fdopen(ssl_certificate_file, "w") as f: + f.write(props["serde"]["schema.registry.ssl.keystore.certificate.chain"]) + + self.kafka_params = { + "kafka.security.protocol": props["security"]["security.protocol"], + "kafka.bootstrap.servers": props["connection"]["bootstrap.servers"], + "subscribe": props["connection"]["topic"], + "startingOffsets": props["connection"]["auto.offset.reset"], + "kafka.ssl.truststore.certificates": props["security"][ + "ssl.truststore.certificates" + ], + "kafka.ssl.keystore.certificate.chain": props["security"][ + "ssl.keystore.certificate.chain" + ], + "kafka.ssl.keystore.key": props["security"]["ssl.keystore.key"], + "kafka.ssl.endpoint.identification.algorithm": props["security"][ + "ssl.endpoint.identification.algorithm" + ], + "kafka.ssl.truststore.type": props["security"]["ssl.truststore.type"], + "kafka.ssl.keystore.type": props["security"]["ssl.keystore.type"], + "kafka.topic": props["connection"]["topic"], + "kafka.schema.registry.url": props["serde"]["schema.registry.url"], + "kafka.schema.registry.topic": props["connection"]["topic"], + "kafka.schema.registry.ssl.keystore.type": props["serde"][ + "schema.registry.ssl.keystore.type" + ], + "kafka.schema.registry.ssl.keystore.certificate.chain": props["serde"][ + "schema.registry.ssl.keystore.certificate.chain" + ], + "kafka.schema.registry.ssl.keystore.key": props["serde"][ + "schema.registry.ssl.keystore.key" + ], + "kafka.schema.registry.ssl.truststore.certificates": props["serde"][ + "schema.registry.ssl.truststore.certificates" + ], + "kafka.schema.registry.ssl.truststore.type": props["serde"][ + "schema.registry.ssl.truststore.type" + ], + "value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy", + } + + self.schema_registry_config = { + "schema.registry.topic": props["connection"]["topic"], + "schema.registry.url": props["serde"]["schema.registry.url"], + "schema.registry.ssl.keystore.type": props["serde"][ + "schema.registry.ssl.keystore.type" + ], + "schema.registry.ssl.keystore.certificate.chain": props["serde"][ + "schema.registry.ssl.keystore.certificate.chain" + ], + "schema.registry.ssl.keystore.key": props["serde"][ + "schema.registry.ssl.keystore.key" + ], + "schema.registry.ssl.truststore.certificates": props["serde"][ + "schema.registry.ssl.truststore.certificates" + ], + "schema.registry.ssl.truststore.type": props["serde"][ + "schema.registry.ssl.truststore.type" + ], + } + + schema_registry_url = props["serde"]["schema.registry.url"] + + self.client = SchemaRegistryClient( + { + "url": schema_registry_url, + "ssl.ca.location": cert_path, + "ssl.key.location": ssl_key_path, + "ssl.certificate.location": ssl_certificate_path, + } + ) + + def get_latest_version( + self, + topic_name: str, + ) -> RegisteredSchema: + """ + Get the latest version of the topic. + """ + if not self.client: + raise RuntimeError("Client has not been initialized. Please call initialize_client first.") + + latest_version = self.client.get_latest_version(topic_name) + + return latest_version + + def get_client( + self + ) -> SchemaRegistryClient: + """ + Return the client. + """ + if not self.client: + raise RuntimeError("Client has not been initialized. Please call initialize_client first.") + + return self.client diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index 383b7e929c..cafd8b3852 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -2,7 +2,6 @@ from typing import List, Optional import pandas as pd -from feast.expediagroup.schema_registry.schema_registry import SchemaRegistry from confluent_kafka.schema_registry.avro import AvroDeserializer from pyspark.sql import DataFrame, SparkSession from pyspark.sql.avro.functions import from_avro @@ -10,6 +9,7 @@ from feast.data_format import AvroFormat, ConfluentAvroFormat, JsonFormat from feast.data_source import KafkaSource, PushMode +from feast.expediagroup.schema_registry.schema_registry import SchemaRegistry from feast.feature_store import FeatureStore from feast.infra.contrib.stream_processor import ( ProcessorConfig, @@ -23,6 +23,7 @@ class SparkProcessorConfig(ProcessorConfig): spark_session: SparkSession processing_time: str query_timeout: int + schema_registry_client: Optional[SchemaRegistry] class SparkKafkaProcessor(StreamProcessor): @@ -57,7 +58,6 @@ def __init__( self.format = "json" elif isinstance(sfv.stream_source.kafka_options.message_format, ConfluentAvroFormat): self.format = "confluent_avro" - self.init_confluent_avro_processor() if not isinstance(config, SparkProcessorConfig): raise ValueError("config is not spark processor config") @@ -65,18 +65,17 @@ def __init__( self.preprocess_fn = preprocess_fn self.processing_time = config.processing_time self.query_timeout = config.query_timeout + self.schema_registry_client = config.schema_registry_client if config.schema_registry_client else None self.join_keys = [fs.get_entity(entity).join_key for entity in sfv.entities] - super().__init__(fs=fs, sfv=sfv, data_source=sfv.stream_source) + if isinstance(sfv.stream_source.kafka_options.message_format, ConfluentAvroFormat): + self.init_confluent_avro_processor() + + super().__init__(fs=fs, sfv=sfv, data_source=sfv.stream_source) def init_confluent_avro_processor(self) -> None: - """Extra initialization for Confluent Avro processor, which uses - SchemaRegistry and the Avro Deserializer, both of which need initialization.""" - - user = "VAULT_SECRETS" - password = "VAULT_SECRETS" - urn = "NOT SURE" - environment = "NOT SURE" + """Extra initialization for Confluent Avro processor.""" + self.deserializer = AvroDeserializer(schema_registry_client=self.schema_registry_client.get_client()) def ingest_stream_feature_view(self, to: PushMode = PushMode.ONLINE) -> None: ingested_stream_df = self._ingest_stream_data() @@ -115,8 +114,16 @@ def _ingest_stream_data(self) -> StreamTable: self.data_source.kafka_options.message_format, ConfluentAvroFormat ): raise ValueError("kafka source message format is not confluent_avro format") - raise ValueError("HOLY MOLY I AM NOT READY TO DEAL WITH CONFLUENT AVRO, GUYS") - stream_df = None + + stream_df = ( + self.spark.readStream.format("kafka") + .options(**self.kafka_options_config) + .load() + .select( + self.deserializer(col("value")) + ) + .select("table.*") + ) else: if not isinstance( self.data_source.kafka_options.message_format, AvroFormat From 7ed2d37048f0a34c98cadd81bf9e9825929392ef Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Mon, 12 Feb 2024 15:15:06 -0600 Subject: [PATCH 17/26] Remove unnecessary dependency on mysqlclient - equal to feast PR 3925 --- sdk/python/feast/infra/feature_servers/multicloud/Dockerfile | 1 - .../feast/infra/feature_servers/multicloud/Dockerfile.dev | 1 - sdk/python/requirements/py3.10-ci-requirements.txt | 2 -- sdk/python/requirements/py3.8-ci-requirements.txt | 2 -- sdk/python/requirements/py3.9-ci-requirements.txt | 2 -- sdk/python/tests/unit/test_sql_registry.py | 2 +- setup.py | 2 +- 7 files changed, 2 insertions(+), 10 deletions(-) diff --git a/sdk/python/feast/infra/feature_servers/multicloud/Dockerfile b/sdk/python/feast/infra/feature_servers/multicloud/Dockerfile index c95c515fb4..fdd8e3ac51 100644 --- a/sdk/python/feast/infra/feature_servers/multicloud/Dockerfile +++ b/sdk/python/feast/infra/feature_servers/multicloud/Dockerfile @@ -4,7 +4,6 @@ RUN apt update && \ apt install -y \ jq \ python3-dev \ - default-libmysqlclient-dev \ build-essential RUN pip install pip --upgrade diff --git a/sdk/python/feast/infra/feature_servers/multicloud/Dockerfile.dev b/sdk/python/feast/infra/feature_servers/multicloud/Dockerfile.dev index ecbc199a5b..3fc1355d7a 100644 --- a/sdk/python/feast/infra/feature_servers/multicloud/Dockerfile.dev +++ b/sdk/python/feast/infra/feature_servers/multicloud/Dockerfile.dev @@ -4,7 +4,6 @@ RUN apt update && \ apt install -y \ jq \ python3-dev \ - default-libmysqlclient-dev \ build-essential RUN pip install pip --upgrade diff --git a/sdk/python/requirements/py3.10-ci-requirements.txt b/sdk/python/requirements/py3.10-ci-requirements.txt index 223e76b2e6..2b4170b8f6 100644 --- a/sdk/python/requirements/py3.10-ci-requirements.txt +++ b/sdk/python/requirements/py3.10-ci-requirements.txt @@ -534,8 +534,6 @@ mypy-extensions==1.0.0 # mypy mypy-protobuf==3.1.0 # via eg-feast (setup.py) -mysqlclient==2.2.0 - # via eg-feast (setup.py) nbclient==0.8.0 # via nbconvert nbconvert==7.7.3 diff --git a/sdk/python/requirements/py3.8-ci-requirements.txt b/sdk/python/requirements/py3.8-ci-requirements.txt index 0a1e7d74de..cebde0c1ab 100644 --- a/sdk/python/requirements/py3.8-ci-requirements.txt +++ b/sdk/python/requirements/py3.8-ci-requirements.txt @@ -529,8 +529,6 @@ mypy-extensions==1.0.0 # mypy mypy-protobuf==3.1 # via feast (setup.py) -mysqlclient==2.1.1 - # via feast (setup.py) nbclassic==1.0.0 # via notebook nbclient==0.8.0 diff --git a/sdk/python/requirements/py3.9-ci-requirements.txt b/sdk/python/requirements/py3.9-ci-requirements.txt index 31eb4496c6..5ed0086a83 100644 --- a/sdk/python/requirements/py3.9-ci-requirements.txt +++ b/sdk/python/requirements/py3.9-ci-requirements.txt @@ -540,8 +540,6 @@ mypy-extensions==1.0.0 # mypy mypy-protobuf==3.1.0 # via eg-feast (setup.py) -mysqlclient==2.2.0 - # via eg-feast (setup.py) nbclient==0.8.0 # via nbconvert nbconvert==7.7.3 diff --git a/sdk/python/tests/unit/test_sql_registry.py b/sdk/python/tests/unit/test_sql_registry.py index 5fba4013bd..3cf68b3e08 100644 --- a/sdk/python/tests/unit/test_sql_registry.py +++ b/sdk/python/tests/unit/test_sql_registry.py @@ -104,7 +104,7 @@ def mysql_registry(): registry_config = RegistryConfig( registry_type="sql", - path=f"mysql+mysqldb://{POSTGRES_USER}:{POSTGRES_PASSWORD}@127.0.0.1:{container_port}/{POSTGRES_DB}", + path=f"mysql+pymysql://{POSTGRES_USER}:{POSTGRES_PASSWORD}@127.0.0.1:{container_port}/{POSTGRES_DB}", cache_ttl_seconds=60, ) diff --git a/setup.py b/setup.py index 93b3a7850b..6068eb80db 100644 --- a/setup.py +++ b/setup.py @@ -119,7 +119,7 @@ "psycopg2-binary>=2.8.3,<3", ] -MYSQL_REQUIRED = ["mysqlclient", "pymysql", "types-PyMySQL"] +MYSQL_REQUIRED = ["pymysql", "types-PyMySQL"] HBASE_REQUIRED = [ "happybase>=1.2.0,<3", From d2f0a0722453581f45323f886cbe1344f8d9c832 Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Mon, 12 Feb 2024 19:37:38 -0600 Subject: [PATCH 18/26] Preliminary checkin for test_streaming_ingestion --- .../unit/infra/test_streaming_ingestion.py | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 sdk/python/tests/unit/infra/test_streaming_ingestion.py diff --git a/sdk/python/tests/unit/infra/test_streaming_ingestion.py b/sdk/python/tests/unit/infra/test_streaming_ingestion.py new file mode 100644 index 0000000000..6adc8630db --- /dev/null +++ b/sdk/python/tests/unit/infra/test_streaming_ingestion.py @@ -0,0 +1,26 @@ +# Copyright 2020 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import assertpy +import pytest + +from feast.entity import Entity +from feast.value_type import ValueType + + +def test_the_test(): + entity = Entity(name="my-entity", description="My entity") + assert entity.join_key == "my-entity" + assert(5==5) + + From 85aeb7dde4fe9d337dbe9d1f7c2d94173a104936 Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Tue, 13 Feb 2024 12:32:28 -0600 Subject: [PATCH 19/26] Start adding FeatureView to spark kafka processor, and add a sketch of spark and stream feature view setup to the streaming test --- .../infra/contrib/spark_kafka_processor.py | 8 +++- .../unit/infra/test_streaming_ingestion.py | 39 +++++++++++++++++-- 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index cafd8b3852..c8b24f2a82 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -36,10 +36,16 @@ def __init__( self, *, fs: FeatureStore, - sfv: StreamFeatureView, + sfv: FeatureView, config: ProcessorConfig, preprocess_fn: Optional[MethodType] = None, ): + + # In general, FeatureView may or may not have stream_source, but it must + # have one to use spark kafka processor + if not sfv.stream_source: + raise ValueError("Feature View must have a stream source to use spark streaming.") + if not isinstance(sfv.stream_source, KafkaSource): raise ValueError("data source is not kafka source") if not isinstance( diff --git a/sdk/python/tests/unit/infra/test_streaming_ingestion.py b/sdk/python/tests/unit/infra/test_streaming_ingestion.py index 6adc8630db..e166977eb0 100644 --- a/sdk/python/tests/unit/infra/test_streaming_ingestion.py +++ b/sdk/python/tests/unit/infra/test_streaming_ingestion.py @@ -16,11 +16,42 @@ from feast.entity import Entity from feast.value_type import ValueType +from feast.infra.contrib.spark_kafka_processor import SparkKafkaProcessor -def test_the_test(): - entity = Entity(name="my-entity", description="My entity") - assert entity.join_key == "my-entity" - assert(5==5) +def test_streaming_ingestion(): + + spark_config = IntegrationTestRepoConfig( + provider="local", + online_store_creator=RedisOnlineStoreCreator, + offline_store_creator=SparkDataSourceCreator, + batch_engine={"type": "spark.engine", "partitions": 10}, + ) + spark_environment = construct_test_environment( + spark_config, None, entity_key_serialization_version=1 + ) + df = create_basic_driver_dataset() + + # Make a stream source. + stream_source = KafkaSource( + name="kafka", + timestamp_field="event_timestamp", + kafka_bootstrap_servers="", + message_format=AvroFormat(""), + topic="topic", + batch_source=FileSource(path="some path"), + ) + StreamFeatureView( + name="test kafka stream feature view", + entities=[], + ttl=timedelta(days=30), + source=stream_source, + aggregations=[], + ) + + + + # processor = SparkKafkaProcessor() +# From 84952daa513760744acacd4f09c3cf29f5808e01 Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Wed, 14 Feb 2024 13:41:04 -0600 Subject: [PATCH 20/26] Change dash to underscore in setup.py --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 6068eb80db..5ebf3adfed 100644 --- a/setup.py +++ b/setup.py @@ -159,7 +159,7 @@ CONFLUENT_REQUIRED = [ "pipenv", - "confluent-kafka>=2.0.2", + "confluent_kafka>=2.0.2", "pip-system-certs==4.0" ] From f1257223ca0a014b3c7f1ca0755ec3545d07b49b Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Wed, 14 Feb 2024 14:12:55 -0600 Subject: [PATCH 21/26] Add confluent kafka to requirements --- sdk/python/requirements/py3.10-ci-requirements.txt | 2 ++ sdk/python/requirements/py3.8-ci-requirements.txt | 2 ++ sdk/python/requirements/py3.9-ci-requirements.txt | 2 ++ 3 files changed, 6 insertions(+) diff --git a/sdk/python/requirements/py3.10-ci-requirements.txt b/sdk/python/requirements/py3.10-ci-requirements.txt index 2b4170b8f6..9f79bc70d0 100644 --- a/sdk/python/requirements/py3.10-ci-requirements.txt +++ b/sdk/python/requirements/py3.10-ci-requirements.txt @@ -154,6 +154,8 @@ comm==0.1.3 # via ipykernel coverage[toml]==7.2.7 # via pytest-cov +confluent_kafka==2.0.2 + # via eg-feast (setup.py) cryptography==40.0.2 # via # adal diff --git a/sdk/python/requirements/py3.8-ci-requirements.txt b/sdk/python/requirements/py3.8-ci-requirements.txt index cebde0c1ab..feed0d5ca3 100644 --- a/sdk/python/requirements/py3.8-ci-requirements.txt +++ b/sdk/python/requirements/py3.8-ci-requirements.txt @@ -156,6 +156,8 @@ comm==0.1.3 # via ipykernel coverage[toml]==7.2.7 # via pytest-cov +confluent_kafka==2.0.2 + # via eg-feast (setup.py) cryptography==40.0.2 # via # adal diff --git a/sdk/python/requirements/py3.9-ci-requirements.txt b/sdk/python/requirements/py3.9-ci-requirements.txt index 5ed0086a83..c5e0492fd7 100644 --- a/sdk/python/requirements/py3.9-ci-requirements.txt +++ b/sdk/python/requirements/py3.9-ci-requirements.txt @@ -154,6 +154,8 @@ comm==0.1.3 # via ipykernel coverage[toml]==7.2.7 # via pytest-cov +confluent_kafka==2.0.2 + # via eg-feast (setup.py) cryptography==40.0.2 # via # adal From e7c8a0c9098f3b615c9ee1286b4411e8bd5a7d79 Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Wed, 14 Feb 2024 14:22:28 -0600 Subject: [PATCH 22/26] Import FeatureView in Spark Kafka Processor --- sdk/python/feast/infra/contrib/spark_kafka_processor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/feast/infra/contrib/spark_kafka_processor.py b/sdk/python/feast/infra/contrib/spark_kafka_processor.py index c8b24f2a82..04901d64e4 100644 --- a/sdk/python/feast/infra/contrib/spark_kafka_processor.py +++ b/sdk/python/feast/infra/contrib/spark_kafka_processor.py @@ -11,6 +11,7 @@ from feast.data_source import KafkaSource, PushMode from feast.expediagroup.schema_registry.schema_registry import SchemaRegistry from feast.feature_store import FeatureStore +from feast.feature_view import FeatureView from feast.infra.contrib.stream_processor import ( ProcessorConfig, StreamProcessor, From 2fc310d0f12d640110d13cbe3e3077d7184fac31 Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Wed, 14 Feb 2024 14:32:11 -0600 Subject: [PATCH 23/26] Take 57 --- .../tests/unit/infra/test_streaming_ingestion.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sdk/python/tests/unit/infra/test_streaming_ingestion.py b/sdk/python/tests/unit/infra/test_streaming_ingestion.py index e166977eb0..8be39bdb32 100644 --- a/sdk/python/tests/unit/infra/test_streaming_ingestion.py +++ b/sdk/python/tests/unit/infra/test_streaming_ingestion.py @@ -17,6 +17,18 @@ from feast.entity import Entity from feast.value_type import ValueType from feast.infra.contrib.spark_kafka_processor import SparkKafkaProcessor +from feast.infra.offline_stores.contrib.spark_offline_store.tests.data_source import ( + SparkDataSourceCreator, +) +from tests.integration.feature_repos.integration_test_repo_config import ( + IntegrationTestRepoConfig, +) +from tests.integration.feature_repos.repo_configuration import ( + construct_test_environment, +) +from tests.integration.feature_repos.universal.online_store.redis import ( + RedisOnlineStoreCreator, +) def test_streaming_ingestion(): From d0d86bf4f410c2fa03440e6c5f45ea76707676c1 Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Wed, 14 Feb 2024 14:40:45 -0600 Subject: [PATCH 24/26] Take 58 --- sdk/python/tests/unit/infra/test_streaming_ingestion.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/tests/unit/infra/test_streaming_ingestion.py b/sdk/python/tests/unit/infra/test_streaming_ingestion.py index 8be39bdb32..bdd745c90c 100644 --- a/sdk/python/tests/unit/infra/test_streaming_ingestion.py +++ b/sdk/python/tests/unit/infra/test_streaming_ingestion.py @@ -17,6 +17,7 @@ from feast.entity import Entity from feast.value_type import ValueType from feast.infra.contrib.spark_kafka_processor import SparkKafkaProcessor +from tests.data.data_creator import create_basic_driver_dataset from feast.infra.offline_stores.contrib.spark_offline_store.tests.data_source import ( SparkDataSourceCreator, ) From 32f53c30bdb80b8488d0319b8bab874beeafb047 Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Wed, 14 Feb 2024 15:01:21 -0600 Subject: [PATCH 25/26] Take 59 --- sdk/python/tests/unit/infra/test_streaming_ingestion.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdk/python/tests/unit/infra/test_streaming_ingestion.py b/sdk/python/tests/unit/infra/test_streaming_ingestion.py index bdd745c90c..259e16a3f5 100644 --- a/sdk/python/tests/unit/infra/test_streaming_ingestion.py +++ b/sdk/python/tests/unit/infra/test_streaming_ingestion.py @@ -30,6 +30,10 @@ from tests.integration.feature_repos.universal.online_store.redis import ( RedisOnlineStoreCreator, ) +from feast.data_source import KafkaSource +from feast.data_format import AvroFormat, ConfluentAvroFormat +from feast import FileSource +from feast.stream_feature_view import StreamFeatureView def test_streaming_ingestion(): @@ -51,7 +55,7 @@ def test_streaming_ingestion(): name="kafka", timestamp_field="event_timestamp", kafka_bootstrap_servers="", - message_format=AvroFormat(""), + message_format=ConfluentAvroFormat(""), topic="topic", batch_source=FileSource(path="some path"), ) From 35930f902d2ec24637e807430430bc3032643a60 Mon Sep 17 00:00:00 2001 From: Matt Carlin Date: Wed, 14 Feb 2024 15:17:21 -0600 Subject: [PATCH 26/26] Take 60 --- sdk/python/tests/unit/infra/test_streaming_ingestion.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/python/tests/unit/infra/test_streaming_ingestion.py b/sdk/python/tests/unit/infra/test_streaming_ingestion.py index 259e16a3f5..74a39cefe0 100644 --- a/sdk/python/tests/unit/infra/test_streaming_ingestion.py +++ b/sdk/python/tests/unit/infra/test_streaming_ingestion.py @@ -14,6 +14,8 @@ import assertpy import pytest +from datetime import timedelta + from feast.entity import Entity from feast.value_type import ValueType from feast.infra.contrib.spark_kafka_processor import SparkKafkaProcessor