From 21937106b527088127662d27274dc4c41c5e8169 Mon Sep 17 00:00:00 2001 From: thomasht86 Date: Fri, 15 Nov 2024 10:19:38 +0100 Subject: [PATCH 1/7] maybe remove max_wait --- vespa/deployment.py | 60 ++++++++++++++++++++++++++++++++++++++------- 1 file changed, 51 insertions(+), 9 deletions(-) diff --git a/vespa/deployment.py b/vespa/deployment.py index ea1e130e..0ddeb207 100644 --- a/vespa/deployment.py +++ b/vespa/deployment.py @@ -609,6 +609,7 @@ def deploy( self, instance: Optional[str] = "default", disk_folder: Optional[str] = None, + version: Optional[str] = None, max_wait: int = 300, ) -> Vespa: """ @@ -617,6 +618,7 @@ def deploy( :param instance: Name of this instance of the application, in the Vespa Cloud. :param disk_folder: Disk folder to save the required Vespa config files. Default to application name folder within user's current working directory. + :param version: Vespa version to use for deployment. Default is None, which means the latest version. Must be a valid Vespa version, e.g. "8.435.13". :param max_wait: Seconds to wait for the deployment. :return: a Vespa connection instance. Returns a connection to the mtls endpoint. To connect to the token endpoint, use :func:`VespaCloud.get_application(endpoint_type="token")`. @@ -627,7 +629,13 @@ def deploy( region = self.get_dev_region() job = "dev-" + region - run = self._start_deployment(instance, job, disk_folder, None) + run = self._start_deployment( + instance=instance, + job=job, + disk_folder=disk_folder, + application_zip_bytes=None, + version=version, + ) self._follow_deployment(instance, job, run) app: Vespa = self.get_application( instance=instance, environment="dev", endpoint_type="mtls" @@ -854,7 +862,11 @@ def wait_for_prod_deployment( raise TimeoutError(f"Deployment did not finish within {max_wait} seconds. ") def deploy_from_disk( - self, instance: str, application_root: Path, max_wait: int = 300 + self, + instance: str, + application_root: Path, + max_wait: int = 300, + version: Optional[str] = None, ) -> Vespa: """ Deploy to dev from a directory tree. @@ -864,6 +876,7 @@ def deploy_from_disk( :param instance: Name of the instance where the application is to be run :param application_root: Application package directory root :param max_wait: Seconds to wait for the deployment. + :param version: Vespa version to use for deployment. Default is None, which means the latest version. Must be a valid Vespa version, e.g. "8.435.13". :return: a Vespa connection instance. Returns a connection to the mtls endpoint. To connect to the token endpoint, use :func:`VespaCloud.get_application(endpoint_type="token")`. """ data = BytesIO(self.read_app_package_from_disk(application_root)) @@ -873,11 +886,13 @@ def deploy_from_disk( region = self.get_dev_region() job = "dev-" + region run = self._start_deployment( - instance, job, disk_folder, application_zip_bytes=data + instance=instance, + job=job, + disk_folder=disk_folder, + application_zip_bytes=data, + version=version, ) self._follow_deployment(instance, job, run) - run = self._start_deployment(instance, job, disk_folder, None) - self._follow_deployment(instance, job, run) app: Vespa = self.get_application( instance=instance, environment="dev", endpoint_type="mtls" ) @@ -1538,6 +1553,7 @@ def _start_deployment( job: str, disk_folder: str, application_zip_bytes: Optional[BytesIO] = None, + version: Optional[str] = None, ) -> int: deploy_path = ( "/application/v4/tenant/{}/application/{}/instance/{}/deploy/{}".format( @@ -1551,11 +1567,33 @@ def _start_deployment( if not application_zip_bytes: application_zip_bytes = self._to_application_zip(disk_folder=disk_folder) + if version is not None: + # Create multipart form data + form_data = { + "applicationZip": ( + "application.zip", + application_zip_bytes, + "application/zip", + ), + "deployOptions": ( + "", + json.dumps({"vespaVersion": version}), + "application/json", + ), + } + multipart = MultipartEncoder(fields=form_data) + headers = {"Content-Type": multipart.content_type} + payload = multipart + else: + # Use existing direct zip upload + headers = {"Content-Type": "application/zip"} + payload = application_zip_bytes + response = self._request( "POST", deploy_path, - application_zip_bytes, - {"Content-Type": "application/zip"}, + payload, + headers, ) message = response.get("message", "No message provided") print(message, file=self.output) @@ -1616,9 +1654,12 @@ def _to_application_zip(self, disk_folder: str) -> BytesIO: return buffer - def _follow_deployment(self, instance: str, job: str, run: int) -> None: + def _follow_deployment( + self, instance: str, job: str, run: int, max_wait: int = 600 + ) -> None: last = -1 - while True: + start = time.time() + while time.time() - start < max_wait: try: status, last = self._get_deployment_status(instance, job, run, last) except RuntimeError: @@ -1630,6 +1671,7 @@ def _follow_deployment(self, instance: str, job: str, run: int) -> None: return else: raise RuntimeError("Unexpected status: {}".format(status)) + raise TimeoutError(f"Deployment did not finish within {max_wait} seconds.") def _get_deployment_status( self, instance: str, job: str, run: int, last: int From a1cd46af2ab7fc9420a688e0ff6f3cdba70b844c Mon Sep 17 00:00:00 2001 From: thomasht86 Date: Thu, 21 Nov 2024 12:41:12 +0100 Subject: [PATCH 2/7] update to use multipartencoder if version arg is passed --- vespa/deployment.py | 119 ++++++++++++++++++++++++++++---------------- 1 file changed, 75 insertions(+), 44 deletions(-) diff --git a/vespa/deployment.py b/vespa/deployment.py index 0ddeb207..b4104d74 100644 --- a/vespa/deployment.py +++ b/vespa/deployment.py @@ -1223,28 +1223,21 @@ def _try_get_access_token(self) -> str: return auth["providers"]["auth0"]["systems"]["public"]["access_token"] - def _request_with_access_token( + def _handle_response( self, - method: str, - path: str, - body: BytesIO = BytesIO(), - headers={}, - return_raw_response=False, + response: httpx.Response, + return_raw_response: bool = False, + path: str = "", ) -> Union[dict, httpx.Response]: - if not self.control_plane_access_token: - raise ValueError("Access token not set.") - body.seek(0) - headers = { - "Authorization": "Bearer " + self.control_plane_access_token, - **headers, - } - response = self.get_connection_response_with_retry(method, path, body, headers) + """Common response handling logic""" if return_raw_response: return response + try: parsed = json.load(response) except json.JSONDecodeError: parsed = response.read() + if response.status_code != 200: print(parsed) raise HTTPError( @@ -1252,8 +1245,41 @@ def _request_with_access_token( ) return parsed + def _get_auth_headers(self, additional_headers: dict = {}) -> dict: + """Create authorization headers""" + if not self.control_plane_access_token: + raise ValueError("Access token not set.") + + return { + "Authorization": f"Bearer {self.control_plane_access_token}", + **additional_headers, + } + + def _request_with_access_token( + self, + method: str, + path: str, + body: Union[BytesIO, MultipartEncoder] = BytesIO(), + headers: dict = {}, + return_raw_response: bool = False, + ) -> Union[dict, httpx.Response]: + """Make authenticated request with access token""" + if hasattr(body, "seek"): + body.seek(0) + + auth_headers = self._get_auth_headers(headers) + response = self.get_connection_response_with_retry( + method, path, body, auth_headers + ) + + return self._handle_response(response, return_raw_response, path) + def _request( - self, method: str, path: str, body: BytesIO = BytesIO(), headers={} + self, + method: str, + path: str, + body: Union[BytesIO, MultipartEncoder] = BytesIO(), + headers: dict = {}, ) -> Union[dict, httpx.Response]: if self.control_plane_auth_method == "access_token": return self._request_with_access_token(method, path, body, headers) @@ -1268,47 +1294,55 @@ def _request_with_api_key( self, method: str, path: str, - body: BytesIO = BytesIO(), - headers={}, - return_raw_response=False, + body: Union[BytesIO, MultipartEncoder] = BytesIO(), + headers: dict = {}, + return_raw_response: bool = False, ) -> Union[dict, httpx.Response]: digest = hashes.Hash(hashes.SHA256(), default_backend()) - body.seek(0) - digest.update(body.read()) + + # Handle different body types + if isinstance(body, MultipartEncoder): + # Use the encoded data for hash computation + digest = hashes.Hash(hashes.SHA256(), default_backend()) + digest.update(body.to_string()) # This moves the buffer position to the end + body._buffer.seek(0) # Needs to be reset. Otherwise, no data will be sent + # Update the headers to include the Content-Type + headers.update({"Content-Type": body.content_type}) + # Read the content of multipart_data into a bytes object + multipart_data_bytes: bytes = body.to_string() + headers.update({"Content-Length": str(len(multipart_data_bytes))}) + # Convert multipart_data_bytes to type BytesIO + body_data: BytesIO = BytesIO(multipart_data_bytes) + else: + if hasattr(body, "seek"): + body.seek(0) + content = body.read() + digest.update(content) + body_data = content + # Create signature content_hash = standard_b64encode(digest.finalize()).decode("UTF-8") - timestamp = ( - datetime.utcnow().isoformat() + "Z" - ) # Java's Instant.parse requires the neutral time zone appended + timestamp = datetime.utcnow().isoformat() + "Z" url = self.base_url + path canonical_message = method + "\n" + url + "\n" + timestamp + "\n" + content_hash signature = self.api_key.sign( canonical_message.encode("UTF-8"), ec.ECDSA(hashes.SHA256()) ) + signature_b64 = standard_b64encode(signature).decode("UTF-8") headers = { "X-Timestamp": timestamp, "X-Content-Hash": content_hash, - "X-Key-Id": self.tenant + ":" + self.application + ":" + "default", + "X-Key-Id": f"{self.tenant}:{self.application}:default", "X-Key": self.api_public_key_bytes, - "X-Authorization": standard_b64encode(signature), + "X-Authorization": signature_b64, **headers, } - body.seek(0) - response = self.get_connection_response_with_retry(method, path, body, headers) - if return_raw_response: - return response - try: - parsed = json.load(response) - except json.JSONDecodeError: - parsed = response.read() - if response.status_code != 200: - print(parsed) - raise HTTPError( - f"HTTP {response.status_code} error: {response.reason_phrase} for {url}" - ) - return parsed + response = self.get_connection_response_with_retry( + method, path, body_data, headers + ) + return self._handle_response(response, return_raw_response, path) def get_all_endpoints( self, @@ -1590,10 +1624,7 @@ def _start_deployment( payload = application_zip_bytes response = self._request( - "POST", - deploy_path, - payload, - headers, + method="POST", path=deploy_path, body=payload, headers=headers ) message = response.get("message", "No message provided") print(message, file=self.output) @@ -1655,7 +1686,7 @@ def _to_application_zip(self, disk_folder: str) -> BytesIO: return buffer def _follow_deployment( - self, instance: str, job: str, run: int, max_wait: int = 600 + self, instance: str, job: str, run: int, max_wait: int = 1800 ) -> None: last = -1 start = time.time() From 640d95fcc2bf997283ebea8135cefdd4abc227e4 Mon Sep 17 00:00:00 2001 From: thomasht86 Date: Thu, 21 Nov 2024 12:54:48 +0100 Subject: [PATCH 3/7] pass through max_wait param --- vespa/deployment.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vespa/deployment.py b/vespa/deployment.py index b4104d74..d4317840 100644 --- a/vespa/deployment.py +++ b/vespa/deployment.py @@ -636,7 +636,7 @@ def deploy( application_zip_bytes=None, version=version, ) - self._follow_deployment(instance, job, run) + self._follow_deployment(instance, job, run, max_wait) app: Vespa = self.get_application( instance=instance, environment="dev", endpoint_type="mtls" ) From 5301a652bb357aa31aad4ed405b866c5a45240b6 Mon Sep 17 00:00:00 2001 From: thomasht86 Date: Thu, 21 Nov 2024 12:59:51 +0100 Subject: [PATCH 4/7] better docstring --- vespa/deployment.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vespa/deployment.py b/vespa/deployment.py index d4317840..45c6fd24 100644 --- a/vespa/deployment.py +++ b/vespa/deployment.py @@ -618,7 +618,7 @@ def deploy( :param instance: Name of this instance of the application, in the Vespa Cloud. :param disk_folder: Disk folder to save the required Vespa config files. Default to application name folder within user's current working directory. - :param version: Vespa version to use for deployment. Default is None, which means the latest version. Must be a valid Vespa version, e.g. "8.435.13". + :param version: Vespa version to use for deployment. Default is None, which means the latest version. Should only be set on instructions from Vespa team. Must be a valid Vespa version, e.g. "8.435.13". :param max_wait: Seconds to wait for the deployment. :return: a Vespa connection instance. Returns a connection to the mtls endpoint. To connect to the token endpoint, use :func:`VespaCloud.get_application(endpoint_type="token")`. From 03410b0e365b634e9724ba90ed59e1c83f6e0804 Mon Sep 17 00:00:00 2001 From: thomasht86 Date: Thu, 21 Nov 2024 13:15:25 +0100 Subject: [PATCH 5/7] handle bytesIO correctly --- vespa/deployment.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/vespa/deployment.py b/vespa/deployment.py index 45c6fd24..363d79d8 100644 --- a/vespa/deployment.py +++ b/vespa/deployment.py @@ -1316,9 +1316,8 @@ def _request_with_api_key( else: if hasattr(body, "seek"): body.seek(0) - content = body.read() - digest.update(content) - body_data = content + digest.update(body.read()) + body_data = body # Create signature content_hash = standard_b64encode(digest.finalize()).decode("UTF-8") timestamp = datetime.utcnow().isoformat() + "Z" From d80752e65294d52da9c34b18a62b30ea3873d8b0 Mon Sep 17 00:00:00 2001 From: thomasht86 Date: Thu, 21 Nov 2024 13:25:14 +0100 Subject: [PATCH 6/7] update link to pdf --- .../source/examples/mother-of-all-embedding-models-cloud.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sphinx/source/examples/mother-of-all-embedding-models-cloud.ipynb b/docs/sphinx/source/examples/mother-of-all-embedding-models-cloud.ipynb index d820f580..3d9de136 100644 --- a/docs/sphinx/source/examples/mother-of-all-embedding-models-cloud.ipynb +++ b/docs/sphinx/source/examples/mother-of-all-embedding-models-cloud.ipynb @@ -624,7 +624,7 @@ "### That is it!\n", "\n", "That is how easy it is to represent the brand new M3 FlagEmbedding representations in Vespa! Read more in the\n", - "[M3 technical report](https://github.com/FlagOpen/FlagEmbedding/blob/master/FlagEmbedding/BGE_M3/BGE_M3.pdf).\n", + "[M3 technical report](https://github.com/FlagOpen/FlagEmbedding/blob/master/research/BGE_M3/BGE_M3.pdf).\n", "\n", "We can go ahead and delete the Vespa cloud instance we deployed by:\n" ] From 8b443ce72a6045954b53f4231312192f1c56eddb Mon Sep 17 00:00:00 2001 From: thomasht86 Date: Thu, 21 Nov 2024 14:08:03 +0100 Subject: [PATCH 7/7] increase max_wait --- vespa/deployment.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vespa/deployment.py b/vespa/deployment.py index 363d79d8..32f4f3ee 100644 --- a/vespa/deployment.py +++ b/vespa/deployment.py @@ -610,7 +610,7 @@ def deploy( instance: Optional[str] = "default", disk_folder: Optional[str] = None, version: Optional[str] = None, - max_wait: int = 300, + max_wait: int = 1800, ) -> Vespa: """ Deploy the given application package as the given instance in the Vespa Cloud dev environment.