diff --git a/screwdriver.yaml b/screwdriver.yaml index c6a89389..a8768b79 100644 --- a/screwdriver.yaml +++ b/screwdriver.yaml @@ -155,6 +155,30 @@ jobs: - run-integration-cloud: | pytest vespa/test_integration_vespa_cloud_token.py -s -v + integration-cloud-vector-search: + requires: [~commit] + annotations: + screwdriver.cd/timeout: 120 + screwdriver.cd/cpu: HIGH + screwdriver.cd/ram: HIGH + screwdriver.cd/buildPeriodically: H 11 * * * + secrets: + - VESPA_TEAM_API_KEY + - VESPA_CLOUD_SECRET_TOKEN + environment: + SD_ZIP_ARTIFACTS: true + steps: + - setup: | + export WORK_DIR=$SD_DIND_SHARE_PATH + export RESOURCES_DIR=$(pwd)/vespa/resources + - install-python: | + dnf install -y python38-pip + python3 -m pip install --upgrade pip + python3 -m pip install pytest datasets + python3 -m pip install -e .[full] + - run-integration-cloud: | + pytest vespa/test_integration_vespa_cloud_vector_search.py -s -v + notebooks-cloud: requires: [integration-cloud] annotations: diff --git a/vespa/application.py b/vespa/application.py index 7218a8b1..26a75514 100644 --- a/vespa/application.py +++ b/vespa/application.py @@ -167,6 +167,20 @@ def asyncio( app=self, connections=connections, total_timeout=total_timeout ) + def syncio( + self, connections: Optional[int] = 100, total_timeout: int = 10 + ) -> "VespaSync": + """ + Access Vespa synchronous connection layer + + :param connections: Number of allowed concurrent connections + :param total_timeout: Total timeout in secs. + :return: Instance of Vespa asynchronous layer. + """ + return VespaSync( + app=self, pool_connections=connections, pool_maxsize=connections + ) + @staticmethod def _run_coroutine_new_event_loop(loop, coro): asyncio.set_event_loop(loop) diff --git a/vespa/deployment.py b/vespa/deployment.py index 032df9b6..5dbe3e74 100644 --- a/vespa/deployment.py +++ b/vespa/deployment.py @@ -18,9 +18,8 @@ from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives.asymmetric import ec -from vespa.application import Vespa -from vespa.application import VESPA_CLOUD_SECRET_TOKEN -from vespa.package import ApplicationPackage +from vespa.application import Vespa, VESPA_CLOUD_SECRET_TOKEN +from vespa.package import ApplicationPackage, AuthClient, Parameter CFG_SERVER_START_TIMEOUT = 300 APP_INIT_TIMEOUT = 300 @@ -370,6 +369,7 @@ def __init__( application_package: ApplicationPackage, key_location: Optional[str] = None, key_content: Optional[str] = None, + auth_client_token_id: Optional[str] = None, output_file: IO = sys.stdout, ) -> None: """ @@ -381,6 +381,8 @@ def __init__( :param key_location: Location of the private key used for signing HTTP requests to the Vespa Cloud. :param key_content: Content of the private key used for signing HTTP requests to the Vespa Cloud. Use only when key file is not available. + :param auth_client_token_id: Use token based data plane authentication. This is the token name configured in the Vespa Cloud Console. + This is used to configure Vespa services.xml. The token is given read and write permissions. :param output_file: Output file to write output messages. Default is sys.stdout """ self.tenant = tenant @@ -401,6 +403,23 @@ def __init__( "api.vespa-external.aws.oath.cloud", 4443 ) self.output = output_file + self.auth_client_token_id = auth_client_token_id + if auth_client_token_id is not None: + application.auth_clients = [ + AuthClient(id="mtls", + permissions=["read,write"], + parameters=[ + Parameter("certificate", {"file": "security/clients.pem"}) + ] + ), + AuthClient(id=auth_client_token_id, + permissions=["read,write"], + parameters=[ + Parameter("token", {"id": "accessToken"} + ) + ]) + ] + def __enter__(self) -> "VespaCloud": return self @@ -468,7 +487,7 @@ def deploy_from_disk(self, instance: str, application_root: Path) -> Vespa: job = "dev-" + region run = self._start_deployment(instance, job, disk_folder, application_zip_bytes=data) self._follow_deployment(instance, job, run) - if os.environ.get("VESPA_CLOUD_SECRET_TOKEN") is None: + if os.environ.get(VESPA_CLOUD_SECRET_TOKEN) is None: endpoint_url = self._get_mtls_endpoint(instance=instance, region=region) else: endpoint_url = self._get_token_endpoint(instance=instance, region=region) diff --git a/vespa/test_integration_vespa_cloud.py b/vespa/test_integration_vespa_cloud.py index 4ecb757f..f7a04856 100644 --- a/vespa/test_integration_vespa_cloud.py +++ b/vespa/test_integration_vespa_cloud.py @@ -6,7 +6,7 @@ import unittest from cryptography.hazmat.primitives import serialization from vespa.application import Vespa -from vespa.package import AuthClient, Parameter +from vespa.package import Parameter from vespa.deployment import VespaCloud from vespa.test_integration_docker import ( TestApplicationCommon, @@ -18,21 +18,7 @@ class TestVespaKeyAndCertificate(unittest.TestCase): def setUp(self) -> None: - """ - self.clients = [ - AuthClient(id="mtls", - permissions=["read", "write"], - parameters=[ - Parameter("certificate", {"file": "security/clients.pem"}) - ]), - AuthClient(id="token", - permissions=["read"], - parameters=[ - Parameter("token", {"id": "pyvespa_integration_msmarco"}) - ]) - ]""" self.app_package = create_msmarco_application_package() - self.vespa_cloud = VespaCloud( tenant="vespa-team", application="pyvespa-integration", @@ -88,20 +74,6 @@ def tearDown(self) -> None: class TestMsmarcoApplication(TestApplicationCommon): def setUp(self) -> None: - """ - self.clients = [ - AuthClient(id="mtls", - permissions=["read", "write"], - parameters=[ - Parameter("certificate", {"file": "security/clients.pem"}) - ]), - AuthClient(id="token", - permissions=["read"], - parameters=[ - Parameter("token", {"id": "pyvespa_integration_msmarco"}) - ]) - ] - """ self.app_package = create_msmarco_application_package() self.vespa_cloud = VespaCloud( tenant="vespa-team", diff --git a/vespa/test_integration_vespa_cloud_vector_search.py b/vespa/test_integration_vespa_cloud_vector_search.py new file mode 100644 index 00000000..612daf0b --- /dev/null +++ b/vespa/test_integration_vespa_cloud_vector_search.py @@ -0,0 +1,130 @@ +# Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +import os +import shutil +import unittest +from vespa.application import Vespa, ApplicationPackage +from vespa.package import Schema, Document, Field, HNSW, RankProfile +from vespa.deployment import VespaCloud +from vespa.io import VespaResponse + +APP_INIT_TIMEOUT = 900 + +def create_vector_ada_application_package() -> ApplicationPackage: + return ApplicationPackage( + name="sample_application", + schema=Schema( + name="vector", + document=Document( + name="vector", + fields=[ + Field(name="id", type="string", indexing=["attribute", "summary"]), + Field( + name="embedding", + type="tensor(x[1536])", + indexing=["attribute", "summary", "index"], + ann=HNSW( + distance_metric="innerproduct", + max_links_per_node=16, + neighbors_to_explore_at_insert=128, + ), + ) + ], + rank_profile=RankProfile( + name="default", + inputs=[("query(q)", "tensor(x[1536])")], + first_phase="closeness(field, embedding))") + ) + ) +) + +class TestVectorSearch(unittest.TestCase): + def setUp(self) -> None: + self.app_package = create_vector_ada_application_package() + self.vespa_cloud = VespaCloud( + tenant="vespa-team", + application="pyvespa-integration-vector-search", + key_content=os.getenv("VESPA_TEAM_API_KEY").replace(r"\n", "\n"), + application_package=self.app_package, + auth_client_token_id="pyvespa_integration_msmarco" + ) + self.disk_folder = os.path.join(os.getenv("WORK_DIR"), "sample_application") + self.instance_name = "default" + self.app: Vespa = self.vespa_cloud.deploy( + instance=self.instance_name, disk_folder=self.disk_folder + ) + print("Endpoint used " + self.app.url) + + def test_right_endpoint_used_with_token(self): + # The secrect token is set in env variable. + print("Endpoint used " + self.app.url) + self.app.wait_for_application_up(max_wait=APP_INIT_TIMEOUT) + self.assertEqual(200, self.app.get_application_status().status_code) + + def test_vector_indexing_and_query(self): + from datasets import load_dataset + print("Endpoint used " + self.app.url) + sample_size = 2000 + + dataset = load_dataset("KShivendu/dbpedia-entities-openai-1M", split="train", streaming=True).take(sample_size) + docs = list(dataset) + ok = 0 + with self.app.syncio() as sync_session: + for doc in docs: + response:VespaResponse = sync_session.feed_data_point( + schema="vector", + data_id=doc["_id"], + fields={ + "id": doc["_id"], + "embedding": doc["openai"] + } + ) + self.assertEqual(response.get_status_code(), 200) + ok +=1 + + self.assertEqual(ok, sample_size) + ok = 0 + + with self.app.asyncio() as async_session: + for doc in docs: + response:VespaResponse = async_session.feed_data_point( + schema="vector", + data_id=doc["_id"], + fields={ + "id": doc["_id"], + "embedding": doc["openai"] + } + ) + self.assertEqual(response.get_status_code(), 200) + ok +=1 + self.assertEqual(ok, sample_size) + + with self.app.syncio() as sync_session: + response:VespaResponse = sync_session.query( + { + "yql": "select id from sources * where {targetHits:10}nearestNeighbor(embedding,q)", + "input.query(q)": docs[0]["openai"], + 'hits' :10 + } + ) + self.assertEqual(response.get_status_code(), 200) + self.assertEqual(len(response.hits), 10) + + with self.app.asyncio() as async_session: + response:VespaResponse = async_session.query( + { + "yql": "select id from sources * where {targetHits:10}nearestNeighbor(embedding,q)", + "input.query(q)": docs[0]["openai"], + 'hits' :10 + } + ) + self.assertEqual(response.get_status_code(), 200) + self.assertEqual(len(response.hits), 10) + + def tearDown(self) -> None: + self.app.delete_all_docs( + content_cluster_name="vector_content", schema="vector" + ) + shutil.rmtree(self.disk_folder, ignore_errors=True) + +